Skip to main content

dbn/encode/json/
async.rs

1use tokio::io::{self, AsyncWriteExt};
2
3use crate::{
4    encode::{AsyncEncodeRecord, AsyncEncodeRecordRef, AsyncEncodeRecordTextExt, DbnEncodable},
5    rtype_dispatch, Error, HasRType, Metadata, RecordRef, Result,
6};
7
8use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf};
9
10/// Type for encoding files and streams of DBN records in JSON lines.
11pub struct Encoder<W>
12where
13    W: io::AsyncWriteExt + Unpin,
14{
15    writer: W,
16    buf: String,
17    should_pretty_print: bool,
18    use_pretty_px: bool,
19    use_pretty_ts: bool,
20}
21
22impl<W> Encoder<W>
23where
24    W: io::AsyncWriteExt + Unpin,
25{
26    /// Creates a new instance of [`Encoder`]. If `should_pretty_print` is `true`,
27    /// each JSON object will be nicely formatted and indented, instead of the default
28    /// compact output with no whitespace between key-value pairs.
29    pub fn new(
30        writer: W,
31        should_pretty_print: bool,
32        use_pretty_px: bool,
33        use_pretty_ts: bool,
34    ) -> Self {
35        Self {
36            writer,
37            buf: String::new(),
38            should_pretty_print,
39            use_pretty_px,
40            use_pretty_ts,
41        }
42    }
43
44    /// Encodes `metadata` into JSON.
45    ///
46    /// # Errors
47    /// This function returns an error if there's an error writing to `writer`.
48    ///
49    /// # Cancel safety
50    /// This method is not cancellation safe. If this method is used in a
51    /// `tokio::select!` statement and another branch completes first, then the
52    /// metadata JSON may have been partially written, but future calls will begin writing
53    /// the metadata JSON from the beginning.
54    pub async fn encode_metadata(&mut self, metadata: &Metadata) -> Result<()> {
55        to_json_in_buf(
56            &mut self.buf,
57            metadata,
58            self.should_pretty_print,
59            self.use_pretty_px,
60            self.use_pretty_ts,
61        );
62        let io_err = |e| Error::io(e, "writing metadata");
63        self.write_buf(io_err).await?;
64        self.writer.flush().await.map_err(io_err)?;
65        Ok(())
66    }
67
68    /// Returns a reference to the underlying writer.
69    pub fn get_ref(&self) -> &W {
70        &self.writer
71    }
72
73    /// Returns a mutable reference to the underlying writer.
74    pub fn get_mut(&mut self) -> &mut W {
75        &mut self.writer
76    }
77
78    /// Writes to `self.buf`, but not the writer.
79    fn encode_to_buf<R: DbnEncodable>(&mut self, record: &R) {
80        to_json_in_buf(
81            &mut self.buf,
82            record,
83            self.should_pretty_print,
84            self.use_pretty_px,
85            self.use_pretty_ts,
86        );
87    }
88
89    async fn write_buf<F>(&mut self, handle_err: F) -> crate::Result<()>
90    where
91        F: FnOnce(io::Error) -> Error,
92    {
93        let res = self
94            .writer
95            .write_all(self.buf.as_bytes())
96            .await
97            .map_err(handle_err);
98        // Always clear `buf`
99        self.buf.clear();
100        res
101    }
102
103    async fn encode_with_ts_out<R: DbnEncodable + HasRType + Clone>(
104        &mut self,
105        rec: &R,
106        ts_out: u64,
107    ) -> Result<()> {
108        self.encode_record(&crate::WithTsOut::new(rec.clone(), ts_out))
109            .await
110    }
111}
112
113impl<W> AsyncEncodeRecord for Encoder<W>
114where
115    W: AsyncWriteExt + Unpin,
116{
117    async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
118        self.encode_to_buf(record);
119        self.write_buf(|e| Error::io(e, "writing record")).await
120    }
121
122    /// Encodes a slice of DBN records.
123    ///
124    /// # Errors
125    /// This function returns an error if it's unable to write to the underlying writer.
126    ///
127    /// # Cancel safety
128    /// This method is not cancellation safe. If this method is used in a
129    /// `tokio::select!` statement and another branch completes first, then the
130    /// record may have been partially written, but future calls will begin writing the
131    /// encoded record from the beginning.
132    async fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
133        for record in records {
134            self.encode_to_buf(record);
135        }
136        self.write_buf(|e| Error::io(e, format!("writing {} records", records.len())))
137            .await
138    }
139    async fn flush(&mut self) -> Result<()> {
140        self.writer
141            .flush()
142            .await
143            .map_err(|e| Error::io(e, "flushing output"))
144    }
145
146    async fn shutdown(&mut self) -> Result<()> {
147        self.writer
148            .shutdown()
149            .await
150            .map_err(|e| Error::io(e, "shutting down"))
151    }
152}
153
154impl<W> AsyncEncodeRecordRef for Encoder<W>
155where
156    W: AsyncWriteExt + Unpin,
157{
158    async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> Result<()> {
159        rtype_dispatch!(record_ref, self.encode_record().await)?
160    }
161
162    async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> {
163        for record_ref in record_refs {
164            rtype_dispatch!(record_ref, self.encode_to_buf())?;
165        }
166        self.write_buf(|e| Error::io(e, format!("writing {} records", record_refs.len())))
167            .await
168    }
169
170    async unsafe fn encode_record_ref_ts_out(
171        &mut self,
172        record_ref: RecordRef<'_>,
173        ts_out: bool,
174    ) -> Result<()> {
175        rtype_dispatch!(record_ref, ts_out: ts_out, self.encode_record().await)?
176    }
177
178    async fn encode_record_ref_with_ts_out(
179        &mut self,
180        record_ref: RecordRef<'_>,
181        ts_out: u64,
182    ) -> Result<()> {
183        rtype_dispatch!(record_ref, self.encode_with_ts_out(ts_out).await)?
184    }
185}
186
187impl<W> AsyncEncodeRecordTextExt for Encoder<W>
188where
189    W: AsyncWriteExt + Unpin,
190{
191    async fn encode_record_with_sym<R: DbnEncodable>(
192        &mut self,
193        record: &R,
194        symbol: Option<&str>,
195    ) -> Result<()> {
196        to_json_with_sym_in_buf(
197            &mut self.buf,
198            record,
199            self.should_pretty_print,
200            self.use_pretty_px,
201            self.use_pretty_ts,
202            symbol,
203        );
204        self.write_buf(|e| Error::io(e, "writing record")).await
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::ffi::c_char;
211
212    use tokio::io::{AsyncWriteExt, BufWriter};
213
214    use crate::{encode::test_data::RECORD_HEADER, enums::rtype, MboMsg, RecordHeader};
215
216    use super::*;
217
218    async fn write_to_json_string<R>(
219        record: &R,
220        should_pretty_print: bool,
221        use_pretty_px: bool,
222        use_pretty_ts: bool,
223    ) -> String
224    where
225        R: DbnEncodable,
226    {
227        let mut buffer = Vec::new();
228        let mut writer = BufWriter::new(&mut buffer);
229        Encoder::new(
230            &mut writer,
231            should_pretty_print,
232            use_pretty_px,
233            use_pretty_ts,
234        )
235        .encode_record(record)
236        .await
237        .unwrap();
238        writer.flush().await.unwrap();
239        String::from_utf8(buffer).expect("valid UTF-8")
240    }
241
242    async fn write_ref_to_json_string(
243        record: RecordRef<'_>,
244        should_pretty_print: bool,
245        use_pretty_px: bool,
246        use_pretty_ts: bool,
247    ) -> String {
248        let mut buffer = Vec::new();
249        let mut writer = BufWriter::new(&mut buffer);
250        Encoder::new(
251            &mut writer,
252            should_pretty_print,
253            use_pretty_px,
254            use_pretty_ts,
255        )
256        .encode_record_ref(record)
257        .await
258        .unwrap();
259        writer.flush().await.unwrap();
260        String::from_utf8(buffer).expect("valid UTF-8")
261    }
262
263    #[tokio::test]
264    async fn test_mbo_write_json() {
265        let record = MboMsg {
266            hd: RecordHeader::new::<MboMsg>(
267                rtype::MBO,
268                RECORD_HEADER.publisher_id,
269                RECORD_HEADER.instrument_id,
270                RECORD_HEADER.ts_event,
271            ),
272            order_id: 16,
273            price: 5500,
274            size: 3,
275            flags: 128.into(),
276            channel_id: 14,
277            action: 'R' as c_char,
278            side: 'N' as c_char,
279            ts_recv: 1658441891000000000,
280            ts_in_delta: 22_000,
281            sequence: 1_002_375,
282        };
283        let res = write_to_json_string(&record, false, true, false).await;
284        let ref_res = write_ref_to_json_string(RecordRef::from(&record), false, true, false).await;
285
286        assert_eq!(res, ref_res);
287        assert_eq!(
288            ref_res,
289            format!(
290                "{{{},{},{}}}\n",
291                r#""ts_recv":"1658441891000000000""#,
292                r#""hd":{"ts_event":"1658441851000000000","rtype":160,"publisher_id":1,"instrument_id":323}"#,
293                r#""action":"R","side":"N","price":"0.000005500","size":3,"channel_id":14,"order_id":"16","flags":128,"ts_in_delta":22000,"sequence":1002375"#
294            )
295        );
296    }
297}