1use std::{collections::HashMap, path::PathBuf};
19
20use databento::dbn;
21use nautilus_core::{
22 ffi::cvec::CVec,
23 python::{IntoPyObjectNautilusExt, to_pyvalue_err},
24};
25use nautilus_model::{
26 data::{
27 Bar, Data, DataFFI, InstrumentStatus, OrderBookDelta, OrderBookDepth10, QuoteTick,
28 TradeTick,
29 },
30 identifiers::{InstrumentId, Symbol, Venue},
31 python::instruments::instrument_any_to_pyobject,
32};
33use pyo3::{
34 prelude::*,
35 types::{PyCapsule, PyList},
36};
37use ustr::Ustr;
38
39use crate::{
40 loader::DatabentoDataLoader,
41 types::{DatabentoImbalance, DatabentoPublisher, DatabentoStatistics, PublisherId},
42};
43
44#[expect(clippy::needless_pass_by_value)]
45#[pymethods]
46#[pyo3_stub_gen::derive::gen_stub_pymethods]
47impl DatabentoDataLoader {
48 #[new]
76 #[pyo3(signature = (publishers_filepath=None))]
77 fn py_new(publishers_filepath: Option<PathBuf>) -> PyResult<Self> {
78 Self::new(publishers_filepath).map_err(to_pyvalue_err)
79 }
80
81 #[pyo3(name = "load_publishers")]
87 fn py_load_publishers(&mut self, publishers_filepath: PathBuf) -> PyResult<()> {
88 self.load_publishers(publishers_filepath)
89 .map_err(to_pyvalue_err)
90 }
91
92 #[must_use]
94 #[pyo3(name = "get_publishers")]
95 fn py_get_publishers(&self) -> HashMap<u16, DatabentoPublisher> {
96 self.get_publishers()
97 .iter()
98 .map(|(&key, value)| (key, value.clone()))
99 .collect::<HashMap<u16, DatabentoPublisher>>()
100 }
101
102 #[pyo3(name = "set_dataset_for_venue")]
104 fn py_set_dataset_for_venue(&mut self, dataset: String, venue: Venue) {
105 self.set_dataset_for_venue(Ustr::from(&dataset), venue);
106 }
107
108 #[must_use]
110 #[pyo3(name = "get_dataset_for_venue")]
111 fn py_get_dataset_for_venue(&self, venue: &Venue) -> Option<String> {
112 self.get_dataset_for_venue(venue).map(ToString::to_string)
113 }
114
115 #[must_use]
117 #[pyo3(name = "get_venue_for_publisher")]
118 fn py_get_venue_for_publisher(&self, publisher_id: PublisherId) -> Option<String> {
119 self.get_venue_for_publisher(publisher_id)
120 .map(ToString::to_string)
121 }
122
123 #[pyo3(name = "set_price_precision")]
129 fn py_set_price_precision(&mut self, symbol: &str, price_precision: u8) {
130 self.set_price_precision(Symbol::from(symbol), price_precision);
131 }
132
133 #[must_use]
135 #[pyo3(name = "get_price_precisions")]
136 fn py_get_price_precisions(&self) -> HashMap<String, u8> {
137 self.get_price_precisions()
138 .iter()
139 .map(|(symbol, precision)| (symbol.to_string(), *precision))
140 .collect()
141 }
142
143 #[pyo3(name = "schema_for_file")]
144 fn py_schema_for_file(&self, filepath: PathBuf) -> PyResult<Option<String>> {
145 self.schema_from_file(&filepath).map_err(to_pyvalue_err)
146 }
147
148 #[pyo3(name = "load_instruments")]
153 #[pyo3(signature = (filepath, use_exchange_as_venue, skip_on_error=false))]
154 fn py_load_instruments(
155 &mut self,
156 py: Python,
157 filepath: PathBuf,
158 use_exchange_as_venue: bool,
159 skip_on_error: bool,
160 ) -> PyResult<Py<PyAny>> {
161 let iter = self
162 .load_instruments(&filepath, use_exchange_as_venue, skip_on_error)
163 .map_err(to_pyvalue_err)?;
164
165 let mut data = Vec::new();
166
167 for instrument in iter {
168 let py_object = instrument_any_to_pyobject(py, instrument)?;
169 data.push(py_object);
170 }
171
172 let list = PyList::new(py, &data).expect("Invalid `ExactSizeIterator`");
173
174 Ok(list.into_py_any_unwrap(py))
175 }
176
177 #[pyo3(name = "load_order_book_deltas")]
182 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
183 fn py_load_order_book_deltas(
184 &self,
185 filepath: PathBuf,
186 instrument_id: Option<InstrumentId>,
187 price_precision: Option<u8>,
188 ) -> PyResult<Vec<OrderBookDelta>> {
189 self.load_order_book_deltas(&filepath, instrument_id, price_precision)
190 .map_err(to_pyvalue_err)
191 }
192
193 #[pyo3(name = "load_order_book_deltas_as_pycapsule")]
194 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
195 fn py_load_order_book_deltas_as_pycapsule(
196 &self,
197 py: Python,
198 filepath: PathBuf,
199 instrument_id: Option<InstrumentId>,
200 price_precision: Option<u8>,
201 include_trades: Option<bool>,
202 ) -> PyResult<Py<PyAny>> {
203 let iter = self
204 .read_records::<dbn::MboMsg>(
205 &filepath,
206 instrument_id,
207 price_precision,
208 include_trades.unwrap_or(false),
209 None,
210 )
211 .map_err(to_pyvalue_err)?;
212
213 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
214 }
215
216 #[pyo3(name = "load_order_book_depth10")]
218 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
219 fn py_load_order_book_depth10(
220 &self,
221 filepath: PathBuf,
222 instrument_id: Option<InstrumentId>,
223 price_precision: Option<u8>,
224 ) -> PyResult<Vec<OrderBookDepth10>> {
225 self.load_order_book_depth10(&filepath, instrument_id, price_precision)
226 .map_err(to_pyvalue_err)
227 }
228
229 #[pyo3(name = "load_order_book_depth10_as_pycapsule")]
230 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
231 fn py_load_order_book_depth10_as_pycapsule(
232 &self,
233 py: Python,
234 filepath: PathBuf,
235 instrument_id: Option<InstrumentId>,
236 price_precision: Option<u8>,
237 ) -> PyResult<Py<PyAny>> {
238 let iter = self
239 .read_records::<dbn::Mbp10Msg>(&filepath, instrument_id, price_precision, false, None)
240 .map_err(to_pyvalue_err)?;
241
242 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
243 }
244
245 #[pyo3(name = "load_quotes")]
247 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
248 fn py_load_quotes(
249 &self,
250 filepath: PathBuf,
251 instrument_id: Option<InstrumentId>,
252 price_precision: Option<u8>,
253 ) -> PyResult<Vec<QuoteTick>> {
254 self.load_quotes(&filepath, instrument_id, price_precision)
255 .map_err(to_pyvalue_err)
256 }
257
258 #[pyo3(name = "load_quotes_as_pycapsule")]
259 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
260 fn py_load_quotes_as_pycapsule(
261 &self,
262 py: Python,
263 filepath: PathBuf,
264 instrument_id: Option<InstrumentId>,
265 price_precision: Option<u8>,
266 include_trades: Option<bool>,
267 ) -> PyResult<Py<PyAny>> {
268 let iter = self
269 .read_records::<dbn::Mbp1Msg>(
270 &filepath,
271 instrument_id,
272 price_precision,
273 include_trades.unwrap_or(false),
274 None,
275 )
276 .map_err(to_pyvalue_err)?;
277
278 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
279 }
280
281 #[pyo3(name = "load_bbo_quotes")]
283 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
284 fn py_load_bbo_quotes(
285 &self,
286 filepath: PathBuf,
287 instrument_id: Option<InstrumentId>,
288 price_precision: Option<u8>,
289 ) -> PyResult<Vec<QuoteTick>> {
290 self.load_bbo_quotes(&filepath, instrument_id, price_precision)
291 .map_err(to_pyvalue_err)
292 }
293
294 #[pyo3(name = "load_bbo_quotes_as_pycapsule")]
295 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
296 fn py_load_bbo_quotes_as_pycapsule(
297 &self,
298 py: Python,
299 filepath: PathBuf,
300 instrument_id: Option<InstrumentId>,
301 price_precision: Option<u8>,
302 ) -> PyResult<Py<PyAny>> {
303 let iter = self
304 .read_records::<dbn::BboMsg>(&filepath, instrument_id, price_precision, false, None)
305 .map_err(to_pyvalue_err)?;
306
307 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
308 }
309
310 #[pyo3(name = "load_cmbp_quotes")]
312 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
313 fn py_load_cmbp_quotes(
314 &self,
315 filepath: PathBuf,
316 instrument_id: Option<InstrumentId>,
317 price_precision: Option<u8>,
318 ) -> PyResult<Vec<QuoteTick>> {
319 self.load_cmbp_quotes(&filepath, instrument_id, price_precision)
320 .map_err(to_pyvalue_err)
321 }
322
323 #[pyo3(name = "load_cmbp_quotes_as_pycapsule")]
324 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, include_trades=None))]
325 fn py_load_cmbp_quotes_as_pycapsule(
326 &self,
327 py: Python,
328 filepath: PathBuf,
329 instrument_id: Option<InstrumentId>,
330 price_precision: Option<u8>,
331 include_trades: Option<bool>,
332 ) -> PyResult<Py<PyAny>> {
333 let iter = self
334 .read_records::<dbn::Cmbp1Msg>(
335 &filepath,
336 instrument_id,
337 price_precision,
338 include_trades.unwrap_or(false),
339 None,
340 )
341 .map_err(to_pyvalue_err)?;
342
343 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
344 }
345
346 #[pyo3(name = "load_cbbo_quotes")]
348 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
349 fn py_load_cbbo_quotes(
350 &self,
351 filepath: PathBuf,
352 instrument_id: Option<InstrumentId>,
353 price_precision: Option<u8>,
354 ) -> PyResult<Vec<QuoteTick>> {
355 self.load_cbbo_quotes(&filepath, instrument_id, price_precision)
356 .map_err(to_pyvalue_err)
357 }
358
359 #[pyo3(name = "load_cbbo_quotes_as_pycapsule")]
360 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
361 fn py_load_cbbo_quotes_as_pycapsule(
362 &self,
363 py: Python,
364 filepath: PathBuf,
365 instrument_id: Option<InstrumentId>,
366 price_precision: Option<u8>,
367 ) -> PyResult<Py<PyAny>> {
368 let iter = self
369 .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
370 .map_err(to_pyvalue_err)?;
371
372 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
373 }
374
375 #[pyo3(name = "load_tbbo_trades")]
377 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
378 fn py_load_tbbo_trades(
379 &self,
380 filepath: PathBuf,
381 instrument_id: Option<InstrumentId>,
382 price_precision: Option<u8>,
383 ) -> PyResult<Vec<TradeTick>> {
384 self.load_tbbo_trades(&filepath, instrument_id, price_precision)
385 .map_err(to_pyvalue_err)
386 }
387
388 #[pyo3(name = "load_tbbo_trades_as_pycapsule")]
389 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
390 fn py_load_tbbo_trades_as_pycapsule(
391 &self,
392 py: Python,
393 filepath: PathBuf,
394 instrument_id: Option<InstrumentId>,
395 price_precision: Option<u8>,
396 ) -> PyResult<Py<PyAny>> {
397 let iter = self
398 .read_records::<dbn::TbboMsg>(&filepath, instrument_id, price_precision, false, None)
399 .map_err(to_pyvalue_err)?;
400
401 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
402 }
403
404 #[pyo3(name = "load_tcbbo_trades")]
406 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
407 fn py_load_tcbbo_trades(
408 &self,
409 filepath: PathBuf,
410 instrument_id: Option<InstrumentId>,
411 price_precision: Option<u8>,
412 ) -> PyResult<Vec<TradeTick>> {
413 self.load_tcbbo_trades(&filepath, instrument_id, price_precision)
414 .map_err(to_pyvalue_err)
415 }
416
417 #[pyo3(name = "load_tcbbo_trades_as_pycapsule")]
418 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
419 fn py_load_tcbbo_trades_as_pycapsule(
420 &self,
421 py: Python,
422 filepath: PathBuf,
423 instrument_id: Option<InstrumentId>,
424 price_precision: Option<u8>,
425 ) -> PyResult<Py<PyAny>> {
426 let iter = self
427 .read_records::<dbn::CbboMsg>(&filepath, instrument_id, price_precision, false, None)
428 .map_err(to_pyvalue_err)?;
429
430 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
431 }
432
433 #[pyo3(name = "load_trades")]
435 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
436 fn py_load_trades(
437 &self,
438 filepath: PathBuf,
439 instrument_id: Option<InstrumentId>,
440 price_precision: Option<u8>,
441 ) -> PyResult<Vec<TradeTick>> {
442 self.load_trades(&filepath, instrument_id, price_precision)
443 .map_err(to_pyvalue_err)
444 }
445
446 #[pyo3(name = "load_trades_as_pycapsule")]
447 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
448 fn py_load_trades_as_pycapsule(
449 &self,
450 py: Python,
451 filepath: PathBuf,
452 instrument_id: Option<InstrumentId>,
453 price_precision: Option<u8>,
454 ) -> PyResult<Py<PyAny>> {
455 let iter = self
456 .read_records::<dbn::TradeMsg>(&filepath, instrument_id, price_precision, false, None)
457 .map_err(to_pyvalue_err)?;
458
459 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
460 }
461
462 #[pyo3(name = "load_bars")]
464 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
465 fn py_load_bars(
466 &self,
467 filepath: PathBuf,
468 instrument_id: Option<InstrumentId>,
469 price_precision: Option<u8>,
470 timestamp_on_close: bool,
471 ) -> PyResult<Vec<Bar>> {
472 self.load_bars(
473 &filepath,
474 instrument_id,
475 price_precision,
476 Some(timestamp_on_close),
477 )
478 .map_err(to_pyvalue_err)
479 }
480
481 #[pyo3(name = "load_bars_as_pycapsule")]
482 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None, timestamp_on_close=true))]
483 fn py_load_bars_as_pycapsule(
484 &self,
485 py: Python,
486 filepath: PathBuf,
487 instrument_id: Option<InstrumentId>,
488 price_precision: Option<u8>,
489 timestamp_on_close: bool,
490 ) -> PyResult<Py<PyAny>> {
491 let iter = self
492 .read_records::<dbn::OhlcvMsg>(
493 &filepath,
494 instrument_id,
495 price_precision,
496 false,
497 Some(timestamp_on_close),
498 )
499 .map_err(to_pyvalue_err)?;
500
501 exhaust_data_iter_to_pycapsule(py, iter).map_err(to_pyvalue_err)
502 }
503
504 #[pyo3(name = "load_status")]
505 #[pyo3(signature = (filepath, instrument_id=None))]
506 fn py_load_status(
507 &self,
508 filepath: PathBuf,
509 instrument_id: Option<InstrumentId>,
510 ) -> PyResult<Vec<InstrumentStatus>> {
511 let iter = self
512 .load_status_records::<dbn::StatusMsg>(&filepath, instrument_id)
513 .map_err(to_pyvalue_err)?;
514
515 let mut data = Vec::new();
516
517 for result in iter {
518 match result {
519 Ok(item) => data.push(item),
520 Err(e) => return Err(to_pyvalue_err(e)),
521 }
522 }
523
524 Ok(data)
525 }
526
527 #[pyo3(name = "load_imbalance")]
528 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
529 fn py_load_imbalance(
530 &self,
531 filepath: PathBuf,
532 instrument_id: Option<InstrumentId>,
533 price_precision: Option<u8>,
534 ) -> PyResult<Vec<DatabentoImbalance>> {
535 let iter = self
536 .read_imbalance_records::<dbn::ImbalanceMsg>(&filepath, instrument_id, price_precision)
537 .map_err(to_pyvalue_err)?;
538
539 let mut data = Vec::new();
540
541 for result in iter {
542 match result {
543 Ok(item) => data.push(item),
544 Err(e) => return Err(to_pyvalue_err(e)),
545 }
546 }
547
548 Ok(data)
549 }
550
551 #[pyo3(name = "load_statistics")]
552 #[pyo3(signature = (filepath, instrument_id=None, price_precision=None))]
553 fn py_load_statistics(
554 &self,
555 filepath: PathBuf,
556 instrument_id: Option<InstrumentId>,
557 price_precision: Option<u8>,
558 ) -> PyResult<Vec<DatabentoStatistics>> {
559 let iter = self
560 .read_statistics_records::<dbn::StatMsg>(&filepath, instrument_id, price_precision)
561 .map_err(to_pyvalue_err)?;
562
563 let mut data = Vec::new();
564
565 for result in iter {
566 match result {
567 Ok(item) => data.push(item),
568 Err(e) => return Err(to_pyvalue_err(e)),
569 }
570 }
571
572 Ok(data)
573 }
574}
575
576fn exhaust_data_iter_to_pycapsule(
577 py: Python,
578 iter: impl Iterator<Item = anyhow::Result<(Option<Data>, Option<Data>)>>,
579) -> anyhow::Result<Py<PyAny>> {
580 let mut data = Vec::new();
581
582 for result in iter {
583 match result {
584 Ok((Some(item1), None)) => data.push(item1),
585 Ok((None, Some(item2))) => data.push(item2),
586 Ok((Some(item1), Some(item2))) => {
587 data.push(item1);
588 data.push(item2);
589 }
590 Ok((None, None)) => {}
591 Err(e) => return Err(e),
592 }
593 }
594
595 let ffi_data: Vec<DataFFI> = data
596 .into_iter()
597 .map(DataFFI::try_from)
598 .collect::<Result<Vec<_>, _>>()
599 .map_err(to_pyvalue_err)?;
600 let cvec: CVec = ffi_data.into();
601 let capsule = PyCapsule::new_with_destructor::<CVec, _>(py, cvec, None, |_, _| {})?;
603
604 Ok(capsule.into_py_any_unwrap(py))
607}