1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
mod csv;
pub(crate) mod dbz;
mod json;
use std::{fmt, io};
use anyhow::anyhow;
use serde_json::ser::CompactFormatter;
use databento_defs::{
enums::Schema,
record::{
ConstTypeId, Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TbboMsg, TickMsg, TradeMsg,
},
};
use self::{
csv::{serialize::CsvSerialize, write_csv},
json::{pretty_formatter, write_json, write_json_metadata},
};
use crate::{Dbz, Metadata};
#[derive(Clone, Copy, Debug)]
pub enum OutputEncoding {
Csv,
Json { should_pretty_print: bool },
}
impl<R: io::BufRead> Dbz<R> {
pub fn write_to(self, writer: impl io::Write, encoding: OutputEncoding) -> anyhow::Result<()> {
match self.schema() {
Schema::Mbo => self.write_with_tick_to::<TickMsg, _>(writer, encoding),
Schema::Mbp1 => self.write_with_tick_to::<Mbp1Msg, _>(writer, encoding),
Schema::Mbp10 => self.write_with_tick_to::<Mbp10Msg, _>(writer, encoding),
Schema::Tbbo => self.write_with_tick_to::<TbboMsg, _>(writer, encoding),
Schema::Trades => self.write_with_tick_to::<TradeMsg, _>(writer, encoding),
Schema::Ohlcv1S | Schema::Ohlcv1M | Schema::Ohlcv1H | Schema::Ohlcv1D => {
self.write_with_tick_to::<OhlcvMsg, _>(writer, encoding)
}
Schema::Definition => self.write_with_tick_to::<SymDefMsg, _>(writer, encoding),
Schema::Statistics => Err(anyhow!("Not implemented for schema Statistics")),
Schema::Status => self.write_with_tick_to::<StatusMsg, _>(writer, encoding),
}
}
fn write_with_tick_to<T, W>(self, writer: W, encoding: OutputEncoding) -> anyhow::Result<()>
where
T: ConstTypeId + CsvSerialize + fmt::Debug,
W: io::Write,
{
let iter = self.try_into_iter::<T>()?;
match encoding {
OutputEncoding::Csv => write_csv(writer, iter),
OutputEncoding::Json {
should_pretty_print,
} => {
if should_pretty_print {
write_json(writer, pretty_formatter(), iter)
} else {
write_json(writer, CompactFormatter, iter)
}
}
}
}
}
impl Metadata {
pub fn write_to(&self, writer: impl io::Write, encoding: OutputEncoding) -> anyhow::Result<()> {
match encoding {
OutputEncoding::Csv => Err(anyhow!(
"Encode metadata as a CSV is unsupported because it isn't tabular"
)),
OutputEncoding::Json {
should_pretty_print,
} => {
if should_pretty_print {
write_json_metadata(writer, pretty_formatter(), self)
} else {
write_json_metadata(writer, CompactFormatter, self)
}
}
}
}
}
#[cfg(test)]
mod test_data {
use databento_defs::record::{BidAskPair, RecordHeader};
use streaming_iterator::StreamingIterator;
pub const RECORD_HEADER: RecordHeader = RecordHeader {
length: 30,
rtype: 4,
publisher_id: 1,
product_id: 323,
ts_event: 1658441851000000000,
};
pub const BID_ASK: BidAskPair = BidAskPair {
bid_px: 372000000000000,
ask_px: 372500000000000,
bid_sz: 10,
ask_sz: 5,
bid_ct: 5,
ask_ct: 2,
};
pub struct VecStream<T> {
vec: Vec<T>,
idx: isize,
}
impl<T> VecStream<T> {
pub fn new(vec: Vec<T>) -> Self {
Self { vec, idx: -1 }
}
}
impl<T> StreamingIterator for VecStream<T> {
type Item = T;
fn advance(&mut self) {
self.idx += 1;
}
fn get(&self) -> Option<&Self::Item> {
self.vec.get(self.idx as usize)
}
}
}