Skip to main content

dbn/
metadata.rs

1//! Contains [`Metadata`] struct which comes at the beginning of any DBN file or
2//! stream and [`MetadataBuilder`] for creating a [`Metadata`] with defaults.
3
4mod merge;
5
6use std::num::NonZeroU64;
7
8// Dummy derive macro to get around `cfg_attr` incompatibility of several
9// of pyo3's attribute macros. See https://github.com/PyO3/pyo3/issues/780
10#[cfg(not(feature = "python"))]
11use dbn_macros::MockPyo3;
12
13use merge::MetadataMerger;
14#[cfg(feature = "serde")]
15use serde::Deserialize;
16
17use crate::{
18    compat::version_symbol_cstr_len, record::as_u8_slice, PitSymbolMap, SType, Schema, TsSymbolMap,
19    VersionUpgradePolicy,
20};
21
22/// Information about the data contained in a DBN file or stream. DBN requires the
23/// Metadata to be included at the start of the encoded data.
24#[derive(Debug, Clone, PartialEq, Eq)]
25#[cfg_attr(
26    feature = "python",
27    pyo3::pyclass(eq, from_py_object, module = "databento_dbn")
28)]
29#[cfg_attr(not(feature = "python"), derive(MockPyo3))] // bring `pyo3` attribute into scope
30pub struct Metadata {
31    /// The DBN schema version number. Newly-encoded DBN files will use
32    /// [`crate::DBN_VERSION`].
33    #[pyo3(get)]
34    pub version: u8,
35    /// The dataset code.
36    #[pyo3(get)]
37    pub dataset: String,
38    /// The data record schema. Specifies which record types are in the DBN stream.
39    /// `None` indicates the DBN stream _may_ contain more than one record type.
40    #[pyo3(get)]
41    pub schema: Option<Schema>,
42    /// The UNIX nanosecond timestamp of the query start, or the first record if the
43    /// file was split.
44    #[pyo3(get)]
45    pub start: u64,
46    /// The UNIX nanosecond timestamp of the query end, or the last record if the file
47    /// was split.
48    #[pyo3(get)]
49    pub end: Option<NonZeroU64>,
50    /// The optional maximum number of records for the query.
51    #[pyo3(get)]
52    pub limit: Option<NonZeroU64>,
53    /// The input symbology type to map from. `None` indicates a mix, such as in the
54    /// case of live data.
55    #[pyo3(get)]
56    pub stype_in: Option<SType>,
57    /// The output symbology type to map to.
58    #[pyo3(get)]
59    pub stype_out: SType,
60    /// `true` if this store contains live data with send timestamps appended to each
61    /// record.
62    #[pyo3(get)]
63    pub ts_out: bool,
64    /// The length in bytes of fixed-length symbol strings, including a null terminator
65    /// byte.
66    #[pyo3(get)]
67    pub symbol_cstr_len: usize,
68    /// The original query input symbols from the request.
69    #[pyo3(get)]
70    pub symbols: Vec<String>,
71    /// Symbols that did not resolve for _at least one day_ in the query time range.
72    #[pyo3(get)]
73    pub partial: Vec<String>,
74    /// Symbols that did not resolve for _any_ day in the query time range.
75    #[pyo3(get)]
76    pub not_found: Vec<String>,
77    /// Symbol mappings containing a raw symbol and its mapping intervals.
78    pub mappings: Vec<SymbolMapping>,
79}
80
81impl Metadata {
82    /// Creates a builder for building `Metadata`. Call `.dataset(...)`, `.schema(...)`,
83    /// `.start(...)` `.stype_in(...)`, and `.stype_out(...)` on the builder to set the
84    /// required fields. Finally call `.build()` to create the `Metadata` instance.
85    pub fn builder() -> MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {
86        MetadataBuilder::default()
87    }
88
89    /// Parses the raw query start into a datetime.
90    pub fn start(&self) -> time::OffsetDateTime {
91        // `u64::MAX` is within the allowable range for `OffsetDateTime`s
92        time::OffsetDateTime::from_unix_timestamp_nanos(self.start as i128).unwrap()
93    }
94
95    /// Parses the raw query end time or the timestamp of the last record into a
96    /// datetime. Returns `None` if  the end time was not specified.
97    pub fn end(&self) -> Option<time::OffsetDateTime> {
98        self.end
99            .map(|end| time::OffsetDateTime::from_unix_timestamp_nanos(end.get() as i128).unwrap())
100    }
101
102    /// Creates a symbology mapping from instrument ID to text symbol for the given
103    /// date.
104    ///
105    /// This method is useful when working with a historical request over a single day
106    /// or in other situations where you're sure the mappings don't change during the
107    /// time range of the request. Otherwise, [`Self::symbol_map()`] is recommended.
108    ///
109    /// # Errors
110    /// This function returns an error if neither `stype_in` nor `stype_out` is
111    /// [`SType::InstrumentId`], or if a symbol cannot be parsed into a `u32` instrument
112    /// ID, or if `date` is outside the query range.
113    pub fn symbol_map_for_date(&self, date: time::Date) -> crate::Result<PitSymbolMap> {
114        PitSymbolMap::from_metadata(self, date)
115    }
116
117    /// Creates a symbology mapping from instrument ID and date to text symbol.
118    ///
119    /// If you're working with a single date or otherwise don't expect the mappings to
120    /// change, [`Self::symbol_map_for_date()`] is recommended.
121    ///
122    /// # Errors
123    /// This function returns an error if neither `stype_in` nor `stype_out` is
124    /// [`SType::InstrumentId`], or if a symbol cannot be parsed into a `u32` instrument
125    /// ID.
126    pub fn symbol_map(&self) -> crate::Result<TsSymbolMap> {
127        TsSymbolMap::from_metadata(self)
128    }
129
130    /// Upgrades the metadata according to `upgrade_policy` if necessary.
131    pub fn upgrade(&mut self, upgrade_policy: VersionUpgradePolicy) {
132        if self.version < 2 {
133            match upgrade_policy {
134                VersionUpgradePolicy::AsIs => {
135                    self.symbol_cstr_len = crate::v1::SYMBOL_CSTR_LEN;
136                }
137                VersionUpgradePolicy::UpgradeToV2 => {
138                    self.version = 2;
139                    self.symbol_cstr_len = crate::v2::SYMBOL_CSTR_LEN;
140                }
141                VersionUpgradePolicy::UpgradeToV3 => {
142                    self.version = 3;
143                    self.symbol_cstr_len = crate::v3::SYMBOL_CSTR_LEN;
144                }
145            }
146        } else if self.version == 2 && upgrade_policy == VersionUpgradePolicy::UpgradeToV3 {
147            self.version = 3;
148        }
149    }
150
151    /// Attempts to merge another metadata into this one. This is useful for merging
152    /// DBN streams.
153    ///
154    /// If merging data from multiple schemas, the resulting metadata will have a schema
155    /// of `None`.
156    ///
157    /// # Errors
158    /// Merging metadata where any of the following fields don't match will result in
159    /// an error:
160    /// - `version`: upgrade the metadata of the lower version before merging
161    /// - `dataset`
162    /// - `stype_in`
163    /// - `stype_out`
164    /// - `ts_out`
165    /// - `symbol_cstr_len`: upgrade the metadata of the lower version before merging
166    ///
167    /// This function will also return an error if there are conflicting symbology
168    /// mappings.
169    pub fn merge(self, other: impl IntoIterator<Item = Metadata>) -> crate::Result<Self> {
170        let mut merger = MetadataMerger::new(self);
171        for metadata in other {
172            merger.merge(metadata)?;
173        }
174        Ok(merger.finalize())
175    }
176
177    /// Returns `true` if the metadata is for inverse mappings, where `stype_in` is
178    /// [`SType::InstrumentId`].
179    ///
180    /// # Errors
181    /// This function returns an error if neither `stype_in` and `stype_out` are
182    /// [`SType::InstrumentId`].
183    pub fn is_inverse(&self) -> crate::Result<bool> {
184        match (self.stype_in, self.stype_out) {
185        (_, SType::InstrumentId) => Ok(false),
186        (Some(SType::InstrumentId), _) => Ok(true),
187        _ => {
188            Err(crate::Error::BadArgument {
189                param_name: "self".to_owned(),
190                desc: "Can only create symbol maps from metadata where either stype_out or stype_in is instrument ID".to_owned(),
191            })
192        }
193    }
194    }
195}
196
197/// Helper for constructing [`Metadata`] structs with defaults.
198///
199/// This struct uses type state to ensure at compile time that all the required fields
200/// are set. If a required field is not set, `build()` won't be visible.
201///
202/// # Required fields
203/// - [`dataset`](Metadata::dataset)
204/// - [`schema`](Metadata::schema)
205/// - [`start`](Metadata::start)
206/// - [`stype_in`](Metadata::stype_in)
207/// - [`stype_out`](Metadata::stype_out)
208#[derive(Debug)]
209pub struct MetadataBuilder<D, Sch, Start, StIn, StOut> {
210    version: u8,
211    dataset: D,
212    schema: Sch,
213    start: Start,
214    end: Option<NonZeroU64>,
215    limit: Option<NonZeroU64>,
216    stype_in: StIn,
217    stype_out: StOut,
218    ts_out: bool,
219    symbols: Vec<String>,
220    partial: Vec<String>,
221    not_found: Vec<String>,
222    mappings: Vec<SymbolMapping>,
223}
224
225/// Sentinel type for a required field that has not yet been set.
226pub struct Unset {}
227
228impl MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {
229    /// Creates a new instance of the builder.
230    pub fn new() -> Self {
231        Self::default()
232    }
233}
234
235impl AsRef<[u8]> for Metadata {
236    fn as_ref(&self) -> &[u8] {
237        unsafe { as_u8_slice(self) }
238    }
239}
240
241impl<D, Sch, Start, StIn, StOut> MetadataBuilder<D, Sch, Start, StIn, StOut> {
242    /// Sets [`version`](Metadata::version) and returns the builder.
243    pub fn version(mut self, version: u8) -> Self {
244        self.version = version;
245        self
246    }
247
248    /// Sets [`dataset`](Metadata::dataset) and returns the builder.
249    pub fn dataset(
250        self,
251        dataset: impl ToString,
252    ) -> MetadataBuilder<String, Sch, Start, StIn, StOut> {
253        MetadataBuilder {
254            version: self.version,
255            dataset: dataset.to_string(),
256            schema: self.schema,
257            start: self.start,
258            end: self.end,
259            limit: self.limit,
260            stype_in: self.stype_in,
261            stype_out: self.stype_out,
262            ts_out: self.ts_out,
263            symbols: self.symbols,
264            partial: self.partial,
265            not_found: self.not_found,
266            mappings: self.mappings,
267        }
268    }
269
270    /// Sets [`schema`](Metadata::schema) and returns the builder.
271    pub fn schema(
272        self,
273        schema: Option<Schema>,
274    ) -> MetadataBuilder<D, Option<Schema>, Start, StIn, StOut> {
275        MetadataBuilder {
276            version: self.version,
277            dataset: self.dataset,
278            schema,
279            start: self.start,
280            end: self.end,
281            limit: self.limit,
282            stype_in: self.stype_in,
283            stype_out: self.stype_out,
284            ts_out: self.ts_out,
285            symbols: self.symbols,
286            partial: self.partial,
287            not_found: self.not_found,
288            mappings: self.mappings,
289        }
290    }
291
292    /// Sets [`start`](Metadata::start) and returns the builder.
293    pub fn start(self, start: u64) -> MetadataBuilder<D, Sch, u64, StIn, StOut> {
294        MetadataBuilder {
295            version: self.version,
296            dataset: self.dataset,
297            schema: self.schema,
298            start,
299            end: self.end,
300            limit: self.limit,
301            stype_in: self.stype_in,
302            stype_out: self.stype_out,
303            symbols: self.symbols,
304            ts_out: self.ts_out,
305            partial: self.partial,
306            not_found: self.not_found,
307            mappings: self.mappings,
308        }
309    }
310
311    /// Sets [`end`](Metadata::end) and returns the builder.
312    pub fn end(mut self, end: Option<NonZeroU64>) -> Self {
313        self.end = end;
314        self
315    }
316
317    /// Sets [`limit`](Metadata::limit) and returns the builder.
318    pub fn limit(mut self, limit: Option<NonZeroU64>) -> Self {
319        self.limit = limit;
320        self
321    }
322
323    /// Sets [`stype_in`](Metadata::stype_in) and returns the builder.
324    pub fn stype_in(
325        self,
326        stype_in: Option<SType>,
327    ) -> MetadataBuilder<D, Sch, Start, Option<SType>, StOut> {
328        MetadataBuilder {
329            version: self.version,
330            dataset: self.dataset,
331            schema: self.schema,
332            start: self.start,
333            end: self.end,
334            limit: self.limit,
335            stype_in,
336            stype_out: self.stype_out,
337            ts_out: self.ts_out,
338            symbols: self.symbols,
339            partial: self.partial,
340            not_found: self.not_found,
341            mappings: self.mappings,
342        }
343    }
344
345    /// Sets [`stype_out`](Metadata::stype_out) and returns the builder.
346    pub fn stype_out(self, stype_out: SType) -> MetadataBuilder<D, Sch, Start, StIn, SType> {
347        MetadataBuilder {
348            version: self.version,
349            dataset: self.dataset,
350            schema: self.schema,
351            start: self.start,
352            end: self.end,
353            limit: self.limit,
354            stype_in: self.stype_in,
355            stype_out,
356            ts_out: self.ts_out,
357            symbols: self.symbols,
358            partial: self.partial,
359            not_found: self.not_found,
360            mappings: self.mappings,
361        }
362    }
363
364    /// Sets [`ts_out`](Metadata::ts_out) and returns the builder.
365    pub fn ts_out(mut self, ts_out: bool) -> Self {
366        self.ts_out = ts_out;
367        self
368    }
369
370    /// Sets [`symbols`](Metadata::symbols) and returns the builder.
371    pub fn symbols(mut self, symbols: Vec<String>) -> Self {
372        self.symbols = symbols;
373        self
374    }
375
376    /// Sets [`partial`](Metadata::partial) and returns the builder.
377    pub fn partial(mut self, partial: Vec<String>) -> Self {
378        self.partial = partial;
379        self
380    }
381
382    /// Sets [`not_found`](Metadata::not_found) and returns the builder.
383    pub fn not_found(mut self, not_found: Vec<String>) -> Self {
384        self.not_found = not_found;
385        self
386    }
387
388    /// Sets [`mappings`](Metadata::mappings) and returns the builder.
389    pub fn mappings(mut self, mappings: Vec<SymbolMapping>) -> Self {
390        self.mappings = mappings;
391        self
392    }
393}
394
395impl MetadataBuilder<String, Option<Schema>, u64, Option<SType>, SType> {
396    /// Constructs a [`Metadata`] object. The availability of this method indicates all
397    /// required fields have been set.
398    pub fn build(self) -> Metadata {
399        Metadata {
400            version: self.version,
401            dataset: self.dataset,
402            schema: self.schema,
403            start: self.start,
404            end: self.end,
405            limit: self.limit,
406            stype_in: self.stype_in,
407            stype_out: self.stype_out,
408            ts_out: self.ts_out,
409            symbols: self.symbols,
410            partial: self.partial,
411            not_found: self.not_found,
412            mappings: self.mappings,
413            symbol_cstr_len: version_symbol_cstr_len(self.version),
414        }
415    }
416}
417
418impl Default for MetadataBuilder<Unset, Unset, Unset, Unset, Unset> {
419    fn default() -> Self {
420        Self {
421            version: crate::DBN_VERSION,
422            dataset: Unset {},
423            schema: Unset {},
424            start: Unset {},
425            end: None,
426            limit: None,
427            stype_in: Unset {},
428            stype_out: Unset {},
429            ts_out: false,
430            symbols: vec![],
431            partial: vec![],
432            not_found: vec![],
433            mappings: vec![],
434        }
435    }
436}
437
438/// A raw symbol and its symbol mappings for different time ranges within the query range.
439#[derive(Debug, Clone, PartialEq, Eq)]
440#[cfg_attr(feature = "serde", derive(Deserialize))]
441#[cfg_attr(feature = "python", derive(pyo3::FromPyObject))]
442pub struct SymbolMapping {
443    /// The `stype_in` symbol.
444    pub raw_symbol: String,
445    /// The mappings of `raw_symbol` to `stype_out` for different date ranges.
446    pub intervals: Vec<MappingInterval>,
447}
448
449/// The resolved symbol for a date range.
450#[derive(Debug, Clone, PartialEq, Eq)]
451#[cfg_attr(feature = "serde", derive(Deserialize))]
452pub struct MappingInterval {
453    /// The UTC start date of interval (inclusive).
454    #[cfg_attr(
455        feature = "serde",
456        serde(rename = "d0", deserialize_with = "deserialize_date")
457    )]
458    pub start_date: time::Date,
459    /// The UTC end date of interval (exclusive).
460    #[cfg_attr(
461        feature = "serde",
462        serde(rename = "d1", deserialize_with = "deserialize_date")
463    )]
464    pub end_date: time::Date,
465    /// The resolved symbol for this interval (in `stype_out`).
466    #[cfg_attr(feature = "serde", serde(rename = "s"))]
467    pub symbol: String,
468}
469
470/// The date format used for date strings when serializing [`Metadata`].
471pub const DATE_FORMAT: &[time::format_description::BorrowedFormatItem<'static>] =
472    time::macros::format_description!("[year]-[month]-[day]");
473
474#[cfg(feature = "serde")]
475fn deserialize_date<'de, D: serde::Deserializer<'de>>(
476    deserializer: D,
477) -> Result<time::Date, D::Error> {
478    let date_str = String::deserialize(deserializer)?;
479    time::Date::parse(&date_str, DATE_FORMAT).map_err(serde::de::Error::custom)
480}
481
482#[cfg(test)]
483mod tests {
484    use rstest::*;
485
486    use crate::Dataset;
487
488    use super::*;
489
490    #[rstest]
491    #[case(VersionUpgradePolicy::AsIs, 1)]
492    #[case(VersionUpgradePolicy::UpgradeToV2, 2)]
493    #[case(VersionUpgradePolicy::UpgradeToV3, 3)]
494    fn test_upgrade_metadata(
495        #[case] upgrade_policy: VersionUpgradePolicy,
496        #[case] exp_version: u8,
497    ) {
498        let mut target = Metadata::builder()
499            .version(1)
500            .dataset(Dataset::OpraPillar)
501            .schema(Some(Schema::Mbp1))
502            .start(0)
503            .stype_in(None)
504            .stype_out(SType::InstrumentId)
505            .build();
506        assert_eq!(target.version, 1);
507        target.upgrade(upgrade_policy);
508        assert_eq!(target.version, exp_version);
509    }
510}