pub mod csv;
pub mod dbn;
pub mod json;
use std::{fmt, io, num::NonZeroU64};
use streaming_iterator::StreamingIterator;
#[cfg(feature = "async")]
pub use self::dbn::{
AsyncMetadataEncoder as AsyncDbnMetadataEncoder, AsyncRecordEncoder as AsyncDbnRecordEncoder,
};
#[cfg(feature = "async")]
pub use self::json::AsyncEncoder as AsyncJsonEncoder;
pub use self::{
csv::Encoder as CsvEncoder,
dbn::{
Encoder as DbnEncoder, MetadataEncoder as DbnMetadataEncoder,
RecordEncoder as DbnRecordEncoder,
},
json::Encoder as JsonEncoder,
};
use crate::{
decode::DecodeDbn, rtype_method_dispatch, rtype_ts_out_method_dispatch, Compression, Encoding,
Error, HasRType, Metadata, Record, RecordRef, Result, Schema,
};
use self::{csv::serialize::CsvSerialize, json::serialize::JsonSerialize};
pub trait DbnEncodable: Record + AsRef<[u8]> + CsvSerialize + fmt::Debug + JsonSerialize {}
impl<T> DbnEncodable for T where
T: HasRType + AsRef<[u8]> + CsvSerialize + fmt::Debug + JsonSerialize
{
}
pub trait EncodeRecord {
fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
fn flush(&mut self) -> Result<()>;
}
pub trait EncodeRecordRef {
fn encode_record_ref(&mut self, record: RecordRef) -> Result<()>;
unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()>;
}
pub trait EncodeDbn: EncodeRecord + EncodeRecordRef {
fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
for record in records {
self.encode_record(record)?;
}
self.flush()?;
Ok(())
}
fn encode_stream<R: DbnEncodable>(
&mut self,
mut stream: impl StreamingIterator<Item = R>,
) -> Result<()> {
while let Some(record) = stream.next() {
self.encode_record(record)?;
}
self.flush()?;
Ok(())
}
fn encode_decoded<D: DecodeDbn>(&mut self, mut decoder: D) -> Result<()> {
let ts_out = decoder.metadata().ts_out;
while let Some(record) = decoder.decode_record_ref()? {
unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
}
self.flush()?;
Ok(())
}
fn encode_decoded_with_limit<D: DecodeDbn>(
&mut self,
mut decoder: D,
limit: NonZeroU64,
) -> Result<()> {
let ts_out = decoder.metadata().ts_out;
let mut i = 0;
while let Some(record) = decoder.decode_record_ref()? {
unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
i += 1;
if i == limit.get() {
break;
}
}
self.flush()?;
Ok(())
}
}
pub trait EncodeRecordTextExt: EncodeRecord + EncodeRecordRef {
fn encode_record_with_sym<R: DbnEncodable>(
&mut self,
record: &R,
symbol: Option<&str>,
) -> Result<()>;
fn encode_ref_with_sym(&mut self, record: RecordRef, symbol: Option<&str>) -> Result<()> {
rtype_method_dispatch!(record, self, encode_record_with_sym, symbol)?
}
unsafe fn encode_ref_ts_out_with_sym(
&mut self,
record: RecordRef,
ts_out: bool,
symbol: Option<&str>,
) -> Result<()> {
rtype_ts_out_method_dispatch!(record, ts_out, self, encode_record_with_sym, symbol)?
}
}
const ZSTD_COMPRESSION_LEVEL: i32 = 0;
pub struct DynWriter<'a, W>(DynWriterImpl<'a, W>)
where
W: io::Write;
enum DynWriterImpl<'a, W>
where
W: io::Write,
{
Uncompressed(W),
ZStd(zstd::stream::AutoFinishEncoder<'a, W>),
}
impl<'a, W> DynWriter<'a, W>
where
W: io::Write,
{
pub fn new(writer: W, compression: Compression) -> Result<Self> {
match compression {
Compression::None => Ok(Self(DynWriterImpl::Uncompressed(writer))),
Compression::ZStd => zstd_encoder(writer).map(|enc| Self(DynWriterImpl::ZStd(enc))),
}
}
pub fn get_mut(&mut self) -> &mut W {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => w,
DynWriterImpl::ZStd(enc) => enc.get_mut(),
}
}
}
fn zstd_encoder<'a, W: io::Write>(writer: W) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
let mut zstd_encoder = zstd::Encoder::new(writer, ZSTD_COMPRESSION_LEVEL)
.map_err(|e| Error::io(e, "creating zstd encoder"))?;
zstd_encoder
.include_checksum(true)
.map_err(|e| Error::io(e, "setting zstd checksum"))?;
Ok(zstd_encoder.auto_finish())
}
impl<'a, W> io::Write for DynWriter<'a, W>
where
W: io::Write,
{
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write(buf),
DynWriterImpl::ZStd(writer) => writer.write(buf),
}
}
fn flush(&mut self) -> io::Result<()> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.flush(),
DynWriterImpl::ZStd(writer) => writer.flush(),
}
}
fn write_vectored(&mut self, bufs: &[io::IoSlice<'_>]) -> io::Result<usize> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write_vectored(bufs),
DynWriterImpl::ZStd(writer) => writer.write_vectored(bufs),
}
}
fn write_all(&mut self, buf: &[u8]) -> io::Result<()> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write_all(buf),
DynWriterImpl::ZStd(writer) => writer.write_all(buf),
}
}
fn write_fmt(&mut self, fmt: std::fmt::Arguments<'_>) -> io::Result<()> {
match &mut self.0 {
DynWriterImpl::Uncompressed(writer) => writer.write_fmt(fmt),
DynWriterImpl::ZStd(writer) => writer.write_fmt(fmt),
}
}
}
pub struct DynEncoder<'a, W>(DynEncoderImpl<'a, W>)
where
W: io::Write;
#[allow(clippy::large_enum_variant)]
enum DynEncoderImpl<'a, W>
where
W: io::Write,
{
Dbn(dbn::Encoder<DynWriter<'a, W>>),
Csv(csv::Encoder<DynWriter<'a, W>>),
Json(json::Encoder<DynWriter<'a, W>>),
}
impl<'a, W> DynEncoder<'a, W>
where
W: io::Write,
{
pub fn new(
writer: W,
encoding: Encoding,
compression: Compression,
metadata: &Metadata,
should_pretty_print: bool,
use_pretty_px: bool,
use_pretty_ts: bool,
) -> Result<Self> {
let writer = DynWriter::new(writer, compression)?;
match encoding {
Encoding::Dbn => {
dbn::Encoder::new(writer, metadata).map(|e| Self(DynEncoderImpl::Dbn(e)))
}
Encoding::Csv => Ok(Self(DynEncoderImpl::Csv(csv::Encoder::new(
writer,
use_pretty_px,
use_pretty_ts,
)))),
Encoding::Json => Ok(Self(DynEncoderImpl::Json(json::Encoder::new(
writer,
should_pretty_print,
use_pretty_px,
use_pretty_ts,
)))),
}
}
pub fn encode_header<R: DbnEncodable>(&mut self, with_symbol: bool) -> Result<()> {
match &mut self.0 {
DynEncoderImpl::Csv(encoder) => encoder.encode_header::<R>(with_symbol),
_ => Ok(()),
}
}
pub fn encode_header_for_schema(
&mut self,
schema: Schema,
ts_out: bool,
with_symbol: bool,
) -> Result<()> {
match &mut self.0 {
DynEncoderImpl::Csv(encoder) => {
encoder.encode_header_for_schema(schema, ts_out, with_symbol)
}
_ => Ok(()),
}
}
}
impl<'a, W> EncodeRecord for DynEncoder<'a, W>
where
W: io::Write,
{
fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
self.0.encode_record(record)
}
fn flush(&mut self) -> Result<()> {
self.0.flush()
}
}
impl<'a, W> EncodeRecordRef for DynEncoder<'a, W>
where
W: io::Write,
{
fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> {
self.0.encode_record_ref(record)
}
unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()> {
self.0.encode_record_ref_ts_out(record, ts_out)
}
}
impl<'a, W> EncodeDbn for DynEncoder<'a, W>
where
W: io::Write,
{
fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
self.0.encode_records(records)
}
fn encode_stream<R: DbnEncodable>(
&mut self,
stream: impl StreamingIterator<Item = R>,
) -> Result<()> {
self.0.encode_stream(stream)
}
fn encode_decoded<D: DecodeDbn>(&mut self, decoder: D) -> Result<()> {
self.0.encode_decoded(decoder)
}
}
impl<'a, W> EncodeRecord for DynEncoderImpl<'a, W>
where
W: io::Write,
{
fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()> {
match self {
DynEncoderImpl::Dbn(enc) => enc.encode_record(record),
DynEncoderImpl::Csv(enc) => enc.encode_record(record),
DynEncoderImpl::Json(enc) => enc.encode_record(record),
}
}
fn flush(&mut self) -> Result<()> {
match self {
DynEncoderImpl::Dbn(enc) => enc.flush(),
DynEncoderImpl::Csv(enc) => enc.flush(),
DynEncoderImpl::Json(enc) => enc.flush(),
}
}
}
impl<'a, W> EncodeRecordRef for DynEncoderImpl<'a, W>
where
W: io::Write,
{
fn encode_record_ref(&mut self, record: RecordRef) -> Result<()> {
match self {
DynEncoderImpl::Dbn(enc) => enc.encode_record_ref(record),
DynEncoderImpl::Csv(enc) => enc.encode_record_ref(record),
DynEncoderImpl::Json(enc) => enc.encode_record_ref(record),
}
}
unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()> {
match self {
DynEncoderImpl::Dbn(enc) => enc.encode_record_ref_ts_out(record, ts_out),
DynEncoderImpl::Csv(enc) => enc.encode_record_ref_ts_out(record, ts_out),
DynEncoderImpl::Json(enc) => enc.encode_record_ref_ts_out(record, ts_out),
}
}
}
impl<'a, W> EncodeDbn for DynEncoderImpl<'a, W>
where
W: io::Write,
{
encoder_enum_dispatch! {Dbn, Csv, Json}
}
macro_rules! encoder_enum_dispatch {
($($variant:ident),*) => {
fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
match self {
$(Self::$variant(v) => v.encode_records(records),)*
}
}
fn encode_stream<R: DbnEncodable>(
&mut self,
stream: impl StreamingIterator<Item = R>,
) -> Result<()> {
match self {
$(Self::$variant(v) => v.encode_stream(stream),)*
}
}
fn encode_decoded<D: DecodeDbn>(
&mut self,
decoder: D,
) -> Result<()> {
match self {
$(Self::$variant(v) => v.encode_decoded(decoder),)*
}
}
};
}
pub(crate) use encoder_enum_dispatch;
#[cfg(test)]
mod test_data {
use streaming_iterator::StreamingIterator;
use crate::record::{BidAskPair, RecordHeader};
pub const RECORD_HEADER: RecordHeader = RecordHeader {
length: 30,
rtype: 4,
publisher_id: 1,
instrument_id: 323,
ts_event: 1658441851000000000,
};
pub const BID_ASK: BidAskPair = BidAskPair {
bid_px: 372000000000000,
ask_px: 372500000000000,
bid_sz: 10,
ask_sz: 5,
bid_ct: 5,
ask_ct: 2,
};
pub struct VecStream<T> {
vec: Vec<T>,
idx: isize,
}
impl<T> VecStream<T> {
pub fn new(vec: Vec<T>) -> Self {
Self { vec, idx: -1 }
}
}
impl<T> StreamingIterator for VecStream<T> {
type Item = T;
fn advance(&mut self) {
self.idx += 1;
}
fn get(&self) -> Option<&Self::Item> {
self.vec.get(self.idx as usize)
}
}
}
#[cfg(feature = "async")]
pub use r#async::DynWriter as DynAsyncWriter;
#[cfg(feature = "async")]
mod r#async {
use std::{
pin::Pin,
task::{Context, Poll},
};
use async_compression::tokio::write::ZstdEncoder;
use tokio::io;
use crate::enums::Compression;
pub struct DynWriter<W>(DynWriterImpl<W>)
where
W: io::AsyncWriteExt + Unpin;
enum DynWriterImpl<W>
where
W: io::AsyncWriteExt + Unpin,
{
Uncompressed(W),
ZStd(ZstdEncoder<W>),
}
impl<W> DynWriter<W>
where
W: io::AsyncWriteExt + Unpin,
{
pub fn new(writer: W, compression: Compression) -> Self {
Self(match compression {
Compression::None => DynWriterImpl::Uncompressed(writer),
Compression::ZStd => DynWriterImpl::ZStd(ZstdEncoder::new(writer)),
})
}
pub fn get_mut(&mut self) -> &mut W {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => w,
DynWriterImpl::ZStd(enc) => enc.get_mut(),
}
}
}
impl<W> io::AsyncWrite for DynWriter<W>
where
W: io::AsyncWrite + io::AsyncWriteExt + Unpin,
{
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<io::Result<usize>> {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_write(Pin::new(w), cx, buf),
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_write(Pin::new(enc), cx, buf),
}
}
fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_flush(Pin::new(w), cx),
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_flush(Pin::new(enc), cx),
}
}
fn poll_shutdown(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<()>> {
match &mut self.0 {
DynWriterImpl::Uncompressed(w) => io::AsyncWrite::poll_shutdown(Pin::new(w), cx),
DynWriterImpl::ZStd(enc) => io::AsyncWrite::poll_shutdown(Pin::new(enc), cx),
}
}
}
}