Skip to main content

dbn/encode/json/
async.rs

1use tokio::io::{self, AsyncWriteExt};
2
3use crate::{
4    encode::{AsyncEncodeRecord, AsyncEncodeRecordRef, AsyncEncodeRecordTextExt, DbnEncodable},
5    rtype_dispatch, Error, Metadata, RecordRef, Result,
6};
7
8use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf};
9
10/// Type for encoding files and streams of DBN records in JSON lines.
11pub struct Encoder<W>
12where
13    W: io::AsyncWriteExt + Unpin,
14{
15    writer: W,
16    buf: String,
17    should_pretty_print: bool,
18    use_pretty_px: bool,
19    use_pretty_ts: bool,
20}
21
22impl<W> Encoder<W>
23where
24    W: io::AsyncWriteExt + Unpin,
25{
26    /// Creates a new instance of [`Encoder`]. If `should_pretty_print` is `true`,
27    /// each JSON object will be nicely formatted and indented, instead of the default
28    /// compact output with no whitespace between key-value pairs.
29    pub fn new(
30        writer: W,
31        should_pretty_print: bool,
32        use_pretty_px: bool,
33        use_pretty_ts: bool,
34    ) -> Self {
35        Self {
36            writer,
37            buf: String::new(),
38            should_pretty_print,
39            use_pretty_px,
40            use_pretty_ts,
41        }
42    }
43
44    /// Encodes `metadata` into JSON.
45    ///
46    /// # Errors
47    /// This function returns an error if there's an error writing to `writer`.
48    ///
49    /// # Cancel safety
50    /// This method is not cancellation safe. If this method is used in a
51    /// `tokio::select!` statement and another branch completes first, then the
52    /// metadata JSON may have been partially written, but future calls will begin writing
53    /// the metadata JSON from the beginning.
54    pub async fn encode_metadata(&mut self, metadata: &Metadata) -> Result<()> {
55        to_json_in_buf(
56            &mut self.buf,
57            metadata,
58            self.should_pretty_print,
59            self.use_pretty_px,
60            self.use_pretty_ts,
61        );
62        let io_err = |e| Error::io(e, "writing metadata");
63        self.write_buf(io_err).await?;
64        self.writer.flush().await.map_err(io_err)?;
65        Ok(())
66    }
67
68    /// Returns a reference to the underlying writer.
69    pub fn get_ref(&self) -> &W {
70        &self.writer
71    }
72
73    /// Returns a mutable reference to the underlying writer.
74    pub fn get_mut(&mut self) -> &mut W {
75        &mut self.writer
76    }
77
78    /// Writes to `self.buf`, but not the writer.
79    fn encode_to_buf<R: DbnEncodable>(&mut self, record: &R) {
80        to_json_in_buf(
81            &mut self.buf,
82            record,
83            self.should_pretty_print,
84            self.use_pretty_px,
85            self.use_pretty_ts,
86        );
87    }
88
89    async fn write_buf<F>(&mut self, handle_err: F) -> crate::Result<()>
90    where
91        F: FnOnce(io::Error) -> Error,
92    {
93        let res = self
94            .writer
95            .write_all(self.buf.as_bytes())
96            .await
97            .map_err(handle_err);
98        // Always clear `buf`
99        self.buf.clear();
100        res
101    }
102}
103
104impl<W> AsyncEncodeRecord for Encoder<W>
105where
106    W: AsyncWriteExt + Unpin,
107{
108    async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
109        self.encode_to_buf(record);
110        self.write_buf(|e| Error::io(e, "writing record")).await
111    }
112
113    /// Encodes a slice of DBN records.
114    ///
115    /// # Errors
116    /// This function returns an error if it's unable to write to the underlying writer.
117    ///
118    /// # Cancel safety
119    /// This method is not cancellation safe. If this method is used in a
120    /// `tokio::select!` statement and another branch completes first, then the
121    /// record may have been partially written, but future calls will begin writing the
122    /// encoded record from the beginning.
123    async fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
124        for record in records {
125            self.encode_to_buf(record);
126        }
127        self.write_buf(|e| Error::io(e, format!("writing {} records", records.len())))
128            .await
129    }
130    async fn flush(&mut self) -> Result<()> {
131        self.writer
132            .flush()
133            .await
134            .map_err(|e| Error::io(e, "flushing output"))
135    }
136
137    async fn shutdown(&mut self) -> Result<()> {
138        self.writer
139            .shutdown()
140            .await
141            .map_err(|e| Error::io(e, "shutting down"))
142    }
143}
144
145impl<W> AsyncEncodeRecordRef for Encoder<W>
146where
147    W: AsyncWriteExt + Unpin,
148{
149    async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> Result<()> {
150        rtype_dispatch!(record_ref, self.encode_record().await)?
151    }
152
153    async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> {
154        for record_ref in record_refs {
155            rtype_dispatch!(record_ref, self.encode_to_buf())?;
156        }
157        self.write_buf(|e| Error::io(e, format!("writing {} records", record_refs.len())))
158            .await
159    }
160
161    async unsafe fn encode_record_ref_ts_out(
162        &mut self,
163        record_ref: RecordRef<'_>,
164        ts_out: bool,
165    ) -> Result<()> {
166        rtype_dispatch!(record_ref, ts_out: ts_out, self.encode_record().await)?
167    }
168}
169
170impl<W> AsyncEncodeRecordTextExt for Encoder<W>
171where
172    W: AsyncWriteExt + Unpin,
173{
174    async fn encode_record_with_sym<R: DbnEncodable>(
175        &mut self,
176        record: &R,
177        symbol: Option<&str>,
178    ) -> Result<()> {
179        to_json_with_sym_in_buf(
180            &mut self.buf,
181            record,
182            self.should_pretty_print,
183            self.use_pretty_px,
184            self.use_pretty_ts,
185            symbol,
186        );
187        self.write_buf(|e| Error::io(e, "writing record")).await
188    }
189}
190
191#[cfg(test)]
192mod tests {
193    use std::ffi::c_char;
194
195    use tokio::io::{AsyncWriteExt, BufWriter};
196
197    use crate::{encode::test_data::RECORD_HEADER, enums::rtype, MboMsg, RecordHeader};
198
199    use super::*;
200
201    async fn write_to_json_string<R>(
202        record: &R,
203        should_pretty_print: bool,
204        use_pretty_px: bool,
205        use_pretty_ts: bool,
206    ) -> String
207    where
208        R: DbnEncodable,
209    {
210        let mut buffer = Vec::new();
211        let mut writer = BufWriter::new(&mut buffer);
212        Encoder::new(
213            &mut writer,
214            should_pretty_print,
215            use_pretty_px,
216            use_pretty_ts,
217        )
218        .encode_record(record)
219        .await
220        .unwrap();
221        writer.flush().await.unwrap();
222        String::from_utf8(buffer).expect("valid UTF-8")
223    }
224
225    async fn write_ref_to_json_string(
226        record: RecordRef<'_>,
227        should_pretty_print: bool,
228        use_pretty_px: bool,
229        use_pretty_ts: bool,
230    ) -> String {
231        let mut buffer = Vec::new();
232        let mut writer = BufWriter::new(&mut buffer);
233        Encoder::new(
234            &mut writer,
235            should_pretty_print,
236            use_pretty_px,
237            use_pretty_ts,
238        )
239        .encode_record_ref(record)
240        .await
241        .unwrap();
242        writer.flush().await.unwrap();
243        String::from_utf8(buffer).expect("valid UTF-8")
244    }
245
246    #[tokio::test]
247    async fn test_mbo_write_json() {
248        let record = MboMsg {
249            hd: RecordHeader::new::<MboMsg>(
250                rtype::MBO,
251                RECORD_HEADER.publisher_id,
252                RECORD_HEADER.instrument_id,
253                RECORD_HEADER.ts_event,
254            ),
255            order_id: 16,
256            price: 5500,
257            size: 3,
258            flags: 128.into(),
259            channel_id: 14,
260            action: 'R' as c_char,
261            side: 'N' as c_char,
262            ts_recv: 1658441891000000000,
263            ts_in_delta: 22_000,
264            sequence: 1_002_375,
265        };
266        let res = write_to_json_string(&record, false, true, false).await;
267        let ref_res = write_ref_to_json_string(RecordRef::from(&record), false, true, false).await;
268
269        assert_eq!(res, ref_res);
270        assert_eq!(
271            ref_res,
272            format!(
273                "{{{},{},{}}}\n",
274                r#""ts_recv":"1658441891000000000""#,
275                r#""hd":{"ts_event":"1658441851000000000","rtype":160,"publisher_id":1,"instrument_id":323}"#,
276                r#""action":"R","side":"N","price":"0.000005500","size":3,"channel_id":14,"order_id":"16","flags":128,"ts_in_delta":22000,"sequence":1002375"#
277            )
278        );
279    }
280}