pub mod csv;
pub mod dbn;
mod dyn_encoder;
mod dyn_writer;
mod io_utils;
pub mod json;
mod split;
use std::{fmt, io, num::NonZeroU64};
use fallible_streaming_iterator::FallibleStreamingIterator;
pub use self::{
csv::Encoder as CsvEncoder,
dbn::{
Encoder as DbnEncoder, MetadataEncoder as DbnMetadataEncoder,
RecordEncoder as DbnRecordEncoder,
},
json::Encoder as JsonEncoder,
split::{
NoSchemaBehavior, SchemaSplitter, SplitDuration, SplitEncoder, Splitter, SymbolSplitter,
TimeSplitter,
},
};
#[cfg(feature = "async")]
pub use self::{
dbn::{
AsyncEncoder as AsyncDbnEncoder, AsyncMetadataEncoder as AsyncDbnMetadataEncoder,
AsyncRecordEncoder as AsyncDbnRecordEncoder,
},
json::AsyncEncoder as AsyncJsonEncoder,
};
#[doc(inline)]
pub use self::{
dyn_encoder::{DynEncoder, DynEncoderBuilder},
dyn_writer::DynWriter,
};
#[cfg(feature = "async")]
#[doc(inline)]
pub use self::dyn_writer::{DynAsyncBufWriter, DynAsyncWriter};
use crate::{
decode::{DbnMetadata, DecodeRecordRef},
rtype_dispatch, Error, Record, RecordRef, Result,
};
use self::{csv::serialize::CsvSerialize, json::serialize::JsonSerialize};
pub trait DbnEncodable: Record + CsvSerialize + fmt::Debug + JsonSerialize {}
impl<T> DbnEncodable for T where T: Record + CsvSerialize + fmt::Debug + JsonSerialize {}
pub trait EncodeRecord {
fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
for record in records {
self.encode_record(record)?;
}
Ok(())
}
fn flush(&mut self) -> Result<()>;
}
pub trait EncodeRecordRef {
fn encode_record_ref(&mut self, record: RecordRef) -> Result<()>;
fn encode_record_refs(&mut self, records: &[RecordRef]) -> Result<()> {
for record in records {
self.encode_record_ref(*record)?;
}
Ok(())
}
unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()>;
}
pub trait EncodeDbn: EncodeRecord + EncodeRecordRef {
fn encode_stream<R: DbnEncodable>(
&mut self,
mut stream: impl FallibleStreamingIterator<Item = R, Error = Error>,
) -> Result<()> {
while let Some(record) = stream.next()? {
self.encode_record(record)?;
}
self.flush()?;
Ok(())
}
fn encode_decoded<D: DecodeRecordRef + DbnMetadata>(&mut self, mut decoder: D) -> Result<()> {
let ts_out = decoder.metadata().ts_out;
while let Some(record) = decoder.decode_record_ref()? {
unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
}
self.flush()?;
Ok(())
}
fn encode_decoded_with_limit<D: DecodeRecordRef + DbnMetadata>(
&mut self,
mut decoder: D,
limit: NonZeroU64,
) -> Result<()> {
let ts_out = decoder.metadata().ts_out;
let mut i = 0;
while let Some(record) = decoder.decode_record_ref()? {
unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
i += 1;
if i == limit.get() {
break;
}
}
self.flush()?;
Ok(())
}
}
pub trait EncodeRecordTextExt: EncodeRecord + EncodeRecordRef {
fn encode_record_with_sym<R: DbnEncodable>(
&mut self,
record: &R,
symbol: Option<&str>,
) -> Result<()>;
fn encode_ref_with_sym(&mut self, record: RecordRef, symbol: Option<&str>) -> Result<()> {
rtype_dispatch!(record, self.encode_record_with_sym(symbol))?
}
unsafe fn encode_ref_ts_out_with_sym(
&mut self,
record: RecordRef,
ts_out: bool,
symbol: Option<&str>,
) -> Result<()> {
rtype_dispatch!(record, ts_out: ts_out, self.encode_record_with_sym(symbol))?
}
}
pub const ZSTD_COMPRESSION_LEVEL: i32 = 0;
fn zstd_encoder<'a, W: io::Write>(writer: W) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
zstd_encoder_with_clevel(writer, ZSTD_COMPRESSION_LEVEL)
}
fn zstd_encoder_with_clevel<'a, W: io::Write>(
writer: W,
level: i32,
) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
let mut zstd_encoder =
zstd::Encoder::new(writer, level).map_err(|e| Error::io(e, "creating zstd encoder"))?;
zstd_encoder
.include_checksum(true)
.map_err(|e| Error::io(e, "setting zstd checksum"))?;
Ok(zstd_encoder.auto_finish())
}
#[cfg(feature = "async")]
fn async_zstd_encoder<W: tokio::io::AsyncWriteExt + Unpin>(
writer: W,
) -> async_compression::tokio::write::ZstdEncoder<W> {
async_zstd_encoder_with_clevel(writer, ZSTD_COMPRESSION_LEVEL)
}
#[cfg(feature = "async")]
fn async_zstd_encoder_with_clevel<W: tokio::io::AsyncWriteExt + Unpin>(
writer: W,
level: i32,
) -> async_compression::tokio::write::ZstdEncoder<W> {
async_compression::tokio::write::ZstdEncoder::with_quality_and_params(
writer,
async_compression::Level::Precise(level),
&[async_compression::zstd::CParameter::checksum_flag(true)],
)
}
#[cfg(feature = "async")]
#[allow(async_fn_in_trait)] pub trait AsyncEncodeRecord {
async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
async fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
for record in records {
self.encode_record(record).await?;
}
Ok(())
}
async fn flush(&mut self) -> Result<()>;
async fn shutdown(&mut self) -> Result<()>;
}
#[cfg(feature = "async")]
#[allow(async_fn_in_trait)] pub trait AsyncEncodeRecordRef {
async fn encode_record_ref(&mut self, record_ref: RecordRef) -> Result<()>;
async fn encode_record_refs(&mut self, record_refs: &[RecordRef<'_>]) -> Result<()> {
for record_ref in record_refs {
self.encode_record_ref(*record_ref).await?;
}
Ok(())
}
async unsafe fn encode_record_ref_ts_out(
&mut self,
record_ref: RecordRef,
ts_out: bool,
) -> Result<()>;
}
#[cfg(feature = "async")]
#[allow(async_fn_in_trait)] pub trait AsyncEncodeRecordTextExt: AsyncEncodeRecord + AsyncEncodeRecordRef {
async fn encode_record_with_sym<R: DbnEncodable>(
&mut self,
record: &R,
symbol: Option<&str>,
) -> Result<()>;
async fn encode_ref_with_sym(
&mut self,
record: RecordRef<'_>,
symbol: Option<&str>,
) -> Result<()> {
rtype_dispatch!(record, self.encode_record_with_sym(symbol).await)?
}
async unsafe fn encode_ref_ts_out_with_sym(
&mut self,
record: RecordRef<'_>,
ts_out: bool,
symbol: Option<&str>,
) -> Result<()> {
rtype_dispatch!(record, ts_out: ts_out, self.encode_record_with_sym(symbol).await)?
}
}
#[cfg(test)]
mod test_data {
use crate::record::{BidAskPair, RecordHeader};
pub const RECORD_HEADER: RecordHeader = RecordHeader {
length: 30,
rtype: 4,
publisher_id: 1,
instrument_id: 323,
ts_event: 1658441851000000000,
};
pub const BID_ASK: BidAskPair = BidAskPair {
bid_px: 372000000000000,
ask_px: 372500000000000,
bid_sz: 10,
ask_sz: 5,
bid_ct: 5,
ask_ct: 2,
};
}