dbz_lib/write/
mod.rs

1mod csv;
2pub(crate) mod dbz;
3mod json;
4
5use std::{fmt, io};
6
7use anyhow::anyhow;
8use serde_json::ser::CompactFormatter;
9
10use databento_defs::{
11    enums::Schema,
12    record::{
13        ConstTypeId, Mbp10Msg, Mbp1Msg, OhlcvMsg, StatusMsg, SymDefMsg, TbboMsg, TickMsg, TradeMsg,
14    },
15};
16
17use self::{
18    csv::{serialize::CsvSerialize, write_csv},
19    json::{pretty_formatter, write_json, write_json_metadata},
20};
21use crate::{Dbz, Metadata};
22
23/// An encoding that DBZs can be translated to.
24#[derive(Clone, Copy, Debug)]
25pub enum OutputEncoding {
26    /// Comma-separate values.
27    Csv,
28    /// JavaScript object notation.
29    Json { should_pretty_print: bool },
30}
31
32impl<R: io::BufRead> Dbz<R> {
33    /// Streams the contents of the [`Dbz`] to `writer` encoding it using `encoding`. Consumes the
34    /// [`Dbz`] object.
35    ///
36    /// # Errors
37    /// This function returns an error if [`Dbz::schema()`] is
38    /// [`Schema::Statistics`](databento_defs::enums::Schema::Statistics). It will also
39    /// return an error if there's an issue writing the output to `writer`.
40    pub fn write_to(self, writer: impl io::Write, encoding: OutputEncoding) -> anyhow::Result<()> {
41        match self.schema() {
42            Schema::Mbo => self.write_with_tick_to::<TickMsg, _>(writer, encoding),
43            Schema::Mbp1 => self.write_with_tick_to::<Mbp1Msg, _>(writer, encoding),
44            Schema::Mbp10 => self.write_with_tick_to::<Mbp10Msg, _>(writer, encoding),
45            Schema::Tbbo => self.write_with_tick_to::<TbboMsg, _>(writer, encoding),
46            Schema::Trades => self.write_with_tick_to::<TradeMsg, _>(writer, encoding),
47            Schema::Ohlcv1S | Schema::Ohlcv1M | Schema::Ohlcv1H | Schema::Ohlcv1D => {
48                self.write_with_tick_to::<OhlcvMsg, _>(writer, encoding)
49            }
50            Schema::Definition => self.write_with_tick_to::<SymDefMsg, _>(writer, encoding),
51            Schema::Statistics => Err(anyhow!("Not implemented for schema Statistics")),
52            Schema::Status => self.write_with_tick_to::<StatusMsg, _>(writer, encoding),
53        }
54    }
55
56    fn write_with_tick_to<T, W>(self, writer: W, encoding: OutputEncoding) -> anyhow::Result<()>
57    where
58        T: ConstTypeId + CsvSerialize + fmt::Debug,
59        W: io::Write,
60    {
61        let iter = self.try_into_iter::<T>()?;
62        match encoding {
63            OutputEncoding::Csv => write_csv(writer, iter),
64            OutputEncoding::Json {
65                should_pretty_print,
66            } => {
67                if should_pretty_print {
68                    write_json(writer, pretty_formatter(), iter)
69                } else {
70                    write_json(writer, CompactFormatter, iter)
71                }
72            }
73        }
74    }
75}
76
77impl Metadata {
78    /// Writes the metadata to `writer` encoding it using `encoding`, if supported.
79    ///
80    /// # Note
81    /// Encoding Metadata as CSV is unsupported.
82    ///
83    /// # Errors
84    /// This function returns an error if [`Dbz::schema()`] is
85    /// [`Schema::Statistics`](databento_defs::enums::Schema::Statistics). It will also
86    /// return an error if there's an issue writing the output to `writer`.
87    pub fn write_to(&self, writer: impl io::Write, encoding: OutputEncoding) -> anyhow::Result<()> {
88        match encoding {
89            OutputEncoding::Csv => Err(anyhow!(
90                "Encode metadata as a CSV is unsupported because it isn't tabular"
91            )),
92            OutputEncoding::Json {
93                should_pretty_print,
94            } => {
95                if should_pretty_print {
96                    write_json_metadata(writer, pretty_formatter(), self)
97                } else {
98                    write_json_metadata(writer, CompactFormatter, self)
99                }
100            }
101        }
102    }
103}
104
105#[cfg(test)]
106mod test_data {
107    use databento_defs::record::{BidAskPair, RecordHeader};
108    use streaming_iterator::StreamingIterator;
109
110    // Common data used in multiple tests
111    pub const RECORD_HEADER: RecordHeader = RecordHeader {
112        length: 30,
113        rtype: 4,
114        publisher_id: 1,
115        product_id: 323,
116        ts_event: 1658441851000000000,
117    };
118
119    pub const BID_ASK: BidAskPair = BidAskPair {
120        bid_px: 372000000000000,
121        ask_px: 372500000000000,
122        bid_sz: 10,
123        ask_sz: 5,
124        bid_ct: 5,
125        ask_ct: 2,
126    };
127
128    /// A testing shim to get a streaming iterator from a [`Vec`].
129    pub struct VecStream<T> {
130        vec: Vec<T>,
131        idx: isize,
132    }
133
134    impl<T> VecStream<T> {
135        pub fn new(vec: Vec<T>) -> Self {
136            // initialize at -1 because `advance()` is always called before
137            // `get()`.
138            Self { vec, idx: -1 }
139        }
140    }
141
142    impl<T> StreamingIterator for VecStream<T> {
143        type Item = T;
144
145        fn advance(&mut self) {
146            self.idx += 1;
147        }
148
149        fn get(&self) -> Option<&Self::Item> {
150            self.vec.get(self.idx as usize)
151        }
152    }
153}