Skip to main content

nautilus_databento/python/
historical.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Python bindings for the Databento historical client.
17
18use std::{fmt::Debug, path::PathBuf};
19
20use nautilus_core::{
21    python::{IntoPyObjectNautilusExt, to_pyexception, to_pyvalue_err},
22    time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25    enums::BarAggregation,
26    identifiers::{InstrumentId, Symbol},
27    python::instruments::instrument_any_to_pyobject,
28};
29use pyo3::{
30    IntoPyObjectExt,
31    prelude::*,
32    types::{PyDict, PyList},
33};
34
35use crate::{
36    common::Credential,
37    historical::{DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams},
38};
39
40/// Python wrapper for the core Databento historical client.
41#[cfg_attr(
42    feature = "python",
43    pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
44)]
45#[cfg_attr(
46    feature = "python",
47    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
48)]
49pub struct DatabentoHistoricalClient {
50    inner: CoreDatabentoHistoricalClient,
51}
52
53impl Debug for DatabentoHistoricalClient {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        f.debug_struct(stringify!(DatabentoHistoricalClient))
56            .field("inner", &self.inner)
57            .finish()
58    }
59}
60
61#[pymethods]
62#[pyo3_stub_gen::derive::gen_stub_pymethods]
63impl DatabentoHistoricalClient {
64    /// Core Databento historical client for fetching historical market data.
65    ///
66    /// This client provides both synchronous and asynchronous interfaces for fetching
67    /// various types of historical market data from Databento.
68    #[new]
69    fn py_new(
70        key: String,
71        publishers_filepath: PathBuf,
72        use_exchange_as_venue: bool,
73    ) -> PyResult<Self> {
74        let clock = get_atomic_clock_realtime();
75        let inner = CoreDatabentoHistoricalClient::new(
76            Credential::new(key),
77            publishers_filepath,
78            clock,
79            use_exchange_as_venue,
80        )
81        .map_err(to_pyvalue_err)?;
82
83        Ok(Self { inner })
84    }
85
86    /// Returns the API key from the stored credential.
87    #[getter]
88    #[pyo3(name = "api_key")]
89    fn py_api_key(&self) -> &str {
90        self.inner.api_key()
91    }
92
93    /// Caches a `price_precision` for the given `symbol`.
94    ///
95    /// When market data is fetched without an explicit `price_precision`, the
96    /// client resolves precision per record from this cache. Instruments
97    /// returned by `Self.get_range_instruments` are inserted automatically.
98    #[pyo3(name = "set_price_precision")]
99    fn py_set_price_precision(&self, symbol: &str, price_precision: u8) {
100        self.inner
101            .set_price_precision(Symbol::from(symbol), price_precision);
102    }
103
104    /// Gets the date range for a specific dataset.
105    #[pyo3(name = "get_dataset_range")]
106    fn py_get_dataset_range<'py>(
107        &self,
108        py: Python<'py>,
109        dataset: String,
110    ) -> PyResult<Bound<'py, PyAny>> {
111        let inner = self.inner.clone();
112
113        pyo3_async_runtimes::tokio::future_into_py(py, async move {
114            let response = inner.get_dataset_range(&dataset).await;
115            match response {
116                Ok(res) => Python::attach(|py| {
117                    let dict = PyDict::new(py);
118                    dict.set_item("start", res.start)?;
119                    dict.set_item("end", res.end)?;
120                    dict.into_py_any(py)
121                }),
122                Err(e) => Err(to_pyexception(format!("Error handling response: {e}"))),
123            }
124        })
125    }
126
127    /// Fetches instrument definitions for the given parameters.
128    #[pyo3(name = "get_range_instruments")]
129    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
130    #[expect(clippy::needless_pass_by_value)]
131    fn py_get_range_instruments<'py>(
132        &self,
133        py: Python<'py>,
134        dataset: String,
135        instrument_ids: Vec<InstrumentId>,
136        start: u64,
137        end: Option<u64>,
138        limit: Option<u64>,
139    ) -> PyResult<Bound<'py, PyAny>> {
140        let inner = self.inner.clone();
141        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
142
143        let params = RangeQueryParams {
144            dataset,
145            symbols,
146            start: start.into(),
147            end: end.map(Into::into),
148            limit,
149            price_precision: None,
150        };
151
152        pyo3_async_runtimes::tokio::future_into_py(py, async move {
153            let instruments = inner
154                .get_range_instruments(params)
155                .await
156                .map_err(to_pyvalue_err)?;
157
158            Python::attach(|py| -> PyResult<Py<PyAny>> {
159                let objs: Vec<Py<PyAny>> = instruments
160                    .into_iter()
161                    .map(|inst| instrument_any_to_pyobject(py, inst))
162                    .collect::<PyResult<Vec<Py<PyAny>>>>()?;
163
164                let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
165                Ok(list.into_py_any_unwrap(py))
166            })
167        })
168    }
169
170    /// Fetches quote ticks for the given parameters.
171    #[pyo3(name = "get_range_quotes")]
172    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
173    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
174    fn py_get_range_quotes<'py>(
175        &self,
176        py: Python<'py>,
177        dataset: String,
178        instrument_ids: Vec<InstrumentId>,
179        start: u64,
180        end: Option<u64>,
181        limit: Option<u64>,
182        price_precision: Option<u8>,
183        schema: Option<String>,
184    ) -> PyResult<Bound<'py, PyAny>> {
185        let inner = self.inner.clone();
186        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
187
188        let params = RangeQueryParams {
189            dataset,
190            symbols,
191            start: start.into(),
192            end: end.map(Into::into),
193            limit,
194            price_precision,
195        };
196
197        pyo3_async_runtimes::tokio::future_into_py(py, async move {
198            let quotes = inner
199                .get_range_quotes(params, schema)
200                .await
201                .map_err(to_pyvalue_err)?;
202            Python::attach(|py| quotes.into_py_any(py))
203        })
204    }
205
206    /// Fetches trade ticks for the given parameters.
207    #[pyo3(name = "get_range_trades")]
208    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
209    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
210    fn py_get_range_trades<'py>(
211        &self,
212        py: Python<'py>,
213        dataset: String,
214        instrument_ids: Vec<InstrumentId>,
215        start: u64,
216        end: Option<u64>,
217        limit: Option<u64>,
218        price_precision: Option<u8>,
219    ) -> PyResult<Bound<'py, PyAny>> {
220        let inner = self.inner.clone();
221        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
222
223        let params = RangeQueryParams {
224            dataset,
225            symbols,
226            start: start.into(),
227            end: end.map(Into::into),
228            limit,
229            price_precision,
230        };
231
232        pyo3_async_runtimes::tokio::future_into_py(py, async move {
233            let trades = inner
234                .get_range_trades(params)
235                .await
236                .map_err(to_pyvalue_err)?;
237            Python::attach(|py| trades.into_py_any(py))
238        })
239    }
240
241    /// Fetches bars for the given parameters.
242    #[pyo3(name = "get_range_bars")]
243    #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
244    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
245    fn py_get_range_bars<'py>(
246        &self,
247        py: Python<'py>,
248        dataset: String,
249        instrument_ids: Vec<InstrumentId>,
250        aggregation: BarAggregation,
251        start: u64,
252        end: Option<u64>,
253        limit: Option<u64>,
254        price_precision: Option<u8>,
255        timestamp_on_close: bool,
256    ) -> PyResult<Bound<'py, PyAny>> {
257        let inner = self.inner.clone();
258        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
259
260        let params = RangeQueryParams {
261            dataset,
262            symbols,
263            start: start.into(),
264            end: end.map(Into::into),
265            limit,
266            price_precision,
267        };
268
269        pyo3_async_runtimes::tokio::future_into_py(py, async move {
270            let bars = inner
271                .get_range_bars(params, aggregation, timestamp_on_close)
272                .await
273                .map_err(to_pyvalue_err)?;
274            Python::attach(|py| bars.into_py_any(py))
275        })
276    }
277
278    #[pyo3(name = "get_order_book_depth10")]
279    #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
280    #[expect(clippy::needless_pass_by_value)]
281    fn py_get_order_book_depth10<'py>(
282        &self,
283        py: Python<'py>,
284        dataset: String,
285        instrument_ids: Vec<InstrumentId>,
286        start: u64,
287        end: Option<u64>,
288        depth: Option<usize>,
289    ) -> PyResult<Bound<'py, PyAny>> {
290        let inner = self.inner.clone();
291        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
292
293        let params = RangeQueryParams {
294            dataset,
295            symbols,
296            start: start.into(),
297            end: end.map(Into::into),
298            limit: None,
299            price_precision: None,
300        };
301
302        pyo3_async_runtimes::tokio::future_into_py(py, async move {
303            let depths = inner
304                .get_range_order_book_depth10(params, depth)
305                .await
306                .map_err(to_pyvalue_err)?;
307            Python::attach(|py| depths.into_py_any(py))
308        })
309    }
310
311    /// Fetches order book deltas for the given parameters.
312    #[pyo3(name = "get_range_order_book_deltas")]
313    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
314    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
315    fn py_get_range_order_book_deltas<'py>(
316        &self,
317        py: Python<'py>,
318        dataset: String,
319        instrument_ids: Vec<InstrumentId>,
320        start: u64,
321        end: Option<u64>,
322        limit: Option<u64>,
323        price_precision: Option<u8>,
324    ) -> PyResult<Bound<'py, PyAny>> {
325        let inner = self.inner.clone();
326        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
327
328        let params = RangeQueryParams {
329            dataset,
330            symbols,
331            start: start.into(),
332            end: end.map(Into::into),
333            limit,
334            price_precision,
335        };
336
337        pyo3_async_runtimes::tokio::future_into_py(py, async move {
338            let deltas = inner
339                .get_range_order_book_deltas(params)
340                .await
341                .map_err(to_pyvalue_err)?;
342            Python::attach(|py| deltas.into_py_any(py))
343        })
344    }
345
346    /// Fetches imbalance data for the given parameters.
347    #[pyo3(name = "get_range_imbalance")]
348    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
349    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
350    fn py_get_range_imbalance<'py>(
351        &self,
352        py: Python<'py>,
353        dataset: String,
354        instrument_ids: Vec<InstrumentId>,
355        start: u64,
356        end: Option<u64>,
357        limit: Option<u64>,
358        price_precision: Option<u8>,
359    ) -> PyResult<Bound<'py, PyAny>> {
360        let inner = self.inner.clone();
361        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
362
363        let params = RangeQueryParams {
364            dataset,
365            symbols,
366            start: start.into(),
367            end: end.map(Into::into),
368            limit,
369            price_precision,
370        };
371
372        pyo3_async_runtimes::tokio::future_into_py(py, async move {
373            let imbalances = inner
374                .get_range_imbalance(params)
375                .await
376                .map_err(to_pyvalue_err)?;
377            Python::attach(|py| imbalances.into_py_any(py))
378        })
379    }
380
381    /// Fetches statistics data for the given parameters.
382    #[pyo3(name = "get_range_statistics")]
383    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
384    #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
385    fn py_get_range_statistics<'py>(
386        &self,
387        py: Python<'py>,
388        dataset: String,
389        instrument_ids: Vec<InstrumentId>,
390        start: u64,
391        end: Option<u64>,
392        limit: Option<u64>,
393        price_precision: Option<u8>,
394    ) -> PyResult<Bound<'py, PyAny>> {
395        let inner = self.inner.clone();
396        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
397
398        let params = RangeQueryParams {
399            dataset,
400            symbols,
401            start: start.into(),
402            end: end.map(Into::into),
403            limit,
404            price_precision,
405        };
406
407        pyo3_async_runtimes::tokio::future_into_py(py, async move {
408            let statistics = inner
409                .get_range_statistics(params)
410                .await
411                .map_err(to_pyvalue_err)?;
412            Python::attach(|py| statistics.into_py_any(py))
413        })
414    }
415
416    /// Fetches status data for the given parameters.
417    #[pyo3(name = "get_range_status")]
418    #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
419    #[expect(clippy::needless_pass_by_value)]
420    fn py_get_range_status<'py>(
421        &self,
422        py: Python<'py>,
423        dataset: String,
424        instrument_ids: Vec<InstrumentId>,
425        start: u64,
426        end: Option<u64>,
427        limit: Option<u64>,
428    ) -> PyResult<Bound<'py, PyAny>> {
429        let inner = self.inner.clone();
430        let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
431
432        let params = RangeQueryParams {
433            dataset,
434            symbols,
435            start: start.into(),
436            end: end.map(Into::into),
437            limit,
438            price_precision: None,
439        };
440
441        pyo3_async_runtimes::tokio::future_into_py(py, async move {
442            let statuses = inner
443                .get_range_status(params)
444                .await
445                .map_err(to_pyvalue_err)?;
446            Python::attach(|py| statuses.into_py_any(py))
447        })
448    }
449}