use tokio::io::{self, AsyncWriteExt};
use crate::{
encode::{AsyncEncodeRecord, AsyncEncodeRecordRef, AsyncEncodeRecordTextExt, DbnEncodable},
rtype_dispatch, Error, Metadata, RecordRef, Result,
};
use super::serialize::{to_json_in_buf, to_json_with_sym_in_buf};
pub struct Encoder<W>
where
W: io::AsyncWriteExt + Unpin,
{
writer: W,
buf: String,
should_pretty_print: bool,
use_pretty_px: bool,
use_pretty_ts: bool,
}
impl<W> Encoder<W>
where
W: io::AsyncWriteExt + Unpin,
{
pub fn new(
writer: W,
should_pretty_print: bool,
use_pretty_px: bool,
use_pretty_ts: bool,
) -> Self {
Self {
writer,
buf: String::new(),
should_pretty_print,
use_pretty_px,
use_pretty_ts,
}
}
pub async fn encode_metadata(&mut self, metadata: &Metadata) -> Result<()> {
to_json_in_buf(
&mut self.buf,
metadata,
self.should_pretty_print,
self.use_pretty_px,
self.use_pretty_ts,
);
let io_err = |e| Error::io(e, "writing metadata");
self.write_buf(io_err).await?;
self.writer.flush().await.map_err(io_err)?;
Ok(())
}
pub fn get_ref(&self) -> &W {
&self.writer
}
pub fn get_mut(&mut self) -> &mut W {
&mut self.writer
}
fn encode_to_buf<R: DbnEncodable>(&mut self, record: &R) {
to_json_in_buf(
&mut self.buf,
record,
self.should_pretty_print,
self.use_pretty_px,
self.use_pretty_ts,
);
}
async fn write_buf<F>(&mut self, handle_err: F) -> crate::Result<()>
where
F: FnOnce(io::Error) -> Error,
{
let res = self
.writer
.write_all(self.buf.as_bytes())
.await
.map_err(handle_err);
self.buf.clear();
res
}
}
impl<W> AsyncEncodeRecord for Encoder<W>
where
W: AsyncWriteExt + Unpin,
{
async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
self.encode_to_buf(record);
self.write_buf(|e| Error::io(e, "writing record")).await
}
async fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
for record in records {
self.encode_to_buf(record);
}
self.write_buf(|e| Error::io(e, format!("writing {} records", records.len())))
.await
}
async fn flush(&mut self) -> Result<()> {
self.writer
.flush()
.await
.map_err(|e| Error::io(e, "flushing output"))
}
async fn shutdown(&mut self) -> Result<()> {
self.writer
.shutdown()
.await
.map_err(|e| Error::io(e, "shutting down"))
}
}
impl<W> AsyncEncodeRecordRef for Encoder<W>
where
W: AsyncWriteExt + Unpin,
{
async fn encode_record_ref(&mut self, record_ref: RecordRef<'_>) -> Result<()> {
rtype_dispatch!(record_ref, self.encode_record().await)?
}
async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> {
for record_ref in record_refs {
rtype_dispatch!(record_ref, self.encode_to_buf())?;
}
self.write_buf(|e| Error::io(e, format!("writing {} records", record_refs.len())))
.await
}
async unsafe fn encode_record_ref_ts_out(
&mut self,
record_ref: RecordRef<'_>,
ts_out: bool,
) -> Result<()> {
rtype_dispatch!(record_ref, ts_out: ts_out, self.encode_record().await)?
}
}
impl<W> AsyncEncodeRecordTextExt for Encoder<W>
where
W: AsyncWriteExt + Unpin,
{
async fn encode_record_with_sym<R: DbnEncodable>(
&mut self,
record: &R,
symbol: Option<&str>,
) -> Result<()> {
to_json_with_sym_in_buf(
&mut self.buf,
record,
self.should_pretty_print,
self.use_pretty_px,
self.use_pretty_ts,
symbol,
);
self.write_buf(|e| Error::io(e, "writing record")).await
}
}
#[cfg(test)]
mod tests {
use std::ffi::c_char;
use tokio::io::{AsyncWriteExt, BufWriter};
use crate::{encode::test_data::RECORD_HEADER, enums::rtype, MboMsg, RecordHeader};
use super::*;
async fn write_to_json_string<R>(
record: &R,
should_pretty_print: bool,
use_pretty_px: bool,
use_pretty_ts: bool,
) -> String
where
R: DbnEncodable,
{
let mut buffer = Vec::new();
let mut writer = BufWriter::new(&mut buffer);
Encoder::new(
&mut writer,
should_pretty_print,
use_pretty_px,
use_pretty_ts,
)
.encode_record(record)
.await
.unwrap();
writer.flush().await.unwrap();
String::from_utf8(buffer).expect("valid UTF-8")
}
async fn write_ref_to_json_string(
record: RecordRef<'_>,
should_pretty_print: bool,
use_pretty_px: bool,
use_pretty_ts: bool,
) -> String {
let mut buffer = Vec::new();
let mut writer = BufWriter::new(&mut buffer);
Encoder::new(
&mut writer,
should_pretty_print,
use_pretty_px,
use_pretty_ts,
)
.encode_record_ref(record)
.await
.unwrap();
writer.flush().await.unwrap();
String::from_utf8(buffer).expect("valid UTF-8")
}
#[tokio::test]
async fn test_mbo_write_json() {
let record = MboMsg {
hd: RecordHeader::new::<MboMsg>(
rtype::MBO,
RECORD_HEADER.publisher_id,
RECORD_HEADER.instrument_id,
RECORD_HEADER.ts_event,
),
order_id: 16,
price: 5500,
size: 3,
flags: 128.into(),
channel_id: 14,
action: 'R' as c_char,
side: 'N' as c_char,
ts_recv: 1658441891000000000,
ts_in_delta: 22_000,
sequence: 1_002_375,
};
let res = write_to_json_string(&record, false, true, false).await;
let ref_res = write_ref_to_json_string(RecordRef::from(&record), false, true, false).await;
assert_eq!(res, ref_res);
assert_eq!(
ref_res,
format!(
"{{{},{},{}}}\n",
r#""ts_recv":"1658441891000000000""#,
r#""hd":{"ts_event":"1658441851000000000","rtype":160,"publisher_id":1,"instrument_id":323}"#,
r#""action":"R","side":"N","price":"0.000005500","size":3,"channel_id":14,"order_id":"16","flags":128,"ts_in_delta":22000,"sequence":1002375"#
)
);
}
}