dbz_lib/
read.rs

1use std::{
2    fs::File,
3    io::{self, BufReader, Read},
4    marker::PhantomData,
5    mem,
6    path::Path,
7};
8
9use anyhow::{anyhow, Context};
10use log::{debug, warn};
11use serde::Serialize;
12use streaming_iterator::StreamingIterator;
13use zstd::Decoder;
14
15use databento_defs::{
16    enums::{Compression, SType, Schema},
17    record::{transmute_record_bytes, ConstTypeId},
18};
19
20use crate::write::dbz::SCHEMA_VERSION;
21
22/// Object for reading, parsing, and serializing a Databento Binary Encoding (DBZ) file.
23#[derive(Debug)]
24pub struct Dbz<R: io::BufRead> {
25    reader: R,
26    metadata: Metadata,
27}
28
29/// Information about the data contained in a DBZ file.
30#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
31pub struct Metadata {
32    /// The DBZ schema version number.
33    pub version: u8,
34    /// The dataset name.
35    pub dataset: String,
36    /// The data record schema. Specifies which record type is stored in the DBZ file.
37    pub schema: Schema,
38    /// The UNIX nanosecond timestamp of the query start, or the first record if the file was split.
39    pub start: u64,
40    /// The UNIX nanosecond timestamp of the query end, or the last record if the file was split.
41    pub end: u64,
42    /// The maximum number of records for the query.
43    pub limit: u64,
44    /// The total number of data records.
45    pub record_count: u64,
46    /// The data compression format (if any).
47    pub compression: Compression,
48    /// The input symbology type to map from.
49    pub stype_in: SType,
50    /// The output symbology type to map to.
51    pub stype_out: SType,
52    /// The original query input symbols from the request.
53    pub symbols: Vec<String>,
54    /// Symbols that did not resolve for _at least one day_ in the query time range.
55    pub partial: Vec<String>,
56    /// Symbols that did not resolve for _any_ day in the query time range.
57    pub not_found: Vec<String>,
58    /// Symbol mappings containing a native symbol and its mapping intervals.
59    pub mappings: Vec<SymbolMapping>,
60}
61
62/// A native symbol and its symbol mappings for different time ranges within the query range.
63#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
64#[cfg_attr(
65    any(feature = "python", feature = "python-test"),
66    derive(pyo3::FromPyObject)
67)]
68pub struct SymbolMapping {
69    /// The native symbol.
70    pub native: String,
71    /// The mappings of `native` for different date ranges.
72    pub intervals: Vec<MappingInterval>,
73}
74
75/// The resolved symbol for a date range.
76#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
77pub struct MappingInterval {
78    /// UTC start date of interval.
79    #[serde(serialize_with = "serialize_date")]
80    pub start_date: time::Date,
81    /// UTC end date of interval.
82    #[serde(serialize_with = "serialize_date")]
83    pub end_date: time::Date,
84    /// The resolved symbol for this interval.
85    pub symbol: String,
86}
87
88// Override `time::Date`'s serialization format to be ISO 8601.
89fn serialize_date<S: serde::Serializer>(
90    date: &time::Date,
91    serializer: S,
92) -> Result<S::Ok, S::Error> {
93    serializer.serialize_str(&date.to_string()) // ISO 8601
94}
95
96impl Dbz<BufReader<File>> {
97    /// Creates a new [`Dbz`] from the file at `path`. This function reads the metadata,
98    /// but does not read the body of the file.
99    ///
100    /// # Errors
101    /// This function will return an error if `path` doesn't exist. It will also return an error
102    /// if it is unable to parse the metadata from the file.
103    pub fn from_file(path: impl AsRef<Path>) -> anyhow::Result<Self> {
104        let file = File::open(path.as_ref()).with_context(|| {
105            format!(
106                "Error opening dbz file at path '{}'",
107                path.as_ref().display()
108            )
109        })?;
110        let reader = BufReader::new(file);
111        Self::new(reader)
112    }
113}
114
115// `BufRead` instead of `Read` because the [zstd::Decoder] works with `BufRead` so accepting
116// a `Read` could result in redundant `BufReader`s being created.
117impl<R: io::BufRead> Dbz<R> {
118    /// Creates a new [`Dbz`] from `reader`.
119    ///
120    /// # Errors
121    /// This function will return an error if it is unable to parse the metadata in `reader`.
122    pub fn new(mut reader: R) -> anyhow::Result<Self> {
123        let metadata = Metadata::read(&mut reader)?;
124        Ok(Self { reader, metadata })
125    }
126
127    /// Returns the [`Schema`] of the DBZ data. The schema also indicates the record type `T` for
128    /// [`Self::try_into_iter`].
129    pub fn schema(&self) -> Schema {
130        self.metadata.schema
131    }
132
133    /// Returns a reference to all metadata read from the Dbz data in a [`Metadata`] object.
134    pub fn metadata(&self) -> &Metadata {
135        &self.metadata
136    }
137
138    /// Try to decode the DBZ file into a streaming iterator. This decodes the
139    /// data lazily.
140    ///
141    /// # Errors
142    /// This function will return an error if the zstd portion of the DBZ file
143    /// was compressed in an unexpected manner.
144    pub fn try_into_iter<T: ConstTypeId>(self) -> anyhow::Result<DbzStreamIter<R, T>> {
145        DbzStreamIter::new(self.reader, self.metadata)
146    }
147}
148
149/// A consuming iterator over a [`Dbz`]. Lazily decompresses and translates the contents of the file
150/// or other buffer. This struct is created by the [`Dbz::try_into_iter`] method.
151pub struct DbzStreamIter<R: io::BufRead, T> {
152    /// [`Metadata`] about the file being iterated
153    metadata: Metadata,
154    /// Reference to the underlying [`Dbz`] object.
155    /// Buffered zstd decoder of the DBZ file, so each call to [`DbzStreamIter::next()`] doesn't result in a
156    /// separate system call.
157    decoder: Decoder<'static, R>,
158    /// Number of elements that have been decoded. Used for [`Iterator::size_hint`].
159    i: usize,
160    /// Reusable buffer for reading into.
161    buffer: Vec<u8>,
162    /// Required to associate [`DbzStreamIter`] with a `T`.
163    _item: PhantomData<T>,
164}
165
166impl<R: io::BufRead, T> DbzStreamIter<R, T> {
167    pub(crate) fn new(reader: R, metadata: Metadata) -> anyhow::Result<Self> {
168        let decoder = Decoder::with_buffer(reader)?;
169        Ok(DbzStreamIter {
170            metadata,
171            decoder,
172            i: 0,
173            buffer: vec![0; mem::size_of::<T>()],
174            _item: PhantomData {},
175        })
176    }
177}
178
179impl<R: io::BufRead, T: ConstTypeId> StreamingIterator for DbzStreamIter<R, T> {
180    type Item = T;
181
182    fn advance(&mut self) {
183        if let Err(e) = self.decoder.read_exact(&mut self.buffer) {
184            warn!("Failed to read from DBZ decoder: {e:?}");
185            self.i = self.metadata.record_count as usize + 1;
186        }
187        self.i += 1;
188    }
189
190    fn get(&self) -> Option<&Self::Item> {
191        if self.i > self.metadata.record_count as usize {
192            return None;
193        }
194        // Safety: `buffer` is specifically sized to `T`
195        unsafe { transmute_record_bytes(self.buffer.as_slice()) }
196    }
197
198    /// Returns the lower bound and upper bounds of remaining length of iterator.
199    fn size_hint(&self) -> (usize, Option<usize>) {
200        let remaining = self.metadata.record_count as usize - self.i;
201        // assumes `record_count` is accurate. If it is not, the program won't crash but
202        // performance will be suboptimal
203        (remaining, Some(remaining))
204    }
205}
206
207pub(crate) trait FromLittleEndianSlice {
208    fn from_le_slice(slice: &[u8]) -> Self;
209}
210
211impl FromLittleEndianSlice for u64 {
212    /// NOTE: assumes the length of `slice` is at least 8 bytes
213    fn from_le_slice(slice: &[u8]) -> Self {
214        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
215        Self::from_le_bytes(bytes.try_into().unwrap())
216    }
217}
218
219impl FromLittleEndianSlice for i32 {
220    /// NOTE: assumes the length of `slice` is at least 4 bytes
221    fn from_le_slice(slice: &[u8]) -> Self {
222        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
223        Self::from_le_bytes(bytes.try_into().unwrap())
224    }
225}
226
227impl FromLittleEndianSlice for u32 {
228    /// NOTE: assumes the length of `slice` is at least 4 bytes
229    fn from_le_slice(slice: &[u8]) -> Self {
230        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
231        Self::from_le_bytes(bytes.try_into().unwrap())
232    }
233}
234
235impl FromLittleEndianSlice for u16 {
236    /// NOTE: assumes the length of `slice` is at least 2 bytes
237    fn from_le_slice(slice: &[u8]) -> Self {
238        let (bytes, _) = slice.split_at(mem::size_of::<Self>());
239        Self::from_le_bytes(bytes.try_into().unwrap())
240    }
241}
242
243impl Metadata {
244    const U32_SIZE: usize = mem::size_of::<u32>();
245
246    pub(crate) fn read(reader: &mut impl io::Read) -> anyhow::Result<Self> {
247        let mut prelude_buffer = [0u8; 2 * mem::size_of::<i32>()];
248        reader
249            .read_exact(&mut prelude_buffer)
250            .with_context(|| "Failed to read metadata prelude")?;
251        let magic = u32::from_le_slice(&prelude_buffer[..4]);
252        if !Self::ZSTD_MAGIC_RANGE.contains(&magic) {
253            return Err(anyhow!("Invalid metadata: no zstd magic number"));
254        }
255        let frame_size = u32::from_le_slice(&prelude_buffer[4..]);
256        debug!("magic={magic}, frame_size={frame_size}");
257        if (frame_size as usize) < Self::FIXED_METADATA_LEN {
258            return Err(anyhow!(
259                "Frame length cannot be shorter than the fixed metadata size"
260            ));
261        }
262
263        let mut metadata_buffer = vec![0u8; frame_size as usize];
264        reader
265            .read_exact(&mut metadata_buffer)
266            .with_context(|| "Failed to read metadata")?;
267        Self::decode(metadata_buffer)
268    }
269
270    fn decode(metadata_buffer: Vec<u8>) -> anyhow::Result<Self> {
271        const U64_SIZE: usize = mem::size_of::<u64>();
272        let mut pos = 0;
273        if &metadata_buffer[pos..pos + 3] != b"DBZ" {
274            return Err(anyhow!("Invalid version string"));
275        }
276        // Interpret 4th character as an u8, not a char to allow for 254 versions (0 omitted)
277        let version = metadata_buffer[pos + 3] as u8;
278        // assume not forwards compatible
279        if version > SCHEMA_VERSION {
280            return Err(anyhow!("Can't read newer version of DBZ"));
281        }
282        pos += Self::VERSION_CSTR_LEN;
283        let dataset = std::str::from_utf8(&metadata_buffer[pos..pos + Self::DATASET_CSTR_LEN])
284            .with_context(|| "Failed to read dataset from metadata")?
285            // remove null bytes
286            .trim_end_matches('\0')
287            .to_owned();
288        pos += Self::DATASET_CSTR_LEN;
289        let schema = Schema::try_from(u16::from_le_slice(&metadata_buffer[pos..]))
290            .with_context(|| format!("Failed to read schema: '{}'", metadata_buffer[pos]))?;
291        pos += mem::size_of::<Schema>();
292        let start = u64::from_le_slice(&metadata_buffer[pos..]);
293        pos += U64_SIZE;
294        let end = u64::from_le_slice(&metadata_buffer[pos..]);
295        pos += U64_SIZE;
296        let limit = u64::from_le_slice(&metadata_buffer[pos..]);
297        pos += U64_SIZE;
298        let record_count = u64::from_le_slice(&metadata_buffer[pos..]);
299        pos += U64_SIZE;
300        let compression = Compression::try_from(metadata_buffer[pos])
301            .with_context(|| format!("Failed to parse compression '{}'", metadata_buffer[pos]))?;
302        pos += mem::size_of::<Compression>();
303        let stype_in = SType::try_from(metadata_buffer[pos])
304            .with_context(|| format!("Failed to read stype_in: '{}'", metadata_buffer[pos]))?;
305        pos += mem::size_of::<SType>();
306        let stype_out = SType::try_from(metadata_buffer[pos])
307            .with_context(|| format!("Failed to read stype_out: '{}'", metadata_buffer[pos]))?;
308        pos += mem::size_of::<SType>();
309        // skip reserved
310        pos += Self::RESERVED_LEN;
311        // remaining metadata is compressed
312        let mut zstd_decoder = Decoder::new(&metadata_buffer[pos..])
313            .with_context(|| "Failed to read zstd-zipped variable-length metadata".to_owned())?;
314
315        // decompressed variable-length metadata buffer
316        let buffer_capacity = (metadata_buffer.len() - pos) * 3; // 3x is arbitrary
317        let mut var_buffer = Vec::with_capacity(buffer_capacity);
318        zstd_decoder.read_to_end(&mut var_buffer)?;
319        pos = 0;
320        let schema_definition_length = u32::from_le_slice(&var_buffer[pos..]);
321        if schema_definition_length != 0 {
322            return Err(anyhow!(
323                "This version of dbz can't parse schema definitions"
324            ));
325        }
326        pos += Self::U32_SIZE + (schema_definition_length as usize);
327        let symbols = Self::decode_repeated_symbol_cstr(var_buffer.as_slice(), &mut pos)
328            .with_context(|| "Failed to parse symbols")?;
329        let partial = Self::decode_repeated_symbol_cstr(var_buffer.as_slice(), &mut pos)
330            .with_context(|| "Failed to parse partial")?;
331        let not_found = Self::decode_repeated_symbol_cstr(var_buffer.as_slice(), &mut pos)
332            .with_context(|| "Failed to parse not_found")?;
333        let mappings = Self::decode_symbol_mappings(var_buffer.as_slice(), &mut pos)?;
334
335        Ok(Self {
336            version,
337            dataset,
338            schema,
339            stype_in,
340            stype_out,
341            start,
342            end,
343            limit,
344            compression,
345            record_count,
346            symbols,
347            partial,
348            not_found,
349            mappings,
350        })
351    }
352
353    fn decode_repeated_symbol_cstr(buffer: &[u8], pos: &mut usize) -> anyhow::Result<Vec<String>> {
354        if *pos + Self::U32_SIZE > buffer.len() {
355            return Err(anyhow!("Unexpected end of metadata buffer"));
356        }
357        let count = u32::from_le_slice(&buffer[*pos..]) as usize;
358        *pos += Self::U32_SIZE;
359        let read_size = count * Self::SYMBOL_CSTR_LEN;
360        if *pos + read_size > buffer.len() {
361            return Err(anyhow!("Unexpected end of metadata buffer"));
362        }
363        let mut res = Vec::with_capacity(count);
364        for i in 0..count {
365            res.push(
366                Self::decode_symbol(buffer, pos)
367                    .with_context(|| format!("Failed to decode symbol at index {i}"))?,
368            );
369        }
370        Ok(res)
371    }
372
373    fn decode_symbol_mappings(
374        buffer: &[u8],
375        pos: &mut usize,
376    ) -> anyhow::Result<Vec<SymbolMapping>> {
377        if *pos + Self::U32_SIZE > buffer.len() {
378            return Err(anyhow!("Unexpected end of metadata buffer"));
379        }
380        let count = u32::from_le_slice(&buffer[*pos..]) as usize;
381        *pos += Self::U32_SIZE;
382        let mut res = Vec::with_capacity(count);
383        // Because each `SymbolMapping` itself is of a variable length, decoding it requires frequent bounds checks
384        for i in 0..count {
385            res.push(
386                Self::decode_symbol_mapping(buffer, pos)
387                    .with_context(|| format!("Failed to parse symbol mapping at index {i}"))?,
388            );
389        }
390        Ok(res)
391    }
392
393    fn decode_symbol_mapping(buffer: &[u8], pos: &mut usize) -> anyhow::Result<SymbolMapping> {
394        const MIN_SYMBOL_MAPPING_ENCODED_SIZE: usize =
395            Metadata::SYMBOL_CSTR_LEN + Metadata::U32_SIZE;
396        const MAPPING_INTERVAL_ENCODED_SIZE: usize =
397            Metadata::U32_SIZE * 2 + Metadata::SYMBOL_CSTR_LEN;
398
399        if *pos + MIN_SYMBOL_MAPPING_ENCODED_SIZE > buffer.len() {
400            return Err(anyhow!(
401                "Unexpected end of metadata buffer while parsing symbol mapping"
402            ));
403        }
404        let native =
405            Self::decode_symbol(buffer, pos).with_context(|| "Couldn't parse native symbol")?;
406        let interval_count = u32::from_le_slice(&buffer[*pos..]) as usize;
407        *pos += Self::U32_SIZE;
408        let read_size = interval_count * MAPPING_INTERVAL_ENCODED_SIZE;
409        if *pos + read_size > buffer.len() {
410            return Err(anyhow!(
411                "Symbol mapping interval_count ({interval_count}) doesn't match size of buffer \
412                which only contains space for {} intervals",
413                (buffer.len() - *pos) / MAPPING_INTERVAL_ENCODED_SIZE
414            ));
415        }
416        let mut intervals = Vec::with_capacity(interval_count);
417        for i in 0..interval_count {
418            let raw_start_date = u32::from_le_slice(&buffer[*pos..]);
419            *pos += Metadata::U32_SIZE;
420            let start_date = Self::decode_iso8601(raw_start_date).with_context(|| {
421                format!("Failed to parse start date of mapping interval at index {i}")
422            })?;
423            let raw_end_date = u32::from_le_slice(&buffer[*pos..]);
424            *pos += Metadata::U32_SIZE;
425            let end_date = Self::decode_iso8601(raw_end_date).with_context(|| {
426                format!("Failed to parse end date of mapping interval at index {i}")
427            })?;
428            let symbol = Self::decode_symbol(buffer, pos).with_context(|| {
429                format!("Failed to parse symbol for mapping interval at index {i}")
430            })?;
431            intervals.push(MappingInterval {
432                start_date,
433                end_date,
434                symbol,
435            });
436        }
437        Ok(SymbolMapping { native, intervals })
438    }
439
440    fn decode_symbol(buffer: &[u8], pos: &mut usize) -> anyhow::Result<String> {
441        let symbol_slice = &buffer[*pos..*pos + Self::SYMBOL_CSTR_LEN];
442        let symbol = std::str::from_utf8(symbol_slice)
443            .with_context(|| format!("Failed to decode bytes {symbol_slice:?}"))?
444            // remove null bytes
445            .trim_end_matches('\0')
446            .to_owned();
447        *pos += Self::SYMBOL_CSTR_LEN;
448        Ok(symbol)
449    }
450
451    fn decode_iso8601(raw: u32) -> anyhow::Result<time::Date> {
452        let year = raw / 10_000;
453        let remaining = raw % 10_000;
454        let raw_month = remaining / 100;
455        let month = u8::try_from(raw_month)
456            .map_err(|e| anyhow!(e))
457            .and_then(|m| time::Month::try_from(m).map_err(|e| anyhow!(e)))
458            .with_context(|| {
459                format!("Invalid month {raw_month} while parsing {raw} into a date")
460            })?;
461        let day = remaining % 100;
462        time::Date::from_calendar_date(year as i32, month, day as u8)
463            .with_context(|| format!("Couldn't convert {raw} to a valid date"))
464    }
465}
466
467#[cfg(test)]
468mod tests {
469    use super::*;
470    use databento_defs::record::{Mbp10Msg, Mbp1Msg, OhlcvMsg, TbboMsg, TickMsg, TradeMsg};
471
472    const DBZ_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data");
473
474    /// there are crates like rstest that provide pytest-like parameterized tests, however
475    /// they don't support passing types
476    macro_rules! test_reading_dbz {
477        // Rust doesn't allow concatenating identifiers in stable rust, so each test case needs
478        // to be named explicitly
479        ($test_name:ident, $record_type:ident, $schema:expr) => {
480            #[test]
481            fn $test_name() {
482                let target =
483                    Dbz::from_file(format!("{DBZ_PATH}/test_data.{}.dbz", $schema.as_str()))
484                        .unwrap();
485                let exp_row_count = target.metadata().record_count;
486                assert_eq!(target.schema(), $schema);
487                let actual_row_count = target.try_into_iter::<$record_type>().unwrap().count();
488                assert_eq!(exp_row_count as usize, actual_row_count);
489            }
490        };
491    }
492
493    test_reading_dbz!(test_reading_mbo, TickMsg, Schema::Mbo);
494    test_reading_dbz!(test_reading_mbp1, Mbp1Msg, Schema::Mbp1);
495    test_reading_dbz!(test_reading_mbp10, Mbp10Msg, Schema::Mbp10);
496    test_reading_dbz!(test_reading_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D);
497    test_reading_dbz!(test_reading_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H);
498    test_reading_dbz!(test_reading_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M);
499    test_reading_dbz!(test_reading_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S);
500    test_reading_dbz!(test_reading_tbbo, TbboMsg, Schema::Tbbo);
501    test_reading_dbz!(test_reading_trades, TradeMsg, Schema::Trades);
502
503    #[test]
504    fn test_decode_symbol() {
505        let bytes = b"SPX.1.2\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
506        assert_eq!(bytes.len(), Metadata::SYMBOL_CSTR_LEN);
507        let mut pos = 0;
508        let res = Metadata::decode_symbol(bytes.as_slice(), &mut pos).unwrap();
509        assert_eq!(pos, Metadata::SYMBOL_CSTR_LEN);
510        assert_eq!(&res, "SPX.1.2");
511    }
512
513    #[test]
514    fn test_decode_symbol_invalid_utf8() {
515        const BYTES: [u8; 22] = [
516            // continuation byte
517            0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
518        ];
519        let mut pos = 0;
520        let res = Metadata::decode_symbol(BYTES.as_slice(), &mut pos);
521        assert!(matches!(res, Err(e) if e.to_string().contains("Failed to decode bytes [")));
522    }
523
524    #[test]
525    fn test_decode_iso8601_valid() {
526        let res = Metadata::decode_iso8601(20151031).unwrap();
527        let exp: time::Date =
528            time::Date::from_calendar_date(2015, time::Month::October, 31).unwrap();
529        assert_eq!(res, exp);
530    }
531
532    #[test]
533    fn test_decode_iso8601_invalid_month() {
534        let res = Metadata::decode_iso8601(20101305);
535        assert!(matches!(res, Err(e) if e.to_string().contains("Invalid month")));
536    }
537
538    #[test]
539    fn test_decode_iso8601_invalid_day() {
540        let res = Metadata::decode_iso8601(20100600);
541        assert!(matches!(res, Err(e) if e.to_string().contains("a valid date")));
542    }
543}