1mod csv;
2pub(crate) mod dbz;
3mod json;
4
5use std::{fmt, io};
6
7use anyhow::anyhow;
8use serde_json::ser::CompactFormatter;
9
10use databento_defs::{
11 enums::Schema,
12 record::{
13 ConstTypeId, Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TbboMsg, TickMsg, TradeMsg,
14 },
15};
16
17use self::{
18 csv::{serialize::CsvSerialize, write_csv},
19 json::{pretty_formatter, write_json, write_json_metadata},
20};
21use crate::{Dbz, Metadata};
22
23#[derive(Clone, Copy, Debug)]
25pub enum OutputEncoding {
26 Csv,
28 Json { should_pretty_print: bool },
30}
31
32impl<R: io::BufRead> Dbz<R> {
33 pub fn write_to(self, writer: impl io::Write, encoding: OutputEncoding) -> anyhow::Result<()> {
41 match self.schema() {
42 Schema::Mbo => self.write_with_tick_to::<TickMsg, _>(writer, encoding),
43 Schema::Mbp1 => self.write_with_tick_to::<Mbp1Msg, _>(writer, encoding),
44 Schema::Mbp10 => self.write_with_tick_to::<Mbp10Msg, _>(writer, encoding),
45 Schema::Tbbo => self.write_with_tick_to::<TbboMsg, _>(writer, encoding),
46 Schema::Trades => self.write_with_tick_to::<TradeMsg, _>(writer, encoding),
47 Schema::Ohlcv1S | Schema::Ohlcv1M | Schema::Ohlcv1H | Schema::Ohlcv1D => {
48 self.write_with_tick_to::<OhlcvMsg, _>(writer, encoding)
49 }
50 Schema::Definition => self.write_with_tick_to::<SymDefMsg, _>(writer, encoding),
51 Schema::Statistics => Err(anyhow!("Not implemented for schema Statistics")),
52 Schema::Status => self.write_with_tick_to::<StatusMsg, _>(writer, encoding),
53 }
54 }
55
56 fn write_with_tick_to<T, W>(self, writer: W, encoding: OutputEncoding) -> anyhow::Result<()>
57 where
58 T: ConstTypeId + CsvSerialize + fmt::Debug,
59 W: io::Write,
60 {
61 let iter = self.try_into_iter::<T>()?;
62 match encoding {
63 OutputEncoding::Csv => write_csv(writer, iter),
64 OutputEncoding::Json {
65 should_pretty_print,
66 } => {
67 if should_pretty_print {
68 write_json(writer, pretty_formatter(), iter)
69 } else {
70 write_json(writer, CompactFormatter, iter)
71 }
72 }
73 }
74 }
75}
76
77impl Metadata {
78 pub fn write_to(&self, writer: impl io::Write, encoding: OutputEncoding) -> anyhow::Result<()> {
88 match encoding {
89 OutputEncoding::Csv => Err(anyhow!(
90 "Encode metadata as a CSV is unsupported because it isn't tabular"
91 )),
92 OutputEncoding::Json {
93 should_pretty_print,
94 } => {
95 if should_pretty_print {
96 write_json_metadata(writer, pretty_formatter(), self)
97 } else {
98 write_json_metadata(writer, CompactFormatter, self)
99 }
100 }
101 }
102 }
103}
104
105#[cfg(test)]
106mod test_data {
107 use databento_defs::record::{BidAskPair, RecordHeader};
108 use streaming_iterator::StreamingIterator;
109
110 pub const RECORD_HEADER: RecordHeader = RecordHeader {
112 length: 30,
113 rtype: 4,
114 publisher_id: 1,
115 product_id: 323,
116 ts_event: 1658441851000000000,
117 };
118
119 pub const BID_ASK: BidAskPair = BidAskPair {
120 bid_px: 372000000000000,
121 ask_px: 372500000000000,
122 bid_sz: 10,
123 ask_sz: 5,
124 bid_ct: 5,
125 ask_ct: 2,
126 };
127
128 pub struct VecStream<T> {
130 vec: Vec<T>,
131 idx: isize,
132 }
133
134 impl<T> VecStream<T> {
135 pub fn new(vec: Vec<T>) -> Self {
136 Self { vec, idx: -1 }
139 }
140 }
141
142 impl<T> StreamingIterator for VecStream<T> {
143 type Item = T;
144
145 fn advance(&mut self) {
146 self.idx += 1;
147 }
148
149 fn get(&self) -> Option<&Self::Item> {
150 self.vec.get(self.idx as usize)
151 }
152 }
153}