Skip to main content

dbn/
encode.rs

1//! Encoding DBN and Zstd-compressed DBN files and streams. Encoders implement the
2//! [`EncodeDbn`] trait.
3pub mod csv;
4pub mod dbn;
5mod dyn_encoder;
6mod dyn_writer;
7pub mod json;
8mod split;
9
10use std::{fmt, io, num::NonZeroU64};
11
12use fallible_streaming_iterator::FallibleStreamingIterator;
13
14// Re-exports
15pub use self::{
16    csv::Encoder as CsvEncoder,
17    dbn::{
18        Encoder as DbnEncoder, MetadataEncoder as DbnMetadataEncoder,
19        RecordEncoder as DbnRecordEncoder,
20    },
21    json::Encoder as JsonEncoder,
22    split::{
23        NoSchemaBehavior, SchemaSplitter, SplitDuration, SplitEncoder, Splitter, SymbolSplitter,
24        TimeSplitter,
25    },
26};
27#[cfg(feature = "async")]
28pub use self::{
29    dbn::{
30        AsyncEncoder as AsyncDbnEncoder, AsyncMetadataEncoder as AsyncDbnMetadataEncoder,
31        AsyncRecordEncoder as AsyncDbnRecordEncoder,
32    },
33    json::AsyncEncoder as AsyncJsonEncoder,
34};
35#[doc(inline)]
36pub use self::{
37    dyn_encoder::{DynEncoder, DynEncoderBuilder},
38    dyn_writer::DynWriter,
39};
40
41#[cfg(feature = "async")]
42#[doc(inline)]
43pub use self::dyn_writer::{DynAsyncBufWriter, DynAsyncWriter};
44
45use crate::{
46    decode::{DbnMetadata, DecodeRecordRef},
47    rtype_dispatch, Error, Record, RecordRef, Result,
48};
49
50use self::{csv::serialize::CsvSerialize, json::serialize::JsonSerialize};
51
52/// Trait alias for [`Record`], `CsvSerialize`, [`fmt::Debug`], and `JsonSerialize`.
53pub trait DbnEncodable: Record + CsvSerialize + fmt::Debug + JsonSerialize {}
54impl<T> DbnEncodable for T where T: Record + CsvSerialize + fmt::Debug + JsonSerialize {}
55
56/// Trait for types that encode a DBN record of a specific type.
57pub trait EncodeRecord {
58    /// Encodes a single DBN record of type `R`.
59    ///
60    /// # Errors
61    /// This function returns an error if it's unable to write to the underlying writer
62    /// or there's a serialization error.
63    fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
64
65    /// Flushes any buffered content to the true output.
66    ///
67    /// # Errors
68    /// This function returns an error if it's unable to flush the underlying writer.
69    fn flush(&mut self) -> Result<()>;
70}
71
72/// Trait for types that encode DBN records with mixed schemas.
73pub trait EncodeRecordRef {
74    /// Encodes a single DBN [`RecordRef`].
75    ///
76    /// # Errors
77    /// This function returns an error if it's unable to write to the underlying writer
78    /// or there's a serialization error.
79    fn encode_record_ref(&mut self, record: RecordRef) -> Result<()>;
80
81    /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
82    /// [`record::WithTsOut`](crate::record::WithTsOut)).
83    ///
84    /// # Safety
85    /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
86    ///
87    /// # Errors
88    /// This function returns an error if it's unable to write to the underlying writer
89    /// or there's a serialization error.
90    unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()>;
91}
92
93/// Trait for types that encode DBN records with a specific record type.
94pub trait EncodeDbn: EncodeRecord + EncodeRecordRef {
95    /// Encodes a slice of DBN records.
96    ///
97    /// # Errors
98    /// This function returns an error if it's unable to write to the underlying writer
99    /// or there's a serialization error.
100    fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
101        for record in records {
102            self.encode_record(record)?;
103        }
104        self.flush()?;
105        Ok(())
106    }
107
108    /// Encodes a stream of DBN records.
109    ///
110    /// # Errors
111    /// This function returns an error if it's unable to write to the underlying writer
112    /// or there's a serialization error.
113    fn encode_stream<R: DbnEncodable>(
114        &mut self,
115        mut stream: impl FallibleStreamingIterator<Item = R, Error = Error>,
116    ) -> Result<()> {
117        while let Some(record) = stream.next()? {
118            self.encode_record(record)?;
119        }
120        self.flush()?;
121        Ok(())
122    }
123
124    /// Encodes DBN records directly from a DBN decoder.
125    ///
126    /// # Errors
127    /// This function returns an error if it's unable to write to the underlying writer
128    /// or there's a serialization error.
129    fn encode_decoded<D: DecodeRecordRef + DbnMetadata>(&mut self, mut decoder: D) -> Result<()> {
130        let ts_out = decoder.metadata().ts_out;
131        while let Some(record) = decoder.decode_record_ref()? {
132            // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out`
133            // from the metadata header.
134            unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
135        }
136        self.flush()?;
137        Ok(())
138    }
139
140    /// Encodes DBN records directly from a DBN decoder, outputting no more than
141    /// `limit` records.
142    ///
143    /// # Errors
144    /// This function returns an error if it's unable to write to the underlying writer
145    /// or there's a serialization error.
146    fn encode_decoded_with_limit<D: DecodeRecordRef + DbnMetadata>(
147        &mut self,
148        mut decoder: D,
149        limit: NonZeroU64,
150    ) -> Result<()> {
151        let ts_out = decoder.metadata().ts_out;
152        let mut i = 0;
153        while let Some(record) = decoder.decode_record_ref()? {
154            // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out`
155            // from the metadata header.
156            unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
157            i += 1;
158            if i == limit.get() {
159                break;
160            }
161        }
162        self.flush()?;
163        Ok(())
164    }
165}
166
167/// Extension trait for text encodings.
168pub trait EncodeRecordTextExt: EncodeRecord + EncodeRecordRef {
169    /// Encodes a single DBN record of type `R` along with the record's text symbol.
170    ///
171    /// # Errors
172    /// This function returns an error if it's unable to write to the underlying writer
173    /// or there's a serialization error.
174    fn encode_record_with_sym<R: DbnEncodable>(
175        &mut self,
176        record: &R,
177        symbol: Option<&str>,
178    ) -> Result<()>;
179
180    /// Encodes a single DBN [`RecordRef`] along with the record's text symbol.
181    ///
182    /// # Errors
183    /// This function returns an error if it's unable to write to the underlying writer
184    /// or there's a serialization error.
185    fn encode_ref_with_sym(&mut self, record: RecordRef, symbol: Option<&str>) -> Result<()> {
186        rtype_dispatch!(record, self.encode_record_with_sym(symbol))?
187    }
188
189    /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
190    /// [`record::WithTsOut`](crate::record::WithTsOut)) along with the record's text
191    /// symbol.
192    ///
193    /// # Safety
194    /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
195    ///
196    /// # Errors
197    /// This function returns an error if it's unable to write to the underlying writer
198    /// or there's a serialization error.
199    unsafe fn encode_ref_ts_out_with_sym(
200        &mut self,
201        record: RecordRef,
202        ts_out: bool,
203        symbol: Option<&str>,
204    ) -> Result<()> {
205        rtype_dispatch!(record, ts_out: ts_out, self.encode_record_with_sym(symbol))?
206    }
207}
208
209/// The default Zstandard compression level used.
210pub const ZSTD_COMPRESSION_LEVEL: i32 = 0;
211
212fn zstd_encoder<'a, W: io::Write>(writer: W) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
213    zstd_encoder_with_clevel(writer, ZSTD_COMPRESSION_LEVEL)
214}
215
216fn zstd_encoder_with_clevel<'a, W: io::Write>(
217    writer: W,
218    level: i32,
219) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
220    let mut zstd_encoder =
221        zstd::Encoder::new(writer, level).map_err(|e| Error::io(e, "creating zstd encoder"))?;
222    zstd_encoder
223        .include_checksum(true)
224        .map_err(|e| Error::io(e, "setting zstd checksum"))?;
225    Ok(zstd_encoder.auto_finish())
226}
227
228#[cfg(feature = "async")]
229fn async_zstd_encoder<W: tokio::io::AsyncWriteExt + Unpin>(
230    writer: W,
231) -> async_compression::tokio::write::ZstdEncoder<W> {
232    async_zstd_encoder_with_clevel(writer, ZSTD_COMPRESSION_LEVEL)
233}
234
235#[cfg(feature = "async")]
236fn async_zstd_encoder_with_clevel<W: tokio::io::AsyncWriteExt + Unpin>(
237    writer: W,
238    level: i32,
239) -> async_compression::tokio::write::ZstdEncoder<W> {
240    async_compression::tokio::write::ZstdEncoder::with_quality_and_params(
241        writer,
242        async_compression::Level::Precise(level),
243        &[async_compression::zstd::CParameter::checksum_flag(true)],
244    )
245}
246
247/// Trait for async encoding of DBN records of a specific type.
248#[cfg(feature = "async")]
249#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
250pub trait AsyncEncodeRecord {
251    /// Encodes a single DBN record of type `R`.
252    ///
253    /// # Errors
254    /// This function returns an error if it's unable to write to the underlying writer
255    /// or there's a serialization error.
256    ///
257    /// # Cancel safety
258    /// This method is not cancellation safe. If this method is used in a
259    /// `tokio::select!` statement and another branch completes first, then the
260    /// record may have been partially written, but future calls will begin writing the
261    /// encoded record from the beginning.
262    async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
263
264    /// Flushes any buffered content to the true output.
265    ///
266    /// # Errors
267    /// This function returns an error if it's unable to flush the underlying writer.
268    async fn flush(&mut self) -> Result<()>;
269
270    /// Initiates or attempts to shut down the inner writer.
271    ///
272    /// # Errors
273    /// This function returns an error if the shut down did not complete successfully.
274    async fn shutdown(&mut self) -> Result<()>;
275}
276
277/// Trait for async encoding of DBN of [`RecordRef`] records.
278#[cfg(feature = "async")]
279#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
280pub trait AsyncEncodeRecordRef {
281    /// Encodes a single [`RecordRef`].
282    ///
283    /// # Errors
284    /// This function returns an error if it's unable to write to the underlying writer
285    /// or there's a serialization error.
286    ///
287    /// # Cancel safety
288    /// This method is not cancellation safe. If this method is used in a
289    /// `tokio::select!` statement and another branch completes first, then the
290    /// record may have been partially written, but future calls will begin writing the
291    /// encoded record from the beginning.
292    async fn encode_record_ref(&mut self, record_ref: RecordRef) -> Result<()>;
293
294    /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
295    /// [`record::WithTsOut`](crate::record::WithTsOut)).
296    ///
297    /// # Safety
298    /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
299    ///
300    /// # Errors
301    /// This function returns an error if it's unable to write to the underlying writer
302    /// or there's a serialization error.
303    ///
304    /// # Cancel safety
305    /// This method is not cancellation safe. If this method is used in a
306    /// `tokio::select!` statement and another branch completes first, then the
307    /// record may have been partially written, but future calls will begin writing the
308    /// encoded record from the beginning.
309    async unsafe fn encode_record_ref_ts_out(
310        &mut self,
311        record_ref: RecordRef,
312        ts_out: bool,
313    ) -> Result<()>;
314}
315
316/// Async extension trait for text encodings.
317#[cfg(feature = "async")]
318#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
319pub trait AsyncEncodeRecordTextExt: AsyncEncodeRecord + AsyncEncodeRecordRef {
320    /// Encodes a single DBN record of type `R` along with the record's text symbol.
321    ///
322    /// # Errors
323    /// This function returns an error if it's unable to write to the underlying writer
324    /// or there's a serialization error.
325    ///
326    /// # Cancel safety
327    /// This method is not cancellation safe. If this method is used in a
328    /// `tokio::select!` statement and another branch completes first, then the
329    /// record may have been partially written, but future calls will begin writing the
330    /// encoded record from the beginning.
331    async fn encode_record_with_sym<R: DbnEncodable>(
332        &mut self,
333        record: &R,
334        symbol: Option<&str>,
335    ) -> Result<()>;
336
337    /// Encodes a single DBN [`RecordRef`] along with the record's text symbol.
338    ///
339    /// # Errors
340    /// This function returns an error if it's unable to write to the underlying writer
341    /// or there's a serialization error.
342    ///
343    /// # Cancel safety
344    /// This method is not cancellation safe. If this method is used in a
345    /// `tokio::select!` statement and another branch completes first, then the
346    /// record may have been partially written, but future calls will begin writing the
347    /// encoded record from the beginning.
348    async fn encode_ref_with_sym(
349        &mut self,
350        record: RecordRef<'_>,
351        symbol: Option<&str>,
352    ) -> Result<()> {
353        rtype_dispatch!(record, self.encode_record_with_sym(symbol).await)?
354    }
355
356    /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
357    /// [`record::WithTsOut`](crate::record::WithTsOut)) along with the record's text
358    /// symbol.
359    ///
360    /// # Safety
361    /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
362    ///
363    /// # Errors
364    /// This function returns an error if it's unable to write to the underlying writer
365    /// or there's a serialization error.
366    ///
367    /// # Cancel safety
368    /// This method is not cancellation safe. If this method is used in a
369    /// `tokio::select!` statement and another branch completes first, then the
370    /// record may have been partially written, but future calls will begin writing the
371    /// encoded record from the beginning.
372    async unsafe fn encode_ref_ts_out_with_sym(
373        &mut self,
374        record: RecordRef<'_>,
375        ts_out: bool,
376        symbol: Option<&str>,
377    ) -> Result<()> {
378        rtype_dispatch!(record, ts_out: ts_out, self.encode_record_with_sym(symbol).await)?
379    }
380}
381
382#[cfg(test)]
383mod test_data {
384    use crate::record::{BidAskPair, RecordHeader};
385
386    // Common data used in multiple tests
387    pub const RECORD_HEADER: RecordHeader = RecordHeader {
388        length: 30,
389        rtype: 4,
390        publisher_id: 1,
391        instrument_id: 323,
392        ts_event: 1658441851000000000,
393    };
394
395    pub const BID_ASK: BidAskPair = BidAskPair {
396        bid_px: 372000000000000,
397        ask_px: 372500000000000,
398        bid_sz: 10,
399        ask_sz: 5,
400        bid_ct: 5,
401        ask_ct: 2,
402    };
403}