Skip to main content

dbn/
metadata.rs

1//! Contains [`Metadata`] struct which comes at the beginning of any DBN file or
2//! stream and [`MetadataBuilder`] for creating a [`Metadata`] with defaults.
3
4mod merge;
5
6use std::num::NonZeroU64;
7
8// Dummy derive macro to get around `cfg_attr` incompatibility of several
9// of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780
10#[cfg(not(feature = "python"))]
11use dbn_macros::MockPyo3;
12
13use merge::MetadataMerger;
14#[cfg(feature = "serde")]
15use serde::Deserialize;
16
17use crate::{
18    compat::version_symbol_cstr_len, record::as_u8_slice, PitSymbolMap, SType, Schema, TsSymbolMap,
19    VersionUpgradePolicy,
20};
21
22/// Information about the data contained in a DBN file or stream. DBN requires the
23/// Metadata to be included at the start of the encoded data.
24#[derive(Debug, Clone, PartialEq, Eq)]
25#[cfg_attr(
26    feature = "python",
27    pyo3::pyclass(eq, from_py_object, module = "databento_dbn")
28)]
29#[cfg_attr(not(feature = "python"), derive(MockPyo3))] // bring `pyo3` attribute into scope
30pub struct Metadata {
31    /// The DBN schema version number. Newly-encoded DBN files will use
32    /// [`crate::DBN_VERSION`].
33    #[pyo3(get)]
34    pub version: u8,
35    /// The dataset code.
36    #[pyo3(get)]
37    pub dataset: String,
38    /// The data record schema. Specifies which record types are in the DBN stream.
39    /// `None` indicates the DBN stream _may_ contain more than one record type.
40    #[pyo3(get)]
41    pub schema: Option<Schema>,
42    /// The UNIX nanosecond timestamp of the query start, or the first record if the
43    /// file was split.
44    #[pyo3(get)]
45    pub start: u64,
46    /// The UNIX nanosecond timestamp of the query end, or the last record if the file
47    /// was split.
48    #[pyo3(get)]
49    pub end: Option<NonZeroU64>,
50    /// The optional maximum number of records for the query.
51    #[pyo3(get)]
52    pub limit: Option<NonZeroU64>,
53    /// The input symbology type to map from. `None` indicates a mix, such as in the
54    /// case of live data.
55    #[pyo3(get)]
56    pub stype_in: Option<SType>,
57    /// The output symbology type to map to.
58    #[pyo3(get)]
59    pub stype_out: SType,
60    /// `true` if this store contains live data with send timestamps appended to each
61    /// record.
62    #[pyo3(get)]
63    pub ts_out: bool,
64    /// The length in bytes of fixed-length symbol strings, including a null terminator
65    /// byte.
66    #[pyo3(get)]
67    pub symbol_cstr_len: usize,
68    /// The original query input symbols from the request.
69    #[pyo3(get)]
70    pub symbols: Vec<String>,
71    /// Symbols that did not resolve for _at least one day_ in the query time range.
72    #[pyo3(get)]
73    pub partial: Vec<String>,
74    /// Symbols that did not resolve for _any_ day in the query time range.
75    #[pyo3(get)]
76    pub not_found: Vec<String>,
77    /// Symbol mappings containing a raw symbol and its mapping intervals.
78    pub mappings: Vec<SymbolMapping>,
79}
80
81impl Metadata {
82    /// Creates a builder for building `Metadata`. Call `.dataset(...)`, `.schema(...)`,
83    /// `.start(...)` `.stype_in(...)`, and `.stype_out(...)` on the builder to set the
84    /// required fields. Finally call `.build()` to create the `Metadata` instance.
85    pub fn builder() -> MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {
86        MetadataBuilder::default()
87    }
88
89    /// Parses the raw query start into a datetime.
90    pub fn start(&self) -> time::OffsetDateTime {
91        // `u64::MAX` is within the allowable range for `OffsetDateTime`s
92        time::OffsetDateTime::from_unix_timestamp_nanos(self.start as i128).unwrap()
93    }
94
95    /// Parses the raw query end time or the timestamp of the last record into a
96    /// datetime. Returns `None` if  the end time was not specified.
97    pub fn end(&self) -> Option<time::OffsetDateTime> {
98        self.end
99            .map(|end| time::OffsetDateTime::from_unix_timestamp_nanos(end.get() as i128).unwrap())
100    }
101
102    /// Creates a symbology mapping from instrument ID to text symbol for the given
103    /// date.
104    ///
105    /// This method is useful when working with a historical request over a single day
106    /// or in other situations where you're sure the mappings don't change during the
107    /// time range of the request. Otherwise, [`Self::symbol_map()`] is recommended.
108    ///
109    /// # Errors
110    /// This function returns an error if `stype_out` is not [`SType::InstrumentId`] or
111    /// it can't parse a symbol into a `u32` instrument ID. It will also return an error
112    /// if `date` is outside the query range.
113    pub fn symbol_map_for_date(&self, date: time::Date) -> crate::Result<PitSymbolMap> {
114        PitSymbolMap::from_metadata(self, date)
115    }
116
117    /// Creates a symbology mapping from instrument ID and date to text symbol.
118    ///
119    /// If you're working with a single date or otherwise don't expect the mappings to
120    /// change, [`Self::symbol_map_for_date()`] is recommended.
121    ///
122    /// # Errors
123    /// This function returns an error if `stype_out` is not [`SType::InstrumentId`] or
124    /// it can't parse a symbol into a `u32` instrument ID.
125    pub fn symbol_map(&self) -> crate::Result<TsSymbolMap> {
126        TsSymbolMap::from_metadata(self)
127    }
128
129    /// Upgrades the metadata according to `upgrade_policy` if necessary.
130    pub fn upgrade(&mut self, upgrade_policy: VersionUpgradePolicy) {
131        if self.version < 2 {
132            match upgrade_policy {
133                VersionUpgradePolicy::AsIs => {
134                    self.symbol_cstr_len = crate::v1::SYMBOL_CSTR_LEN;
135                }
136                VersionUpgradePolicy::UpgradeToV2 => {
137                    self.version = 2;
138                    self.symbol_cstr_len = crate::v2::SYMBOL_CSTR_LEN;
139                }
140                VersionUpgradePolicy::UpgradeToV3 => {
141                    self.version = 3;
142                    self.symbol_cstr_len = crate::v3::SYMBOL_CSTR_LEN;
143                }
144            }
145        } else if self.version == 2 && upgrade_policy == VersionUpgradePolicy::UpgradeToV3 {
146            self.version = 3;
147        }
148    }
149
150    /// Attempts to merge another metadata into this one. This is useful for merging
151    /// DBN streams.
152    ///
153    /// If merging data from multiple schemas, the resulting metadata will have a schema
154    /// of `None`.
155    ///
156    /// # Errors
157    /// Merging metadata where any of the following fields don't match will result in
158    /// an error:
159    /// - `version`: upgrade the metadata of the lower version before merging
160    /// - `dataset`
161    /// - `stype_in`
162    /// - `stype_out`
163    /// - `ts_out`
164    /// - `symbol_cstr_len`: upgrade the metadata of the lower version before merging
165    ///
166    /// This function will also return an error if there are conflicting symbology
167    /// mappings.
168    pub fn merge(self, other: impl IntoIterator<Item = Metadata>) -> crate::Result<Self> {
169        let mut merger = MetadataMerger::new(self);
170        for metadata in other {
171            merger.merge(metadata)?;
172        }
173        Ok(merger.finalize())
174    }
175
176    /// Returns `true` if the metadata is for inverse mappings, where `stype_in` is
177    /// [`SType::InstrumentId`].
178    ///
179    /// # Errors
180    /// This function returns an error if neither `stype_in` and `stype_out` are
181    /// [`SType::InstrumentId`].
182    pub fn is_inverse(&self) -> crate::Result<bool> {
183        match (self.stype_in, self.stype_out) {
184        (_, SType::InstrumentId) => Ok(false),
185        (Some(SType::InstrumentId), _) => Ok(true),
186        _ => {
187            Err(crate::Error::BadArgument {
188                param_name: "self".to_owned(),
189                desc: "Can only create symbol maps from metadata where either stype_out or stype_in is instrument ID".to_owned(),
190            })
191        }
192    }
193    }
194}
195
196/// Helper for constructing [`Metadata`] structs with defaults.
197///
198/// This struct uses type state to ensure at compile time that all the required fields
199/// are set. If a required field is not set, `build()` won't be visible.
200///
201/// # Required fields
202/// - [`dataset`](Metadata::dataset)
203/// - [`schema`](Metadata::schema)
204/// - [`start`](Metadata::start)
205/// - [`stype_in`](Metadata::stype_in)
206/// - [`stype_out`](Metadata::stype_out)
207#[derive(Debug)]
208pub struct MetadataBuilder<D, Sch, Start, StIn, StOut> {
209    version: u8,
210    dataset: D,
211    schema: Sch,
212    start: Start,
213    end: Option<NonZeroU64>,
214    limit: Option<NonZeroU64>,
215    stype_in: StIn,
216    stype_out: StOut,
217    ts_out: bool,
218    symbols: Vec<String>,
219    partial: Vec<String>,
220    not_found: Vec<String>,
221    mappings: Vec<SymbolMapping>,
222}
223
224/// Sentinel type for a required field that has not yet been set.
225pub struct Unset {}
226
227impl MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {
228    /// Creates a new instance of the builder.
229    pub fn new() -> Self {
230        Self::default()
231    }
232}
233
234impl AsRef<[u8]> for Metadata {
235    fn as_ref(&self) -> &[u8] {
236        unsafe { as_u8_slice(self) }
237    }
238}
239
240impl<D, Sch, Start, StIn, StOut> MetadataBuilder<D, Sch, Start, StIn, StOut> {
241    /// Sets [`version`](Metadata::version) and returns the builder.
242    pub fn version(mut self, version: u8) -> Self {
243        self.version = version;
244        self
245    }
246
247    /// Sets [`dataset`](Metadata::dataset) and returns the builder.
248    pub fn dataset(
249        self,
250        dataset: impl ToString,
251    ) -> MetadataBuilder<String, Sch, Start, StIn, StOut> {
252        MetadataBuilder {
253            version: self.version,
254            dataset: dataset.to_string(),
255            schema: self.schema,
256            start: self.start,
257            end: self.end,
258            limit: self.limit,
259            stype_in: self.stype_in,
260            stype_out: self.stype_out,
261            ts_out: self.ts_out,
262            symbols: self.symbols,
263            partial: self.partial,
264            not_found: self.not_found,
265            mappings: self.mappings,
266        }
267    }
268
269    /// Sets [`schema`](Metadata::schema) and returns the builder.
270    pub fn schema(
271        self,
272        schema: Option<Schema>,
273    ) -> MetadataBuilder<D, Option<Schema>, Start, StIn, StOut> {
274        MetadataBuilder {
275            version: self.version,
276            dataset: self.dataset,
277            schema,
278            start: self.start,
279            end: self.end,
280            limit: self.limit,
281            stype_in: self.stype_in,
282            stype_out: self.stype_out,
283            ts_out: self.ts_out,
284            symbols: self.symbols,
285            partial: self.partial,
286            not_found: self.not_found,
287            mappings: self.mappings,
288        }
289    }
290
291    /// Sets [`start`](Metadata::start) and returns the builder.
292    pub fn start(self, start: u64) -> MetadataBuilder<D, Sch, u64, StIn, StOut> {
293        MetadataBuilder {
294            version: self.version,
295            dataset: self.dataset,
296            schema: self.schema,
297            start,
298            end: self.end,
299            limit: self.limit,
300            stype_in: self.stype_in,
301            stype_out: self.stype_out,
302            symbols: self.symbols,
303            ts_out: self.ts_out,
304            partial: self.partial,
305            not_found: self.not_found,
306            mappings: self.mappings,
307        }
308    }
309
310    /// Sets [`end`](Metadata::end) and returns the builder.
311    pub fn end(mut self, end: Option<NonZeroU64>) -> Self {
312        self.end = end;
313        self
314    }
315
316    /// Sets [`limit`](Metadata::limit) and returns the builder.
317    pub fn limit(mut self, limit: Option<NonZeroU64>) -> Self {
318        self.limit = limit;
319        self
320    }
321
322    /// Sets [`stype_in`](Metadata::stype_in) and returns the builder.
323    pub fn stype_in(
324        self,
325        stype_in: Option<SType>,
326    ) -> MetadataBuilder<D, Sch, Start, Option<SType>, StOut> {
327        MetadataBuilder {
328            version: self.version,
329            dataset: self.dataset,
330            schema: self.schema,
331            start: self.start,
332            end: self.end,
333            limit: self.limit,
334            stype_in,
335            stype_out: self.stype_out,
336            ts_out: self.ts_out,
337            symbols: self.symbols,
338            partial: self.partial,
339            not_found: self.not_found,
340            mappings: self.mappings,
341        }
342    }
343
344    /// Sets [`stype_out`](Metadata::stype_out) and returns the builder.
345    pub fn stype_out(self, stype_out: SType) -> MetadataBuilder<D, Sch, Start, StIn, SType> {
346        MetadataBuilder {
347            version: self.version,
348            dataset: self.dataset,
349            schema: self.schema,
350            start: self.start,
351            end: self.end,
352            limit: self.limit,
353            stype_in: self.stype_in,
354            stype_out,
355            ts_out: self.ts_out,
356            symbols: self.symbols,
357            partial: self.partial,
358            not_found: self.not_found,
359            mappings: self.mappings,
360        }
361    }
362
363    /// Sets [`ts_out`](Metadata::ts_out) and returns the builder.
364    pub fn ts_out(mut self, ts_out: bool) -> Self {
365        self.ts_out = ts_out;
366        self
367    }
368
369    /// Sets [`symbols`](Metadata::symbols) and returns the builder.
370    pub fn symbols(mut self, symbols: Vec<String>) -> Self {
371        self.symbols = symbols;
372        self
373    }
374
375    /// Sets [`partial`](Metadata::partial) and returns the builder.
376    pub fn partial(mut self, partial: Vec<String>) -> Self {
377        self.partial = partial;
378        self
379    }
380
381    /// Sets [`not_found`](Metadata::not_found) and returns the builder.
382    pub fn not_found(mut self, not_found: Vec<String>) -> Self {
383        self.not_found = not_found;
384        self
385    }
386
387    /// Sets [`mappings`](Metadata::mappings) and returns the builder.
388    pub fn mappings(mut self, mappings: Vec<SymbolMapping>) -> Self {
389        self.mappings = mappings;
390        self
391    }
392}
393
394impl MetadataBuilder<String, Option<Schema>, u64, Option<SType>, SType> {
395    /// Constructs a [`Metadata`] object. The availability of this method indicates all
396    /// required fields have been set.
397    pub fn build(self) -> Metadata {
398        Metadata {
399            version: self.version,
400            dataset: self.dataset,
401            schema: self.schema,
402            start: self.start,
403            end: self.end,
404            limit: self.limit,
405            stype_in: self.stype_in,
406            stype_out: self.stype_out,
407            ts_out: self.ts_out,
408            symbols: self.symbols,
409            partial: self.partial,
410            not_found: self.not_found,
411            mappings: self.mappings,
412            symbol_cstr_len: version_symbol_cstr_len(self.version),
413        }
414    }
415}
416
417impl Default for MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {
418    fn default() -> Self {
419        Self {
420            version: crate::DBN_VERSION,
421            dataset: Unset {},
422            schema: Unset {},
423            start: Unset {},
424            end: None,
425            limit: None,
426            stype_in: Unset {},
427            stype_out: Unset {},
428            ts_out: false,
429            symbols: vec![],
430            partial: vec![],
431            not_found: vec![],
432            mappings: vec![],
433        }
434    }
435}
436
437/// A raw symbol and its symbol mappings for different time ranges within the query range.
438#[derive(Debug, Clone, PartialEq, Eq)]
439#[cfg_attr(feature = "serde", derive(Deserialize))]
440#[cfg_attr(feature = "python", derive(pyo3::FromPyObject))]
441pub struct SymbolMapping {
442    /// The `stype_in` symbol.
443    pub raw_symbol: String,
444    /// The mappings of `raw_symbol` to `stype_out` for different date ranges.
445    pub intervals: Vec<MappingInterval>,
446}
447
448/// The resolved symbol for a date range.
449#[derive(Debug, Clone, PartialEq, Eq)]
450#[cfg_attr(feature = "serde", derive(Deserialize))]
451pub struct MappingInterval {
452    /// The UTC start date of interval (inclusive).
453    #[cfg_attr(
454        feature = "serde",
455        serde(rename = "d0", deserialize_with = "deserialize_date")
456    )]
457    pub start_date: time::Date,
458    /// The UTC end date of interval (exclusive).
459    #[cfg_attr(
460        feature = "serde",
461        serde(rename = "d1", deserialize_with = "deserialize_date")
462    )]
463    pub end_date: time::Date,
464    /// The resolved symbol for this interval (in `stype_out`).
465    #[cfg_attr(feature = "serde", serde(rename = "s"))]
466    pub symbol: String,
467}
468
469/// The date format used for date strings when serializing [`Metadata`].
470pub const DATE_FORMAT: &[time::format_description::BorrowedFormatItem<'static>] =
471    time::macros::format_description!("[year]-[month]-[day]");
472
473#[cfg(feature = "serde")]
474fn deserialize_date<'de, D: serde::Deserializer<'de>>(
475    deserializer: D,
476) -> Result<time::Date, D::Error> {
477    let date_str = String::deserialize(deserializer)?;
478    time::Date::parse(&date_str, DATE_FORMAT).map_err(serde::de::Error::custom)
479}
480
481#[cfg(test)]
482mod tests {
483    use rstest::*;
484
485    use crate::Dataset;
486
487    use super::*;
488
489    #[rstest]
490    #[case(VersionUpgradePolicy::AsIs, 1)]
491    #[case(VersionUpgradePolicy::UpgradeToV2, 2)]
492    #[case(VersionUpgradePolicy::UpgradeToV3, 3)]
493    fn test_upgrade_metadata(
494        #[case] upgrade_policy: VersionUpgradePolicy,
495        #[case] exp_version: u8,
496    ) {
497        let mut target = Metadata::builder()
498            .version(1)
499            .dataset(Dataset::OpraPillar)
500            .schema(Some(Schema::Mbp1))
501            .start(0)
502            .stype_in(None)
503            .stype_out(SType::InstrumentId)
504            .build();
505        assert_eq!(target.version, 1);
506        target.upgrade(upgrade_policy);
507        assert_eq!(target.version, exp_version);
508    }
509}