1use std::{
22 fmt::Debug,
23 path::PathBuf,
24 str::FromStr,
25 sync::{
26 Arc, Mutex,
27 atomic::{AtomicBool, Ordering},
28 },
29};
30
31use ahash::AHashMap;
32use databento::{dbn, live::Subscription};
33use indexmap::IndexMap;
34use nautilus_common::{
35 clients::DataClient,
36 live::{runner::get_data_event_sender, runtime::get_runtime},
37 messages::{
38 DataEvent, DataResponse,
39 data::{
40 BarsResponse, BookDeltasResponse, BookDepthResponse, InstrumentResponse,
41 InstrumentsResponse, QuotesResponse, RequestBars, RequestBookDeltas, RequestBookDepth,
42 RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
43 SubscribeBookDeltas, SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes,
44 SubscribeTrades, TradesResponse, UnsubscribeBookDeltas, UnsubscribeInstrumentStatus,
45 UnsubscribeQuotes, UnsubscribeTrades,
46 },
47 },
48};
49use nautilus_core::{
50 AtomicMap, MUTEX_POISONED, Params, UnixNanos,
51 datetime::{NANOSECONDS_IN_DAY, datetime_to_unix_nanos},
52 string::secret::REDACTED,
53 time::{AtomicTime, get_atomic_clock_realtime},
54};
55use nautilus_model::{
56 data::{CustomData, Data},
57 enums::BarAggregation,
58 identifiers::{ClientId, InstrumentId, Symbol, Venue},
59 instruments::{Instrument, InstrumentAny},
60};
61use tokio::task::JoinHandle;
62use tokio_util::sync::CancellationToken;
63
64use crate::{
65 common::{Credential, DATABENTO_VENUE},
66 historical::{DatabentoHistoricalClient, RangeQueryParams},
67 live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
68 loader::DatabentoDataLoader,
69 symbology::instrument_id_to_symbol_string,
70 types::{Dataset, PublisherId},
71};
72
73const PRICE_PRECISION_PARAM: &str = "price_precision";
74const SCHEMA_PARAM: &str = "schema";
75const QUOTE_SCHEMAS: &[dbn::Schema] = &[
76 dbn::Schema::Mbp1,
77 dbn::Schema::Bbo1S,
78 dbn::Schema::Bbo1M,
79 dbn::Schema::Cmbp1,
80 dbn::Schema::Cbbo1S,
81 dbn::Schema::Cbbo1M,
82 dbn::Schema::Tbbo,
83 dbn::Schema::Tcbbo,
84];
85const TRADE_SCHEMAS: &[dbn::Schema] = &[
86 dbn::Schema::Trades,
87 dbn::Schema::Tbbo,
88 dbn::Schema::Tcbbo,
89 dbn::Schema::Mbp1,
90 dbn::Schema::Cmbp1,
91];
92
93#[derive(Clone)]
95pub struct DatabentoDataClientConfig {
96 pub(crate) credential: Credential,
98 pub publishers_filepath: PathBuf,
100 pub venue_dataset_map: IndexMap<String, String>,
102 pub use_exchange_as_venue: bool,
104 pub bars_timestamp_on_close: bool,
106 pub reconnect_timeout_mins: Option<u64>,
108}
109
110impl Debug for DatabentoDataClientConfig {
111 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112 f.debug_struct(stringify!(DatabentoDataClientConfig))
113 .field("credential", &REDACTED)
114 .field("publishers_filepath", &self.publishers_filepath)
115 .field("venue_dataset_map", &self.venue_dataset_map)
116 .field("use_exchange_as_venue", &self.use_exchange_as_venue)
117 .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
118 .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
119 .finish()
120 }
121}
122
123impl DatabentoDataClientConfig {
124 #[must_use]
126 pub fn new(
127 api_key: impl Into<String>,
128 publishers_filepath: PathBuf,
129 use_exchange_as_venue: bool,
130 bars_timestamp_on_close: bool,
131 ) -> Self {
132 Self {
133 credential: Credential::new(api_key),
134 publishers_filepath,
135 venue_dataset_map: IndexMap::new(),
136 use_exchange_as_venue,
137 bars_timestamp_on_close,
138 reconnect_timeout_mins: Some(10), }
140 }
141
142 #[must_use]
144 pub fn api_key(&self) -> &str {
145 self.credential.api_key()
146 }
147
148 #[must_use]
150 pub fn api_key_masked(&self) -> String {
151 self.credential.api_key_masked()
152 }
153}
154
155#[cfg_attr(feature = "python", pyo3::pyclass)]
161#[cfg_attr(
162 feature = "python",
163 pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.adapters.databento")
164)]
165#[derive(Debug)]
166pub struct DatabentoDataClient {
167 client_id: ClientId,
169 config: DatabentoDataClientConfig,
171 is_connected: AtomicBool,
173 historical: DatabentoHistoricalClient,
175 loader: DatabentoDataLoader,
177 cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>>,
179 task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
181 cancellation_token: CancellationToken,
183 publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
185 symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
187 data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
189}
190
191impl DatabentoDataClient {
192 pub fn new(
198 client_id: ClientId,
199 config: DatabentoDataClientConfig,
200 clock: &'static AtomicTime,
201 ) -> anyhow::Result<Self> {
202 let historical = DatabentoHistoricalClient::new(
203 config.credential.clone(),
204 config.publishers_filepath.clone(),
205 clock,
206 config.use_exchange_as_venue,
207 )?;
208
209 let mut loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
211 for (venue, dataset) in &config.venue_dataset_map {
212 loader.set_dataset_for_venue(
213 Dataset::from(dataset.as_str()),
214 Venue::from(venue.as_str()),
215 );
216 }
217
218 let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
220 let publishers_vec: Vec<crate::types::DatabentoPublisher> =
221 serde_json::from_str(&file_content)?;
222
223 let publisher_venue_map = publishers_vec
224 .into_iter()
225 .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
226 .collect::<IndexMap<u16, Venue>>();
227
228 let data_sender = get_data_event_sender();
229
230 Ok(Self {
231 client_id,
232 config,
233 is_connected: AtomicBool::new(false),
234 historical,
235 loader,
236 cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
237 task_handles: Arc::new(Mutex::new(Vec::new())),
238 cancellation_token: CancellationToken::new(),
239 publisher_venue_map: Arc::new(publisher_venue_map),
240 symbol_venue_map: Arc::new(AtomicMap::new()),
241 data_sender,
242 })
243 }
244
245 fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
251 self.loader
252 .get_dataset_for_venue(&venue)
253 .map(ToString::to_string)
254 .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
255 }
256
257 fn get_or_create_feed_handler(&self, dataset: &str) -> bool {
259 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
260
261 if !channels.contains_key(dataset) {
262 log::debug!("Creating new feed handler for dataset: {dataset}");
263 let cmd_tx = self.initialize_live_feed(dataset.to_string());
264 channels.insert(dataset.to_string(), cmd_tx);
265
266 log::debug!("Feed handler created for dataset: {dataset}, channel stored");
267 return true;
268 }
269
270 false
271 }
272
273 fn send_subscription_to_dataset(
274 &self,
275 dataset: &str,
276 price_precision: Option<(Symbol, u8)>,
277 subscription: Subscription,
278 start_after_subscribe: bool,
279 ) -> anyhow::Result<()> {
280 let tx = {
281 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
282 channels
283 .get(dataset)
284 .cloned()
285 .ok_or_else(|| anyhow::anyhow!("No feed handler found for dataset: {dataset}"))?
286 };
287
288 send_subscription_commands(
289 &tx,
290 dataset,
291 price_precision,
292 subscription,
293 start_after_subscribe,
294 )
295 }
296
297 fn send_close_to_active_feeds(&self) {
298 let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
299 for (dataset, tx) in channels.iter() {
300 if let Err(e) = tx.send(HandlerCommand::Close) {
301 log::warn!("Failed to send close command to dataset {dataset}: {e}");
302 }
303 }
304 }
305
306 fn clear_feed_channels(&self) {
307 let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
308 channels.clear();
309 }
310
311 fn abort_active_tasks(&self) {
312 let handles = {
313 let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
314 std::mem::take(&mut *task_handles)
315 };
316
317 for handle in handles {
318 handle.abort();
319 }
320 }
321
322 fn initialize_live_feed(
324 &self,
325 dataset: String,
326 ) -> tokio::sync::mpsc::UnboundedSender<HandlerCommand> {
327 let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
328 let (msg_tx, msg_rx) = tokio::sync::mpsc::unbounded_channel();
329 let feed_dataset = dataset.clone();
330 let feed_channels = self.cmd_channels.clone();
331
332 let mut feed_handler = DatabentoFeedHandler::new(
333 self.config.credential.clone(),
334 dataset,
335 cmd_rx,
336 msg_tx,
337 (*self.publisher_venue_map).clone(),
338 self.symbol_venue_map.clone(),
339 self.config.use_exchange_as_venue,
340 self.config.bars_timestamp_on_close,
341 self.config.reconnect_timeout_mins,
342 );
343
344 let feed_handle = get_runtime().spawn(async move {
345 if let Err(e) = feed_handler.run().await {
346 log::error!("Feed handler error: {e}");
347 }
348 feed_channels
349 .lock()
350 .expect(MUTEX_POISONED)
351 .remove(&feed_dataset);
352 });
353
354 let cancellation_token = self.cancellation_token.clone();
355 let data_sender = self.data_sender.clone();
356
357 let msg_handle = get_runtime().spawn(async move {
359 let mut msg_rx = msg_rx;
360
361 loop {
362 tokio::select! {
363 msg = msg_rx.recv() => {
364 match msg {
365 Some(DatabentoMessage::Data(data)) => {
366 log::debug!("Received data: {data:?}");
367 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
368 log::error!("Failed to send data event: {e}");
369 }
370 }
371 Some(DatabentoMessage::Instrument(instrument)) => {
372 log::debug!("Received instrument definition: {}", instrument.id());
373 if let Err(e) = data_sender.send(DataEvent::Instrument(*instrument)) {
374 log::error!("Failed to send instrument: {e}");
375 }
376 }
377 Some(DatabentoMessage::Status(status)) => {
378 log::debug!("Received status: {status:?}");
379 if let Err(e) =
380 data_sender.send(DataEvent::Data(Data::InstrumentStatus(status)))
381 {
382 log::error!("Failed to send status data event: {e}");
383 }
384 }
385 Some(DatabentoMessage::Imbalance(imbalance)) => {
386 log::debug!("Received imbalance: {imbalance:?}");
387 let data = Data::Custom(CustomData::from_arc(Arc::new(imbalance)));
388 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
389 log::error!("Failed to send imbalance data event: {e}");
390 }
391 }
392 Some(DatabentoMessage::Statistics(statistics)) => {
393 log::debug!("Received statistics: {statistics:?}");
394 let data = Data::Custom(CustomData::from_arc(Arc::new(statistics)));
395 if let Err(e) = data_sender.send(DataEvent::Data(data)) {
396 log::error!("Failed to send statistics data event: {e}");
397 }
398 }
399 Some(DatabentoMessage::SubscriptionAck(ack)) => {
400 log::debug!("Received subscription ack: {}", ack.message);
401 }
402 Some(DatabentoMessage::Error(error)) => {
403 log::error!("Feed handler error: {error}");
404 }
405 Some(DatabentoMessage::Close) => {
406 log::debug!("Feed handler closed");
407 break;
408 }
409 None => {
410 log::debug!("Message channel closed");
411 break;
412 }
413 }
414 }
415 () = cancellation_token.cancelled() => {
416 log::debug!("Message processing cancelled");
417 break;
418 }
419 }
420 }
421 });
422
423 {
424 let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
425 handles.push(feed_handle);
426 handles.push(msg_handle);
427 }
428
429 cmd_tx
430 }
431}
432
433#[async_trait::async_trait(?Send)]
434impl DataClient for DatabentoDataClient {
435 fn client_id(&self) -> ClientId {
437 self.client_id
438 }
439
440 fn venue(&self) -> Option<Venue> {
442 None
443 }
444
445 fn start(&mut self) -> anyhow::Result<()> {
451 log::debug!("Starting");
452 Ok(())
453 }
454
455 fn stop(&mut self) -> anyhow::Result<()> {
461 log::debug!("Stopping");
462
463 self.send_close_to_active_feeds();
464 self.clear_feed_channels();
465 self.cancellation_token.cancel();
466 self.abort_active_tasks();
467
468 self.cancellation_token = CancellationToken::new();
469
470 self.is_connected.store(false, Ordering::Relaxed);
471 Ok(())
472 }
473
474 fn reset(&mut self) -> anyhow::Result<()> {
475 log::debug!("Resetting");
476 self.is_connected.store(false, Ordering::Relaxed);
477 Ok(())
478 }
479
480 fn dispose(&mut self) -> anyhow::Result<()> {
481 log::debug!("Disposing");
482 self.stop()
483 }
484
485 async fn connect(&mut self) -> anyhow::Result<()> {
486 log::debug!("Connecting...");
487
488 if self.cancellation_token.is_cancelled() {
489 self.cancellation_token = CancellationToken::new();
490 }
491
492 self.is_connected.store(true, Ordering::Relaxed);
493
494 log::info!("Connected");
495 Ok(())
496 }
497
498 async fn disconnect(&mut self) -> anyhow::Result<()> {
499 log::debug!("Disconnecting...");
500
501 self.send_close_to_active_feeds();
502 self.clear_feed_channels();
503
504 let handles = {
505 let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
506 std::mem::take(&mut *task_handles)
507 };
508
509 for handle in handles {
510 if let Err(e) = handle.await
511 && !e.is_cancelled()
512 {
513 log::error!("Task join error: {e}");
514 }
515 }
516
517 self.is_connected.store(false, Ordering::Relaxed);
518 self.cancellation_token = CancellationToken::new();
519
520 log::info!("Disconnected");
521 Ok(())
522 }
523
524 fn is_connected(&self) -> bool {
526 self.is_connected.load(Ordering::Relaxed)
527 }
528
529 fn is_disconnected(&self) -> bool {
530 !self.is_connected()
531 }
532
533 fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
539 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
540 let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
541
542 self.symbol_venue_map
543 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
544 let symbol = cmd.instrument_id.symbol.to_string();
545
546 let subscription = Subscription::builder()
547 .schema(databento::dbn::Schema::Definition)
548 .symbols(symbol)
549 .build();
550
551 self.send_subscription_to_dataset(&dataset, None, subscription, start_after_subscribe)?;
552
553 Ok(())
554 }
555
556 fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
562 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
563 let symbol = cmd.instrument_id.symbol.to_string();
564 let price_precision = price_precision_from_params(cmd.params.as_ref())?
565 .map(|precision| (cmd.instrument_id.symbol, precision));
566 let schema = schema_from_params(cmd.params.as_ref(), dbn::Schema::Mbp1, QUOTE_SCHEMAS)?;
567
568 let subscription = Subscription::builder()
569 .schema(schema)
570 .symbols(symbol)
571 .build();
572
573 let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
574 self.symbol_venue_map
575 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
576
577 self.send_subscription_to_dataset(
578 &dataset,
579 price_precision,
580 subscription,
581 start_after_subscribe,
582 )?;
583
584 Ok(())
585 }
586
587 fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
593 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
594 let symbol = cmd.instrument_id.symbol.to_string();
595 let price_precision = price_precision_from_params(cmd.params.as_ref())?
596 .map(|precision| (cmd.instrument_id.symbol, precision));
597 let schema = schema_from_params(cmd.params.as_ref(), dbn::Schema::Trades, TRADE_SCHEMAS)?;
598
599 let subscription = Subscription::builder()
600 .schema(schema)
601 .symbols(symbol)
602 .build();
603
604 let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
605 self.symbol_venue_map
606 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
607
608 self.send_subscription_to_dataset(
609 &dataset,
610 price_precision,
611 subscription,
612 start_after_subscribe,
613 )?;
614
615 Ok(())
616 }
617
618 fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
624 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
625 let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
626
627 self.symbol_venue_map
628 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
629 let symbol = cmd.instrument_id.symbol.to_string();
630
631 let subscription = Subscription::builder()
632 .schema(databento::dbn::Schema::Mbo) .symbols(symbol)
634 .build();
635
636 self.send_subscription_to_dataset(&dataset, None, subscription, start_after_subscribe)?;
637
638 Ok(())
639 }
640
641 fn subscribe_instrument_status(
647 &mut self,
648 cmd: SubscribeInstrumentStatus,
649 ) -> anyhow::Result<()> {
650 let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
651 let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
652
653 self.symbol_venue_map
654 .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
655 let symbol = cmd.instrument_id.symbol.to_string();
656
657 let subscription = Subscription::builder()
658 .schema(databento::dbn::Schema::Status)
659 .symbols(symbol)
660 .build();
661
662 self.send_subscription_to_dataset(&dataset, None, subscription, start_after_subscribe)?;
663
664 Ok(())
665 }
666
667 fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
669 log::warn!(
673 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
674 cmd.instrument_id
675 );
676
677 Ok(())
678 }
679
680 fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
681 log::warn!(
685 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
686 cmd.instrument_id
687 );
688
689 Ok(())
690 }
691
692 fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
693 log::warn!(
697 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
698 cmd.instrument_id
699 );
700
701 Ok(())
702 }
703
704 fn unsubscribe_instrument_status(
705 &mut self,
706 cmd: &UnsubscribeInstrumentStatus,
707 ) -> anyhow::Result<()> {
708 log::warn!(
712 "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
713 cmd.instrument_id
714 );
715
716 Ok(())
717 }
718
719 fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
720 log::debug!("Request instruments: {request:?}");
721
722 let historical_client = self.historical.clone();
723 let data_sender = self.data_sender.clone();
724 let dataset = request
725 .venue
726 .map(|venue| self.get_dataset_for_venue(venue))
727 .transpose()?
728 .unwrap_or_else(|| "GLBX.MDP3".to_string());
729 let request_id = request.request_id;
730 let client_id = request.client_id.unwrap_or(self.client_id);
731 let venue = request.venue.unwrap_or(*DATABENTO_VENUE);
732 let start_nanos = datetime_to_unix_nanos(request.start);
733 let end_nanos = datetime_to_unix_nanos(request.end);
734 let request_params = request.params;
735 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
736
737 get_runtime().spawn(async move {
738 let query_params = instruments_query_params(dataset, query_start, query_end);
739
740 match historical_client.get_range_instruments(query_params).await {
741 Ok(instruments) => {
742 log::debug!("Retrieved {} instruments", instruments.len());
743
744 let response = DataResponse::Instruments(InstrumentsResponse::new(
745 request_id,
746 client_id,
747 venue,
748 instruments,
749 start_nanos,
750 end_nanos,
751 get_atomic_clock_realtime().get_time_ns(),
752 request_params,
753 ));
754
755 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
756 log::error!("Failed to send instruments response: {e}");
757 }
758 }
759 Err(e) => {
760 log::error!("Failed to request instruments: {e}");
761 let response = DataResponse::Instruments(InstrumentsResponse::new(
762 request_id,
763 client_id,
764 venue,
765 Vec::new(),
766 start_nanos,
767 end_nanos,
768 get_atomic_clock_realtime().get_time_ns(),
769 request_params,
770 ));
771
772 send_data_response(&data_sender, response, "empty instruments");
773 }
774 }
775 });
776
777 Ok(())
778 }
779
780 fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
781 log::debug!("Request instrument: {request:?}");
782
783 let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
784 let historical_client = self.historical.clone();
785 let data_sender = self.data_sender.clone();
786 let instrument_id = request.instrument_id;
787 let request_id = request.request_id;
788 let client_id = request.client_id.unwrap_or(self.client_id);
789 let start_nanos = datetime_to_unix_nanos(request.start);
790 let end_nanos = datetime_to_unix_nanos(request.end);
791 let request_params = request.params;
792 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
793
794 get_runtime().spawn(async move {
795 let query_params =
796 instrument_query_params(dataset, instrument_id, query_start, query_end);
797
798 match historical_client.get_range_instruments(query_params).await {
799 Ok(instruments) => {
800 let instrument = requested_instrument(instruments, instrument_id);
801
802 let Some(instrument) = instrument else {
803 log::error!("Instrument not found: {instrument_id}");
804 return;
805 };
806
807 let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
808 request_id,
809 client_id,
810 instrument.id(),
811 instrument,
812 start_nanos,
813 end_nanos,
814 get_atomic_clock_realtime().get_time_ns(),
815 request_params,
816 )));
817
818 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
819 log::error!("Failed to send instrument response: {e}");
820 }
821 }
822 Err(e) => {
823 log::error!("Failed to request instrument {instrument_id}: {e}");
824 }
825 }
826 });
827
828 Ok(())
829 }
830
831 fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
832 log::debug!("Request quotes: {request:?}");
833
834 let historical_client = self.historical.clone();
835 let data_sender = self.data_sender.clone();
836 let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
837 let instrument_id = request.instrument_id;
838 let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
839 let request_id = request.request_id;
840 let client_id = request.client_id.unwrap_or(self.client_id);
841 let start_nanos = datetime_to_unix_nanos(request.start);
842 let end_nanos = datetime_to_unix_nanos(request.end);
843 let limit = request.limit.map(|limit| limit.get() as u64);
844 let request_params = request.params;
845 let price_precision = price_precision_from_params(request_params.as_ref())?;
846 let schema = schema_from_params(request_params.as_ref(), dbn::Schema::Mbp1, QUOTE_SCHEMAS)?
847 .to_string();
848 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
849
850 get_runtime().spawn(async move {
851 seed_price_precision_if_needed(
852 &historical_client,
853 dataset.as_str(),
854 instrument_id,
855 query_start,
856 query_end,
857 price_precision,
858 )
859 .await;
860
861 let params = RangeQueryParams {
862 dataset,
863 symbols,
864 start: query_start,
865 end: query_end,
866 limit,
867 price_precision,
868 };
869
870 match historical_client
871 .get_range_quotes(params, Some(schema))
872 .await
873 {
874 Ok(quotes) => {
875 log::debug!("Retrieved {} quotes", quotes.len());
876 let response = DataResponse::Quotes(QuotesResponse::new(
877 request_id,
878 client_id,
879 instrument_id,
880 quotes,
881 start_nanos,
882 end_nanos,
883 get_atomic_clock_realtime().get_time_ns(),
884 request_params,
885 ));
886
887 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
888 log::error!("Failed to send quotes response: {e}");
889 }
890 }
891 Err(e) => {
892 log::error!("Failed to request quotes: {e}");
893 let response = DataResponse::Quotes(QuotesResponse::new(
894 request_id,
895 client_id,
896 instrument_id,
897 Vec::new(),
898 start_nanos,
899 end_nanos,
900 get_atomic_clock_realtime().get_time_ns(),
901 request_params,
902 ));
903
904 send_data_response(&data_sender, response, "empty quotes");
905 }
906 }
907 });
908
909 Ok(())
910 }
911
912 fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
913 log::debug!("Request trades: {request:?}");
914
915 let historical_client = self.historical.clone();
916 let data_sender = self.data_sender.clone();
917 let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
918 let instrument_id = request.instrument_id;
919 let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
920 let request_id = request.request_id;
921 let client_id = request.client_id.unwrap_or(self.client_id);
922 let start_nanos = datetime_to_unix_nanos(request.start);
923 let end_nanos = datetime_to_unix_nanos(request.end);
924 let limit = request.limit.map(|limit| limit.get() as u64);
925 let request_params = request.params;
926 let price_precision = price_precision_from_params(request_params.as_ref())?;
927 let schema =
928 schema_from_params(request_params.as_ref(), dbn::Schema::Trades, TRADE_SCHEMAS)?
929 .to_string();
930 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
931
932 get_runtime().spawn(async move {
933 seed_price_precision_if_needed(
934 &historical_client,
935 dataset.as_str(),
936 instrument_id,
937 query_start,
938 query_end,
939 price_precision,
940 )
941 .await;
942
943 let params = RangeQueryParams {
944 dataset,
945 symbols,
946 start: query_start,
947 end: query_end,
948 limit,
949 price_precision,
950 };
951
952 match historical_client
953 .get_range_trades(params, Some(schema))
954 .await
955 {
956 Ok(trades) => {
957 log::debug!("Retrieved {} trades", trades.len());
958 let response = DataResponse::Trades(TradesResponse::new(
959 request_id,
960 client_id,
961 instrument_id,
962 trades,
963 start_nanos,
964 end_nanos,
965 get_atomic_clock_realtime().get_time_ns(),
966 request_params,
967 ));
968
969 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
970 log::error!("Failed to send trades response: {e}");
971 }
972 }
973 Err(e) => {
974 log::error!("Failed to request trades: {e}");
975 let response = DataResponse::Trades(TradesResponse::new(
976 request_id,
977 client_id,
978 instrument_id,
979 Vec::new(),
980 start_nanos,
981 end_nanos,
982 get_atomic_clock_realtime().get_time_ns(),
983 request_params,
984 ));
985
986 send_data_response(&data_sender, response, "empty trades");
987 }
988 }
989 });
990
991 Ok(())
992 }
993
994 fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
995 log::debug!("Request bars: {request:?}");
996
997 let historical_client = self.historical.clone();
998 let data_sender = self.data_sender.clone();
999 let instrument_id = request.bar_type.instrument_id();
1000 let dataset = self.get_dataset_for_venue(instrument_id.venue)?;
1001 let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
1002 let request_id = request.request_id;
1003 let client_id = request.client_id.unwrap_or(self.client_id);
1004 let bar_type = request.bar_type;
1005 let start_nanos = datetime_to_unix_nanos(request.start);
1006 let end_nanos = datetime_to_unix_nanos(request.end);
1007 let limit = request.limit.map(|limit| limit.get() as u64);
1008 let request_params = request.params;
1009 let price_precision = price_precision_from_params(request_params.as_ref())?;
1010 let timestamp_on_close = self.config.bars_timestamp_on_close;
1011 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
1012
1013 get_runtime().spawn(async move {
1014 seed_price_precision_if_needed(
1015 &historical_client,
1016 dataset.as_str(),
1017 instrument_id,
1018 query_start,
1019 query_end,
1020 price_precision,
1021 )
1022 .await;
1023
1024 let params = RangeQueryParams {
1025 dataset,
1026 symbols,
1027 start: query_start,
1028 end: query_end,
1029 limit,
1030 price_precision,
1031 };
1032
1033 let aggregation = match bar_type.spec().aggregation {
1034 BarAggregation::Second => BarAggregation::Second,
1035 BarAggregation::Minute => BarAggregation::Minute,
1036 BarAggregation::Hour => BarAggregation::Hour,
1037 BarAggregation::Day => BarAggregation::Day,
1038 _ => {
1039 log::error!(
1040 "Unsupported bar aggregation: {:?}",
1041 bar_type.spec().aggregation
1042 );
1043 let response = DataResponse::Bars(BarsResponse::new(
1044 request_id,
1045 client_id,
1046 bar_type,
1047 Vec::new(),
1048 start_nanos,
1049 end_nanos,
1050 get_atomic_clock_realtime().get_time_ns(),
1051 request_params,
1052 ));
1053
1054 send_data_response(&data_sender, response, "empty bars");
1055 return;
1056 }
1057 };
1058
1059 match historical_client
1060 .get_range_bars(params, aggregation, timestamp_on_close)
1061 .await
1062 {
1063 Ok(bars) => {
1064 log::debug!("Retrieved {} bars", bars.len());
1065 let response = DataResponse::Bars(BarsResponse::new(
1066 request_id,
1067 client_id,
1068 bar_type,
1069 bars,
1070 start_nanos,
1071 end_nanos,
1072 get_atomic_clock_realtime().get_time_ns(),
1073 request_params,
1074 ));
1075
1076 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
1077 log::error!("Failed to send bars response: {e}");
1078 }
1079 }
1080 Err(e) => {
1081 log::error!("Failed to request bars: {e}");
1082 let response = DataResponse::Bars(BarsResponse::new(
1083 request_id,
1084 client_id,
1085 bar_type,
1086 Vec::new(),
1087 start_nanos,
1088 end_nanos,
1089 get_atomic_clock_realtime().get_time_ns(),
1090 request_params,
1091 ));
1092
1093 send_data_response(&data_sender, response, "empty bars");
1094 }
1095 }
1096 });
1097
1098 Ok(())
1099 }
1100
1101 fn request_book_depth(&self, request: RequestBookDepth) -> anyhow::Result<()> {
1102 log::debug!("Request book depth: {request:?}");
1103
1104 let historical_client = self.historical.clone();
1105 let data_sender = self.data_sender.clone();
1106 let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
1107 let instrument_id = request.instrument_id;
1108 let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
1109 let request_id = request.request_id;
1110 let client_id = request.client_id.unwrap_or(self.client_id);
1111 let start_nanos = datetime_to_unix_nanos(request.start);
1112 let end_nanos = datetime_to_unix_nanos(request.end);
1113 let limit = request.limit.map(|limit| limit.get() as u64);
1114 let depth = request.depth.map(|depth| depth.get());
1115 let request_params = request.params;
1116 let price_precision = price_precision_from_params(request_params.as_ref())?;
1117 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
1118
1119 get_runtime().spawn(async move {
1120 seed_price_precision_if_needed(
1121 &historical_client,
1122 dataset.as_str(),
1123 instrument_id,
1124 query_start,
1125 query_end,
1126 price_precision,
1127 )
1128 .await;
1129
1130 let params = RangeQueryParams {
1131 dataset,
1132 symbols,
1133 start: query_start,
1134 end: query_end,
1135 limit,
1136 price_precision,
1137 };
1138
1139 match historical_client
1140 .get_range_order_book_depth10(params, depth)
1141 .await
1142 {
1143 Ok(depths) => {
1144 log::debug!("Retrieved {} order book depths", depths.len());
1145 let response = DataResponse::BookDepth(BookDepthResponse::new(
1146 request_id,
1147 client_id,
1148 instrument_id,
1149 depths,
1150 start_nanos,
1151 end_nanos,
1152 get_atomic_clock_realtime().get_time_ns(),
1153 request_params,
1154 ));
1155
1156 send_data_response(&data_sender, response, "book depth");
1157 }
1158 Err(e) => {
1159 log::error!("Failed to request order book depths: {e}");
1160 let response = DataResponse::BookDepth(BookDepthResponse::new(
1161 request_id,
1162 client_id,
1163 instrument_id,
1164 Vec::new(),
1165 start_nanos,
1166 end_nanos,
1167 get_atomic_clock_realtime().get_time_ns(),
1168 request_params,
1169 ));
1170
1171 send_data_response(&data_sender, response, "empty book depth");
1172 }
1173 }
1174 });
1175
1176 Ok(())
1177 }
1178
1179 fn request_book_deltas(&self, request: RequestBookDeltas) -> anyhow::Result<()> {
1180 log::debug!("Request book deltas: {request:?}");
1181
1182 let historical_client = self.historical.clone();
1183 let data_sender = self.data_sender.clone();
1184 let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
1185 let instrument_id = request.instrument_id;
1186 let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
1187 let request_id = request.request_id;
1188 let client_id = request.client_id.unwrap_or(self.client_id);
1189 let start_nanos = datetime_to_unix_nanos(request.start);
1190 let end_nanos = datetime_to_unix_nanos(request.end);
1191 let limit = request.limit.map(|limit| limit.get() as u64);
1192 let request_params = request.params;
1193 let price_precision = price_precision_from_params(request_params.as_ref())?;
1194 let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
1195
1196 get_runtime().spawn(async move {
1197 seed_price_precision_if_needed(
1198 &historical_client,
1199 dataset.as_str(),
1200 instrument_id,
1201 query_start,
1202 query_end,
1203 price_precision,
1204 )
1205 .await;
1206
1207 let params = RangeQueryParams {
1208 dataset,
1209 symbols,
1210 start: query_start,
1211 end: query_end,
1212 limit,
1213 price_precision,
1214 };
1215
1216 match historical_client.get_range_order_book_deltas(params).await {
1217 Ok(deltas) => {
1218 log::debug!("Retrieved {} order book deltas", deltas.len());
1219 let response = DataResponse::BookDeltas(BookDeltasResponse::new(
1220 request_id,
1221 client_id,
1222 instrument_id,
1223 deltas,
1224 start_nanos,
1225 end_nanos,
1226 get_atomic_clock_realtime().get_time_ns(),
1227 request_params,
1228 ));
1229
1230 send_data_response(&data_sender, response, "book deltas");
1231 }
1232 Err(e) => {
1233 log::error!("Failed to request order book deltas: {e}");
1234 let response = DataResponse::BookDeltas(BookDeltasResponse::new(
1235 request_id,
1236 client_id,
1237 instrument_id,
1238 Vec::new(),
1239 start_nanos,
1240 end_nanos,
1241 get_atomic_clock_realtime().get_time_ns(),
1242 request_params,
1243 ));
1244
1245 send_data_response(&data_sender, response, "empty book deltas");
1246 }
1247 }
1248 });
1249
1250 Ok(())
1251 }
1252}
1253
1254fn instruments_query_params(
1255 dataset: String,
1256 start_nanos: UnixNanos,
1257 end_nanos: Option<UnixNanos>,
1258) -> RangeQueryParams {
1259 RangeQueryParams {
1260 dataset,
1261 symbols: vec!["ALL_SYMBOLS".to_string()],
1262 start: start_nanos,
1263 end: end_nanos,
1264 limit: None,
1265 price_precision: None,
1266 }
1267}
1268
1269fn instrument_query_params(
1270 dataset: String,
1271 instrument_id: InstrumentId,
1272 start_nanos: UnixNanos,
1273 end_nanos: Option<UnixNanos>,
1274) -> RangeQueryParams {
1275 RangeQueryParams {
1276 dataset,
1277 symbols: vec![instrument_id_to_symbol_string(
1278 instrument_id,
1279 &mut AHashMap::new(),
1280 )],
1281 start: start_nanos,
1282 end: end_nanos,
1283 limit: None,
1284 price_precision: None,
1285 }
1286}
1287
1288fn resolve_request_time_range(
1289 start_nanos: Option<UnixNanos>,
1290 end_nanos: Option<UnixNanos>,
1291) -> (UnixNanos, Option<UnixNanos>) {
1292 let mut end = end_nanos.unwrap_or_else(|| get_atomic_clock_realtime().get_time_ns());
1293 let mut start = start_nanos.unwrap_or_else(|| start_of_utc_day(end));
1294
1295 if start > end {
1296 start = end;
1297 }
1298
1299 if start == end {
1300 if end.as_u64() > 0 {
1301 start = UnixNanos::from(end.as_u64() - 1);
1302 } else {
1303 end = UnixNanos::from(1);
1304 }
1305 }
1306
1307 (start, Some(end))
1308}
1309
1310fn start_of_utc_day(timestamp: UnixNanos) -> UnixNanos {
1311 UnixNanos::from((timestamp.as_u64() / NANOSECONDS_IN_DAY) * NANOSECONDS_IN_DAY)
1312}
1313
1314async fn seed_price_precision_if_needed(
1315 historical_client: &DatabentoHistoricalClient,
1316 dataset: &str,
1317 instrument_id: InstrumentId,
1318 start_nanos: UnixNanos,
1319 end_nanos: Option<UnixNanos>,
1320 price_precision: Option<u8>,
1321) {
1322 if price_precision.is_some()
1323 || historical_client
1324 .price_precision(instrument_id.symbol)
1325 .is_some()
1326 {
1327 return;
1328 }
1329
1330 let query_params =
1331 instrument_query_params(dataset.to_string(), instrument_id, start_nanos, end_nanos);
1332
1333 if let Err(e) = historical_client.get_range_instruments(query_params).await {
1334 log::warn!("Failed to seed price precision for {instrument_id}: {e}");
1335 }
1336}
1337
1338fn send_data_response(
1339 data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1340 response: DataResponse,
1341 label: &str,
1342) {
1343 if let Err(e) = data_sender.send(DataEvent::Response(response)) {
1344 log::error!("Failed to send {label} response: {e}");
1345 }
1346}
1347
1348fn requested_instrument(
1349 instruments: Vec<InstrumentAny>,
1350 instrument_id: InstrumentId,
1351) -> Option<InstrumentAny> {
1352 instruments
1353 .into_iter()
1354 .rev()
1355 .find(|instrument| instrument.id() == instrument_id)
1356}
1357
1358fn price_precision_from_params(params: Option<&Params>) -> anyhow::Result<Option<u8>> {
1359 let Some(price_precision) = params.and_then(|params| params.get_u64(PRICE_PRECISION_PARAM))
1360 else {
1361 return Ok(None);
1362 };
1363
1364 Ok(Some(u8::try_from(price_precision).map_err(|_| {
1365 anyhow::anyhow!(
1366 "`{PRICE_PRECISION_PARAM}` must be less than or equal to {}",
1367 u8::MAX
1368 )
1369 })?))
1370}
1371
1372fn schema_from_params(
1373 params: Option<&Params>,
1374 default_schema: dbn::Schema,
1375 allowed_schemas: &[dbn::Schema],
1376) -> anyhow::Result<dbn::Schema> {
1377 let schema = if let Some(schema) = params.and_then(|params| params.get_str(SCHEMA_PARAM)) {
1378 dbn::Schema::from_str(schema)?
1379 } else {
1380 default_schema
1381 };
1382
1383 if allowed_schemas.contains(&schema) {
1384 return Ok(schema);
1385 }
1386
1387 let allowed = allowed_schemas
1388 .iter()
1389 .map(dbn::Schema::as_str)
1390 .collect::<Vec<_>>()
1391 .join(", ");
1392 anyhow::bail!(
1393 "Invalid `{SCHEMA_PARAM}` '{}'. Must be one of: {allowed}",
1394 schema.as_str()
1395 );
1396}
1397
1398fn send_subscription_commands(
1399 tx: &tokio::sync::mpsc::UnboundedSender<HandlerCommand>,
1400 dataset: &str,
1401 price_precision: Option<(Symbol, u8)>,
1402 subscription: Subscription,
1403 start_after_subscribe: bool,
1404) -> anyhow::Result<()> {
1405 if let Some((symbol, precision)) = price_precision {
1406 tx.send(HandlerCommand::SetPricePrecision(symbol, precision))
1407 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
1408 }
1409
1410 tx.send(HandlerCommand::Subscribe(subscription))
1411 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
1412
1413 if start_after_subscribe {
1414 tx.send(HandlerCommand::Start)
1415 .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
1416 }
1417
1418 Ok(())
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423 use std::path::PathBuf;
1424
1425 use nautilus_common::live::runner::replace_data_event_sender;
1426 use nautilus_core::UUID4;
1427 use nautilus_model::{
1428 identifiers::{ClientId, InstrumentId},
1429 instruments::{CurrencyPair, InstrumentAny},
1430 types::{Currency, Price, Quantity},
1431 };
1432 use rstest::rstest;
1433 use serde_json::json;
1434
1435 use super::*;
1436
1437 #[derive(Clone, Copy)]
1438 enum SubscribeKind {
1439 Quotes,
1440 Trades,
1441 }
1442
1443 fn currency_pair(instrument_id: &str) -> InstrumentAny {
1444 currency_pair_with_ts_init(instrument_id, UnixNanos::default())
1445 }
1446
1447 fn test_data_client() -> DatabentoDataClient {
1448 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1449 replace_data_event_sender(sender);
1450
1451 let config = DatabentoDataClientConfig::new(
1452 "32-character-with-lots-of-filler",
1453 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json"),
1454 true,
1455 true,
1456 );
1457 DatabentoDataClient::new(
1458 ClientId::from("DATABENTO-TEST"),
1459 config,
1460 get_atomic_clock_realtime(),
1461 )
1462 .expect("test client should initialize")
1463 }
1464
1465 #[rstest]
1466 #[tokio::test]
1467 async fn test_stop_aborts_active_tasks_and_marks_disconnected() {
1468 let mut client = test_data_client();
1469
1470 let handle = tokio::spawn(async { std::future::pending::<()>().await });
1471 {
1472 let mut handles = client.task_handles.lock().expect(MUTEX_POISONED);
1473 handles.push(handle);
1474 }
1475 client.is_connected.store(true, Ordering::Relaxed);
1476
1477 client.stop().unwrap();
1478
1479 assert!(client.task_handles.lock().expect(MUTEX_POISONED).is_empty());
1480 assert!(client.is_disconnected());
1481 }
1482
1483 #[rstest]
1484 #[case("EQUS", "EQUS.PLUS")] #[case("GLBX", "EQUS.MINI")] fn test_venue_dataset_map_overrides_default(#[case] venue: &str, #[case] dataset: &str) {
1487 let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1488 replace_data_event_sender(sender);
1489
1490 let mut config = DatabentoDataClientConfig::new(
1491 "32-character-with-lots-of-filler",
1492 PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json"),
1493 true,
1494 true,
1495 );
1496 config.venue_dataset_map = IndexMap::from([(venue.to_string(), dataset.to_string())]);
1497
1498 let client = DatabentoDataClient::new(
1499 ClientId::from("DATABENTO-TEST"),
1500 config,
1501 get_atomic_clock_realtime(),
1502 )
1503 .expect("test client should initialize");
1504
1505 assert_eq!(
1506 client.get_dataset_for_venue(Venue::from(venue)).unwrap(),
1507 dataset
1508 );
1509
1510 assert_eq!(
1512 client.get_dataset_for_venue(Venue::from("XCBO")).unwrap(),
1513 "OPRA.PILLAR"
1514 );
1515 }
1516
1517 fn subscribe_quotes_cmd(params: Option<Params>) -> SubscribeQuotes {
1518 SubscribeQuotes::new(
1519 InstrumentId::from("ESM4.GLBX"),
1520 Some(ClientId::from("DATABENTO-TEST")),
1521 None,
1522 UUID4::new(),
1523 UnixNanos::default(),
1524 None,
1525 params,
1526 )
1527 }
1528
1529 fn subscribe_trades_cmd(params: Option<Params>) -> SubscribeTrades {
1530 SubscribeTrades::new(
1531 InstrumentId::from("ESM4.GLBX"),
1532 Some(ClientId::from("DATABENTO-TEST")),
1533 None,
1534 UUID4::new(),
1535 UnixNanos::default(),
1536 None,
1537 params,
1538 )
1539 }
1540
1541 fn currency_pair_with_ts_init(instrument_id: &str, ts_init: UnixNanos) -> InstrumentAny {
1542 let instrument_id = InstrumentId::from(instrument_id);
1543 InstrumentAny::CurrencyPair(CurrencyPair::new(
1544 instrument_id,
1545 instrument_id.symbol,
1546 Currency::from("BTC"),
1547 Currency::from("USDT"),
1548 2,
1549 6,
1550 Price::from("0.01"),
1551 Quantity::from("0.000001"),
1552 None,
1553 None,
1554 None,
1555 None,
1556 None,
1557 None,
1558 None,
1559 None,
1560 None,
1561 None,
1562 None,
1563 None,
1564 None,
1565 None,
1566 UnixNanos::default(),
1567 ts_init,
1568 ))
1569 }
1570
1571 #[rstest]
1572 fn test_instruments_query_params_requests_all_symbols() {
1573 let start = UnixNanos::from(1_000_000_000);
1574 let end = UnixNanos::from(2_000_000_000);
1575
1576 let params = instruments_query_params("GLBX.MDP3".to_string(), start, Some(end));
1577
1578 assert_eq!(params.dataset, "GLBX.MDP3");
1579 assert_eq!(params.symbols, vec!["ALL_SYMBOLS"]);
1580 assert_eq!(params.start, start);
1581 assert_eq!(params.end, Some(end));
1582 assert_eq!(params.limit, None);
1583 assert_eq!(params.price_precision, None);
1584 }
1585
1586 #[rstest]
1587 fn test_instrument_query_params_requests_single_symbol() {
1588 let instrument_id = InstrumentId::from("ESM4.GLBX");
1589
1590 let start = UnixNanos::from(1_000_000_000);
1591 let end = UnixNanos::from(2_000_000_000);
1592
1593 let params =
1594 instrument_query_params("GLBX.MDP3".to_string(), instrument_id, start, Some(end));
1595
1596 assert_eq!(params.dataset, "GLBX.MDP3");
1597 assert_eq!(params.symbols, vec!["ESM4"]);
1598 assert_eq!(params.start, start);
1599 assert_eq!(params.end, Some(end));
1600 assert_eq!(params.limit, None);
1601 assert_eq!(params.price_precision, None);
1602 }
1603
1604 #[rstest]
1605 fn test_resolve_request_time_range_defaults_to_end_day() {
1606 let end = UnixNanos::from(1_706_443_200_000_000_001);
1607
1608 let (start, resolved_end) = resolve_request_time_range(None, Some(end));
1609
1610 assert_eq!(start, UnixNanos::from(1_706_400_000_000_000_000));
1611 assert_eq!(resolved_end, Some(end));
1612 }
1613
1614 #[rstest]
1615 fn test_resolve_request_time_range_makes_empty_interval_non_empty() {
1616 let end = UnixNanos::from(1_706_443_200_000_000_001);
1617
1618 let (start, resolved_end) = resolve_request_time_range(Some(end), Some(end));
1619
1620 assert_eq!(start, UnixNanos::from(end.as_u64() - 1));
1621 assert_eq!(resolved_end, Some(end));
1622 }
1623
1624 #[rstest]
1625 fn test_requested_instrument_filters_exact_id() {
1626 let requested_id = InstrumentId::from("BTCUSDT.BINANCE");
1627 let instruments = vec![
1628 currency_pair("ETHUSDT.BINANCE"),
1629 currency_pair("BTCUSDT.BINANCE"),
1630 ];
1631
1632 let instrument = requested_instrument(instruments, requested_id).expect("instrument");
1633
1634 assert_eq!(instrument.id(), requested_id);
1635 }
1636
1637 #[rstest]
1638 fn test_requested_instrument_returns_latest_matching_id() {
1639 let requested_id = InstrumentId::from("BTCUSDT.BINANCE");
1640 let instruments = vec![
1641 currency_pair_with_ts_init("BTCUSDT.BINANCE", UnixNanos::from(1)),
1642 currency_pair_with_ts_init("BTCUSDT.BINANCE", UnixNanos::from(2)),
1643 ];
1644
1645 let instrument = requested_instrument(instruments, requested_id).expect("instrument");
1646
1647 assert_eq!(instrument.ts_init(), UnixNanos::from(2));
1648 }
1649
1650 #[rstest]
1651 fn test_requested_instrument_returns_none_on_miss() {
1652 let instruments = vec![currency_pair("ETHUSDT.BINANCE")];
1653
1654 let instrument = requested_instrument(instruments, InstrumentId::from("BTCUSDT.BINANCE"));
1655
1656 assert!(instrument.is_none());
1657 }
1658
1659 #[rstest]
1660 fn test_price_precision_from_params() {
1661 let mut params = Params::new();
1662 params.insert(PRICE_PRECISION_PARAM.to_string(), json!(5));
1663
1664 let price_precision = price_precision_from_params(Some(¶ms)).unwrap();
1665
1666 assert_eq!(price_precision, Some(5));
1667 }
1668
1669 #[rstest]
1670 fn test_price_precision_from_params_rejects_out_of_range_value() {
1671 let mut params = Params::new();
1672 params.insert(
1673 PRICE_PRECISION_PARAM.to_string(),
1674 json!(u64::from(u8::MAX) + 1),
1675 );
1676
1677 let result = price_precision_from_params(Some(¶ms));
1678
1679 assert!(result.is_err());
1680 }
1681
1682 #[rstest]
1683 fn test_schema_from_params_returns_default() {
1684 let schema = schema_from_params(None, dbn::Schema::Mbp1, QUOTE_SCHEMAS).unwrap();
1685
1686 assert_eq!(schema, dbn::Schema::Mbp1);
1687 }
1688
1689 #[rstest]
1690 fn test_schema_from_params_accepts_allowed_value() {
1691 let mut params = Params::new();
1692 params.insert(SCHEMA_PARAM.to_string(), json!("tbbo"));
1693
1694 let schema = schema_from_params(Some(¶ms), dbn::Schema::Mbp1, QUOTE_SCHEMAS).unwrap();
1695
1696 assert_eq!(schema, dbn::Schema::Tbbo);
1697 }
1698
1699 #[rstest]
1700 fn test_schema_from_params_rejects_disallowed_value() {
1701 let mut params = Params::new();
1702 params.insert(SCHEMA_PARAM.to_string(), json!("mbo"));
1703
1704 let result = schema_from_params(Some(¶ms), dbn::Schema::Mbp1, QUOTE_SCHEMAS);
1705
1706 assert!(result.is_err());
1707 }
1708
1709 #[rstest]
1710 #[case::quotes(SubscribeKind::Quotes)]
1711 #[case::trades(SubscribeKind::Trades)]
1712 fn test_invalid_subscribe_params_do_not_create_feed_handler(#[case] kind: SubscribeKind) {
1713 let mut client = test_data_client();
1714 let mut params = Params::new();
1715 params.insert(SCHEMA_PARAM.to_string(), json!("definition"));
1716
1717 let result = match kind {
1718 SubscribeKind::Quotes => client.subscribe_quotes(subscribe_quotes_cmd(Some(params))),
1719 SubscribeKind::Trades => client.subscribe_trades(subscribe_trades_cmd(Some(params))),
1720 };
1721
1722 assert!(result.is_err());
1723 assert!(client.cmd_channels.lock().expect(MUTEX_POISONED).is_empty());
1724 }
1725
1726 #[rstest]
1727 fn test_send_subscription_commands_starts_after_subscribe() {
1728 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1729 let subscription = Subscription::builder()
1730 .schema(dbn::Schema::Mbp1)
1731 .symbols(vec!["ESM4"])
1732 .build();
1733
1734 send_subscription_commands(
1735 &tx,
1736 "GLBX.MDP3",
1737 Some((Symbol::from("ESM4"), 2)),
1738 subscription,
1739 true,
1740 )
1741 .unwrap();
1742
1743 assert!(matches!(
1744 rx.try_recv().unwrap(),
1745 HandlerCommand::SetPricePrecision(symbol, 2) if symbol == Symbol::from("ESM4")
1746 ));
1747 assert!(matches!(
1748 rx.try_recv().unwrap(),
1749 HandlerCommand::Subscribe(sub) if sub.schema == dbn::Schema::Mbp1
1750 ));
1751 assert!(matches!(rx.try_recv().unwrap(), HandlerCommand::Start));
1752 }
1753
1754 #[rstest]
1755 fn test_send_subscription_commands_without_precision_or_start() {
1756 let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1757 let subscription = Subscription::builder()
1758 .schema(dbn::Schema::Mbp1)
1759 .symbols(vec!["ESM4"])
1760 .build();
1761
1762 send_subscription_commands(&tx, "GLBX.MDP3", None, subscription, false).unwrap();
1763
1764 assert!(matches!(
1765 rx.try_recv().unwrap(),
1766 HandlerCommand::Subscribe(sub) if sub.schema == dbn::Schema::Mbp1
1767 ));
1768 assert!(matches!(
1769 rx.try_recv(),
1770 Err(tokio::sync::mpsc::error::TryRecvError::Empty)
1771 ));
1772 }
1773}