1use tokio::io::{self, AsyncWriteExt};
2
3use crate::{
4 encode::{AsyncEncodeRecord, AsyncEncodeRecordRef, AsyncEncodeRecordTextExt, DbnEncodable},
5 rtype_dispatch, Error, Metadata, RecordRef, Result,
6};
7
8use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf};
9
10pub struct Encoder<W>
12where
13 W: io::AsyncWriteExt + Unpin,
14{
15 writer: W,
16 buf: String,
17 should_pretty_print: bool,
18 use_pretty_px: bool,
19 use_pretty_ts: bool,
20}
21
22impl<W> Encoder<W>
23where
24 W: io::AsyncWriteExt + Unpin,
25{
26 pub fn new(
30 writer: W,
31 should_pretty_print: bool,
32 use_pretty_px: bool,
33 use_pretty_ts: bool,
34 ) -> Self {
35 Self {
36 writer,
37 buf: String::new(),
38 should_pretty_print,
39 use_pretty_px,
40 use_pretty_ts,
41 }
42 }
43
44 pub async fn encode_metadata(&mut self, metadata: &Metadata) -> Result<()> {
55 to_json_in_buf(
56 &mut self.buf,
57 metadata,
58 self.should_pretty_print,
59 self.use_pretty_px,
60 self.use_pretty_ts,
61 );
62 let io_err = |e| Error::io(e, "writing metadata");
63 self.write_buf(io_err).await?;
64 self.writer.flush().await.map_err(io_err)?;
65 Ok(())
66 }
67
68 pub fn get_ref(&self) -> &W {
70 &self.writer
71 }
72
73 pub fn get_mut(&mut self) -> &mut W {
75 &mut self.writer
76 }
77
78 fn encode_to_buf<R: DbnEncodable>(&mut self, record: &R) {
80 to_json_in_buf(
81 &mut self.buf,
82 record,
83 self.should_pretty_print,
84 self.use_pretty_px,
85 self.use_pretty_ts,
86 );
87 }
88
89 async fn write_buf<F>(&mut self, handle_err: F) -> crate::Result<()>
90 where
91 F: FnOnce(io::Error) -> Error,
92 {
93 let res = self
94 .writer
95 .write_all(self.buf.as_bytes())
96 .await
97 .map_err(handle_err);
98 self.buf.clear();
100 res
101 }
102}
103
104impl<W> AsyncEncodeRecord for Encoder<W>
105where
106 W: AsyncWriteExt + Unpin,
107{
108 async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
109 self.encode_to_buf(record);
110 self.write_buf(|e| Error::io(e, "writing record")).await
111 }
112
113 async fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
124 for record in records {
125 self.encode_to_buf(record);
126 }
127 self.write_buf(|e| Error::io(e, format!("writing {} records", records.len())))
128 .await
129 }
130 async fn flush(&mut self) -> Result<()> {
131 self.writer
132 .flush()
133 .await
134 .map_err(|e| Error::io(e, "flushing output"))
135 }
136
137 async fn shutdown(&mut self) -> Result<()> {
138 self.writer
139 .shutdown()
140 .await
141 .map_err(|e| Error::io(e, "shutting down"))
142 }
143}
144
145impl<W> AsyncEncodeRecordRef for Encoder<W>
146where
147 W: AsyncWriteExt + Unpin,
148{
149 async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> Result<()> {
150 rtype_dispatch!(record_ref, self.encode_record().await)?
151 }
152
153 async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> {
154 for record_ref in record_refs {
155 rtype_dispatch!(record_ref, self.encode_to_buf())?;
156 }
157 self.write_buf(|e| Error::io(e, format!("writing {} records", record_refs.len())))
158 .await
159 }
160
161 async unsafe fn encode_record_ref_ts_out(
162 &mut self,
163 record_ref: RecordRef<'_>,
164 ts_out: bool,
165 ) -> Result<()> {
166 rtype_dispatch!(record_ref, ts_out: ts_out, self.encode_record().await)?
167 }
168}
169
170impl<W> AsyncEncodeRecordTextExt for Encoder<W>
171where
172 W: AsyncWriteExt + Unpin,
173{
174 async fn encode_record_with_sym<R: DbnEncodable>(
175 &mut self,
176 record: &R,
177 symbol: Option<&str>,
178 ) -> Result<()> {
179 to_json_with_sym_in_buf(
180 &mut self.buf,
181 record,
182 self.should_pretty_print,
183 self.use_pretty_px,
184 self.use_pretty_ts,
185 symbol,
186 );
187 self.write_buf(|e| Error::io(e, "writing record")).await
188 }
189}
190
191#[cfg(test)]
192mod tests {
193 use std::ffi::c_char;
194
195 use tokio::io::{AsyncWriteExt, BufWriter};
196
197 use crate::{encode::test_data::RECORD_HEADER, enums::rtype, MboMsg, RecordHeader};
198
199 use super::*;
200
201 async fn write_to_json_string<R>(
202 record: &R,
203 should_pretty_print: bool,
204 use_pretty_px: bool,
205 use_pretty_ts: bool,
206 ) -> String
207 where
208 R: DbnEncodable,
209 {
210 let mut buffer = Vec::new();
211 let mut writer = BufWriter::new(&mut buffer);
212 Encoder::new(
213 &mut writer,
214 should_pretty_print,
215 use_pretty_px,
216 use_pretty_ts,
217 )
218 .encode_record(record)
219 .await
220 .unwrap();
221 writer.flush().await.unwrap();
222 String::from_utf8(buffer).expect("valid UTF-8")
223 }
224
225 async fn write_ref_to_json_string(
226 record: RecordRef<'_>,
227 should_pretty_print: bool,
228 use_pretty_px: bool,
229 use_pretty_ts: bool,
230 ) -> String {
231 let mut buffer = Vec::new();
232 let mut writer = BufWriter::new(&mut buffer);
233 Encoder::new(
234 &mut writer,
235 should_pretty_print,
236 use_pretty_px,
237 use_pretty_ts,
238 )
239 .encode_record_ref(record)
240 .await
241 .unwrap();
242 writer.flush().await.unwrap();
243 String::from_utf8(buffer).expect("valid UTF-8")
244 }
245
246 #[tokio::test]
247 async fn test_mbo_write_json() {
248 let record = MboMsg {
249 hd: RecordHeader::new::<MboMsg>(
250 rtype::MBO,
251 RECORD_HEADER.publisher_id,
252 RECORD_HEADER.instrument_id,
253 RECORD_HEADER.ts_event,
254 ),
255 order_id: 16,
256 price: 5500,
257 size: 3,
258 flags: 128.into(),
259 channel_id: 14,
260 action: 'R' as c_char,
261 side: 'N' as c_char,
262 ts_recv: 1658441891000000000,
263 ts_in_delta: 22_000,
264 sequence: 1_002_375,
265 };
266 let res = write_to_json_string(&record, false, true, false).await;
267 let ref_res = write_ref_to_json_string(RecordRef::from(&record), false, true, false).await;
268
269 assert_eq!(res, ref_res);
270 assert_eq!(
271 ref_res,
272 format!(
273 "{{{},{},{}}}\n",
274 r#""ts_recv":"1658441891000000000""#,
275 r#""hd":{"ts_event":"1658441851000000000","rtype":160,"publisher_id":1,"instrument_id":323}"#,
276 r#""action":"R","side":"N","price":"0.000005500","size":3,"channel_id":14,"order_id":"16","flags":128,"ts_in_delta":22000,"sequence":1002375"#
277 )
278 );
279 }
280}