Skip to main content

nautilus_databento/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Provides a unified data client that combines Databento's live streaming and historical data capabilities.
17//!
18//! This module implements a data client that manages connections to multiple Databento datasets,
19//! handles live market data subscriptions, and provides access to historical data on demand.
20
21use std::{
22    fmt::Debug,
23    path::PathBuf,
24    str::FromStr,
25    sync::{
26        Arc, Mutex,
27        atomic::{AtomicBool, Ordering},
28    },
29};
30
31use ahash::AHashMap;
32use databento::{dbn, live::Subscription};
33use indexmap::IndexMap;
34use nautilus_common::{
35    clients::DataClient,
36    live::{runner::get_data_event_sender, runtime::get_runtime},
37    messages::{
38        DataEvent, DataResponse,
39        data::{
40            BarsResponse, BookDeltasResponse, BookDepthResponse, InstrumentResponse,
41            InstrumentsResponse, QuotesResponse, RequestBars, RequestBookDeltas, RequestBookDepth,
42            RequestInstrument, RequestInstruments, RequestQuotes, RequestTrades,
43            SubscribeBookDeltas, SubscribeInstrument, SubscribeInstrumentStatus, SubscribeQuotes,
44            SubscribeTrades, TradesResponse, UnsubscribeBookDeltas, UnsubscribeInstrumentStatus,
45            UnsubscribeQuotes, UnsubscribeTrades,
46        },
47    },
48};
49use nautilus_core::{
50    AtomicMap, MUTEX_POISONED, Params, UnixNanos,
51    datetime::{NANOSECONDS_IN_DAY, datetime_to_unix_nanos},
52    string::secret::REDACTED,
53    time::{AtomicTime, get_atomic_clock_realtime},
54};
55use nautilus_model::{
56    data::{CustomData, Data},
57    enums::BarAggregation,
58    identifiers::{ClientId, InstrumentId, Symbol, Venue},
59    instruments::{Instrument, InstrumentAny},
60};
61use tokio::task::JoinHandle;
62use tokio_util::sync::CancellationToken;
63
64use crate::{
65    common::{Credential, DATABENTO_VENUE},
66    historical::{DatabentoHistoricalClient, RangeQueryParams},
67    live::{DatabentoFeedHandler, DatabentoMessage, HandlerCommand},
68    loader::DatabentoDataLoader,
69    symbology::instrument_id_to_symbol_string,
70    types::{Dataset, PublisherId},
71};
72
73const PRICE_PRECISION_PARAM: &str = "price_precision";
74const SCHEMA_PARAM: &str = "schema";
75const QUOTE_SCHEMAS: &[dbn::Schema] = &[
76    dbn::Schema::Mbp1,
77    dbn::Schema::Bbo1S,
78    dbn::Schema::Bbo1M,
79    dbn::Schema::Cmbp1,
80    dbn::Schema::Cbbo1S,
81    dbn::Schema::Cbbo1M,
82    dbn::Schema::Tbbo,
83    dbn::Schema::Tcbbo,
84];
85const TRADE_SCHEMAS: &[dbn::Schema] = &[
86    dbn::Schema::Trades,
87    dbn::Schema::Tbbo,
88    dbn::Schema::Tcbbo,
89    dbn::Schema::Mbp1,
90    dbn::Schema::Cmbp1,
91];
92
93/// Configuration for the Databento data client.
94#[derive(Clone)]
95pub struct DatabentoDataClientConfig {
96    /// Databento API credential.
97    pub(crate) credential: Credential,
98    /// Path to publishers.json file.
99    pub publishers_filepath: PathBuf,
100    /// Venue-to-dataset overrides applied on top of the publishers.json mappings.
101    pub venue_dataset_map: IndexMap<String, String>,
102    /// Whether to use exchange as venue for GLBX instruments.
103    pub use_exchange_as_venue: bool,
104    /// Whether to timestamp bars on close.
105    pub bars_timestamp_on_close: bool,
106    /// Reconnection timeout in minutes (None for infinite retries).
107    pub reconnect_timeout_mins: Option<u64>,
108}
109
110impl Debug for DatabentoDataClientConfig {
111    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
112        f.debug_struct(stringify!(DatabentoDataClientConfig))
113            .field("credential", &REDACTED)
114            .field("publishers_filepath", &self.publishers_filepath)
115            .field("venue_dataset_map", &self.venue_dataset_map)
116            .field("use_exchange_as_venue", &self.use_exchange_as_venue)
117            .field("bars_timestamp_on_close", &self.bars_timestamp_on_close)
118            .field("reconnect_timeout_mins", &self.reconnect_timeout_mins)
119            .finish()
120    }
121}
122
123impl DatabentoDataClientConfig {
124    /// Creates a new [`DatabentoDataClientConfig`] instance.
125    #[must_use]
126    pub fn new(
127        api_key: impl Into<String>,
128        publishers_filepath: PathBuf,
129        use_exchange_as_venue: bool,
130        bars_timestamp_on_close: bool,
131    ) -> Self {
132        Self {
133            credential: Credential::new(api_key),
134            publishers_filepath,
135            venue_dataset_map: IndexMap::new(),
136            use_exchange_as_venue,
137            bars_timestamp_on_close,
138            reconnect_timeout_mins: Some(10), // Default: 10 minutes
139        }
140    }
141
142    /// Returns the API key associated with this config.
143    #[must_use]
144    pub fn api_key(&self) -> &str {
145        self.credential.api_key()
146    }
147
148    /// Returns a masked version of the API key for logging purposes.
149    #[must_use]
150    pub fn api_key_masked(&self) -> String {
151        self.credential.api_key_masked()
152    }
153}
154
155/// A Databento data client that combines live streaming and historical data functionality.
156///
157/// This client uses the existing `DatabentoFeedHandler` for live data subscriptions
158/// and `DatabentoHistoricalClient` for historical data requests. It supports multiple
159/// datasets simultaneously, with separate feed handlers per dataset.
160#[cfg_attr(feature = "python", pyo3::pyclass)]
161#[cfg_attr(
162    feature = "python",
163    pyo3_stub_gen::derive::gen_stub_pyclass(module = "nautilus_trader.adapters.databento")
164)]
165#[derive(Debug)]
166pub struct DatabentoDataClient {
167    /// Client identifier.
168    client_id: ClientId,
169    /// Client configuration.
170    config: DatabentoDataClientConfig,
171    /// Connection state.
172    is_connected: AtomicBool,
173    /// Historical client for on-demand data requests.
174    historical: DatabentoHistoricalClient,
175    /// Data loader for venue-to-dataset mapping.
176    loader: DatabentoDataLoader,
177    /// Feed handler command senders per dataset.
178    cmd_channels: Arc<Mutex<AHashMap<String, tokio::sync::mpsc::UnboundedSender<HandlerCommand>>>>,
179    /// Task handles for lifecycle management.
180    task_handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
181    /// Cancellation token for graceful shutdown.
182    cancellation_token: CancellationToken,
183    /// Publisher to venue mapping.
184    publisher_venue_map: Arc<IndexMap<PublisherId, Venue>>,
185    /// Symbol to venue mapping (for caching).
186    symbol_venue_map: Arc<AtomicMap<Symbol, Venue>>,
187    /// Data event sender for forwarding data to the async runner.
188    data_sender: tokio::sync::mpsc::UnboundedSender<DataEvent>,
189}
190
191impl DatabentoDataClient {
192    /// Creates a new [`DatabentoDataClient`] instance.
193    ///
194    /// # Errors
195    ///
196    /// Returns an error if client creation or publisher configuration loading fails.
197    pub fn new(
198        client_id: ClientId,
199        config: DatabentoDataClientConfig,
200        clock: &'static AtomicTime,
201    ) -> anyhow::Result<Self> {
202        let historical = DatabentoHistoricalClient::new(
203            config.credential.clone(),
204            config.publishers_filepath.clone(),
205            clock,
206            config.use_exchange_as_venue,
207        )?;
208
209        // Create data loader for venue-to-dataset mapping
210        let mut loader = DatabentoDataLoader::new(Some(config.publishers_filepath.clone()))?;
211        for (venue, dataset) in &config.venue_dataset_map {
212            loader.set_dataset_for_venue(
213                Dataset::from(dataset.as_str()),
214                Venue::from(venue.as_str()),
215            );
216        }
217
218        // Load publisher configuration
219        let file_content = std::fs::read_to_string(&config.publishers_filepath)?;
220        let publishers_vec: Vec<crate::types::DatabentoPublisher> =
221            serde_json::from_str(&file_content)?;
222
223        let publisher_venue_map = publishers_vec
224            .into_iter()
225            .map(|p| (p.publisher_id, Venue::from(p.venue.as_str())))
226            .collect::<IndexMap<u16, Venue>>();
227
228        let data_sender = get_data_event_sender();
229
230        Ok(Self {
231            client_id,
232            config,
233            is_connected: AtomicBool::new(false),
234            historical,
235            loader,
236            cmd_channels: Arc::new(Mutex::new(AHashMap::new())),
237            task_handles: Arc::new(Mutex::new(Vec::new())),
238            cancellation_token: CancellationToken::new(),
239            publisher_venue_map: Arc::new(publisher_venue_map),
240            symbol_venue_map: Arc::new(AtomicMap::new()),
241            data_sender,
242        })
243    }
244
245    /// Gets the dataset for a given venue using the data loader.
246    ///
247    /// # Errors
248    ///
249    /// Returns an error if the venue-to-dataset mapping cannot be found.
250    fn get_dataset_for_venue(&self, venue: Venue) -> anyhow::Result<String> {
251        self.loader
252            .get_dataset_for_venue(&venue)
253            .map(ToString::to_string)
254            .ok_or_else(|| anyhow::anyhow!("No dataset found for venue: {venue}"))
255    }
256
257    /// Gets or creates a feed handler for the specified dataset.
258    fn get_or_create_feed_handler(&self, dataset: &str) -> bool {
259        let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
260
261        if !channels.contains_key(dataset) {
262            log::debug!("Creating new feed handler for dataset: {dataset}");
263            let cmd_tx = self.initialize_live_feed(dataset.to_string());
264            channels.insert(dataset.to_string(), cmd_tx);
265
266            log::debug!("Feed handler created for dataset: {dataset}, channel stored");
267            return true;
268        }
269
270        false
271    }
272
273    fn send_subscription_to_dataset(
274        &self,
275        dataset: &str,
276        price_precision: Option<(Symbol, u8)>,
277        subscription: Subscription,
278        start_after_subscribe: bool,
279    ) -> anyhow::Result<()> {
280        let tx = {
281            let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
282            channels
283                .get(dataset)
284                .cloned()
285                .ok_or_else(|| anyhow::anyhow!("No feed handler found for dataset: {dataset}"))?
286        };
287
288        send_subscription_commands(
289            &tx,
290            dataset,
291            price_precision,
292            subscription,
293            start_after_subscribe,
294        )
295    }
296
297    fn send_close_to_active_feeds(&self) {
298        let channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
299        for (dataset, tx) in channels.iter() {
300            if let Err(e) = tx.send(HandlerCommand::Close) {
301                log::warn!("Failed to send close command to dataset {dataset}: {e}");
302            }
303        }
304    }
305
306    fn clear_feed_channels(&self) {
307        let mut channels = self.cmd_channels.lock().expect(MUTEX_POISONED);
308        channels.clear();
309    }
310
311    fn abort_active_tasks(&self) {
312        let handles = {
313            let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
314            std::mem::take(&mut *task_handles)
315        };
316
317        for handle in handles {
318            handle.abort();
319        }
320    }
321
322    /// Initializes the live feed handler for streaming data.
323    fn initialize_live_feed(
324        &self,
325        dataset: String,
326    ) -> tokio::sync::mpsc::UnboundedSender<HandlerCommand> {
327        let (cmd_tx, cmd_rx) = tokio::sync::mpsc::unbounded_channel();
328        let (msg_tx, msg_rx) = tokio::sync::mpsc::unbounded_channel();
329        let feed_dataset = dataset.clone();
330        let feed_channels = self.cmd_channels.clone();
331
332        let mut feed_handler = DatabentoFeedHandler::new(
333            self.config.credential.clone(),
334            dataset,
335            cmd_rx,
336            msg_tx,
337            (*self.publisher_venue_map).clone(),
338            self.symbol_venue_map.clone(),
339            self.config.use_exchange_as_venue,
340            self.config.bars_timestamp_on_close,
341            self.config.reconnect_timeout_mins,
342        );
343
344        let feed_handle = get_runtime().spawn(async move {
345            if let Err(e) = feed_handler.run().await {
346                log::error!("Feed handler error: {e}");
347            }
348            feed_channels
349                .lock()
350                .expect(MUTEX_POISONED)
351                .remove(&feed_dataset);
352        });
353
354        let cancellation_token = self.cancellation_token.clone();
355        let data_sender = self.data_sender.clone();
356
357        // Spawn message processing task with cancellation support
358        let msg_handle = get_runtime().spawn(async move {
359            let mut msg_rx = msg_rx;
360
361            loop {
362                tokio::select! {
363                    msg = msg_rx.recv() => {
364                        match msg {
365                            Some(DatabentoMessage::Data(data)) => {
366                                log::debug!("Received data: {data:?}");
367                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
368                                    log::error!("Failed to send data event: {e}");
369                                }
370                            }
371                            Some(DatabentoMessage::Instrument(instrument)) => {
372                                log::debug!("Received instrument definition: {}", instrument.id());
373                                if let Err(e) = data_sender.send(DataEvent::Instrument(*instrument)) {
374                                    log::error!("Failed to send instrument: {e}");
375                                }
376                            }
377                            Some(DatabentoMessage::Status(status)) => {
378                                log::debug!("Received status: {status:?}");
379                                if let Err(e) =
380                                    data_sender.send(DataEvent::Data(Data::InstrumentStatus(status)))
381                                {
382                                    log::error!("Failed to send status data event: {e}");
383                                }
384                            }
385                            Some(DatabentoMessage::Imbalance(imbalance)) => {
386                                log::debug!("Received imbalance: {imbalance:?}");
387                                let data = Data::Custom(CustomData::from_arc(Arc::new(imbalance)));
388                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
389                                    log::error!("Failed to send imbalance data event: {e}");
390                                }
391                            }
392                            Some(DatabentoMessage::Statistics(statistics)) => {
393                                log::debug!("Received statistics: {statistics:?}");
394                                let data = Data::Custom(CustomData::from_arc(Arc::new(statistics)));
395                                if let Err(e) = data_sender.send(DataEvent::Data(data)) {
396                                    log::error!("Failed to send statistics data event: {e}");
397                                }
398                            }
399                            Some(DatabentoMessage::SubscriptionAck(ack)) => {
400                                log::debug!("Received subscription ack: {}", ack.message);
401                            }
402                            Some(DatabentoMessage::Error(error)) => {
403                                log::error!("Feed handler error: {error}");
404                            }
405                            Some(DatabentoMessage::Close) => {
406                                log::debug!("Feed handler closed");
407                                break;
408                            }
409                            None => {
410                                log::debug!("Message channel closed");
411                                break;
412                            }
413                        }
414                    }
415                    () = cancellation_token.cancelled() => {
416                        log::debug!("Message processing cancelled");
417                        break;
418                    }
419                }
420            }
421        });
422
423        {
424            let mut handles = self.task_handles.lock().expect(MUTEX_POISONED);
425            handles.push(feed_handle);
426            handles.push(msg_handle);
427        }
428
429        cmd_tx
430    }
431}
432
433#[async_trait::async_trait(?Send)]
434impl DataClient for DatabentoDataClient {
435    /// Returns the client identifier.
436    fn client_id(&self) -> ClientId {
437        self.client_id
438    }
439
440    /// Returns the venue associated with this client (None for multi-venue clients).
441    fn venue(&self) -> Option<Venue> {
442        None
443    }
444
445    /// Starts the data client.
446    ///
447    /// # Errors
448    ///
449    /// Returns an error if the client fails to start.
450    fn start(&mut self) -> anyhow::Result<()> {
451        log::debug!("Starting");
452        Ok(())
453    }
454
455    /// Stops the data client and cancels all active subscriptions.
456    ///
457    /// # Errors
458    ///
459    /// Returns an error if the client fails to stop cleanly.
460    fn stop(&mut self) -> anyhow::Result<()> {
461        log::debug!("Stopping");
462
463        self.send_close_to_active_feeds();
464        self.clear_feed_channels();
465        self.cancellation_token.cancel();
466        self.abort_active_tasks();
467
468        self.cancellation_token = CancellationToken::new();
469
470        self.is_connected.store(false, Ordering::Relaxed);
471        Ok(())
472    }
473
474    fn reset(&mut self) -> anyhow::Result<()> {
475        log::debug!("Resetting");
476        self.is_connected.store(false, Ordering::Relaxed);
477        Ok(())
478    }
479
480    fn dispose(&mut self) -> anyhow::Result<()> {
481        log::debug!("Disposing");
482        self.stop()
483    }
484
485    async fn connect(&mut self) -> anyhow::Result<()> {
486        log::debug!("Connecting...");
487
488        if self.cancellation_token.is_cancelled() {
489            self.cancellation_token = CancellationToken::new();
490        }
491
492        self.is_connected.store(true, Ordering::Relaxed);
493
494        log::info!("Connected");
495        Ok(())
496    }
497
498    async fn disconnect(&mut self) -> anyhow::Result<()> {
499        log::debug!("Disconnecting...");
500
501        self.send_close_to_active_feeds();
502        self.clear_feed_channels();
503
504        let handles = {
505            let mut task_handles = self.task_handles.lock().expect(MUTEX_POISONED);
506            std::mem::take(&mut *task_handles)
507        };
508
509        for handle in handles {
510            if let Err(e) = handle.await
511                && !e.is_cancelled()
512            {
513                log::error!("Task join error: {e}");
514            }
515        }
516
517        self.is_connected.store(false, Ordering::Relaxed);
518        self.cancellation_token = CancellationToken::new();
519
520        log::info!("Disconnected");
521        Ok(())
522    }
523
524    /// Returns whether the client is currently connected.
525    fn is_connected(&self) -> bool {
526        self.is_connected.load(Ordering::Relaxed)
527    }
528
529    fn is_disconnected(&self) -> bool {
530        !self.is_connected()
531    }
532
533    /// Subscribes to instrument definition data for the specified instrument.
534    ///
535    /// # Errors
536    ///
537    /// Returns an error if the subscription request fails.
538    fn subscribe_instrument(&mut self, cmd: SubscribeInstrument) -> anyhow::Result<()> {
539        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
540        let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
541
542        self.symbol_venue_map
543            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
544        let symbol = cmd.instrument_id.symbol.to_string();
545
546        let subscription = Subscription::builder()
547            .schema(databento::dbn::Schema::Definition)
548            .symbols(symbol)
549            .build();
550
551        self.send_subscription_to_dataset(&dataset, None, subscription, start_after_subscribe)?;
552
553        Ok(())
554    }
555
556    /// Subscribes to quote tick data for the specified instruments.
557    ///
558    /// # Errors
559    ///
560    /// Returns an error if the subscription request fails.
561    fn subscribe_quotes(&mut self, cmd: SubscribeQuotes) -> anyhow::Result<()> {
562        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
563        let symbol = cmd.instrument_id.symbol.to_string();
564        let price_precision = price_precision_from_params(cmd.params.as_ref())?
565            .map(|precision| (cmd.instrument_id.symbol, precision));
566        let schema = schema_from_params(cmd.params.as_ref(), dbn::Schema::Mbp1, QUOTE_SCHEMAS)?;
567
568        let subscription = Subscription::builder()
569            .schema(schema)
570            .symbols(symbol)
571            .build();
572
573        let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
574        self.symbol_venue_map
575            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
576
577        self.send_subscription_to_dataset(
578            &dataset,
579            price_precision,
580            subscription,
581            start_after_subscribe,
582        )?;
583
584        Ok(())
585    }
586
587    /// Subscribes to trade tick data for the specified instruments.
588    ///
589    /// # Errors
590    ///
591    /// Returns an error if the subscription request fails.
592    fn subscribe_trades(&mut self, cmd: SubscribeTrades) -> anyhow::Result<()> {
593        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
594        let symbol = cmd.instrument_id.symbol.to_string();
595        let price_precision = price_precision_from_params(cmd.params.as_ref())?
596            .map(|precision| (cmd.instrument_id.symbol, precision));
597        let schema = schema_from_params(cmd.params.as_ref(), dbn::Schema::Trades, TRADE_SCHEMAS)?;
598
599        let subscription = Subscription::builder()
600            .schema(schema)
601            .symbols(symbol)
602            .build();
603
604        let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
605        self.symbol_venue_map
606            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
607
608        self.send_subscription_to_dataset(
609            &dataset,
610            price_precision,
611            subscription,
612            start_after_subscribe,
613        )?;
614
615        Ok(())
616    }
617
618    /// Subscribes to order book delta updates for the specified instruments.
619    ///
620    /// # Errors
621    ///
622    /// Returns an error if the subscription request fails.
623    fn subscribe_book_deltas(&mut self, cmd: SubscribeBookDeltas) -> anyhow::Result<()> {
624        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
625        let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
626
627        self.symbol_venue_map
628            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
629        let symbol = cmd.instrument_id.symbol.to_string();
630
631        let subscription = Subscription::builder()
632            .schema(databento::dbn::Schema::Mbo) // Market by order for book deltas
633            .symbols(symbol)
634            .build();
635
636        self.send_subscription_to_dataset(&dataset, None, subscription, start_after_subscribe)?;
637
638        Ok(())
639    }
640
641    /// Subscribes to instrument status updates for the specified instruments.
642    ///
643    /// # Errors
644    ///
645    /// Returns an error if the subscription request fails.
646    fn subscribe_instrument_status(
647        &mut self,
648        cmd: SubscribeInstrumentStatus,
649    ) -> anyhow::Result<()> {
650        let dataset = self.get_dataset_for_venue(cmd.instrument_id.venue)?;
651        let start_after_subscribe = self.get_or_create_feed_handler(&dataset);
652
653        self.symbol_venue_map
654            .insert(cmd.instrument_id.symbol, cmd.instrument_id.venue);
655        let symbol = cmd.instrument_id.symbol.to_string();
656
657        let subscription = Subscription::builder()
658            .schema(databento::dbn::Schema::Status)
659            .symbols(symbol)
660            .build();
661
662        self.send_subscription_to_dataset(&dataset, None, subscription, start_after_subscribe)?;
663
664        Ok(())
665    }
666
667    // Unsubscribe methods
668    fn unsubscribe_quotes(&mut self, cmd: &UnsubscribeQuotes) -> anyhow::Result<()> {
669        // Note: Databento live API doesn't support granular unsubscribing.
670        // The feed handler manages subscriptions and can handle reconnections
671        // with the appropriate subscription state.
672        log::warn!(
673            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
674            cmd.instrument_id
675        );
676
677        Ok(())
678    }
679
680    fn unsubscribe_trades(&mut self, cmd: &UnsubscribeTrades) -> anyhow::Result<()> {
681        // Note: Databento live API doesn't support granular unsubscribing.
682        // The feed handler manages subscriptions and can handle reconnections
683        // with the appropriate subscription state.
684        log::warn!(
685            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
686            cmd.instrument_id
687        );
688
689        Ok(())
690    }
691
692    fn unsubscribe_book_deltas(&mut self, cmd: &UnsubscribeBookDeltas) -> anyhow::Result<()> {
693        // Note: Databento live API doesn't support granular unsubscribing.
694        // The feed handler manages subscriptions and can handle reconnections
695        // with the appropriate subscription state.
696        log::warn!(
697            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
698            cmd.instrument_id
699        );
700
701        Ok(())
702    }
703
704    fn unsubscribe_instrument_status(
705        &mut self,
706        cmd: &UnsubscribeInstrumentStatus,
707    ) -> anyhow::Result<()> {
708        // Note: Databento live API doesn't support granular unsubscribing.
709        // The feed handler manages subscriptions and can handle reconnections
710        // with the appropriate subscription state.
711        log::warn!(
712            "Databento does not support granular unsubscribing - ignoring unsubscribe request for {}",
713            cmd.instrument_id
714        );
715
716        Ok(())
717    }
718
719    fn request_instruments(&self, request: RequestInstruments) -> anyhow::Result<()> {
720        log::debug!("Request instruments: {request:?}");
721
722        let historical_client = self.historical.clone();
723        let data_sender = self.data_sender.clone();
724        let dataset = request
725            .venue
726            .map(|venue| self.get_dataset_for_venue(venue))
727            .transpose()?
728            .unwrap_or_else(|| "GLBX.MDP3".to_string());
729        let request_id = request.request_id;
730        let client_id = request.client_id.unwrap_or(self.client_id);
731        let venue = request.venue.unwrap_or(*DATABENTO_VENUE);
732        let start_nanos = datetime_to_unix_nanos(request.start);
733        let end_nanos = datetime_to_unix_nanos(request.end);
734        let request_params = request.params;
735        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
736
737        get_runtime().spawn(async move {
738            let query_params = instruments_query_params(dataset, query_start, query_end);
739
740            match historical_client.get_range_instruments(query_params).await {
741                Ok(instruments) => {
742                    log::debug!("Retrieved {} instruments", instruments.len());
743
744                    let response = DataResponse::Instruments(InstrumentsResponse::new(
745                        request_id,
746                        client_id,
747                        venue,
748                        instruments,
749                        start_nanos,
750                        end_nanos,
751                        get_atomic_clock_realtime().get_time_ns(),
752                        request_params,
753                    ));
754
755                    if let Err(e) = data_sender.send(DataEvent::Response(response)) {
756                        log::error!("Failed to send instruments response: {e}");
757                    }
758                }
759                Err(e) => {
760                    log::error!("Failed to request instruments: {e}");
761                    let response = DataResponse::Instruments(InstrumentsResponse::new(
762                        request_id,
763                        client_id,
764                        venue,
765                        Vec::new(),
766                        start_nanos,
767                        end_nanos,
768                        get_atomic_clock_realtime().get_time_ns(),
769                        request_params,
770                    ));
771
772                    send_data_response(&data_sender, response, "empty instruments");
773                }
774            }
775        });
776
777        Ok(())
778    }
779
780    fn request_instrument(&self, request: RequestInstrument) -> anyhow::Result<()> {
781        log::debug!("Request instrument: {request:?}");
782
783        let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
784        let historical_client = self.historical.clone();
785        let data_sender = self.data_sender.clone();
786        let instrument_id = request.instrument_id;
787        let request_id = request.request_id;
788        let client_id = request.client_id.unwrap_or(self.client_id);
789        let start_nanos = datetime_to_unix_nanos(request.start);
790        let end_nanos = datetime_to_unix_nanos(request.end);
791        let request_params = request.params;
792        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
793
794        get_runtime().spawn(async move {
795            let query_params =
796                instrument_query_params(dataset, instrument_id, query_start, query_end);
797
798            match historical_client.get_range_instruments(query_params).await {
799                Ok(instruments) => {
800                    let instrument = requested_instrument(instruments, instrument_id);
801
802                    let Some(instrument) = instrument else {
803                        log::error!("Instrument not found: {instrument_id}");
804                        return;
805                    };
806
807                    let response = DataResponse::Instrument(Box::new(InstrumentResponse::new(
808                        request_id,
809                        client_id,
810                        instrument.id(),
811                        instrument,
812                        start_nanos,
813                        end_nanos,
814                        get_atomic_clock_realtime().get_time_ns(),
815                        request_params,
816                    )));
817
818                    if let Err(e) = data_sender.send(DataEvent::Response(response)) {
819                        log::error!("Failed to send instrument response: {e}");
820                    }
821                }
822                Err(e) => {
823                    log::error!("Failed to request instrument {instrument_id}: {e}");
824                }
825            }
826        });
827
828        Ok(())
829    }
830
831    fn request_quotes(&self, request: RequestQuotes) -> anyhow::Result<()> {
832        log::debug!("Request quotes: {request:?}");
833
834        let historical_client = self.historical.clone();
835        let data_sender = self.data_sender.clone();
836        let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
837        let instrument_id = request.instrument_id;
838        let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
839        let request_id = request.request_id;
840        let client_id = request.client_id.unwrap_or(self.client_id);
841        let start_nanos = datetime_to_unix_nanos(request.start);
842        let end_nanos = datetime_to_unix_nanos(request.end);
843        let limit = request.limit.map(|limit| limit.get() as u64);
844        let request_params = request.params;
845        let price_precision = price_precision_from_params(request_params.as_ref())?;
846        let schema = schema_from_params(request_params.as_ref(), dbn::Schema::Mbp1, QUOTE_SCHEMAS)?
847            .to_string();
848        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
849
850        get_runtime().spawn(async move {
851            seed_price_precision_if_needed(
852                &historical_client,
853                dataset.as_str(),
854                instrument_id,
855                query_start,
856                query_end,
857                price_precision,
858            )
859            .await;
860
861            let params = RangeQueryParams {
862                dataset,
863                symbols,
864                start: query_start,
865                end: query_end,
866                limit,
867                price_precision,
868            };
869
870            match historical_client
871                .get_range_quotes(params, Some(schema))
872                .await
873            {
874                Ok(quotes) => {
875                    log::debug!("Retrieved {} quotes", quotes.len());
876                    let response = DataResponse::Quotes(QuotesResponse::new(
877                        request_id,
878                        client_id,
879                        instrument_id,
880                        quotes,
881                        start_nanos,
882                        end_nanos,
883                        get_atomic_clock_realtime().get_time_ns(),
884                        request_params,
885                    ));
886
887                    if let Err(e) = data_sender.send(DataEvent::Response(response)) {
888                        log::error!("Failed to send quotes response: {e}");
889                    }
890                }
891                Err(e) => {
892                    log::error!("Failed to request quotes: {e}");
893                    let response = DataResponse::Quotes(QuotesResponse::new(
894                        request_id,
895                        client_id,
896                        instrument_id,
897                        Vec::new(),
898                        start_nanos,
899                        end_nanos,
900                        get_atomic_clock_realtime().get_time_ns(),
901                        request_params,
902                    ));
903
904                    send_data_response(&data_sender, response, "empty quotes");
905                }
906            }
907        });
908
909        Ok(())
910    }
911
912    fn request_trades(&self, request: RequestTrades) -> anyhow::Result<()> {
913        log::debug!("Request trades: {request:?}");
914
915        let historical_client = self.historical.clone();
916        let data_sender = self.data_sender.clone();
917        let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
918        let instrument_id = request.instrument_id;
919        let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
920        let request_id = request.request_id;
921        let client_id = request.client_id.unwrap_or(self.client_id);
922        let start_nanos = datetime_to_unix_nanos(request.start);
923        let end_nanos = datetime_to_unix_nanos(request.end);
924        let limit = request.limit.map(|limit| limit.get() as u64);
925        let request_params = request.params;
926        let price_precision = price_precision_from_params(request_params.as_ref())?;
927        let schema =
928            schema_from_params(request_params.as_ref(), dbn::Schema::Trades, TRADE_SCHEMAS)?
929                .to_string();
930        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
931
932        get_runtime().spawn(async move {
933            seed_price_precision_if_needed(
934                &historical_client,
935                dataset.as_str(),
936                instrument_id,
937                query_start,
938                query_end,
939                price_precision,
940            )
941            .await;
942
943            let params = RangeQueryParams {
944                dataset,
945                symbols,
946                start: query_start,
947                end: query_end,
948                limit,
949                price_precision,
950            };
951
952            match historical_client
953                .get_range_trades(params, Some(schema))
954                .await
955            {
956                Ok(trades) => {
957                    log::debug!("Retrieved {} trades", trades.len());
958                    let response = DataResponse::Trades(TradesResponse::new(
959                        request_id,
960                        client_id,
961                        instrument_id,
962                        trades,
963                        start_nanos,
964                        end_nanos,
965                        get_atomic_clock_realtime().get_time_ns(),
966                        request_params,
967                    ));
968
969                    if let Err(e) = data_sender.send(DataEvent::Response(response)) {
970                        log::error!("Failed to send trades response: {e}");
971                    }
972                }
973                Err(e) => {
974                    log::error!("Failed to request trades: {e}");
975                    let response = DataResponse::Trades(TradesResponse::new(
976                        request_id,
977                        client_id,
978                        instrument_id,
979                        Vec::new(),
980                        start_nanos,
981                        end_nanos,
982                        get_atomic_clock_realtime().get_time_ns(),
983                        request_params,
984                    ));
985
986                    send_data_response(&data_sender, response, "empty trades");
987                }
988            }
989        });
990
991        Ok(())
992    }
993
994    fn request_bars(&self, request: RequestBars) -> anyhow::Result<()> {
995        log::debug!("Request bars: {request:?}");
996
997        let historical_client = self.historical.clone();
998        let data_sender = self.data_sender.clone();
999        let instrument_id = request.bar_type.instrument_id();
1000        let dataset = self.get_dataset_for_venue(instrument_id.venue)?;
1001        let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
1002        let request_id = request.request_id;
1003        let client_id = request.client_id.unwrap_or(self.client_id);
1004        let bar_type = request.bar_type;
1005        let start_nanos = datetime_to_unix_nanos(request.start);
1006        let end_nanos = datetime_to_unix_nanos(request.end);
1007        let limit = request.limit.map(|limit| limit.get() as u64);
1008        let request_params = request.params;
1009        let price_precision = price_precision_from_params(request_params.as_ref())?;
1010        let timestamp_on_close = self.config.bars_timestamp_on_close;
1011        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
1012
1013        get_runtime().spawn(async move {
1014            seed_price_precision_if_needed(
1015                &historical_client,
1016                dataset.as_str(),
1017                instrument_id,
1018                query_start,
1019                query_end,
1020                price_precision,
1021            )
1022            .await;
1023
1024            let params = RangeQueryParams {
1025                dataset,
1026                symbols,
1027                start: query_start,
1028                end: query_end,
1029                limit,
1030                price_precision,
1031            };
1032
1033            let aggregation = match bar_type.spec().aggregation {
1034                BarAggregation::Second => BarAggregation::Second,
1035                BarAggregation::Minute => BarAggregation::Minute,
1036                BarAggregation::Hour => BarAggregation::Hour,
1037                BarAggregation::Day => BarAggregation::Day,
1038                _ => {
1039                    log::error!(
1040                        "Unsupported bar aggregation: {:?}",
1041                        bar_type.spec().aggregation
1042                    );
1043                    let response = DataResponse::Bars(BarsResponse::new(
1044                        request_id,
1045                        client_id,
1046                        bar_type,
1047                        Vec::new(),
1048                        start_nanos,
1049                        end_nanos,
1050                        get_atomic_clock_realtime().get_time_ns(),
1051                        request_params,
1052                    ));
1053
1054                    send_data_response(&data_sender, response, "empty bars");
1055                    return;
1056                }
1057            };
1058
1059            match historical_client
1060                .get_range_bars(params, aggregation, timestamp_on_close)
1061                .await
1062            {
1063                Ok(bars) => {
1064                    log::debug!("Retrieved {} bars", bars.len());
1065                    let response = DataResponse::Bars(BarsResponse::new(
1066                        request_id,
1067                        client_id,
1068                        bar_type,
1069                        bars,
1070                        start_nanos,
1071                        end_nanos,
1072                        get_atomic_clock_realtime().get_time_ns(),
1073                        request_params,
1074                    ));
1075
1076                    if let Err(e) = data_sender.send(DataEvent::Response(response)) {
1077                        log::error!("Failed to send bars response: {e}");
1078                    }
1079                }
1080                Err(e) => {
1081                    log::error!("Failed to request bars: {e}");
1082                    let response = DataResponse::Bars(BarsResponse::new(
1083                        request_id,
1084                        client_id,
1085                        bar_type,
1086                        Vec::new(),
1087                        start_nanos,
1088                        end_nanos,
1089                        get_atomic_clock_realtime().get_time_ns(),
1090                        request_params,
1091                    ));
1092
1093                    send_data_response(&data_sender, response, "empty bars");
1094                }
1095            }
1096        });
1097
1098        Ok(())
1099    }
1100
1101    fn request_book_depth(&self, request: RequestBookDepth) -> anyhow::Result<()> {
1102        log::debug!("Request book depth: {request:?}");
1103
1104        let historical_client = self.historical.clone();
1105        let data_sender = self.data_sender.clone();
1106        let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
1107        let instrument_id = request.instrument_id;
1108        let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
1109        let request_id = request.request_id;
1110        let client_id = request.client_id.unwrap_or(self.client_id);
1111        let start_nanos = datetime_to_unix_nanos(request.start);
1112        let end_nanos = datetime_to_unix_nanos(request.end);
1113        let limit = request.limit.map(|limit| limit.get() as u64);
1114        let depth = request.depth.map(|depth| depth.get());
1115        let request_params = request.params;
1116        let price_precision = price_precision_from_params(request_params.as_ref())?;
1117        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
1118
1119        get_runtime().spawn(async move {
1120            seed_price_precision_if_needed(
1121                &historical_client,
1122                dataset.as_str(),
1123                instrument_id,
1124                query_start,
1125                query_end,
1126                price_precision,
1127            )
1128            .await;
1129
1130            let params = RangeQueryParams {
1131                dataset,
1132                symbols,
1133                start: query_start,
1134                end: query_end,
1135                limit,
1136                price_precision,
1137            };
1138
1139            match historical_client
1140                .get_range_order_book_depth10(params, depth)
1141                .await
1142            {
1143                Ok(depths) => {
1144                    log::debug!("Retrieved {} order book depths", depths.len());
1145                    let response = DataResponse::BookDepth(BookDepthResponse::new(
1146                        request_id,
1147                        client_id,
1148                        instrument_id,
1149                        depths,
1150                        start_nanos,
1151                        end_nanos,
1152                        get_atomic_clock_realtime().get_time_ns(),
1153                        request_params,
1154                    ));
1155
1156                    send_data_response(&data_sender, response, "book depth");
1157                }
1158                Err(e) => {
1159                    log::error!("Failed to request order book depths: {e}");
1160                    let response = DataResponse::BookDepth(BookDepthResponse::new(
1161                        request_id,
1162                        client_id,
1163                        instrument_id,
1164                        Vec::new(),
1165                        start_nanos,
1166                        end_nanos,
1167                        get_atomic_clock_realtime().get_time_ns(),
1168                        request_params,
1169                    ));
1170
1171                    send_data_response(&data_sender, response, "empty book depth");
1172                }
1173            }
1174        });
1175
1176        Ok(())
1177    }
1178
1179    fn request_book_deltas(&self, request: RequestBookDeltas) -> anyhow::Result<()> {
1180        log::debug!("Request book deltas: {request:?}");
1181
1182        let historical_client = self.historical.clone();
1183        let data_sender = self.data_sender.clone();
1184        let dataset = self.get_dataset_for_venue(request.instrument_id.venue)?;
1185        let instrument_id = request.instrument_id;
1186        let symbols = historical_client.prepare_symbols_from_instrument_ids(&[instrument_id]);
1187        let request_id = request.request_id;
1188        let client_id = request.client_id.unwrap_or(self.client_id);
1189        let start_nanos = datetime_to_unix_nanos(request.start);
1190        let end_nanos = datetime_to_unix_nanos(request.end);
1191        let limit = request.limit.map(|limit| limit.get() as u64);
1192        let request_params = request.params;
1193        let price_precision = price_precision_from_params(request_params.as_ref())?;
1194        let (query_start, query_end) = resolve_request_time_range(start_nanos, end_nanos);
1195
1196        get_runtime().spawn(async move {
1197            seed_price_precision_if_needed(
1198                &historical_client,
1199                dataset.as_str(),
1200                instrument_id,
1201                query_start,
1202                query_end,
1203                price_precision,
1204            )
1205            .await;
1206
1207            let params = RangeQueryParams {
1208                dataset,
1209                symbols,
1210                start: query_start,
1211                end: query_end,
1212                limit,
1213                price_precision,
1214            };
1215
1216            match historical_client.get_range_order_book_deltas(params).await {
1217                Ok(deltas) => {
1218                    log::debug!("Retrieved {} order book deltas", deltas.len());
1219                    let response = DataResponse::BookDeltas(BookDeltasResponse::new(
1220                        request_id,
1221                        client_id,
1222                        instrument_id,
1223                        deltas,
1224                        start_nanos,
1225                        end_nanos,
1226                        get_atomic_clock_realtime().get_time_ns(),
1227                        request_params,
1228                    ));
1229
1230                    send_data_response(&data_sender, response, "book deltas");
1231                }
1232                Err(e) => {
1233                    log::error!("Failed to request order book deltas: {e}");
1234                    let response = DataResponse::BookDeltas(BookDeltasResponse::new(
1235                        request_id,
1236                        client_id,
1237                        instrument_id,
1238                        Vec::new(),
1239                        start_nanos,
1240                        end_nanos,
1241                        get_atomic_clock_realtime().get_time_ns(),
1242                        request_params,
1243                    ));
1244
1245                    send_data_response(&data_sender, response, "empty book deltas");
1246                }
1247            }
1248        });
1249
1250        Ok(())
1251    }
1252}
1253
1254fn instruments_query_params(
1255    dataset: String,
1256    start_nanos: UnixNanos,
1257    end_nanos: Option<UnixNanos>,
1258) -> RangeQueryParams {
1259    RangeQueryParams {
1260        dataset,
1261        symbols: vec!["ALL_SYMBOLS".to_string()],
1262        start: start_nanos,
1263        end: end_nanos,
1264        limit: None,
1265        price_precision: None,
1266    }
1267}
1268
1269fn instrument_query_params(
1270    dataset: String,
1271    instrument_id: InstrumentId,
1272    start_nanos: UnixNanos,
1273    end_nanos: Option<UnixNanos>,
1274) -> RangeQueryParams {
1275    RangeQueryParams {
1276        dataset,
1277        symbols: vec![instrument_id_to_symbol_string(
1278            instrument_id,
1279            &mut AHashMap::new(),
1280        )],
1281        start: start_nanos,
1282        end: end_nanos,
1283        limit: None,
1284        price_precision: None,
1285    }
1286}
1287
1288fn resolve_request_time_range(
1289    start_nanos: Option<UnixNanos>,
1290    end_nanos: Option<UnixNanos>,
1291) -> (UnixNanos, Option<UnixNanos>) {
1292    let mut end = end_nanos.unwrap_or_else(|| get_atomic_clock_realtime().get_time_ns());
1293    let mut start = start_nanos.unwrap_or_else(|| start_of_utc_day(end));
1294
1295    if start > end {
1296        start = end;
1297    }
1298
1299    if start == end {
1300        if end.as_u64() > 0 {
1301            start = UnixNanos::from(end.as_u64() - 1);
1302        } else {
1303            end = UnixNanos::from(1);
1304        }
1305    }
1306
1307    (start, Some(end))
1308}
1309
1310fn start_of_utc_day(timestamp: UnixNanos) -> UnixNanos {
1311    UnixNanos::from((timestamp.as_u64() / NANOSECONDS_IN_DAY) * NANOSECONDS_IN_DAY)
1312}
1313
1314async fn seed_price_precision_if_needed(
1315    historical_client: &DatabentoHistoricalClient,
1316    dataset: &str,
1317    instrument_id: InstrumentId,
1318    start_nanos: UnixNanos,
1319    end_nanos: Option<UnixNanos>,
1320    price_precision: Option<u8>,
1321) {
1322    if price_precision.is_some()
1323        || historical_client
1324            .price_precision(instrument_id.symbol)
1325            .is_some()
1326    {
1327        return;
1328    }
1329
1330    let query_params =
1331        instrument_query_params(dataset.to_string(), instrument_id, start_nanos, end_nanos);
1332
1333    if let Err(e) = historical_client.get_range_instruments(query_params).await {
1334        log::warn!("Failed to seed price precision for {instrument_id}: {e}");
1335    }
1336}
1337
1338fn send_data_response(
1339    data_sender: &tokio::sync::mpsc::UnboundedSender<DataEvent>,
1340    response: DataResponse,
1341    label: &str,
1342) {
1343    if let Err(e) = data_sender.send(DataEvent::Response(response)) {
1344        log::error!("Failed to send {label} response: {e}");
1345    }
1346}
1347
1348fn requested_instrument(
1349    instruments: Vec<InstrumentAny>,
1350    instrument_id: InstrumentId,
1351) -> Option<InstrumentAny> {
1352    instruments
1353        .into_iter()
1354        .rev()
1355        .find(|instrument| instrument.id() == instrument_id)
1356}
1357
1358fn price_precision_from_params(params: Option<&Params>) -> anyhow::Result<Option<u8>> {
1359    let Some(price_precision) = params.and_then(|params| params.get_u64(PRICE_PRECISION_PARAM))
1360    else {
1361        return Ok(None);
1362    };
1363
1364    Ok(Some(u8::try_from(price_precision).map_err(|_| {
1365        anyhow::anyhow!(
1366            "`{PRICE_PRECISION_PARAM}` must be less than or equal to {}",
1367            u8::MAX
1368        )
1369    })?))
1370}
1371
1372fn schema_from_params(
1373    params: Option<&Params>,
1374    default_schema: dbn::Schema,
1375    allowed_schemas: &[dbn::Schema],
1376) -> anyhow::Result<dbn::Schema> {
1377    let schema = if let Some(schema) = params.and_then(|params| params.get_str(SCHEMA_PARAM)) {
1378        dbn::Schema::from_str(schema)?
1379    } else {
1380        default_schema
1381    };
1382
1383    if allowed_schemas.contains(&schema) {
1384        return Ok(schema);
1385    }
1386
1387    let allowed = allowed_schemas
1388        .iter()
1389        .map(dbn::Schema::as_str)
1390        .collect::<Vec<_>>()
1391        .join(", ");
1392    anyhow::bail!(
1393        "Invalid `{SCHEMA_PARAM}` '{}'. Must be one of: {allowed}",
1394        schema.as_str()
1395    );
1396}
1397
1398fn send_subscription_commands(
1399    tx: &tokio::sync::mpsc::UnboundedSender<HandlerCommand>,
1400    dataset: &str,
1401    price_precision: Option<(Symbol, u8)>,
1402    subscription: Subscription,
1403    start_after_subscribe: bool,
1404) -> anyhow::Result<()> {
1405    if let Some((symbol, precision)) = price_precision {
1406        tx.send(HandlerCommand::SetPricePrecision(symbol, precision))
1407            .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
1408    }
1409
1410    tx.send(HandlerCommand::Subscribe(subscription))
1411        .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
1412
1413    if start_after_subscribe {
1414        tx.send(HandlerCommand::Start)
1415            .map_err(|e| anyhow::anyhow!("Failed to send command to dataset {dataset}: {e}"))?;
1416    }
1417
1418    Ok(())
1419}
1420
1421#[cfg(test)]
1422mod tests {
1423    use std::path::PathBuf;
1424
1425    use nautilus_common::live::runner::replace_data_event_sender;
1426    use nautilus_core::UUID4;
1427    use nautilus_model::{
1428        identifiers::{ClientId, InstrumentId},
1429        instruments::{CurrencyPair, InstrumentAny},
1430        types::{Currency, Price, Quantity},
1431    };
1432    use rstest::rstest;
1433    use serde_json::json;
1434
1435    use super::*;
1436
1437    #[derive(Clone, Copy)]
1438    enum SubscribeKind {
1439        Quotes,
1440        Trades,
1441    }
1442
1443    fn currency_pair(instrument_id: &str) -> InstrumentAny {
1444        currency_pair_with_ts_init(instrument_id, UnixNanos::default())
1445    }
1446
1447    fn test_data_client() -> DatabentoDataClient {
1448        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1449        replace_data_event_sender(sender);
1450
1451        let config = DatabentoDataClientConfig::new(
1452            "32-character-with-lots-of-filler",
1453            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json"),
1454            true,
1455            true,
1456        );
1457        DatabentoDataClient::new(
1458            ClientId::from("DATABENTO-TEST"),
1459            config,
1460            get_atomic_clock_realtime(),
1461        )
1462        .expect("test client should initialize")
1463    }
1464
1465    #[rstest]
1466    #[tokio::test]
1467    async fn test_stop_aborts_active_tasks_and_marks_disconnected() {
1468        let mut client = test_data_client();
1469
1470        let handle = tokio::spawn(async { std::future::pending::<()>().await });
1471        {
1472            let mut handles = client.task_handles.lock().expect(MUTEX_POISONED);
1473            handles.push(handle);
1474        }
1475        client.is_connected.store(true, Ordering::Relaxed);
1476
1477        client.stop().unwrap();
1478
1479        assert!(client.task_handles.lock().expect(MUTEX_POISONED).is_empty());
1480        assert!(client.is_disconnected());
1481    }
1482
1483    #[rstest]
1484    #[case("EQUS", "EQUS.PLUS")] // overrides the apply_default EQUS -> EQUS.MINI mapping
1485    #[case("GLBX", "EQUS.MINI")] // overrides the apply_default GLBX -> GLBX.MDP3 mapping
1486    fn test_venue_dataset_map_overrides_default(#[case] venue: &str, #[case] dataset: &str) {
1487        let (sender, _receiver) = tokio::sync::mpsc::unbounded_channel::<DataEvent>();
1488        replace_data_event_sender(sender);
1489
1490        let mut config = DatabentoDataClientConfig::new(
1491            "32-character-with-lots-of-filler",
1492            PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("publishers.json"),
1493            true,
1494            true,
1495        );
1496        config.venue_dataset_map = IndexMap::from([(venue.to_string(), dataset.to_string())]);
1497
1498        let client = DatabentoDataClient::new(
1499            ClientId::from("DATABENTO-TEST"),
1500            config,
1501            get_atomic_clock_realtime(),
1502        )
1503        .expect("test client should initialize");
1504
1505        assert_eq!(
1506            client.get_dataset_for_venue(Venue::from(venue)).unwrap(),
1507            dataset
1508        );
1509
1510        // The override is targeted: an unrelated venue keeps its default.
1511        assert_eq!(
1512            client.get_dataset_for_venue(Venue::from("XCBO")).unwrap(),
1513            "OPRA.PILLAR"
1514        );
1515    }
1516
1517    fn subscribe_quotes_cmd(params: Option<Params>) -> SubscribeQuotes {
1518        SubscribeQuotes::new(
1519            InstrumentId::from("ESM4.GLBX"),
1520            Some(ClientId::from("DATABENTO-TEST")),
1521            None,
1522            UUID4::new(),
1523            UnixNanos::default(),
1524            None,
1525            params,
1526        )
1527    }
1528
1529    fn subscribe_trades_cmd(params: Option<Params>) -> SubscribeTrades {
1530        SubscribeTrades::new(
1531            InstrumentId::from("ESM4.GLBX"),
1532            Some(ClientId::from("DATABENTO-TEST")),
1533            None,
1534            UUID4::new(),
1535            UnixNanos::default(),
1536            None,
1537            params,
1538        )
1539    }
1540
1541    fn currency_pair_with_ts_init(instrument_id: &str, ts_init: UnixNanos) -> InstrumentAny {
1542        let instrument_id = InstrumentId::from(instrument_id);
1543        InstrumentAny::CurrencyPair(CurrencyPair::new(
1544            instrument_id,
1545            instrument_id.symbol,
1546            Currency::from("BTC"),
1547            Currency::from("USDT"),
1548            2,
1549            6,
1550            Price::from("0.01"),
1551            Quantity::from("0.000001"),
1552            None,
1553            None,
1554            None,
1555            None,
1556            None,
1557            None,
1558            None,
1559            None,
1560            None,
1561            None,
1562            None,
1563            None,
1564            None,
1565            None,
1566            UnixNanos::default(),
1567            ts_init,
1568        ))
1569    }
1570
1571    #[rstest]
1572    fn test_instruments_query_params_requests_all_symbols() {
1573        let start = UnixNanos::from(1_000_000_000);
1574        let end = UnixNanos::from(2_000_000_000);
1575
1576        let params = instruments_query_params("GLBX.MDP3".to_string(), start, Some(end));
1577
1578        assert_eq!(params.dataset, "GLBX.MDP3");
1579        assert_eq!(params.symbols, vec!["ALL_SYMBOLS"]);
1580        assert_eq!(params.start, start);
1581        assert_eq!(params.end, Some(end));
1582        assert_eq!(params.limit, None);
1583        assert_eq!(params.price_precision, None);
1584    }
1585
1586    #[rstest]
1587    fn test_instrument_query_params_requests_single_symbol() {
1588        let instrument_id = InstrumentId::from("ESM4.GLBX");
1589
1590        let start = UnixNanos::from(1_000_000_000);
1591        let end = UnixNanos::from(2_000_000_000);
1592
1593        let params =
1594            instrument_query_params("GLBX.MDP3".to_string(), instrument_id, start, Some(end));
1595
1596        assert_eq!(params.dataset, "GLBX.MDP3");
1597        assert_eq!(params.symbols, vec!["ESM4"]);
1598        assert_eq!(params.start, start);
1599        assert_eq!(params.end, Some(end));
1600        assert_eq!(params.limit, None);
1601        assert_eq!(params.price_precision, None);
1602    }
1603
1604    #[rstest]
1605    fn test_resolve_request_time_range_defaults_to_end_day() {
1606        let end = UnixNanos::from(1_706_443_200_000_000_001);
1607
1608        let (start, resolved_end) = resolve_request_time_range(None, Some(end));
1609
1610        assert_eq!(start, UnixNanos::from(1_706_400_000_000_000_000));
1611        assert_eq!(resolved_end, Some(end));
1612    }
1613
1614    #[rstest]
1615    fn test_resolve_request_time_range_makes_empty_interval_non_empty() {
1616        let end = UnixNanos::from(1_706_443_200_000_000_001);
1617
1618        let (start, resolved_end) = resolve_request_time_range(Some(end), Some(end));
1619
1620        assert_eq!(start, UnixNanos::from(end.as_u64() - 1));
1621        assert_eq!(resolved_end, Some(end));
1622    }
1623
1624    #[rstest]
1625    fn test_requested_instrument_filters_exact_id() {
1626        let requested_id = InstrumentId::from("BTCUSDT.BINANCE");
1627        let instruments = vec![
1628            currency_pair("ETHUSDT.BINANCE"),
1629            currency_pair("BTCUSDT.BINANCE"),
1630        ];
1631
1632        let instrument = requested_instrument(instruments, requested_id).expect("instrument");
1633
1634        assert_eq!(instrument.id(), requested_id);
1635    }
1636
1637    #[rstest]
1638    fn test_requested_instrument_returns_latest_matching_id() {
1639        let requested_id = InstrumentId::from("BTCUSDT.BINANCE");
1640        let instruments = vec![
1641            currency_pair_with_ts_init("BTCUSDT.BINANCE", UnixNanos::from(1)),
1642            currency_pair_with_ts_init("BTCUSDT.BINANCE", UnixNanos::from(2)),
1643        ];
1644
1645        let instrument = requested_instrument(instruments, requested_id).expect("instrument");
1646
1647        assert_eq!(instrument.ts_init(), UnixNanos::from(2));
1648    }
1649
1650    #[rstest]
1651    fn test_requested_instrument_returns_none_on_miss() {
1652        let instruments = vec![currency_pair("ETHUSDT.BINANCE")];
1653
1654        let instrument = requested_instrument(instruments, InstrumentId::from("BTCUSDT.BINANCE"));
1655
1656        assert!(instrument.is_none());
1657    }
1658
1659    #[rstest]
1660    fn test_price_precision_from_params() {
1661        let mut params = Params::new();
1662        params.insert(PRICE_PRECISION_PARAM.to_string(), json!(5));
1663
1664        let price_precision = price_precision_from_params(Some(&params)).unwrap();
1665
1666        assert_eq!(price_precision, Some(5));
1667    }
1668
1669    #[rstest]
1670    fn test_price_precision_from_params_rejects_out_of_range_value() {
1671        let mut params = Params::new();
1672        params.insert(
1673            PRICE_PRECISION_PARAM.to_string(),
1674            json!(u64::from(u8::MAX) + 1),
1675        );
1676
1677        let result = price_precision_from_params(Some(&params));
1678
1679        assert!(result.is_err());
1680    }
1681
1682    #[rstest]
1683    fn test_schema_from_params_returns_default() {
1684        let schema = schema_from_params(None, dbn::Schema::Mbp1, QUOTE_SCHEMAS).unwrap();
1685
1686        assert_eq!(schema, dbn::Schema::Mbp1);
1687    }
1688
1689    #[rstest]
1690    fn test_schema_from_params_accepts_allowed_value() {
1691        let mut params = Params::new();
1692        params.insert(SCHEMA_PARAM.to_string(), json!("tbbo"));
1693
1694        let schema = schema_from_params(Some(&params), dbn::Schema::Mbp1, QUOTE_SCHEMAS).unwrap();
1695
1696        assert_eq!(schema, dbn::Schema::Tbbo);
1697    }
1698
1699    #[rstest]
1700    fn test_schema_from_params_rejects_disallowed_value() {
1701        let mut params = Params::new();
1702        params.insert(SCHEMA_PARAM.to_string(), json!("mbo"));
1703
1704        let result = schema_from_params(Some(&params), dbn::Schema::Mbp1, QUOTE_SCHEMAS);
1705
1706        assert!(result.is_err());
1707    }
1708
1709    #[rstest]
1710    #[case::quotes(SubscribeKind::Quotes)]
1711    #[case::trades(SubscribeKind::Trades)]
1712    fn test_invalid_subscribe_params_do_not_create_feed_handler(#[case] kind: SubscribeKind) {
1713        let mut client = test_data_client();
1714        let mut params = Params::new();
1715        params.insert(SCHEMA_PARAM.to_string(), json!("definition"));
1716
1717        let result = match kind {
1718            SubscribeKind::Quotes => client.subscribe_quotes(subscribe_quotes_cmd(Some(params))),
1719            SubscribeKind::Trades => client.subscribe_trades(subscribe_trades_cmd(Some(params))),
1720        };
1721
1722        assert!(result.is_err());
1723        assert!(client.cmd_channels.lock().expect(MUTEX_POISONED).is_empty());
1724    }
1725
1726    #[rstest]
1727    fn test_send_subscription_commands_starts_after_subscribe() {
1728        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1729        let subscription = Subscription::builder()
1730            .schema(dbn::Schema::Mbp1)
1731            .symbols(vec!["ESM4"])
1732            .build();
1733
1734        send_subscription_commands(
1735            &tx,
1736            "GLBX.MDP3",
1737            Some((Symbol::from("ESM4"), 2)),
1738            subscription,
1739            true,
1740        )
1741        .unwrap();
1742
1743        assert!(matches!(
1744            rx.try_recv().unwrap(),
1745            HandlerCommand::SetPricePrecision(symbol, 2) if symbol == Symbol::from("ESM4")
1746        ));
1747        assert!(matches!(
1748            rx.try_recv().unwrap(),
1749            HandlerCommand::Subscribe(sub) if sub.schema == dbn::Schema::Mbp1
1750        ));
1751        assert!(matches!(rx.try_recv().unwrap(), HandlerCommand::Start));
1752    }
1753
1754    #[rstest]
1755    fn test_send_subscription_commands_without_precision_or_start() {
1756        let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
1757        let subscription = Subscription::builder()
1758            .schema(dbn::Schema::Mbp1)
1759            .symbols(vec!["ESM4"])
1760            .build();
1761
1762        send_subscription_commands(&tx, "GLBX.MDP3", None, subscription, false).unwrap();
1763
1764        assert!(matches!(
1765            rx.try_recv().unwrap(),
1766            HandlerCommand::Subscribe(sub) if sub.schema == dbn::Schema::Mbp1
1767        ));
1768        assert!(matches!(
1769            rx.try_recv(),
1770            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
1771        ));
1772    }
1773}