1use std::{fmt::Debug, path::PathBuf};
19
20use nautilus_core::{
21 python::{IntoPyObjectNautilusExt, to_pyexception, to_pyvalue_err},
22 time::get_atomic_clock_realtime,
23};
24use nautilus_model::{
25 enums::BarAggregation,
26 identifiers::{InstrumentId, Symbol},
27 python::instruments::instrument_any_to_pyobject,
28};
29use pyo3::{
30 IntoPyObjectExt,
31 prelude::*,
32 types::{PyDict, PyList},
33};
34
35use crate::{
36 common::Credential,
37 historical::{DatabentoHistoricalClient as CoreDatabentoHistoricalClient, RangeQueryParams},
38};
39
40#[cfg_attr(
42 feature = "python",
43 pyo3::pyclass(module = "nautilus_trader.core.nautilus_pyo3.databento")
44)]
45#[cfg_attr(
46 feature = "python",
47 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.databento")
48)]
49pub struct DatabentoHistoricalClient {
50 inner: CoreDatabentoHistoricalClient,
51}
52
53impl Debug for DatabentoHistoricalClient {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct(stringify!(DatabentoHistoricalClient))
56 .field("inner", &self.inner)
57 .finish()
58 }
59}
60
61#[pymethods]
62#[pyo3_stub_gen::derive::gen_stub_pymethods]
63impl DatabentoHistoricalClient {
64 #[new]
69 fn py_new(
70 key: String,
71 publishers_filepath: PathBuf,
72 use_exchange_as_venue: bool,
73 ) -> PyResult<Self> {
74 let clock = get_atomic_clock_realtime();
75 let inner = CoreDatabentoHistoricalClient::new(
76 Credential::new(key),
77 publishers_filepath,
78 clock,
79 use_exchange_as_venue,
80 )
81 .map_err(to_pyvalue_err)?;
82
83 Ok(Self { inner })
84 }
85
86 #[getter]
88 #[pyo3(name = "api_key")]
89 fn py_api_key(&self) -> &str {
90 self.inner.api_key()
91 }
92
93 #[pyo3(name = "set_price_precision")]
99 fn py_set_price_precision(&self, symbol: &str, price_precision: u8) {
100 self.inner
101 .set_price_precision(Symbol::from(symbol), price_precision);
102 }
103
104 #[pyo3(name = "get_dataset_range")]
106 fn py_get_dataset_range<'py>(
107 &self,
108 py: Python<'py>,
109 dataset: String,
110 ) -> PyResult<Bound<'py, PyAny>> {
111 let inner = self.inner.clone();
112
113 pyo3_async_runtimes::tokio::future_into_py(py, async move {
114 let response = inner.get_dataset_range(&dataset).await;
115 match response {
116 Ok(res) => Python::attach(|py| {
117 let dict = PyDict::new(py);
118 dict.set_item("start", res.start)?;
119 dict.set_item("end", res.end)?;
120 dict.into_py_any(py)
121 }),
122 Err(e) => Err(to_pyexception(format!("Error handling response: {e}"))),
123 }
124 })
125 }
126
127 #[pyo3(name = "get_range_instruments")]
129 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
130 #[expect(clippy::needless_pass_by_value)]
131 fn py_get_range_instruments<'py>(
132 &self,
133 py: Python<'py>,
134 dataset: String,
135 instrument_ids: Vec<InstrumentId>,
136 start: u64,
137 end: Option<u64>,
138 limit: Option<u64>,
139 ) -> PyResult<Bound<'py, PyAny>> {
140 let inner = self.inner.clone();
141 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
142
143 let params = RangeQueryParams {
144 dataset,
145 symbols,
146 start: start.into(),
147 end: end.map(Into::into),
148 limit,
149 price_precision: None,
150 };
151
152 pyo3_async_runtimes::tokio::future_into_py(py, async move {
153 let instruments = inner
154 .get_range_instruments(params)
155 .await
156 .map_err(to_pyvalue_err)?;
157
158 Python::attach(|py| -> PyResult<Py<PyAny>> {
159 let objs: Vec<Py<PyAny>> = instruments
160 .into_iter()
161 .map(|inst| instrument_any_to_pyobject(py, inst))
162 .collect::<PyResult<Vec<Py<PyAny>>>>()?;
163
164 let list = PyList::new(py, &objs).expect("Invalid `ExactSizeIterator`");
165 Ok(list.into_py_any_unwrap(py))
166 })
167 })
168 }
169
170 #[pyo3(name = "get_range_quotes")]
172 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None, schema=None))]
173 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
174 fn py_get_range_quotes<'py>(
175 &self,
176 py: Python<'py>,
177 dataset: String,
178 instrument_ids: Vec<InstrumentId>,
179 start: u64,
180 end: Option<u64>,
181 limit: Option<u64>,
182 price_precision: Option<u8>,
183 schema: Option<String>,
184 ) -> PyResult<Bound<'py, PyAny>> {
185 let inner = self.inner.clone();
186 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
187
188 let params = RangeQueryParams {
189 dataset,
190 symbols,
191 start: start.into(),
192 end: end.map(Into::into),
193 limit,
194 price_precision,
195 };
196
197 pyo3_async_runtimes::tokio::future_into_py(py, async move {
198 let quotes = inner
199 .get_range_quotes(params, schema)
200 .await
201 .map_err(to_pyvalue_err)?;
202 Python::attach(|py| quotes.into_py_any(py))
203 })
204 }
205
206 #[pyo3(name = "get_range_trades")]
208 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
209 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
210 fn py_get_range_trades<'py>(
211 &self,
212 py: Python<'py>,
213 dataset: String,
214 instrument_ids: Vec<InstrumentId>,
215 start: u64,
216 end: Option<u64>,
217 limit: Option<u64>,
218 price_precision: Option<u8>,
219 ) -> PyResult<Bound<'py, PyAny>> {
220 let inner = self.inner.clone();
221 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
222
223 let params = RangeQueryParams {
224 dataset,
225 symbols,
226 start: start.into(),
227 end: end.map(Into::into),
228 limit,
229 price_precision,
230 };
231
232 pyo3_async_runtimes::tokio::future_into_py(py, async move {
233 let trades = inner
234 .get_range_trades(params)
235 .await
236 .map_err(to_pyvalue_err)?;
237 Python::attach(|py| trades.into_py_any(py))
238 })
239 }
240
241 #[pyo3(name = "get_range_bars")]
243 #[pyo3(signature = (dataset, instrument_ids, aggregation, start, end=None, limit=None, price_precision=None, timestamp_on_close=true))]
244 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
245 fn py_get_range_bars<'py>(
246 &self,
247 py: Python<'py>,
248 dataset: String,
249 instrument_ids: Vec<InstrumentId>,
250 aggregation: BarAggregation,
251 start: u64,
252 end: Option<u64>,
253 limit: Option<u64>,
254 price_precision: Option<u8>,
255 timestamp_on_close: bool,
256 ) -> PyResult<Bound<'py, PyAny>> {
257 let inner = self.inner.clone();
258 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
259
260 let params = RangeQueryParams {
261 dataset,
262 symbols,
263 start: start.into(),
264 end: end.map(Into::into),
265 limit,
266 price_precision,
267 };
268
269 pyo3_async_runtimes::tokio::future_into_py(py, async move {
270 let bars = inner
271 .get_range_bars(params, aggregation, timestamp_on_close)
272 .await
273 .map_err(to_pyvalue_err)?;
274 Python::attach(|py| bars.into_py_any(py))
275 })
276 }
277
278 #[pyo3(name = "get_order_book_depth10")]
279 #[pyo3(signature = (dataset, instrument_ids, start, end=None, depth=None))]
280 #[expect(clippy::needless_pass_by_value)]
281 fn py_get_order_book_depth10<'py>(
282 &self,
283 py: Python<'py>,
284 dataset: String,
285 instrument_ids: Vec<InstrumentId>,
286 start: u64,
287 end: Option<u64>,
288 depth: Option<usize>,
289 ) -> PyResult<Bound<'py, PyAny>> {
290 let inner = self.inner.clone();
291 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
292
293 let params = RangeQueryParams {
294 dataset,
295 symbols,
296 start: start.into(),
297 end: end.map(Into::into),
298 limit: None,
299 price_precision: None,
300 };
301
302 pyo3_async_runtimes::tokio::future_into_py(py, async move {
303 let depths = inner
304 .get_range_order_book_depth10(params, depth)
305 .await
306 .map_err(to_pyvalue_err)?;
307 Python::attach(|py| depths.into_py_any(py))
308 })
309 }
310
311 #[pyo3(name = "get_range_order_book_deltas")]
313 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
314 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
315 fn py_get_range_order_book_deltas<'py>(
316 &self,
317 py: Python<'py>,
318 dataset: String,
319 instrument_ids: Vec<InstrumentId>,
320 start: u64,
321 end: Option<u64>,
322 limit: Option<u64>,
323 price_precision: Option<u8>,
324 ) -> PyResult<Bound<'py, PyAny>> {
325 let inner = self.inner.clone();
326 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
327
328 let params = RangeQueryParams {
329 dataset,
330 symbols,
331 start: start.into(),
332 end: end.map(Into::into),
333 limit,
334 price_precision,
335 };
336
337 pyo3_async_runtimes::tokio::future_into_py(py, async move {
338 let deltas = inner
339 .get_range_order_book_deltas(params)
340 .await
341 .map_err(to_pyvalue_err)?;
342 Python::attach(|py| deltas.into_py_any(py))
343 })
344 }
345
346 #[pyo3(name = "get_range_imbalance")]
348 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
349 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
350 fn py_get_range_imbalance<'py>(
351 &self,
352 py: Python<'py>,
353 dataset: String,
354 instrument_ids: Vec<InstrumentId>,
355 start: u64,
356 end: Option<u64>,
357 limit: Option<u64>,
358 price_precision: Option<u8>,
359 ) -> PyResult<Bound<'py, PyAny>> {
360 let inner = self.inner.clone();
361 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
362
363 let params = RangeQueryParams {
364 dataset,
365 symbols,
366 start: start.into(),
367 end: end.map(Into::into),
368 limit,
369 price_precision,
370 };
371
372 pyo3_async_runtimes::tokio::future_into_py(py, async move {
373 let imbalances = inner
374 .get_range_imbalance(params)
375 .await
376 .map_err(to_pyvalue_err)?;
377 Python::attach(|py| imbalances.into_py_any(py))
378 })
379 }
380
381 #[pyo3(name = "get_range_statistics")]
383 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None, price_precision=None))]
384 #[expect(clippy::too_many_arguments, clippy::needless_pass_by_value)]
385 fn py_get_range_statistics<'py>(
386 &self,
387 py: Python<'py>,
388 dataset: String,
389 instrument_ids: Vec<InstrumentId>,
390 start: u64,
391 end: Option<u64>,
392 limit: Option<u64>,
393 price_precision: Option<u8>,
394 ) -> PyResult<Bound<'py, PyAny>> {
395 let inner = self.inner.clone();
396 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
397
398 let params = RangeQueryParams {
399 dataset,
400 symbols,
401 start: start.into(),
402 end: end.map(Into::into),
403 limit,
404 price_precision,
405 };
406
407 pyo3_async_runtimes::tokio::future_into_py(py, async move {
408 let statistics = inner
409 .get_range_statistics(params)
410 .await
411 .map_err(to_pyvalue_err)?;
412 Python::attach(|py| statistics.into_py_any(py))
413 })
414 }
415
416 #[pyo3(name = "get_range_status")]
418 #[pyo3(signature = (dataset, instrument_ids, start, end=None, limit=None))]
419 #[expect(clippy::needless_pass_by_value)]
420 fn py_get_range_status<'py>(
421 &self,
422 py: Python<'py>,
423 dataset: String,
424 instrument_ids: Vec<InstrumentId>,
425 start: u64,
426 end: Option<u64>,
427 limit: Option<u64>,
428 ) -> PyResult<Bound<'py, PyAny>> {
429 let inner = self.inner.clone();
430 let symbols = inner.prepare_symbols_from_instrument_ids(&instrument_ids);
431
432 let params = RangeQueryParams {
433 dataset,
434 symbols,
435 start: start.into(),
436 end: end.map(Into::into),
437 limit,
438 price_precision: None,
439 };
440
441 pyo3_async_runtimes::tokio::future_into_py(py, async move {
442 let statuses = inner
443 .get_range_status(params)
444 .await
445 .map_err(to_pyvalue_err)?;
446 Python::attach(|py| statuses.into_py_any(py))
447 })
448 }
449}