1use tokio::io::{self, AsyncWriteExt};
2
3use crate::{
4 encode::{AsyncEncodeRecord, AsyncEncodeRecordRef, AsyncEncodeRecordTextExt, DbnEncodable},
5 rtype_dispatch, Error, HasRType, Metadata, RecordRef, Result,
6};
7
8use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf};
9
10pub struct Encoder<W>
12where
13 W: io::AsyncWriteExt + Unpin,
14{
15 writer: W,
16 buf: String,
17 should_pretty_print: bool,
18 use_pretty_px: bool,
19 use_pretty_ts: bool,
20}
21
22impl<W> Encoder<W>
23where
24 W: io::AsyncWriteExt + Unpin,
25{
26 pub fn new(
30 writer: W,
31 should_pretty_print: bool,
32 use_pretty_px: bool,
33 use_pretty_ts: bool,
34 ) -> Self {
35 Self {
36 writer,
37 buf: String::new(),
38 should_pretty_print,
39 use_pretty_px,
40 use_pretty_ts,
41 }
42 }
43
44 pub async fn encode_metadata(&mut self, metadata: &Metadata) -> Result<()> {
55 to_json_in_buf(
56 &mut self.buf,
57 metadata,
58 self.should_pretty_print,
59 self.use_pretty_px,
60 self.use_pretty_ts,
61 );
62 let io_err = |e| Error::io(e, "writing metadata");
63 self.write_buf(io_err).await?;
64 self.writer.flush().await.map_err(io_err)?;
65 Ok(())
66 }
67
68 pub fn get_ref(&self) -> &W {
70 &self.writer
71 }
72
73 pub fn get_mut(&mut self) -> &mut W {
75 &mut self.writer
76 }
77
78 fn encode_to_buf<R: DbnEncodable>(&mut self, record: &R) {
80 to_json_in_buf(
81 &mut self.buf,
82 record,
83 self.should_pretty_print,
84 self.use_pretty_px,
85 self.use_pretty_ts,
86 );
87 }
88
89 async fn write_buf<F>(&mut self, handle_err: F) -> crate::Result<()>
90 where
91 F: FnOnce(io::Error) -> Error,
92 {
93 let res = self
94 .writer
95 .write_all(self.buf.as_bytes())
96 .await
97 .map_err(handle_err);
98 self.buf.clear();
100 res
101 }
102
103 async fn encode_with_ts_out<R: DbnEncodable + HasRType + Clone>(
104 &mut self,
105 rec: &R,
106 ts_out: u64,
107 ) -> Result<()> {
108 self.encode_record(&crate::WithTsOut::new(rec.clone(), ts_out))
109 .await
110 }
111}
112
113impl<W> AsyncEncodeRecord for Encoder<W>
114where
115 W: AsyncWriteExt + Unpin,
116{
117 async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
118 self.encode_to_buf(record);
119 self.write_buf(|e| Error::io(e, "writing record")).await
120 }
121
122 async fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
133 for record in records {
134 self.encode_to_buf(record);
135 }
136 self.write_buf(|e| Error::io(e, format!("writing {} records", records.len())))
137 .await
138 }
139 async fn flush(&mut self) -> Result<()> {
140 self.writer
141 .flush()
142 .await
143 .map_err(|e| Error::io(e, "flushing output"))
144 }
145
146 async fn shutdown(&mut self) -> Result<()> {
147 self.writer
148 .shutdown()
149 .await
150 .map_err(|e| Error::io(e, "shutting down"))
151 }
152}
153
154impl<W> AsyncEncodeRecordRef for Encoder<W>
155where
156 W: AsyncWriteExt + Unpin,
157{
158 async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> Result<()> {
159 rtype_dispatch!(record_ref, self.encode_record().await)?
160 }
161
162 async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> {
163 for record_ref in record_refs {
164 rtype_dispatch!(record_ref, self.encode_to_buf())?;
165 }
166 self.write_buf(|e| Error::io(e, format!("writing {} records", record_refs.len())))
167 .await
168 }
169
170 async unsafe fn encode_record_ref_ts_out(
171 &mut self,
172 record_ref: RecordRef<'_>,
173 ts_out: bool,
174 ) -> Result<()> {
175 rtype_dispatch!(record_ref, ts_out: ts_out, self.encode_record().await)?
176 }
177
178 async fn encode_record_ref_with_ts_out(
179 &mut self,
180 record_ref: RecordRef<'_>,
181 ts_out: u64,
182 ) -> Result<()> {
183 rtype_dispatch!(record_ref, self.encode_with_ts_out(ts_out).await)?
184 }
185}
186
187impl<W> AsyncEncodeRecordTextExt for Encoder<W>
188where
189 W: AsyncWriteExt + Unpin,
190{
191 async fn encode_record_with_sym<R: DbnEncodable>(
192 &mut self,
193 record: &R,
194 symbol: Option<&str>,
195 ) -> Result<()> {
196 to_json_with_sym_in_buf(
197 &mut self.buf,
198 record,
199 self.should_pretty_print,
200 self.use_pretty_px,
201 self.use_pretty_ts,
202 symbol,
203 );
204 self.write_buf(|e| Error::io(e, "writing record")).await
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::ffi::c_char;
211
212 use tokio::io::{AsyncWriteExt, BufWriter};
213
214 use crate::{encode::test_data::RECORD_HEADER, enums::rtype, MboMsg, RecordHeader};
215
216 use super::*;
217
218 async fn write_to_json_string<R>(
219 record: &R,
220 should_pretty_print: bool,
221 use_pretty_px: bool,
222 use_pretty_ts: bool,
223 ) -> String
224 where
225 R: DbnEncodable,
226 {
227 let mut buffer = Vec::new();
228 let mut writer = BufWriter::new(&mut buffer);
229 Encoder::new(
230 &mut writer,
231 should_pretty_print,
232 use_pretty_px,
233 use_pretty_ts,
234 )
235 .encode_record(record)
236 .await
237 .unwrap();
238 writer.flush().await.unwrap();
239 String::from_utf8(buffer).expect("valid UTF-8")
240 }
241
242 async fn write_ref_to_json_string(
243 record: RecordRef<'_>,
244 should_pretty_print: bool,
245 use_pretty_px: bool,
246 use_pretty_ts: bool,
247 ) -> String {
248 let mut buffer = Vec::new();
249 let mut writer = BufWriter::new(&mut buffer);
250 Encoder::new(
251 &mut writer,
252 should_pretty_print,
253 use_pretty_px,
254 use_pretty_ts,
255 )
256 .encode_record_ref(record)
257 .await
258 .unwrap();
259 writer.flush().await.unwrap();
260 String::from_utf8(buffer).expect("valid UTF-8")
261 }
262
263 #[tokio::test]
264 async fn test_mbo_write_json() {
265 let record = MboMsg {
266 hd: RecordHeader::new::<MboMsg>(
267 rtype::MBO,
268 RECORD_HEADER.publisher_id,
269 RECORD_HEADER.instrument_id,
270 RECORD_HEADER.ts_event,
271 ),
272 order_id: 16,
273 price: 5500,
274 size: 3,
275 flags: 128.into(),
276 channel_id: 14,
277 action: 'R' as c_char,
278 side: 'N' as c_char,
279 ts_recv: 1658441891000000000,
280 ts_in_delta: 22_000,
281 sequence: 1_002_375,
282 };
283 let res = write_to_json_string(&record, false, true, false).await;
284 let ref_res = write_ref_to_json_string(RecordRef::from(&record), false, true, false).await;
285
286 assert_eq!(res, ref_res);
287 assert_eq!(
288 ref_res,
289 format!(
290 "{{{},{},{}}}\n",
291 r#""ts_recv":"1658441891000000000""#,
292 r#""hd":{"ts_event":"1658441851000000000","rtype":160,"publisher_id":1,"instrument_id":323}"#,
293 r#""action":"R","side":"N","price":"0.000005500","size":3,"channel_id":14,"order_id":"16","flags":128,"ts_in_delta":22000,"sequence":1002375"#
294 )
295 );
296 }
297}