Skip to main content

nautilus_databento/python/
loader.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento data loader.
17
18use std::{collections::HashMap, path::PathBuf};
19
20use databento::dbn;
21use nautilus_core::{
22    ffi::cvec::CVec,
23    python::{IntoPyObjectNautilusExt, to_pyvalue_err},
24};
25use nautilus_model::{
26    data::{
27        Bar, Data, DataFFI, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick,
28        TradeTick,
29    },
30    identifiers::{InstrumentId, Symbol, Venue},
31    python::instruments::instrument_any_to_pyobject,
32};
33use pyo3::{
34    prelude::*,
35    types::{PyCapsule, PyList},
36};
37use ustr::Ustr;
38
39use crate::{
40    loader::DatabentoDataLoader,
41    types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
42};
43
44#[expect(clippy::needless_pass_by_value)]
45#[pymethods]
46#[pyo3_stub_gen::derive::gen_stub_pymethods]
47impl DatabentoDataLoader {
48    /// A Nautilus data loader for Databento Binary Encoding (DBN) format data.
49    ///
50    /// # Supported Schemas
51    ///  - `MBO` -> `OrderBookDelta`
52    ///  - `MBP_1` -> `(QuoteTick, Option<TradeTick>)`
53    ///  - `MBP_10` -> `OrderBookDepth10`
54    ///  - `BBO_1S` -> `QuoteTick`
55    ///  - `BBO_1M` -> `QuoteTick`
56    ///  - `CMBP_1` -> `(QuoteTick, Option<TradeTick>)`
57    ///  - `CBBO_1S` -> `QuoteTick`
58    ///  - `CBBO_1M` -> `QuoteTick`
59    ///  - `TCBBO` -> `(QuoteTick, TradeTick)`
60    ///  - `TBBO` -> `(QuoteTick, TradeTick)`
61    ///  - `TRADES` -> `TradeTick`
62    ///  - `OHLCV_1S` -> `Bar`
63    ///  - `OHLCV_1M` -> `Bar`
64    ///  - `OHLCV_1H` -> `Bar`
65    ///  - `OHLCV_1D` -> `Bar`
66    ///  - `OHLCV_EOD` -> `Bar`
67    ///  - `DEFINITION` -> `Instrument`
68    ///  - `IMBALANCE` -> `DatabentoImbalance`
69    ///  - `STATISTICS` -> `DatabentoStatistics`
70    ///  - `STATUS` -> `InstrumentStatus`
71    ///
72    /// # References
73    ///
74    /// <https://databento.com/docs/schemas-and-data-formats>
75    #[new]
76    #[pyo3(signature = (publishers_filepath=None))]
77    fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
78        Self::new(publishers_filepath).map_err(to_pyvalue_err)
79    }
80
81    /// Load the publishers data from the file at the given `filepath`.
82    ///
83    /// # Errors
84    ///
85    /// Returns an error if the file cannot be read or parsed as JSON.
86    #[pyo3(name = "load_publishers")]
87    fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
88        self.load_publishers(publishers_filepath)
89            .map_err(to_pyvalue_err)
90    }
91
92    /// Returns the internal Databento publishers currently held by the loader.
93    #[must_use]
94    #[pyo3(name = "get_publishers")]
95    fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
96        self.get_publishers()
97            .iter()
98            .map(|(&key, value)| (key, value.clone()))
99            .collect::<HashMap<u16, DatabentoPublisher>>()
100    }
101
102    /// Sets the `venue` to map to the given `dataset`.
103    #[pyo3(name = "set_dataset_for_venue")]
104    fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
105        self.set_dataset_for_venue(Ustr::from(&dataset), venue);
106    }
107
108    /// Returns the dataset which matches the given `venue` (if found).
109    #[must_use]
110    #[pyo3(name = "get_dataset_for_venue")]
111    fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
112        self.get_dataset_for_venue(venue).map(ToString::to_string)
113    }
114
115    /// Returns the venue which matches the given `publisher_id` (if found).
116    #[must_use]
117    #[pyo3(name = "get_venue_for_publisher")]
118    fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
119        self.get_venue_for_publisher(publisher_id)
120            .map(ToString::to_string)
121    }
122
123    /// Caches a `price_precision` for the given `symbol`.
124    ///
125    /// When market data is read without an explicit `price_precision` argument,
126    /// the loader resolves precision per record from this cache. Definitions
127    /// loaded via `Self.load_instruments` are inserted automatically.
128    #[pyo3(name = "set_price_precision")]
129    fn py_set_price_precision(&mut self, symbol: &str, price_precision: u8) {
130        self.set_price_precision(Symbol::from(symbol), price_precision);
131    }
132
133    /// Returns the cached price precisions keyed by symbol.
134    #[must_use]
135    #[pyo3(name = "get_price_precisions")]
136    fn py_get_price_precisions(&self) -> HashMap<String, u8> {
137        self.get_price_precisions()
138            .iter()
139            .map(|(symbol, precision)| (symbol.to_string(), *precision))
140            .collect()
141    }
142
143    #[pyo3(name = "schema_for_file")]
144    fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
145        self.schema_from_file(&filepath).map_err(to_pyvalue_err)
146    }
147
148    /// Loads all instrument definitions from a DBN file.
149    ///
150    /// When `skip_on_error` is true, instruments that fail to decode are logged
151    /// as warnings and skipped. When false (default), any decode error is propagated.
152    #[pyo3(name = "load_instruments")]
153    #[pyo3(signature = (filepath, use_exchange_as_venue, skip_on_error=false))]
154    fn py_load_instruments(
155        &mut self,
156        py: Python,
157        filepath: PathBuf,
158        use_exchange_as_venue: bool,
159        skip_on_error: bool,
160    ) -> PyResult<Py<PyAny>> {
161        let iter = self
162            .load_instruments(&filepath, use_exchange_as_venue, skip_on_error)
163            .map_err(to_pyvalue_err)?;
164
165        let mut data = Vec::new();
166
167        for instrument in iter {
168            let py_object = instrument_any_to_pyobject(py, instrument)?;
169            data.push(py_object);
170        }
171
172        let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
173
174        Ok(list.into_py_any_unwrap(py))
175    }
176
177    // Cannot include trades
178    /// Loads order book delta messages from a DBN MBO schema file.
179    ///
180    /// Cannot include trades.
181    #[pyo3(name = "load_order_book_deltas")]
182    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
183    fn py_load_order_book_deltas(
184        &self,
185        filepath: PathBuf,
186        instrument_id: Option<InstrumentId>,
187        price_precision: Option<u8>,
188    ) -> PyResult<Vec<OrderBookDelta>> {
189        self.load_order_book_deltas(&filepath, instrument_id, price_precision)
190            .map_err(to_pyvalue_err)
191    }
192
193    #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
194    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
195    fn py_load_order_book_deltas_as_pycapsule(
196        &self,
197        py: Python,
198        filepath: PathBuf,
199        instrument_id: Option<InstrumentId>,
200        price_precision: Option<u8>,
201        include_trades: Option<bool>,
202    ) -> PyResult<Py<PyAny>> {
203        let iter = self
204            .read_records::<dbn::MboMsg>(
205                &filepath,
206                instrument_id,
207                price_precision,
208                include_trades.unwrap_or(false),
209                None,
210            )
211            .map_err(to_pyvalue_err)?;
212
213        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
214    }
215
216    /// Loads order book depth10 snapshots from a DBN MBP-10 schema file.
217    #[pyo3(name = "load_order_book_depth10")]
218    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
219    fn py_load_order_book_depth10(
220        &self,
221        filepath: PathBuf,
222        instrument_id: Option<InstrumentId>,
223        price_precision: Option<u8>,
224    ) -> PyResult<Vec<OrderBookDepth10>> {
225        self.load_order_book_depth10(&filepath, instrument_id, price_precision)
226            .map_err(to_pyvalue_err)
227    }
228
229    #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
230    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
231    fn py_load_order_book_depth10_as_pycapsule(
232        &self,
233        py: Python,
234        filepath: PathBuf,
235        instrument_id: Option<InstrumentId>,
236        price_precision: Option<u8>,
237    ) -> PyResult<Py<PyAny>> {
238        let iter = self
239            .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
240            .map_err(to_pyvalue_err)?;
241
242        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
243    }
244
245    /// Loads quote tick messages from a DBN MBP-1 or TBBO schema file.
246    #[pyo3(name = "load_quotes")]
247    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
248    fn py_load_quotes(
249        &self,
250        filepath: PathBuf,
251        instrument_id: Option<InstrumentId>,
252        price_precision: Option<u8>,
253    ) -> PyResult<Vec<QuoteTick>> {
254        self.load_quotes(&filepath, instrument_id, price_precision)
255            .map_err(to_pyvalue_err)
256    }
257
258    #[pyo3(name = "load_quotes_as_pycapsule")]
259    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
260    fn py_load_quotes_as_pycapsule(
261        &self,
262        py: Python,
263        filepath: PathBuf,
264        instrument_id: Option<InstrumentId>,
265        price_precision: Option<u8>,
266        include_trades: Option<bool>,
267    ) -> PyResult<Py<PyAny>> {
268        let iter = self
269            .read_records::<dbn::Mbp1Msg>(
270                &filepath,
271                instrument_id,
272                price_precision,
273                include_trades.unwrap_or(false),
274                None,
275            )
276            .map_err(to_pyvalue_err)?;
277
278        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
279    }
280
281    /// Loads best bid/offer quote messages from a DBN BBO schema file.
282    #[pyo3(name = "load_bbo_quotes")]
283    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
284    fn py_load_bbo_quotes(
285        &self,
286        filepath: PathBuf,
287        instrument_id: Option<InstrumentId>,
288        price_precision: Option<u8>,
289    ) -> PyResult<Vec<QuoteTick>> {
290        self.load_bbo_quotes(&filepath, instrument_id, price_precision)
291            .map_err(to_pyvalue_err)
292    }
293
294    #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
295    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
296    fn py_load_bbo_quotes_as_pycapsule(
297        &self,
298        py: Python,
299        filepath: PathBuf,
300        instrument_id: Option<InstrumentId>,
301        price_precision: Option<u8>,
302    ) -> PyResult<Py<PyAny>> {
303        let iter = self
304            .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
305            .map_err(to_pyvalue_err)?;
306
307        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
308    }
309
310    /// Loads consolidated MBP-1 quote messages from a DBN CMBP-1 schema file.
311    #[pyo3(name = "load_cmbp_quotes")]
312    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
313    fn py_load_cmbp_quotes(
314        &self,
315        filepath: PathBuf,
316        instrument_id: Option<InstrumentId>,
317        price_precision: Option<u8>,
318    ) -> PyResult<Vec<QuoteTick>> {
319        self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
320            .map_err(to_pyvalue_err)
321    }
322
323    #[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
324    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
325    fn py_load_cmbp_quotes_as_pycapsule(
326        &self,
327        py: Python,
328        filepath: PathBuf,
329        instrument_id: Option<InstrumentId>,
330        price_precision: Option<u8>,
331        include_trades: Option<bool>,
332    ) -> PyResult<Py<PyAny>> {
333        let iter = self
334            .read_records::<dbn::Cmbp1Msg>(
335                &filepath,
336                instrument_id,
337                price_precision,
338                include_trades.unwrap_or(false),
339                None,
340            )
341            .map_err(to_pyvalue_err)?;
342
343        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
344    }
345
346    /// Loads consolidated best bid/offer quote messages from a DBN CBBO schema file.
347    #[pyo3(name = "load_cbbo_quotes")]
348    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
349    fn py_load_cbbo_quotes(
350        &self,
351        filepath: PathBuf,
352        instrument_id: Option<InstrumentId>,
353        price_precision: Option<u8>,
354    ) -> PyResult<Vec<QuoteTick>> {
355        self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
356            .map_err(to_pyvalue_err)
357    }
358
359    #[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
360    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
361    fn py_load_cbbo_quotes_as_pycapsule(
362        &self,
363        py: Python,
364        filepath: PathBuf,
365        instrument_id: Option<InstrumentId>,
366        price_precision: Option<u8>,
367    ) -> PyResult<Py<PyAny>> {
368        let iter = self
369            .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
370            .map_err(to_pyvalue_err)?;
371
372        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
373    }
374
375    /// Loads trade messages from a DBN TBBO schema file.
376    #[pyo3(name = "load_tbbo_trades")]
377    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
378    fn py_load_tbbo_trades(
379        &self,
380        filepath: PathBuf,
381        instrument_id: Option<InstrumentId>,
382        price_precision: Option<u8>,
383    ) -> PyResult<Vec<TradeTick>> {
384        self.load_tbbo_trades(&filepath, instrument_id, price_precision)
385            .map_err(to_pyvalue_err)
386    }
387
388    #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
389    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
390    fn py_load_tbbo_trades_as_pycapsule(
391        &self,
392        py: Python,
393        filepath: PathBuf,
394        instrument_id: Option<InstrumentId>,
395        price_precision: Option<u8>,
396    ) -> PyResult<Py<PyAny>> {
397        let iter = self
398            .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
399            .map_err(to_pyvalue_err)?;
400
401        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
402    }
403
404    /// Loads trade messages from a DBN TCBBO schema file.
405    #[pyo3(name = "load_tcbbo_trades")]
406    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
407    fn py_load_tcbbo_trades(
408        &self,
409        filepath: PathBuf,
410        instrument_id: Option<InstrumentId>,
411        price_precision: Option<u8>,
412    ) -> PyResult<Vec<TradeTick>> {
413        self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
414            .map_err(to_pyvalue_err)
415    }
416
417    #[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
418    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
419    fn py_load_tcbbo_trades_as_pycapsule(
420        &self,
421        py: Python,
422        filepath: PathBuf,
423        instrument_id: Option<InstrumentId>,
424        price_precision: Option<u8>,
425    ) -> PyResult<Py<PyAny>> {
426        let iter = self
427            .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
428            .map_err(to_pyvalue_err)?;
429
430        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
431    }
432
433    /// Loads trade messages from a DBN TRADES schema file.
434    #[pyo3(name = "load_trades")]
435    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
436    fn py_load_trades(
437        &self,
438        filepath: PathBuf,
439        instrument_id: Option<InstrumentId>,
440        price_precision: Option<u8>,
441    ) -> PyResult<Vec<TradeTick>> {
442        self.load_trades(&filepath, instrument_id, price_precision)
443            .map_err(to_pyvalue_err)
444    }
445
446    #[pyo3(name = "load_trades_as_pycapsule")]
447    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
448    fn py_load_trades_as_pycapsule(
449        &self,
450        py: Python,
451        filepath: PathBuf,
452        instrument_id: Option<InstrumentId>,
453        price_precision: Option<u8>,
454    ) -> PyResult<Py<PyAny>> {
455        let iter = self
456            .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
457            .map_err(to_pyvalue_err)?;
458
459        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
460    }
461
462    /// Loads OHLCV bar messages from a DBN OHLCV schema file.
463    #[pyo3(name = "load_bars")]
464    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
465    fn py_load_bars(
466        &self,
467        filepath: PathBuf,
468        instrument_id: Option<InstrumentId>,
469        price_precision: Option<u8>,
470        timestamp_on_close: bool,
471    ) -> PyResult<Vec<Bar>> {
472        self.load_bars(
473            &filepath,
474            instrument_id,
475            price_precision,
476            Some(timestamp_on_close),
477        )
478        .map_err(to_pyvalue_err)
479    }
480
481    #[pyo3(name = "load_bars_as_pycapsule")]
482    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
483    fn py_load_bars_as_pycapsule(
484        &self,
485        py: Python,
486        filepath: PathBuf,
487        instrument_id: Option<InstrumentId>,
488        price_precision: Option<u8>,
489        timestamp_on_close: bool,
490    ) -> PyResult<Py<PyAny>> {
491        let iter = self
492            .read_records::<dbn::OhlcvMsg>(
493                &filepath,
494                instrument_id,
495                price_precision,
496                false,
497                Some(timestamp_on_close),
498            )
499            .map_err(to_pyvalue_err)?;
500
501        exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
502    }
503
504    #[pyo3(name = "load_status")]
505    #[pyo3(signature = (filepath, instrument_id=None))]
506    fn py_load_status(
507        &self,
508        filepath: PathBuf,
509        instrument_id: Option<InstrumentId>,
510    ) -> PyResult<Vec<InstrumentStatus>> {
511        let iter = self
512            .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
513            .map_err(to_pyvalue_err)?;
514
515        let mut data = Vec::new();
516
517        for result in iter {
518            match result {
519                Ok(item) => data.push(item),
520                Err(e) => return Err(to_pyvalue_err(e)),
521            }
522        }
523
524        Ok(data)
525    }
526
527    #[pyo3(name = "load_imbalance")]
528    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
529    fn py_load_imbalance(
530        &self,
531        filepath: PathBuf,
532        instrument_id: Option<InstrumentId>,
533        price_precision: Option<u8>,
534    ) -> PyResult<Vec<DatabentoImbalance>> {
535        let iter = self
536            .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
537            .map_err(to_pyvalue_err)?;
538
539        let mut data = Vec::new();
540
541        for result in iter {
542            match result {
543                Ok(item) => data.push(item),
544                Err(e) => return Err(to_pyvalue_err(e)),
545            }
546        }
547
548        Ok(data)
549    }
550
551    #[pyo3(name = "load_statistics")]
552    #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
553    fn py_load_statistics(
554        &self,
555        filepath: PathBuf,
556        instrument_id: Option<InstrumentId>,
557        price_precision: Option<u8>,
558    ) -> PyResult<Vec<DatabentoStatistics>> {
559        let iter = self
560            .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
561            .map_err(to_pyvalue_err)?;
562
563        let mut data = Vec::new();
564
565        for result in iter {
566            match result {
567                Ok(item) => data.push(item),
568                Err(e) => return Err(to_pyvalue_err(e)),
569            }
570        }
571
572        Ok(data)
573    }
574}
575
576fn exhaust_data_iter_to_pycapsule(
577    py: Python,
578    iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
579) -> anyhow::Result<Py<PyAny>> {
580    let mut data = Vec::new();
581
582    for result in iter {
583        match result {
584            Ok((Some(item1), None)) => data.push(item1),
585            Ok((None, Some(item2))) => data.push(item2),
586            Ok((Some(item1), Some(item2))) => {
587                data.push(item1);
588                data.push(item2);
589            }
590            Ok((None, None)) => {}
591            Err(e) => return Err(e),
592        }
593    }
594
595    let ffi_data: Vec<DataFFI> = data
596        .into_iter()
597        .map(DataFFI::try_from)
598        .collect::<Result<Vec<_>, _>>()
599        .map_err(to_pyvalue_err)?;
600    let cvec: CVec = ffi_data.into();
601    // No destructor: Python must call drop_cvec_pycapsule to take ownership and free.
602    let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
603
604    // TODO: Improve error domain. Replace anyhow errors with nautilus
605    // errors to unify pyo3 and anyhow errors.
606    Ok(capsule.into_py_any_unwrap(py))
607}