1use std::{
2 fs::File,
3 io::{self, BufReader, Read},
4 marker::PhantomData,
5 mem,
6 path::Path,
7};
8
9use anyhow::{anyhow, Context};
10use log::{debug, warn};
11use serde::Serialize;
12use streaming_iterator::StreamingIterator;
13use zstd::Decoder;
14
15use databento_defs::{
16 enums::{Compression, SType, Schema},
17 record::{transmute_record_bytes, ConstTypeId},
18};
19
20use crate::write::dbz::SCHEMA_VERSION;
21
22#[derive(Debug)]
24pub struct Dbz<R: io::BufRead> {
25 reader: R,
26 metadata: Metadata,
27}
28
29#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
31pub struct Metadata {
32 pub version: u8,
34 pub dataset: String,
36 pub schema: Schema,
38 pub start: u64,
40 pub end: u64,
42 pub limit: u64,
44 pub record_count: u64,
46 pub compression: Compression,
48 pub stype_in: SType,
50 pub stype_out: SType,
52 pub symbols: Vec<String>,
54 pub partial: Vec<String>,
56 pub not_found: Vec<String>,
58 pub mappings: Vec<SymbolMapping>,
60}
61
62#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
64#[cfg_attr(
65 any(feature = "python", feature = "python-test"),
66 derive(pyo3::FromPyObject)
67)]
68pub struct SymbolMapping {
69 pub native: String,
71 pub intervals: Vec<MappingInterval>,
73}
74
75#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
77pub struct MappingInterval {
78 #[serde(serialize_with = "serialize_date")]
80 pub start_date: time::Date,
81 #[serde(serialize_with = "serialize_date")]
83 pub end_date: time::Date,
84 pub symbol: String,
86}
87
88fn serialize_date<S: serde::Serializer>(
90 date: &time::Date,
91 serializer: S,
92) -> Result<S::Ok, S::Error> {
93 serializer.serialize_str(&date.to_string()) }
95
96impl Dbz<BufReader<File>> {
97 pub fn from_file(path: impl AsRef<Path>) -> anyhow::Result<Self> {
104 let file = File::open(path.as_ref()).with_context(|| {
105 format!(
106 "Error opening dbz file at path '{}'",
107 path.as_ref().display()
108 )
109 })?;
110 let reader = BufReader::new(file);
111 Self::new(reader)
112 }
113}
114
115impl<R: io::BufRead> Dbz<R> {
118 pub fn new(mut reader: R) -> anyhow::Result<Self> {
123 let metadata = Metadata::read(&mut reader)?;
124 Ok(Self { reader, metadata })
125 }
126
127 pub fn schema(&self) -> Schema {
130 self.metadata.schema
131 }
132
133 pub fn metadata(&self) -> &Metadata {
135 &self.metadata
136 }
137
138 pub fn try_into_iter<T: ConstTypeId>(self) -> anyhow::Result<DbzStreamIter<R, T>> {
145 DbzStreamIter::new(self.reader, self.metadata)
146 }
147}
148
149pub struct DbzStreamIter<R: io::BufRead, T> {
152 metadata: Metadata,
154 decoder: Decoder<'static, R>,
158 i: usize,
160 buffer: Vec<u8>,
162 _item: PhantomData<T>,
164}
165
166impl<R: io::BufRead, T> DbzStreamIter<R, T> {
167 pub(crate) fn new(reader: R, metadata: Metadata) -> anyhow::Result<Self> {
168 let decoder = Decoder::with_buffer(reader)?;
169 Ok(DbzStreamIter {
170 metadata,
171 decoder,
172 i: 0,
173 buffer: vec![0; mem::size_of::<T>()],
174 _item: PhantomData {},
175 })
176 }
177}
178
179impl<R: io::BufRead, T: ConstTypeId> StreamingIterator for DbzStreamIter<R, T> {
180 type Item = T;
181
182 fn advance(&mut self) {
183 if let Err(e) = self.decoder.read_exact(&mut self.buffer) {
184 warn!("Failed to read from DBZ decoder: {e:?}");
185 self.i = self.metadata.record_count as usize + 1;
186 }
187 self.i += 1;
188 }
189
190 fn get(&self) -> Option<&Self::Item> {
191 if self.i > self.metadata.record_count as usize {
192 return None;
193 }
194 unsafe { transmute_record_bytes(self.buffer.as_slice()) }
196 }
197
198 fn size_hint(&self) -> (usize, Option<usize>) {
200 let remaining = self.metadata.record_count as usize - self.i;
201 (remaining, Some(remaining))
204 }
205}
206
207pub(crate) trait FromLittleEndianSlice {
208 fn from_le_slice(slice: &[u8]) -> Self;
209}
210
211impl FromLittleEndianSlice for u64 {
212 fn from_le_slice(slice: &[u8]) -> Self {
214 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
215 Self::from_le_bytes(bytes.try_into().unwrap())
216 }
217}
218
219impl FromLittleEndianSlice for i32 {
220 fn from_le_slice(slice: &[u8]) -> Self {
222 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
223 Self::from_le_bytes(bytes.try_into().unwrap())
224 }
225}
226
227impl FromLittleEndianSlice for u32 {
228 fn from_le_slice(slice: &[u8]) -> Self {
230 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
231 Self::from_le_bytes(bytes.try_into().unwrap())
232 }
233}
234
235impl FromLittleEndianSlice for u16 {
236 fn from_le_slice(slice: &[u8]) -> Self {
238 let (bytes, _) = slice.split_at(mem::size_of::<Self>());
239 Self::from_le_bytes(bytes.try_into().unwrap())
240 }
241}
242
243impl Metadata {
244 const U32_SIZE: usize = mem::size_of::<u32>();
245
246 pub(crate) fn read(reader: &mut impl io::Read) -> anyhow::Result<Self> {
247 let mut prelude_buffer = [0u8; 2 * mem::size_of::<i32>()];
248 reader
249 .read_exact(&mut prelude_buffer)
250 .with_context(|| "Failed to read metadata prelude")?;
251 let magic = u32::from_le_slice(&prelude_buffer[..4]);
252 if !Self::ZSTD_MAGIC_RANGE.contains(&magic) {
253 return Err(anyhow!("Invalid metadata: no zstd magic number"));
254 }
255 let frame_size = u32::from_le_slice(&prelude_buffer[4..]);
256 debug!("magic={magic}, frame_size={frame_size}");
257 if (frame_size as usize) < Self::FIXED_METADATA_LEN {
258 return Err(anyhow!(
259 "Frame length cannot be shorter than the fixed metadata size"
260 ));
261 }
262
263 let mut metadata_buffer = vec![0u8; frame_size as usize];
264 reader
265 .read_exact(&mut metadata_buffer)
266 .with_context(|| "Failed to read metadata")?;
267 Self::decode(metadata_buffer)
268 }
269
270 fn decode(metadata_buffer: Vec<u8>) -> anyhow::Result<Self> {
271 const U64_SIZE: usize = mem::size_of::<u64>();
272 let mut pos = 0;
273 if &metadata_buffer[pos..pos + 3] != b"DBZ" {
274 return Err(anyhow!("Invalid version string"));
275 }
276 let version = metadata_buffer[pos + 3] as u8;
278 if version > SCHEMA_VERSION {
280 return Err(anyhow!("Can't read newer version of DBZ"));
281 }
282 pos += Self::VERSION_CSTR_LEN;
283 let dataset = std::str::from_utf8(&metadata_buffer[pos..pos + Self::DATASET_CSTR_LEN])
284 .with_context(|| "Failed to read dataset from metadata")?
285 .trim_end_matches('\0')
287 .to_owned();
288 pos += Self::DATASET_CSTR_LEN;
289 let schema = Schema::try_from(u16::from_le_slice(&metadata_buffer[pos..]))
290 .with_context(|| format!("Failed to read schema: '{}'", metadata_buffer[pos]))?;
291 pos += mem::size_of::<Schema>();
292 let start = u64::from_le_slice(&metadata_buffer[pos..]);
293 pos += U64_SIZE;
294 let end = u64::from_le_slice(&metadata_buffer[pos..]);
295 pos += U64_SIZE;
296 let limit = u64::from_le_slice(&metadata_buffer[pos..]);
297 pos += U64_SIZE;
298 let record_count = u64::from_le_slice(&metadata_buffer[pos..]);
299 pos += U64_SIZE;
300 let compression = Compression::try_from(metadata_buffer[pos])
301 .with_context(|| format!("Failed to parse compression '{}'", metadata_buffer[pos]))?;
302 pos += mem::size_of::<Compression>();
303 let stype_in = SType::try_from(metadata_buffer[pos])
304 .with_context(|| format!("Failed to read stype_in: '{}'", metadata_buffer[pos]))?;
305 pos += mem::size_of::<SType>();
306 let stype_out = SType::try_from(metadata_buffer[pos])
307 .with_context(|| format!("Failed to read stype_out: '{}'", metadata_buffer[pos]))?;
308 pos += mem::size_of::<SType>();
309 pos += Self::RESERVED_LEN;
311 let mut zstd_decoder = Decoder::new(&metadata_buffer[pos..])
313 .with_context(|| "Failed to read zstd-zipped variable-length metadata".to_owned())?;
314
315 let buffer_capacity = (metadata_buffer.len() - pos) * 3; let mut var_buffer = Vec::with_capacity(buffer_capacity);
318 zstd_decoder.read_to_end(&mut var_buffer)?;
319 pos = 0;
320 let schema_definition_length = u32::from_le_slice(&var_buffer[pos..]);
321 if schema_definition_length != 0 {
322 return Err(anyhow!(
323 "This version of dbz can't parse schema definitions"
324 ));
325 }
326 pos += Self::U32_SIZE + (schema_definition_length as usize);
327 let symbols = Self::decode_repeated_symbol_cstr(var_buffer.as_slice(), &mut pos)
328 .with_context(|| "Failed to parse symbols")?;
329 let partial = Self::decode_repeated_symbol_cstr(var_buffer.as_slice(), &mut pos)
330 .with_context(|| "Failed to parse partial")?;
331 let not_found = Self::decode_repeated_symbol_cstr(var_buffer.as_slice(), &mut pos)
332 .with_context(|| "Failed to parse not_found")?;
333 let mappings = Self::decode_symbol_mappings(var_buffer.as_slice(), &mut pos)?;
334
335 Ok(Self {
336 version,
337 dataset,
338 schema,
339 stype_in,
340 stype_out,
341 start,
342 end,
343 limit,
344 compression,
345 record_count,
346 symbols,
347 partial,
348 not_found,
349 mappings,
350 })
351 }
352
353 fn decode_repeated_symbol_cstr(buffer: &[u8], pos: &mut usize) -> anyhow::Result<Vec<String>> {
354 if *pos + Self::U32_SIZE > buffer.len() {
355 return Err(anyhow!("Unexpected end of metadata buffer"));
356 }
357 let count = u32::from_le_slice(&buffer[*pos..]) as usize;
358 *pos += Self::U32_SIZE;
359 let read_size = count * Self::SYMBOL_CSTR_LEN;
360 if *pos + read_size > buffer.len() {
361 return Err(anyhow!("Unexpected end of metadata buffer"));
362 }
363 let mut res = Vec::with_capacity(count);
364 for i in 0..count {
365 res.push(
366 Self::decode_symbol(buffer, pos)
367 .with_context(|| format!("Failed to decode symbol at index {i}"))?,
368 );
369 }
370 Ok(res)
371 }
372
373 fn decode_symbol_mappings(
374 buffer: &[u8],
375 pos: &mut usize,
376 ) -> anyhow::Result<Vec<SymbolMapping>> {
377 if *pos + Self::U32_SIZE > buffer.len() {
378 return Err(anyhow!("Unexpected end of metadata buffer"));
379 }
380 let count = u32::from_le_slice(&buffer[*pos..]) as usize;
381 *pos += Self::U32_SIZE;
382 let mut res = Vec::with_capacity(count);
383 for i in 0..count {
385 res.push(
386 Self::decode_symbol_mapping(buffer, pos)
387 .with_context(|| format!("Failed to parse symbol mapping at index {i}"))?,
388 );
389 }
390 Ok(res)
391 }
392
393 fn decode_symbol_mapping(buffer: &[u8], pos: &mut usize) -> anyhow::Result<SymbolMapping> {
394 const MIN_SYMBOL_MAPPING_ENCODED_SIZE: usize =
395 Metadata::SYMBOL_CSTR_LEN + Metadata::U32_SIZE;
396 const MAPPING_INTERVAL_ENCODED_SIZE: usize =
397 Metadata::U32_SIZE * 2 + Metadata::SYMBOL_CSTR_LEN;
398
399 if *pos + MIN_SYMBOL_MAPPING_ENCODED_SIZE > buffer.len() {
400 return Err(anyhow!(
401 "Unexpected end of metadata buffer while parsing symbol mapping"
402 ));
403 }
404 let native =
405 Self::decode_symbol(buffer, pos).with_context(|| "Couldn't parse native symbol")?;
406 let interval_count = u32::from_le_slice(&buffer[*pos..]) as usize;
407 *pos += Self::U32_SIZE;
408 let read_size = interval_count * MAPPING_INTERVAL_ENCODED_SIZE;
409 if *pos + read_size > buffer.len() {
410 return Err(anyhow!(
411 "Symbol mapping interval_count ({interval_count}) doesn't match size of buffer \
412 which only contains space for {} intervals",
413 (buffer.len() - *pos) / MAPPING_INTERVAL_ENCODED_SIZE
414 ));
415 }
416 let mut intervals = Vec::with_capacity(interval_count);
417 for i in 0..interval_count {
418 let raw_start_date = u32::from_le_slice(&buffer[*pos..]);
419 *pos += Metadata::U32_SIZE;
420 let start_date = Self::decode_iso8601(raw_start_date).with_context(|| {
421 format!("Failed to parse start date of mapping interval at index {i}")
422 })?;
423 let raw_end_date = u32::from_le_slice(&buffer[*pos..]);
424 *pos += Metadata::U32_SIZE;
425 let end_date = Self::decode_iso8601(raw_end_date).with_context(|| {
426 format!("Failed to parse end date of mapping interval at index {i}")
427 })?;
428 let symbol = Self::decode_symbol(buffer, pos).with_context(|| {
429 format!("Failed to parse symbol for mapping interval at index {i}")
430 })?;
431 intervals.push(MappingInterval {
432 start_date,
433 end_date,
434 symbol,
435 });
436 }
437 Ok(SymbolMapping { native, intervals })
438 }
439
440 fn decode_symbol(buffer: &[u8], pos: &mut usize) -> anyhow::Result<String> {
441 let symbol_slice = &buffer[*pos..*pos + Self::SYMBOL_CSTR_LEN];
442 let symbol = std::str::from_utf8(symbol_slice)
443 .with_context(|| format!("Failed to decode bytes {symbol_slice:?}"))?
444 .trim_end_matches('\0')
446 .to_owned();
447 *pos += Self::SYMBOL_CSTR_LEN;
448 Ok(symbol)
449 }
450
451 fn decode_iso8601(raw: u32) -> anyhow::Result<time::Date> {
452 let year = raw / 10_000;
453 let remaining = raw % 10_000;
454 let raw_month = remaining / 100;
455 let month = u8::try_from(raw_month)
456 .map_err(|e| anyhow!(e))
457 .and_then(|m| time::Month::try_from(m).map_err(|e| anyhow!(e)))
458 .with_context(|| {
459 format!("Invalid month {raw_month} while parsing {raw} into a date")
460 })?;
461 let day = remaining % 100;
462 time::Date::from_calendar_date(year as i32, month, day as u8)
463 .with_context(|| format!("Couldn't convert {raw} to a valid date"))
464 }
465}
466
467#[cfg(test)]
468mod tests {
469 use super::*;
470 use databento_defs::record::{Mbp10Msg, Mbp1Msg, OhlcvMsg, TbboMsg, TickMsg, TradeMsg};
471
472 const DBZ_PATH: &str = concat!(env!("CARGO_MANIFEST_DIR"), "/../../tests/data");
473
474 macro_rules! test_reading_dbz {
477 ($test_name:ident, $record_type:ident, $schema:expr) => {
480 #[test]
481 fn $test_name() {
482 let target =
483 Dbz::from_file(format!("{DBZ_PATH}/test_data.{}.dbz", $schema.as_str()))
484 .unwrap();
485 let exp_row_count = target.metadata().record_count;
486 assert_eq!(target.schema(), $schema);
487 let actual_row_count = target.try_into_iter::<$record_type>().unwrap().count();
488 assert_eq!(exp_row_count as usize, actual_row_count);
489 }
490 };
491 }
492
493 test_reading_dbz!(test_reading_mbo, TickMsg, Schema::Mbo);
494 test_reading_dbz!(test_reading_mbp1, Mbp1Msg, Schema::Mbp1);
495 test_reading_dbz!(test_reading_mbp10, Mbp10Msg, Schema::Mbp10);
496 test_reading_dbz!(test_reading_ohlcv1d, OhlcvMsg, Schema::Ohlcv1D);
497 test_reading_dbz!(test_reading_ohlcv1h, OhlcvMsg, Schema::Ohlcv1H);
498 test_reading_dbz!(test_reading_ohlcv1m, OhlcvMsg, Schema::Ohlcv1M);
499 test_reading_dbz!(test_reading_ohlcv1s, OhlcvMsg, Schema::Ohlcv1S);
500 test_reading_dbz!(test_reading_tbbo, TbboMsg, Schema::Tbbo);
501 test_reading_dbz!(test_reading_trades, TradeMsg, Schema::Trades);
502
503 #[test]
504 fn test_decode_symbol() {
505 let bytes = b"SPX.1.2\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0";
506 assert_eq!(bytes.len(), Metadata::SYMBOL_CSTR_LEN);
507 let mut pos = 0;
508 let res = Metadata::decode_symbol(bytes.as_slice(), &mut pos).unwrap();
509 assert_eq!(pos, Metadata::SYMBOL_CSTR_LEN);
510 assert_eq!(&res, "SPX.1.2");
511 }
512
513 #[test]
514 fn test_decode_symbol_invalid_utf8() {
515 const BYTES: [u8; 22] = [
516 0x80, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
518 ];
519 let mut pos = 0;
520 let res = Metadata::decode_symbol(BYTES.as_slice(), &mut pos);
521 assert!(matches!(res, Err(e) if e.to_string().contains("Failed to decode bytes [")));
522 }
523
524 #[test]
525 fn test_decode_iso8601_valid() {
526 let res = Metadata::decode_iso8601(20151031).unwrap();
527 let exp: time::Date =
528 time::Date::from_calendar_date(2015, time::Month::October, 31).unwrap();
529 assert_eq!(res, exp);
530 }
531
532 #[test]
533 fn test_decode_iso8601_invalid_month() {
534 let res = Metadata::decode_iso8601(20101305);
535 assert!(matches!(res, Err(e) if e.to_string().contains("Invalid month")));
536 }
537
538 #[test]
539 fn test_decode_iso8601_invalid_day() {
540 let res = Metadata::decode_iso8601(20100600);
541 assert!(matches!(res, Err(e) if e.to_string().contains("a valid date")));
542 }
543}