dbn/encode.rs
1//! Encoding DBN and Zstd-compressed DBN files and streams. Encoders implement the
2//! [`EncodeDbn`] trait.
3pub mod csv;
4pub mod dbn;
5mod dyn_encoder;
6mod dyn_writer;
7pub mod json;
8mod split;
9
10use std::{fmt, io, num::NonZeroU64};
11
12use fallible_streaming_iterator::FallibleStreamingIterator;
13
14// Re-exports
15pub use self::{
16 csv::Encoder as CsvEncoder,
17 dbn::{
18 Encoder as DbnEncoder, MetadataEncoder as DbnMetadataEncoder,
19 RecordEncoder as DbnRecordEncoder,
20 },
21 json::Encoder as JsonEncoder,
22 split::{
23 NoSchemaBehavior, SchemaSplitter, SplitDuration, SplitEncoder, Splitter, SymbolSplitter,
24 TimeSplitter,
25 },
26};
27#[cfg(feature = "async")]
28pub use self::{
29 dbn::{
30 AsyncEncoder as AsyncDbnEncoder, AsyncMetadataEncoder as AsyncDbnMetadataEncoder,
31 AsyncRecordEncoder as AsyncDbnRecordEncoder,
32 },
33 json::AsyncEncoder as AsyncJsonEncoder,
34};
35#[doc(inline)]
36pub use self::{
37 dyn_encoder::{DynEncoder, DynEncoderBuilder},
38 dyn_writer::DynWriter,
39};
40
41#[cfg(feature = "async")]
42#[doc(inline)]
43pub use self::dyn_writer::{DynAsyncBufWriter, DynAsyncWriter};
44
45use crate::{
46 decode::{DbnMetadata, DecodeRecordRef},
47 rtype_dispatch, Error, Record, RecordRef, Result,
48};
49
50use self::{csv::serialize::CsvSerialize, json::serialize::JsonSerialize};
51
52/// Trait alias for [`Record`], `CsvSerialize`, [`fmt::Debug`], and `JsonSerialize`.
53pub trait DbnEncodable: Record + CsvSerialize + fmt::Debug + JsonSerialize {}
54impl<T> DbnEncodable for T where T: Record + CsvSerialize + fmt::Debug + JsonSerialize {}
55
56/// Trait for types that encode a DBN record of a specific type.
57pub trait EncodeRecord {
58 /// Encodes a single DBN record of type `R`.
59 ///
60 /// # Errors
61 /// This function returns an error if it's unable to write to the underlying writer
62 /// or there's a serialization error.
63 fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
64
65 /// Flushes any buffered content to the true output.
66 ///
67 /// # Errors
68 /// This function returns an error if it's unable to flush the underlying writer.
69 fn flush(&mut self) -> Result<()>;
70}
71
72/// Trait for types that encode DBN records with mixed schemas.
73pub trait EncodeRecordRef {
74 /// Encodes a single DBN [`RecordRef`].
75 ///
76 /// # Errors
77 /// This function returns an error if it's unable to write to the underlying writer
78 /// or there's a serialization error.
79 fn encode_record_ref(&mut self, record: RecordRef) -> Result<()>;
80
81 /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
82 /// [`record::WithTsOut`](crate::record::WithTsOut)).
83 ///
84 /// # Safety
85 /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
86 ///
87 /// # Errors
88 /// This function returns an error if it's unable to write to the underlying writer
89 /// or there's a serialization error.
90 unsafe fn encode_record_ref_ts_out(&mut self, record: RecordRef, ts_out: bool) -> Result<()>;
91}
92
93/// Trait for types that encode DBN records with a specific record type.
94pub trait EncodeDbn: EncodeRecord + EncodeRecordRef {
95 /// Encodes a slice of DBN records.
96 ///
97 /// # Errors
98 /// This function returns an error if it's unable to write to the underlying writer
99 /// or there's a serialization error.
100 fn encode_records<R: DbnEncodable>(&mut self, records: &[R]) -> Result<()> {
101 for record in records {
102 self.encode_record(record)?;
103 }
104 self.flush()?;
105 Ok(())
106 }
107
108 /// Encodes a stream of DBN records.
109 ///
110 /// # Errors
111 /// This function returns an error if it's unable to write to the underlying writer
112 /// or there's a serialization error.
113 fn encode_stream<R: DbnEncodable>(
114 &mut self,
115 mut stream: impl FallibleStreamingIterator<Item = R, Error = Error>,
116 ) -> Result<()> {
117 while let Some(record) = stream.next()? {
118 self.encode_record(record)?;
119 }
120 self.flush()?;
121 Ok(())
122 }
123
124 /// Encodes DBN records directly from a DBN decoder.
125 ///
126 /// # Errors
127 /// This function returns an error if it's unable to write to the underlying writer
128 /// or there's a serialization error.
129 fn encode_decoded<D: DecodeRecordRef + DbnMetadata>(&mut self, mut decoder: D) -> Result<()> {
130 let ts_out = decoder.metadata().ts_out;
131 while let Some(record) = decoder.decode_record_ref()? {
132 // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out`
133 // from the metadata header.
134 unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
135 }
136 self.flush()?;
137 Ok(())
138 }
139
140 /// Encodes DBN records directly from a DBN decoder, outputting no more than
141 /// `limit` records.
142 ///
143 /// # Errors
144 /// This function returns an error if it's unable to write to the underlying writer
145 /// or there's a serialization error.
146 fn encode_decoded_with_limit<D: DecodeRecordRef + DbnMetadata>(
147 &mut self,
148 mut decoder: D,
149 limit: NonZeroU64,
150 ) -> Result<()> {
151 let ts_out = decoder.metadata().ts_out;
152 let mut i = 0;
153 while let Some(record) = decoder.decode_record_ref()? {
154 // Safety: It's safe to cast to `WithTsOut` because we're passing in the `ts_out`
155 // from the metadata header.
156 unsafe { self.encode_record_ref_ts_out(record, ts_out) }?;
157 i += 1;
158 if i == limit.get() {
159 break;
160 }
161 }
162 self.flush()?;
163 Ok(())
164 }
165}
166
167/// Extension trait for text encodings.
168pub trait EncodeRecordTextExt: EncodeRecord + EncodeRecordRef {
169 /// Encodes a single DBN record of type `R` along with the record's text symbol.
170 ///
171 /// # Errors
172 /// This function returns an error if it's unable to write to the underlying writer
173 /// or there's a serialization error.
174 fn encode_record_with_sym<R: DbnEncodable>(
175 &mut self,
176 record: &R,
177 symbol: Option<&str>,
178 ) -> Result<()>;
179
180 /// Encodes a single DBN [`RecordRef`] along with the record's text symbol.
181 ///
182 /// # Errors
183 /// This function returns an error if it's unable to write to the underlying writer
184 /// or there's a serialization error.
185 fn encode_ref_with_sym(&mut self, record: RecordRef, symbol: Option<&str>) -> Result<()> {
186 rtype_dispatch!(record, self.encode_record_with_sym(symbol))?
187 }
188
189 /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
190 /// [`record::WithTsOut`](crate::record::WithTsOut)) along with the record's text
191 /// symbol.
192 ///
193 /// # Safety
194 /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
195 ///
196 /// # Errors
197 /// This function returns an error if it's unable to write to the underlying writer
198 /// or there's a serialization error.
199 unsafe fn encode_ref_ts_out_with_sym(
200 &mut self,
201 record: RecordRef,
202 ts_out: bool,
203 symbol: Option<&str>,
204 ) -> Result<()> {
205 rtype_dispatch!(record, ts_out: ts_out, self.encode_record_with_sym(symbol))?
206 }
207}
208
209/// The default Zstandard compression level used.
210pub const ZSTD_COMPRESSION_LEVEL: i32 = 0;
211
212fn zstd_encoder<'a, W: io::Write>(writer: W) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
213 zstd_encoder_with_clevel(writer, ZSTD_COMPRESSION_LEVEL)
214}
215
216fn zstd_encoder_with_clevel<'a, W: io::Write>(
217 writer: W,
218 level: i32,
219) -> Result<zstd::stream::AutoFinishEncoder<'a, W>> {
220 let mut zstd_encoder =
221 zstd::Encoder::new(writer, level).map_err(|e| Error::io(e, "creating zstd encoder"))?;
222 zstd_encoder
223 .include_checksum(true)
224 .map_err(|e| Error::io(e, "setting zstd checksum"))?;
225 Ok(zstd_encoder.auto_finish())
226}
227
228#[cfg(feature = "async")]
229fn async_zstd_encoder<W: tokio::io::AsyncWriteExt + Unpin>(
230 writer: W,
231) -> async_compression::tokio::write::ZstdEncoder<W> {
232 async_zstd_encoder_with_clevel(writer, ZSTD_COMPRESSION_LEVEL)
233}
234
235#[cfg(feature = "async")]
236fn async_zstd_encoder_with_clevel<W: tokio::io::AsyncWriteExt + Unpin>(
237 writer: W,
238 level: i32,
239) -> async_compression::tokio::write::ZstdEncoder<W> {
240 async_compression::tokio::write::ZstdEncoder::with_quality_and_params(
241 writer,
242 async_compression::Level::Precise(level),
243 &[async_compression::zstd::CParameter::checksum_flag(true)],
244 )
245}
246
247/// Trait for async encoding of DBN records of a specific type.
248#[cfg(feature = "async")]
249#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
250pub trait AsyncEncodeRecord {
251 /// Encodes a single DBN record of type `R`.
252 ///
253 /// # Errors
254 /// This function returns an error if it's unable to write to the underlying writer
255 /// or there's a serialization error.
256 ///
257 /// # Cancel safety
258 /// This method is not cancellation safe. If this method is used in a
259 /// `tokio::select!` statement and another branch completes first, then the
260 /// record may have been partially written, but future calls will begin writing the
261 /// encoded record from the beginning.
262 async fn encode_record<R: DbnEncodable>(&mut self, record: &R) -> Result<()>;
263
264 /// Flushes any buffered content to the true output.
265 ///
266 /// # Errors
267 /// This function returns an error if it's unable to flush the underlying writer.
268 async fn flush(&mut self) -> Result<()>;
269
270 /// Initiates or attempts to shut down the inner writer.
271 ///
272 /// # Errors
273 /// This function returns an error if the shut down did not complete successfully.
274 async fn shutdown(&mut self) -> Result<()>;
275}
276
277/// Trait for async encoding of DBN of [`RecordRef`] records.
278#[cfg(feature = "async")]
279#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
280pub trait AsyncEncodeRecordRef {
281 /// Encodes a single [`RecordRef`].
282 ///
283 /// # Errors
284 /// This function returns an error if it's unable to write to the underlying writer
285 /// or there's a serialization error.
286 ///
287 /// # Cancel safety
288 /// This method is not cancellation safe. If this method is used in a
289 /// `tokio::select!` statement and another branch completes first, then the
290 /// record may have been partially written, but future calls will begin writing the
291 /// encoded record from the beginning.
292 async fn encode_record_ref(&mut self, record_ref: RecordRef) -> Result<()>;
293
294 /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
295 /// [`record::WithTsOut`](crate::record::WithTsOut)).
296 ///
297 /// # Safety
298 /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
299 ///
300 /// # Errors
301 /// This function returns an error if it's unable to write to the underlying writer
302 /// or there's a serialization error.
303 ///
304 /// # Cancel safety
305 /// This method is not cancellation safe. If this method is used in a
306 /// `tokio::select!` statement and another branch completes first, then the
307 /// record may have been partially written, but future calls will begin writing the
308 /// encoded record from the beginning.
309 async unsafe fn encode_record_ref_ts_out(
310 &mut self,
311 record_ref: RecordRef,
312 ts_out: bool,
313 ) -> Result<()>;
314}
315
316/// Async extension trait for text encodings.
317#[cfg(feature = "async")]
318#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
319pub trait AsyncEncodeRecordTextExt: AsyncEncodeRecord + AsyncEncodeRecordRef {
320 /// Encodes a single DBN record of type `R` along with the record's text symbol.
321 ///
322 /// # Errors
323 /// This function returns an error if it's unable to write to the underlying writer
324 /// or there's a serialization error.
325 ///
326 /// # Cancel safety
327 /// This method is not cancellation safe. If this method is used in a
328 /// `tokio::select!` statement and another branch completes first, then the
329 /// record may have been partially written, but future calls will begin writing the
330 /// encoded record from the beginning.
331 async fn encode_record_with_sym<R: DbnEncodable>(
332 &mut self,
333 record: &R,
334 symbol: Option<&str>,
335 ) -> Result<()>;
336
337 /// Encodes a single DBN [`RecordRef`] along with the record's text symbol.
338 ///
339 /// # Errors
340 /// This function returns an error if it's unable to write to the underlying writer
341 /// or there's a serialization error.
342 ///
343 /// # Cancel safety
344 /// This method is not cancellation safe. If this method is used in a
345 /// `tokio::select!` statement and another branch completes first, then the
346 /// record may have been partially written, but future calls will begin writing the
347 /// encoded record from the beginning.
348 async fn encode_ref_with_sym(
349 &mut self,
350 record: RecordRef<'_>,
351 symbol: Option<&str>,
352 ) -> Result<()> {
353 rtype_dispatch!(record, self.encode_record_with_sym(symbol).await)?
354 }
355
356 /// Encodes a single DBN [`RecordRef`] with an optional `ts_out` (see
357 /// [`record::WithTsOut`](crate::record::WithTsOut)) along with the record's text
358 /// symbol.
359 ///
360 /// # Safety
361 /// `ts_out` must be `false` if `record` does not have an appended `ts_out`.
362 ///
363 /// # Errors
364 /// This function returns an error if it's unable to write to the underlying writer
365 /// or there's a serialization error.
366 ///
367 /// # Cancel safety
368 /// This method is not cancellation safe. If this method is used in a
369 /// `tokio::select!` statement and another branch completes first, then the
370 /// record may have been partially written, but future calls will begin writing the
371 /// encoded record from the beginning.
372 async unsafe fn encode_ref_ts_out_with_sym(
373 &mut self,
374 record: RecordRef<'_>,
375 ts_out: bool,
376 symbol: Option<&str>,
377 ) -> Result<()> {
378 rtype_dispatch!(record, ts_out: ts_out, self.encode_record_with_sym(symbol).await)?
379 }
380}
381
382#[cfg(test)]
383mod test_data {
384 use crate::record::{BidAskPair, RecordHeader};
385
386 // Common data used in multiple tests
387 pub const RECORD_HEADER: RecordHeader = RecordHeader {
388 length: 30,
389 rtype: 4,
390 publisher_id: 1,
391 instrument_id: 323,
392 ts_event: 1658441851000000000,
393 };
394
395 pub const BID_ASK: BidAskPair = BidAskPair {
396 bid_px: 372000000000000,
397 ask_px: 372500000000000,
398 bid_sz: 10,
399 ask_sz: 5,
400 bid_ct: 5,
401 ask_ct: 2,
402 };
403}