Skip to main content

dbn/
decode.rs

1//! Decoding DBN and Zstd-compressed DBN files and streams.
2//!
3//! The primary entry point is [`DbnDecoder`], which reads DBN data from any
4//! [`io::Read`](std::io::Read) source (files, network streams, in-memory buffers).
5//! When the format or compression is unknown at compile time, use [`DynDecoder`]
6//! to auto-detect from the first few bytes.
7//!
8//! Sync decoders implement the [`DecodeDbn`] trait. With the `async` feature flag,
9//! async variants are also available.
10//!
11//! # Examples
12//!
13//! Decode a DBN file, dispatching on record type:
14//! ```no_run
15//! use dbn::decode::{DbnDecoder, DecodeRecordRef, DbnMetadata};
16//! use dbn::{TradeMsg, OhlcvMsg, Record};
17//!
18//! let mut decoder = DbnDecoder::from_zstd_file("20241007.dbn.zst")?;
19//! println!("schema: {:?}", decoder.metadata().schema);
20//!
21//! while let Some(rec_ref) = decoder.decode_record_ref()? {
22//!     if let Some(trade) = rec_ref.get::<TradeMsg>() {
23//!         println!("trade: instrument={} price={}", trade.hd.instrument_id, trade.price);
24//!     } else if let Some(bar) = rec_ref.get::<OhlcvMsg>() {
25//!         println!("bar: instrument={} close={}", bar.hd.instrument_id, bar.close);
26//!     }
27//! }
28//! # Ok::<(), dbn::Error>(())
29//! ```
30//!
31//! Decode all records of a known type into a `Vec`:
32//! ```no_run
33//! use dbn::decode::{DbnDecoder, DecodeRecord};
34//! use dbn::MboMsg;
35//!
36//! let decoder = DbnDecoder::from_zstd_file("20241007.mbo.dbn.zst")?;
37//! let records: Vec<MboMsg> = decoder.decode_records()?;
38//! println!("{} MBO records", records.len());
39//! # Ok::<(), dbn::Error>(())
40//! ```
41pub mod dbn;
42// Having any tests in a deprecated module emits many warnings that can't be silenced, see
43// https://github.com/rust-lang/rust/issues/47238
44#[cfg_attr(
45    not(test),
46    deprecated(
47        since = "0.3.0",
48        note = "DBZ was renamed to DBN and the format was changed to no longer rely on Zstd."
49    )
50)]
51pub mod dbz;
52mod dyn_decoder;
53mod dyn_reader;
54mod merge;
55mod stream;
56// used in databento_dbn
57#[doc(hidden)]
58pub mod zstd;
59
60// Re-exports
61pub use self::dbn::{
62    Decoder as DbnDecoder, MetadataDecoder as DbnMetadataDecoder, RecordDecoder as DbnRecordDecoder,
63};
64#[doc(inline)]
65pub use dyn_decoder::DynDecoder;
66#[doc(inline)]
67pub use dyn_reader::*;
68#[doc(inline)]
69pub use merge::{Decoder as MergeDecoder, RecordDecoder as MergeRecordDecoder};
70#[doc(inline)]
71pub use stream::StreamIterDecoder;
72
73use std::{io::Seek, mem};
74
75use crate::{HasRType, Metadata, RecordBuf, RecordRef, VersionUpgradePolicy};
76
77/// Trait for types that decode references to DBN records of a dynamic type.
78pub trait DecodeRecordRef {
79    /// Tries to decode a generic reference a record. Returns `Ok(None)` if input
80    /// has been exhausted.
81    ///
82    /// # Errors
83    /// This function returns an error if the underlying reader returns an error of a
84    /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
85    ///
86    /// If the `length` property of the record is invalid, an
87    /// [`Error::Decode`](crate::Error::Decode) will be returned.
88    fn decode_record_ref(&mut self) -> crate::Result<Option<RecordRef<'_>>>;
89
90    /// Returns an [`Iterator`] of owned [`RecordBuf`]s decoded from this reader.
91    ///
92    /// Each record is copied from the internal decode buffer into a `RecordBuf`.
93    /// For zero-copy access, use [`decode_record_ref()`](Self::decode_record_ref) instead.
94    fn decode_buf_iter(&mut self) -> impl Iterator<Item = crate::Result<RecordBuf>> + '_ {
95        std::iter::from_fn(move || match self.decode_record_ref() {
96            Ok(Some(rec_ref)) => Some(Ok(rec_ref.to_owned())),
97            Ok(None) => None,
98            Err(e) => Some(Err(e)),
99        })
100    }
101}
102
103/// Trait for decoders with metadata about what's being decoded.
104pub trait DbnMetadata {
105    /// Returns an immutable reference to the decoded [`Metadata`].
106    fn metadata(&self) -> &Metadata;
107
108    /// Returns a mutable reference to the decoded [`Metadata`].
109    fn metadata_mut(&mut self) -> &mut Metadata;
110}
111
112/// Trait for types that decode DBN records of a particular type.
113pub trait DecodeRecord {
114    /// Tries to decode a reference to a single record of type `T`. Returns `Ok(None)`
115    /// if the input has been exhausted.
116    ///
117    /// # Errors
118    /// This function returns an error if the underlying reader returns an error of a
119    /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
120    ///
121    /// If the next record is of a different type than `T`, an
122    /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
123    ///
124    /// If the `length` property of the record is invalid, an
125    /// [`Error::Decode`](crate::Error::Decode) will be returned.
126    fn decode_record<T: HasRType>(&mut self) -> crate::Result<Option<&T>>;
127
128    /// Tries to decode all records into a `Vec`. This eagerly decodes the data.
129    ///
130    /// # Errors
131    /// This function returns an error if the underlying reader returns an error of a
132    /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
133    ///
134    /// If any of the records is of a different type than `T`, an
135    /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
136    ///
137    /// If the `length` property of any of the records is invalid, a
138    /// [`Error::Decode`](crate::Error::Decode) will be returned.
139    fn decode_records<T: HasRType + Clone>(mut self) -> crate::Result<Vec<T>>
140    where
141        Self: Sized,
142    {
143        let mut res = Vec::new();
144        while let Some(rec) = self.decode_record::<T>()? {
145            res.push(rec.clone());
146        }
147        Ok(res)
148    }
149}
150
151/// A trait alias for DBN decoders with metadata.
152pub trait DecodeDbn: DecodeRecord + DecodeRecordRef + DbnMetadata {}
153
154/// A trait for decoders that can be converted to streaming iterators.
155pub trait DecodeStream: DecodeRecord + private::LastRecord {
156    /// Converts the decoder into a streaming iterator of records of type `T`. This
157    /// lazily decodes the data.
158    fn decode_stream<T: HasRType>(self) -> StreamIterDecoder<Self, T>
159    where
160        Self: Sized;
161}
162/// Like [`Seek`], but only allows seeking forward from the current
163/// position.
164pub trait SkipBytes {
165    /// Skips `n_bytes` ahead.
166    ///
167    /// # Errors
168    /// This function returns an error if the I/O operations fail.
169    fn skip_bytes(&mut self, n_bytes: usize) -> crate::Result<()>;
170}
171
172impl<T> SkipBytes for T
173where
174    T: Seek,
175{
176    fn skip_bytes(&mut self, n_bytes: usize) -> crate::Result<()> {
177        self.seek(std::io::SeekFrom::Current(n_bytes as i64))
178            .map(drop)
179            .map_err(|err| crate::Error::io(err, format!("seeking ahead {n_bytes} bytes")))
180    }
181}
182
183/// Async trait for types that decode references to DBN records of a dynamic type.
184#[cfg(feature = "async")]
185#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
186pub trait AsyncDecodeRecordRef {
187    /// Tries to decode a generic reference a record. Returns `Ok(None)` if input
188    /// has been exhausted.
189    ///
190    /// # Errors
191    /// This function returns an error if the underlying reader returns an error of a
192    /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
193    ///
194    /// If the `length` property of the record is invalid, an
195    /// [`Error::Decode`](crate::Error::Decode) will be returned.
196    ///
197    /// # Cancel safety
198    /// This method is cancel safe. It can be used within a `tokio::select!` statement
199    /// without the potential for corrupting the input stream.
200    async fn decode_record_ref(&mut self) -> crate::Result<Option<RecordRef<'_>>>;
201
202    /// Returns a [`Stream`](futures_core::Stream) of owned [`RecordBuf`]s decoded
203    /// from this reader.
204    ///
205    /// Each record is copied from the internal decode buffer into a `RecordBuf`.
206    /// For zero-copy access, use [`decode_record_ref()`](Self::decode_record_ref) instead.
207    fn decode_stream(&mut self) -> impl futures_core::Stream<Item = crate::Result<RecordBuf>> + '_ {
208        async_stream::try_stream! {
209            while let Some(rec_ref) = self.decode_record_ref().await? {
210                yield rec_ref.to_owned();
211            }
212        }
213    }
214}
215
216/// Async trait for types that decode DBN records of a particular type.
217#[cfg(feature = "async")]
218#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
219pub trait AsyncDecodeRecord {
220    /// Tries to decode a reference to a single record of type `T`. Returns `Ok(None)`
221    /// if the input has been exhausted.
222    ///
223    /// # Errors
224    /// This function returns an error if the underlying reader returns an error of a
225    /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
226    ///
227    /// If the next record is of a different type than `T`, an
228    /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
229    ///
230    /// If the `length` property of the record is invalid, an
231    /// [`Error::Decode`](crate::Error::Decode) will be returned.
232    ///
233    /// # Cancel safety
234    /// This method is cancel safe. It can be used within a `tokio::select!` statement
235    /// without the potential for corrupting the input stream.
236    async fn decode_record<'a, T: HasRType + 'a>(&'a mut self) -> crate::Result<Option<&'a T>>;
237
238    /// Tries to decode all records into a `Vec`. This eagerly decodes the data.
239    ///
240    /// # Errors
241    /// This function returns an error if the underlying reader returns an error of a
242    /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
243    ///
244    /// If any of the records is of a different type than `T`, an
245    /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
246    ///
247    /// If the `length` property of any of the records is invalid, a
248    /// [`Error::Decode`](crate::Error::Decode) will be returned.
249    ///
250    /// # Cancel safety
251    /// This method is not cancellation safe. If used within a `tokio::select!` statement
252    /// partially decoded records will be lost and the stream may be corrupted.
253    async fn decode_records<T: HasRType + Clone>(&mut self) -> crate::Result<Vec<T>>
254    where
255        Self: Sized,
256    {
257        let mut res = Vec::new();
258        while let Some(rec) = self.decode_record::<T>().await? {
259            res.push(rec.clone());
260        }
261        Ok(res)
262    }
263}
264
265/// Like [`AsyncSeek`](tokio::io::AsyncSeek), but only allows seeking forward from the current position.
266#[cfg(feature = "async")]
267#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
268pub trait AsyncSkipBytes {
269    /// Skips ahead `n_bytes` bytes.
270    ///
271    /// # Errors
272    /// This function returns an error if the I/O operations fail.
273    async fn skip_bytes(&mut self, n_bytes: usize) -> crate::Result<()>;
274}
275
276#[cfg(feature = "async")]
277const ZSTD_FILE_BUFFER_CAPACITY: usize = 1 << 20;
278
279#[doc(hidden)]
280pub mod private {
281    use crate::RecordRef;
282
283    /// An implementation detail for the interaction between [`StreamingIterator`] and
284    /// implementers of [`DecodeRecord`].
285    #[doc(hidden)]
286    pub trait LastRecord {
287        fn last_record(&self) -> Option<RecordRef<'_>>;
288    }
289}
290
291pub(crate) trait FromLittleEndianSlice {
292    fn from_le_slice(slice: &[u8]) -> Self;
293}
294
295impl FromLittleEndianSlice for u64 {
296    /// # Panics
297    /// Panics if the length of `slice` is less than 8 bytes.
298    fn from_le_slice(slice: &[u8]) -> Self {
299        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
300        Self::from_le_bytes(bytes.try_into().unwrap())
301    }
302}
303
304impl FromLittleEndianSlice for i32 {
305    /// # Panics
306    /// Panics if the length of `slice` is less than 4 bytes.
307    fn from_le_slice(slice: &[u8]) -> Self {
308        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
309        Self::from_le_bytes(bytes.try_into().unwrap())
310    }
311}
312
313impl FromLittleEndianSlice for u32 {
314    /// # Panics
315    /// Panics if the length of `slice` is less than 4 bytes.
316    fn from_le_slice(slice: &[u8]) -> Self {
317        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
318        Self::from_le_bytes(bytes.try_into().unwrap())
319    }
320}
321
322impl FromLittleEndianSlice for u16 {
323    /// # Panics
324    /// Panics if the length of `slice` is less than 2 bytes.
325    fn from_le_slice(slice: &[u8]) -> Self {
326        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
327        Self::from_le_bytes(bytes.try_into().unwrap())
328    }
329}
330
331#[cfg(feature = "async")]
332pub use self::dbn::{
333    AsyncDecoder as AsyncDbnDecoder, AsyncMetadataDecoder as AsyncDbnMetadataDecoder,
334    AsyncRecordDecoder as AsyncDbnRecordDecoder,
335};
336
337#[cfg(test)]
338mod tests {
339    pub const TEST_DATA_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data");
340}