dbn/decode.rs
1//! Decoding DBN and Zstd-compressed DBN files and streams.
2//!
3//! The primary entry point is [`DbnDecoder`], which reads DBN data from any
4//! [`io::Read`](std::io::Read) source (files, network streams, in-memory buffers).
5//! When the format or compression is unknown at compile time, use [`DynDecoder`]
6//! to auto-detect from the first few bytes.
7//!
8//! Sync decoders implement the [`DecodeDbn`] trait. With the `async` feature flag,
9//! async variants are also available.
10//!
11//! # Examples
12//!
13//! Decode a DBN file, dispatching on record type:
14//! ```no_run
15//! use dbn::decode::{DbnDecoder, DecodeRecordRef, DbnMetadata};
16//! use dbn::{TradeMsg, OhlcvMsg, Record};
17//!
18//! let mut decoder = DbnDecoder::from_zstd_file("20241007.dbn.zst")?;
19//! println!("schema: {:?}", decoder.metadata().schema);
20//!
21//! while let Some(rec_ref) = decoder.decode_record_ref()? {
22//! if let Some(trade) = rec_ref.get::<TradeMsg>() {
23//! println!("trade: instrument={} price={}", trade.hd.instrument_id, trade.price);
24//! } else if let Some(bar) = rec_ref.get::<OhlcvMsg>() {
25//! println!("bar: instrument={} close={}", bar.hd.instrument_id, bar.close);
26//! }
27//! }
28//! # Ok::<(), dbn::Error>(())
29//! ```
30//!
31//! Decode all records of a known type into a `Vec`:
32//! ```no_run
33//! use dbn::decode::{DbnDecoder, DecodeRecord};
34//! use dbn::MboMsg;
35//!
36//! let decoder = DbnDecoder::from_zstd_file("20241007.mbo.dbn.zst")?;
37//! let records: Vec<MboMsg> = decoder.decode_records()?;
38//! println!("{} MBO records", records.len());
39//! # Ok::<(), dbn::Error>(())
40//! ```
41pub mod dbn;
42// Having any tests in a deprecated module emits many warnings that can't be silenced, see
43// https://github.com/rust-lang/rust/issues/47238
44#[cfg_attr(
45 not(test),
46 deprecated(
47 since = "0.3.0",
48 note = "DBZ was renamed to DBN and the format was changed to no longer rely on Zstd."
49 )
50)]
51pub mod dbz;
52mod dyn_decoder;
53mod dyn_reader;
54mod merge;
55mod stream;
56// used in databento_dbn
57#[doc(hidden)]
58pub mod zstd;
59
60// Re-exports
61pub use self::dbn::{
62 Decoder as DbnDecoder, MetadataDecoder as DbnMetadataDecoder, RecordDecoder as DbnRecordDecoder,
63};
64#[doc(inline)]
65pub use dyn_decoder::DynDecoder;
66#[doc(inline)]
67pub use dyn_reader::*;
68#[doc(inline)]
69pub use merge::{Decoder as MergeDecoder, RecordDecoder as MergeRecordDecoder};
70#[doc(inline)]
71pub use stream::StreamIterDecoder;
72
73use std::{io::Seek, mem};
74
75use crate::{HasRType, Metadata, RecordBuf, RecordRef, VersionUpgradePolicy};
76
77/// Trait for types that decode references to DBN records of a dynamic type.
78pub trait DecodeRecordRef {
79 /// Tries to decode a generic reference a record. Returns `Ok(None)` if input
80 /// has been exhausted.
81 ///
82 /// # Errors
83 /// This function returns an error if the underlying reader returns an error of a
84 /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
85 ///
86 /// If the `length` property of the record is invalid, an
87 /// [`Error::Decode`](crate::Error::Decode) will be returned.
88 fn decode_record_ref(&mut self) -> crate::Result<Option<RecordRef<'_>>>;
89
90 /// Returns an [`Iterator`] of owned [`RecordBuf`]s decoded from this reader.
91 ///
92 /// Each record is copied from the internal decode buffer into a `RecordBuf`.
93 /// For zero-copy access, use [`decode_record_ref()`](Self::decode_record_ref) instead.
94 fn decode_buf_iter(&mut self) -> impl Iterator<Item = crate::Result<RecordBuf>> + '_ {
95 std::iter::from_fn(move || match self.decode_record_ref() {
96 Ok(Some(rec_ref)) => Some(Ok(rec_ref.to_owned())),
97 Ok(None) => None,
98 Err(e) => Some(Err(e)),
99 })
100 }
101}
102
103/// Trait for decoders with metadata about what's being decoded.
104pub trait DbnMetadata {
105 /// Returns an immutable reference to the decoded [`Metadata`].
106 fn metadata(&self) -> &Metadata;
107
108 /// Returns a mutable reference to the decoded [`Metadata`].
109 fn metadata_mut(&mut self) -> &mut Metadata;
110}
111
112/// Trait for types that decode DBN records of a particular type.
113pub trait DecodeRecord {
114 /// Tries to decode a reference to a single record of type `T`. Returns `Ok(None)`
115 /// if the input has been exhausted.
116 ///
117 /// # Errors
118 /// This function returns an error if the underlying reader returns an error of a
119 /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
120 ///
121 /// If the next record is of a different type than `T`, an
122 /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
123 ///
124 /// If the `length` property of the record is invalid, an
125 /// [`Error::Decode`](crate::Error::Decode) will be returned.
126 fn decode_record<T: HasRType>(&mut self) -> crate::Result<Option<&T>>;
127
128 /// Tries to decode all records into a `Vec`. This eagerly decodes the data.
129 ///
130 /// # Errors
131 /// This function returns an error if the underlying reader returns an error of a
132 /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
133 ///
134 /// If any of the records is of a different type than `T`, an
135 /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
136 ///
137 /// If the `length` property of any of the records is invalid, a
138 /// [`Error::Decode`](crate::Error::Decode) will be returned.
139 fn decode_records<T: HasRType + Clone>(mut self) -> crate::Result<Vec<T>>
140 where
141 Self: Sized,
142 {
143 let mut res = Vec::new();
144 while let Some(rec) = self.decode_record::<T>()? {
145 res.push(rec.clone());
146 }
147 Ok(res)
148 }
149}
150
151/// A trait alias for DBN decoders with metadata.
152pub trait DecodeDbn: DecodeRecord + DecodeRecordRef + DbnMetadata {}
153
154/// A trait for decoders that can be converted to streaming iterators.
155pub trait DecodeStream: DecodeRecord + private::LastRecord {
156 /// Converts the decoder into a streaming iterator of records of type `T`. This
157 /// lazily decodes the data.
158 fn decode_stream<T: HasRType>(self) -> StreamIterDecoder<Self, T>
159 where
160 Self: Sized;
161}
162/// Like [`Seek`], but only allows seeking forward from the current
163/// position.
164pub trait SkipBytes {
165 /// Skips `n_bytes` ahead.
166 ///
167 /// # Errors
168 /// This function returns an error if the I/O operations fail.
169 fn skip_bytes(&mut self, n_bytes: usize) -> crate::Result<()>;
170}
171
172impl<T> SkipBytes for T
173where
174 T: Seek,
175{
176 fn skip_bytes(&mut self, n_bytes: usize) -> crate::Result<()> {
177 self.seek(std::io::SeekFrom::Current(n_bytes as i64))
178 .map(drop)
179 .map_err(|err| crate::Error::io(err, format!("seeking ahead {n_bytes} bytes")))
180 }
181}
182
183/// Async trait for types that decode references to DBN records of a dynamic type.
184#[cfg(feature = "async")]
185#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
186pub trait AsyncDecodeRecordRef {
187 /// Tries to decode a generic reference a record. Returns `Ok(None)` if input
188 /// has been exhausted.
189 ///
190 /// # Errors
191 /// This function returns an error if the underlying reader returns an error of a
192 /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
193 ///
194 /// If the `length` property of the record is invalid, an
195 /// [`Error::Decode`](crate::Error::Decode) will be returned.
196 ///
197 /// # Cancel safety
198 /// This method is cancel safe. It can be used within a `tokio::select!` statement
199 /// without the potential for corrupting the input stream.
200 async fn decode_record_ref(&mut self) -> crate::Result<Option<RecordRef<'_>>>;
201
202 /// Returns a [`Stream`](futures_core::Stream) of owned [`RecordBuf`]s decoded
203 /// from this reader.
204 ///
205 /// Each record is copied from the internal decode buffer into a `RecordBuf`.
206 /// For zero-copy access, use [`decode_record_ref()`](Self::decode_record_ref) instead.
207 fn decode_stream(&mut self) -> impl futures_core::Stream<Item = crate::Result<RecordBuf>> + '_ {
208 async_stream::try_stream! {
209 while let Some(rec_ref) = self.decode_record_ref().await? {
210 yield rec_ref.to_owned();
211 }
212 }
213 }
214}
215
216/// Async trait for types that decode DBN records of a particular type.
217#[cfg(feature = "async")]
218#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
219pub trait AsyncDecodeRecord {
220 /// Tries to decode a reference to a single record of type `T`. Returns `Ok(None)`
221 /// if the input has been exhausted.
222 ///
223 /// # Errors
224 /// This function returns an error if the underlying reader returns an error of a
225 /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
226 ///
227 /// If the next record is of a different type than `T`, an
228 /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
229 ///
230 /// If the `length` property of the record is invalid, an
231 /// [`Error::Decode`](crate::Error::Decode) will be returned.
232 ///
233 /// # Cancel safety
234 /// This method is cancel safe. It can be used within a `tokio::select!` statement
235 /// without the potential for corrupting the input stream.
236 async fn decode_record<'a, T: HasRType + 'a>(&'a mut self) -> crate::Result<Option<&'a T>>;
237
238 /// Tries to decode all records into a `Vec`. This eagerly decodes the data.
239 ///
240 /// # Errors
241 /// This function returns an error if the underlying reader returns an error of a
242 /// kind other than `io::ErrorKind::UnexpectedEof` upon reading.
243 ///
244 /// If any of the records is of a different type than `T`, an
245 /// [`Error::Conversion`](crate::Error::Conversion) will be returned.
246 ///
247 /// If the `length` property of any of the records is invalid, a
248 /// [`Error::Decode`](crate::Error::Decode) will be returned.
249 ///
250 /// # Cancel safety
251 /// This method is not cancellation safe. If used within a `tokio::select!` statement
252 /// partially decoded records will be lost and the stream may be corrupted.
253 async fn decode_records<T: HasRType + Clone>(&mut self) -> crate::Result<Vec<T>>
254 where
255 Self: Sized,
256 {
257 let mut res = Vec::new();
258 while let Some(rec) = self.decode_record::<T>().await? {
259 res.push(rec.clone());
260 }
261 Ok(res)
262 }
263}
264
265/// Like [`AsyncSeek`](tokio::io::AsyncSeek), but only allows seeking forward from the current position.
266#[cfg(feature = "async")]
267#[allow(async_fn_in_trait)] // the futures can't be Send because self is borrowed mutably
268pub trait AsyncSkipBytes {
269 /// Skips ahead `n_bytes` bytes.
270 ///
271 /// # Errors
272 /// This function returns an error if the I/O operations fail.
273 async fn skip_bytes(&mut self, n_bytes: usize) -> crate::Result<()>;
274}
275
276#[cfg(feature = "async")]
277const ZSTD_FILE_BUFFER_CAPACITY: usize = 1 << 20;
278
279#[doc(hidden)]
280pub mod private {
281 use crate::RecordRef;
282
283 /// An implementation detail for the interaction between [`StreamingIterator`] and
284 /// implementers of [`DecodeRecord`].
285 #[doc(hidden)]
286 pub trait LastRecord {
287 fn last_record(&self) -> Option<RecordRef<'_>>;
288 }
289}
290
291pub(crate) trait FromLittleEndianSlice {
292 fn from_le_slice(slice: &[u8]) -> Self;
293}
294
295impl FromLittleEndianSlice for u64 {
296 /// # Panics
297 /// Panics if the length of `slice` is less than 8 bytes.
298 fn from_le_slice(slice: &[u8]) -> Self {
299 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
300 Self::from_le_bytes(bytes.try_into().unwrap())
301 }
302}
303
304impl FromLittleEndianSlice for i32 {
305 /// # Panics
306 /// Panics if the length of `slice` is less than 4 bytes.
307 fn from_le_slice(slice: &[u8]) -> Self {
308 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
309 Self::from_le_bytes(bytes.try_into().unwrap())
310 }
311}
312
313impl FromLittleEndianSlice for u32 {
314 /// # Panics
315 /// Panics if the length of `slice` is less than 4 bytes.
316 fn from_le_slice(slice: &[u8]) -> Self {
317 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
318 Self::from_le_bytes(bytes.try_into().unwrap())
319 }
320}
321
322impl FromLittleEndianSlice for u16 {
323 /// # Panics
324 /// Panics if the length of `slice` is less than 2 bytes.
325 fn from_le_slice(slice: &[u8]) -> Self {
326 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
327 Self::from_le_bytes(bytes.try_into().unwrap())
328 }
329}
330
331#[cfg(feature = "async")]
332pub use self::dbn::{
333 AsyncDecoder as AsyncDbnDecoder, AsyncMetadataDecoder as AsyncDbnMetadataDecoder,
334 AsyncRecordDecoder as AsyncDbnRecordDecoder,
335};
336
337#[cfg(test)]
338mod tests {
339 pub const TEST_DATA_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data");
340}