Skip to main content

nautilus_testkit/testers/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data tester actor for live testing market data subscriptions.
17
18use std::{
19    num::NonZeroUsize,
20    ops::{Deref, DerefMut},
21    time::Duration,
22};
23
24use ahash::{AHashMap, AHashSet};
25use chrono::Duration as ChronoDuration;
26use nautilus_common::{
27    actor::{DataActor, DataActorConfig, DataActorCore},
28    enums::LogColor,
29    log_info,
30    timer::TimeEvent,
31};
32use nautilus_core::Params;
33use nautilus_model::{
34    data::{
35        Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
36        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, bar::BarType,
37    },
38    enums::BookType,
39    identifiers::{ClientId, InstrumentId},
40    instruments::InstrumentAny,
41    orderbook::OrderBook,
42};
43
44/// Configuration for the data tester actor.
45#[derive(Debug, Clone)]
46pub struct DataTesterConfig {
47    /// Base data actor configuration.
48    pub base: DataActorConfig,
49    /// Instrument IDs to subscribe to.
50    pub instrument_ids: Vec<InstrumentId>,
51    /// Client ID to use for subscriptions.
52    pub client_id: Option<ClientId>,
53    /// Bar types to subscribe to.
54    pub bar_types: Option<Vec<BarType>>,
55    /// Whether to subscribe to order book deltas.
56    pub subscribe_book_deltas: bool,
57    /// Whether to subscribe to order book depth snapshots.
58    pub subscribe_book_depth: bool,
59    /// Whether to subscribe to order book at interval.
60    pub subscribe_book_at_interval: bool,
61    /// Whether to subscribe to quotes.
62    pub subscribe_quotes: bool,
63    /// Whether to subscribe to trades.
64    pub subscribe_trades: bool,
65    /// Whether to subscribe to mark prices.
66    pub subscribe_mark_prices: bool,
67    /// Whether to subscribe to index prices.
68    pub subscribe_index_prices: bool,
69    /// Whether to subscribe to funding rates.
70    pub subscribe_funding_rates: bool,
71    /// Whether to subscribe to bars.
72    pub subscribe_bars: bool,
73    /// Whether to subscribe to instrument updates.
74    pub subscribe_instrument: bool,
75    /// Whether to subscribe to instrument status.
76    pub subscribe_instrument_status: bool,
77    /// Whether to subscribe to instrument close.
78    pub subscribe_instrument_close: bool,
79    /// Optional parameters passed to all subscribe calls.
80    pub subscribe_params: Option<Params>,
81    /// Optional parameters passed to all request calls.
82    pub request_params: Option<Params>,
83    /// Whether unsubscribe is supported on stop.
84    pub can_unsubscribe: bool,
85    /// Whether to request instruments on start.
86    pub request_instruments: bool,
87    // TODO: Support request_quotes when historical data requests are available
88    /// Whether to request historical quotes (not yet implemented).
89    pub request_quotes: bool,
90    // TODO: Support request_trades when historical data requests are available
91    /// Whether to request historical trades (not yet implemented).
92    pub request_trades: bool,
93    /// Whether to request historical bars.
94    pub request_bars: bool,
95    /// Whether to request order book snapshots.
96    pub request_book_snapshot: bool,
97    // TODO: Support request_book_deltas when Rust data engine has RequestBookDeltas
98    /// Whether to request historical order book deltas (not yet implemented).
99    pub request_book_deltas: bool,
100    /// Whether to request historical funding rates.
101    pub request_funding_rates: bool,
102    // TODO: Support requests_start_delta when we implement historical data requests
103    /// Book type for order book subscriptions.
104    pub book_type: BookType,
105    /// Order book depth for subscriptions.
106    pub book_depth: Option<NonZeroUsize>,
107    // TODO: Support book_group_size when order book grouping is implemented
108    /// Order book interval in milliseconds for at_interval subscriptions.
109    pub book_interval_ms: NonZeroUsize,
110    /// Number of order book levels to print when logging.
111    pub book_levels_to_print: usize,
112    /// Whether to manage local order book from deltas.
113    pub manage_book: bool,
114    /// Whether to log received data.
115    pub log_data: bool,
116    /// Stats logging interval in seconds (0 to disable).
117    pub stats_interval_secs: u64,
118}
119
120impl DataTesterConfig {
121    /// Creates a new [`DataTesterConfig`] instance with minimal settings.
122    ///
123    /// # Panics
124    ///
125    /// Panics if `NonZeroUsize::new(1000)` fails (which should never happen).
126    #[must_use]
127    pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
128        Self {
129            base: DataActorConfig::default(),
130            instrument_ids,
131            client_id: Some(client_id),
132            bar_types: None,
133            subscribe_book_deltas: false,
134            subscribe_book_depth: false,
135            subscribe_book_at_interval: false,
136            subscribe_quotes: false,
137            subscribe_trades: false,
138            subscribe_mark_prices: false,
139            subscribe_index_prices: false,
140            subscribe_funding_rates: false,
141            subscribe_bars: false,
142
143            subscribe_instrument: false,
144            subscribe_instrument_status: false,
145            subscribe_instrument_close: false,
146            subscribe_params: None,
147            request_params: None,
148            can_unsubscribe: true,
149            request_instruments: false,
150            request_quotes: false,
151            request_trades: false,
152            request_bars: false,
153            request_book_snapshot: false,
154            request_book_deltas: false,
155            request_funding_rates: false,
156            book_type: BookType::L2_MBP,
157            book_depth: None,
158            book_interval_ms: NonZeroUsize::new(1000).unwrap(),
159            book_levels_to_print: 10,
160            manage_book: true,
161            log_data: true,
162            stats_interval_secs: 5,
163        }
164    }
165
166    #[must_use]
167    pub fn with_log_data(mut self, log_data: bool) -> Self {
168        self.log_data = log_data;
169        self
170    }
171
172    #[must_use]
173    pub fn with_subscribe_book_deltas(mut self, subscribe: bool) -> Self {
174        self.subscribe_book_deltas = subscribe;
175        self
176    }
177
178    #[must_use]
179    pub fn with_subscribe_book_depth(mut self, subscribe: bool) -> Self {
180        self.subscribe_book_depth = subscribe;
181        self
182    }
183
184    #[must_use]
185    pub fn with_subscribe_book_at_interval(mut self, subscribe: bool) -> Self {
186        self.subscribe_book_at_interval = subscribe;
187        self
188    }
189
190    #[must_use]
191    pub fn with_subscribe_quotes(mut self, subscribe: bool) -> Self {
192        self.subscribe_quotes = subscribe;
193        self
194    }
195
196    #[must_use]
197    pub fn with_subscribe_trades(mut self, subscribe: bool) -> Self {
198        self.subscribe_trades = subscribe;
199        self
200    }
201
202    #[must_use]
203    pub fn with_subscribe_mark_prices(mut self, subscribe: bool) -> Self {
204        self.subscribe_mark_prices = subscribe;
205        self
206    }
207
208    #[must_use]
209    pub fn with_subscribe_index_prices(mut self, subscribe: bool) -> Self {
210        self.subscribe_index_prices = subscribe;
211        self
212    }
213
214    #[must_use]
215    pub fn with_subscribe_funding_rates(mut self, subscribe: bool) -> Self {
216        self.subscribe_funding_rates = subscribe;
217        self
218    }
219
220    #[must_use]
221    pub fn with_subscribe_bars(mut self, subscribe: bool) -> Self {
222        self.subscribe_bars = subscribe;
223        self
224    }
225
226    #[must_use]
227    pub fn with_bar_types(mut self, bar_types: Vec<BarType>) -> Self {
228        self.bar_types = Some(bar_types);
229        self
230    }
231
232    #[must_use]
233    pub fn with_subscribe_instrument(mut self, subscribe: bool) -> Self {
234        self.subscribe_instrument = subscribe;
235        self
236    }
237
238    #[must_use]
239    pub fn with_subscribe_instrument_status(mut self, subscribe: bool) -> Self {
240        self.subscribe_instrument_status = subscribe;
241        self
242    }
243
244    #[must_use]
245    pub fn with_subscribe_instrument_close(mut self, subscribe: bool) -> Self {
246        self.subscribe_instrument_close = subscribe;
247        self
248    }
249
250    #[must_use]
251    pub fn with_book_type(mut self, book_type: BookType) -> Self {
252        self.book_type = book_type;
253        self
254    }
255
256    #[must_use]
257    pub fn with_book_depth(mut self, depth: Option<NonZeroUsize>) -> Self {
258        self.book_depth = depth;
259        self
260    }
261
262    #[must_use]
263    pub fn with_book_interval_ms(mut self, interval_ms: NonZeroUsize) -> Self {
264        self.book_interval_ms = interval_ms;
265        self
266    }
267
268    #[must_use]
269    pub fn with_manage_book(mut self, manage: bool) -> Self {
270        self.manage_book = manage;
271        self
272    }
273
274    #[must_use]
275    pub fn with_request_instruments(mut self, request: bool) -> Self {
276        self.request_instruments = request;
277        self
278    }
279
280    #[must_use]
281    pub fn with_request_book_snapshot(mut self, request: bool) -> Self {
282        self.request_book_snapshot = request;
283        self
284    }
285
286    #[must_use]
287    pub fn with_request_book_deltas(mut self, request: bool) -> Self {
288        self.request_book_deltas = request;
289        self
290    }
291
292    #[must_use]
293    pub fn with_request_trades(mut self, request: bool) -> Self {
294        self.request_trades = request;
295        self
296    }
297
298    #[must_use]
299    pub fn with_request_bars(mut self, request: bool) -> Self {
300        self.request_bars = request;
301        self
302    }
303
304    #[must_use]
305    pub fn with_request_funding_rates(mut self, request: bool) -> Self {
306        self.request_funding_rates = request;
307        self
308    }
309
310    #[must_use]
311    pub fn with_can_unsubscribe(mut self, can_unsubscribe: bool) -> Self {
312        self.can_unsubscribe = can_unsubscribe;
313        self
314    }
315
316    #[must_use]
317    pub fn with_subscribe_params(mut self, params: Option<Params>) -> Self {
318        self.subscribe_params = params;
319        self
320    }
321
322    #[must_use]
323    pub fn with_request_params(mut self, params: Option<Params>) -> Self {
324        self.request_params = params;
325        self
326    }
327
328    #[must_use]
329    pub fn with_stats_interval_secs(mut self, interval_secs: u64) -> Self {
330        self.stats_interval_secs = interval_secs;
331        self
332    }
333}
334
335impl Default for DataTesterConfig {
336    fn default() -> Self {
337        Self {
338            base: DataActorConfig::default(),
339            instrument_ids: Vec::new(),
340            client_id: None,
341            bar_types: None,
342            subscribe_book_deltas: false,
343            subscribe_book_depth: false,
344            subscribe_book_at_interval: false,
345            subscribe_quotes: false,
346            subscribe_trades: false,
347            subscribe_mark_prices: false,
348            subscribe_index_prices: false,
349            subscribe_funding_rates: false,
350            subscribe_bars: false,
351            subscribe_instrument: false,
352            subscribe_instrument_status: false,
353            subscribe_instrument_close: false,
354            subscribe_params: None,
355            request_params: None,
356            can_unsubscribe: true,
357            request_instruments: false,
358            request_quotes: false,
359            request_trades: false,
360            request_bars: false,
361            request_book_snapshot: false,
362            request_book_deltas: false,
363            request_funding_rates: false,
364            book_type: BookType::L2_MBP,
365            book_depth: None,
366            book_interval_ms: NonZeroUsize::new(1000).unwrap(),
367            book_levels_to_print: 10,
368            manage_book: false,
369            log_data: true,
370            stats_interval_secs: 5,
371        }
372    }
373}
374
375/// A data tester actor for live testing market data subscriptions.
376///
377/// Subscribes to configured data types for specified instruments and logs
378/// received data to demonstrate the data flow. Useful for testing adapters
379/// and validating data connectivity.
380///
381/// This actor provides equivalent functionality to the Python `DataTester`
382/// in the test kit.
383#[derive(Debug)]
384pub struct DataTester {
385    core: DataActorCore,
386    config: DataTesterConfig,
387    books: AHashMap<InstrumentId, OrderBook>,
388}
389
390impl Deref for DataTester {
391    type Target = DataActorCore;
392
393    fn deref(&self) -> &Self::Target {
394        &self.core
395    }
396}
397
398impl DerefMut for DataTester {
399    fn deref_mut(&mut self) -> &mut Self::Target {
400        &mut self.core
401    }
402}
403
404impl DataActor for DataTester {
405    fn on_start(&mut self) -> anyhow::Result<()> {
406        let instrument_ids = self.config.instrument_ids.clone();
407        let client_id = self.config.client_id;
408        let subscribe_params = self.config.subscribe_params.clone();
409        let request_params = self.config.request_params.clone();
410        let stats_interval_secs = self.config.stats_interval_secs;
411
412        // Request instruments if configured
413        if self.config.request_instruments {
414            let mut venues = AHashSet::new();
415            for instrument_id in &instrument_ids {
416                venues.insert(instrument_id.venue);
417            }
418
419            for venue in venues {
420                let _ = self.request_instruments(
421                    Some(venue),
422                    None,
423                    None,
424                    client_id,
425                    request_params.clone(),
426                );
427            }
428        }
429
430        // Subscribe to data for each instrument
431        for instrument_id in instrument_ids {
432            if self.config.subscribe_instrument {
433                self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
434            }
435
436            if self.config.subscribe_book_deltas {
437                self.subscribe_book_deltas(
438                    instrument_id,
439                    self.config.book_type,
440                    None,
441                    client_id,
442                    self.config.manage_book,
443                    subscribe_params.clone(),
444                );
445
446                if self.config.manage_book {
447                    let book = OrderBook::new(instrument_id, self.config.book_type);
448                    self.books.insert(instrument_id, book);
449                }
450            }
451
452            if self.config.subscribe_book_at_interval {
453                self.subscribe_book_at_interval(
454                    instrument_id,
455                    self.config.book_type,
456                    self.config.book_depth,
457                    self.config.book_interval_ms,
458                    client_id,
459                    subscribe_params.clone(),
460                );
461            }
462
463            // TODO: Support subscribe_book_depth when the method is available
464            // if self.config.subscribe_book_depth {
465            //     self.subscribe_book_depth(
466            //         instrument_id,
467            //         self.config.book_type,
468            //         self.config.book_depth,
469            //         client_id,
470            //         subscribe_params.clone(),
471            //     );
472            // }
473
474            if self.config.subscribe_quotes {
475                self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
476            }
477
478            if self.config.subscribe_trades {
479                self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
480            }
481
482            if self.config.subscribe_mark_prices {
483                self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
484            }
485
486            if self.config.subscribe_index_prices {
487                self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
488            }
489
490            if self.config.subscribe_funding_rates {
491                self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
492            }
493
494            if self.config.subscribe_instrument_status {
495                self.subscribe_instrument_status(
496                    instrument_id,
497                    client_id,
498                    subscribe_params.clone(),
499                );
500            }
501
502            if self.config.subscribe_instrument_close {
503                self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
504            }
505
506            // TODO: Implement historical data requests
507            // if self.config.request_quotes {
508            //     self.request_quote_ticks(...);
509            // }
510
511            // Request order book snapshot if configured
512            if self.config.request_book_snapshot {
513                let _ = self.request_book_snapshot(
514                    instrument_id,
515                    self.config.book_depth,
516                    client_id,
517                    request_params.clone(),
518                );
519            }
520
521            // TODO: Request book deltas when Rust data engine has RequestBookDeltas
522
523            // Request historical trades (default to last 1 hour)
524            if self.config.request_trades {
525                let start = self.clock().utc_now() - ChronoDuration::hours(1);
526                if let Err(e) = self.request_trades(
527                    instrument_id,
528                    Some(start),
529                    None,
530                    None,
531                    client_id,
532                    request_params.clone(),
533                ) {
534                    log::error!("Failed to request trades for {instrument_id}: {e}");
535                }
536            }
537
538            // Request historical funding rates (default to last 7 days)
539            if self.config.request_funding_rates {
540                let start = self.clock().utc_now() - ChronoDuration::days(7);
541                if let Err(e) = self.request_funding_rates(
542                    instrument_id,
543                    Some(start),
544                    None,
545                    None,
546                    client_id,
547                    request_params.clone(),
548                ) {
549                    log::error!("Failed to request funding rates for {instrument_id}: {e}");
550                }
551            }
552        }
553
554        // Subscribe to bars
555        if let Some(bar_types) = self.config.bar_types.clone() {
556            for bar_type in bar_types {
557                if self.config.subscribe_bars {
558                    self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
559                }
560
561                // Request historical bars (default to last 1 hour)
562                if self.config.request_bars {
563                    let start = self.clock().utc_now() - ChronoDuration::hours(1);
564                    if let Err(e) = self.request_bars(
565                        bar_type,
566                        Some(start),
567                        None,
568                        None,
569                        client_id,
570                        request_params.clone(),
571                    ) {
572                        log::error!("Failed to request bars for {bar_type}: {e}");
573                    }
574                }
575            }
576        }
577
578        // Set up stats timer
579        if stats_interval_secs > 0 {
580            self.clock().set_timer(
581                "STATS-TIMER",
582                Duration::from_secs(stats_interval_secs),
583                None,
584                None,
585                None,
586                Some(true),
587                Some(false),
588            )?;
589        }
590
591        Ok(())
592    }
593
594    fn on_stop(&mut self) -> anyhow::Result<()> {
595        if !self.config.can_unsubscribe {
596            return Ok(());
597        }
598
599        let instrument_ids = self.config.instrument_ids.clone();
600        let client_id = self.config.client_id;
601        let subscribe_params = self.config.subscribe_params.clone();
602
603        for instrument_id in instrument_ids {
604            if self.config.subscribe_instrument {
605                self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
606            }
607
608            if self.config.subscribe_book_deltas {
609                self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
610            }
611
612            if self.config.subscribe_book_at_interval {
613                self.unsubscribe_book_at_interval(
614                    instrument_id,
615                    self.config.book_interval_ms,
616                    client_id,
617                    subscribe_params.clone(),
618                );
619            }
620
621            // TODO: Support unsubscribe_book_depth when the method is available
622            // if self.config.subscribe_book_depth {
623            //     self.unsubscribe_book_depth(instrument_id, client_id, subscribe_params.clone());
624            // }
625
626            if self.config.subscribe_quotes {
627                self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
628            }
629
630            if self.config.subscribe_trades {
631                self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
632            }
633
634            if self.config.subscribe_mark_prices {
635                self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
636            }
637
638            if self.config.subscribe_index_prices {
639                self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
640            }
641
642            if self.config.subscribe_funding_rates {
643                self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
644            }
645
646            if self.config.subscribe_instrument_status {
647                self.unsubscribe_instrument_status(
648                    instrument_id,
649                    client_id,
650                    subscribe_params.clone(),
651                );
652            }
653
654            if self.config.subscribe_instrument_close {
655                self.unsubscribe_instrument_close(
656                    instrument_id,
657                    client_id,
658                    subscribe_params.clone(),
659                );
660            }
661        }
662
663        if let Some(bar_types) = self.config.bar_types.clone() {
664            for bar_type in bar_types {
665                if self.config.subscribe_bars {
666                    self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
667                }
668            }
669        }
670
671        Ok(())
672    }
673
674    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
675        // Timer events are used by the actor but don't require specific handling
676        Ok(())
677    }
678
679    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
680        if self.config.log_data {
681            log_info!("{instrument:?}", color = LogColor::Cyan);
682        }
683        Ok(())
684    }
685
686    fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
687        if self.config.log_data {
688            let levels = self.config.book_levels_to_print;
689            let instrument_id = book.instrument_id;
690            let book_str = book.pprint(levels, None);
691            log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
692        }
693
694        Ok(())
695    }
696
697    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
698        if self.config.manage_book {
699            if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
700                book.apply_deltas(deltas)?;
701
702                if self.config.log_data {
703                    let levels = self.config.book_levels_to_print;
704                    let instrument_id = deltas.instrument_id;
705                    let book_str = book.pprint(levels, None);
706                    log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
707                }
708            }
709        } else if self.config.log_data {
710            log_info!("{deltas:?}", color = LogColor::Cyan);
711        }
712        Ok(())
713    }
714
715    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
716        if self.config.log_data {
717            log_info!("{quote:?}", color = LogColor::Cyan);
718        }
719        Ok(())
720    }
721
722    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
723        if self.config.log_data {
724            log_info!("{trade:?}", color = LogColor::Cyan);
725        }
726        Ok(())
727    }
728
729    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
730        if self.config.log_data {
731            log_info!("{bar:?}", color = LogColor::Cyan);
732        }
733        Ok(())
734    }
735
736    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
737        if self.config.log_data {
738            log_info!("{mark_price:?}", color = LogColor::Cyan);
739        }
740        Ok(())
741    }
742
743    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
744        if self.config.log_data {
745            log_info!("{index_price:?}", color = LogColor::Cyan);
746        }
747        Ok(())
748    }
749
750    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
751        if self.config.log_data {
752            log_info!("{funding_rate:?}", color = LogColor::Cyan);
753        }
754        Ok(())
755    }
756
757    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
758        if self.config.log_data {
759            log_info!("{data:?}", color = LogColor::Cyan);
760        }
761        Ok(())
762    }
763
764    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
765        if self.config.log_data {
766            log_info!("{update:?}", color = LogColor::Cyan);
767        }
768        Ok(())
769    }
770
771    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
772        if self.config.log_data {
773            log_info!(
774                "Received {} historical trades",
775                trades.len(),
776                color = LogColor::Cyan
777            );
778            for trade in trades.iter().take(5) {
779                log_info!("  {trade:?}", color = LogColor::Cyan);
780            }
781            if trades.len() > 5 {
782                log_info!(
783                    "  ... and {} more trades",
784                    trades.len() - 5,
785                    color = LogColor::Cyan
786                );
787            }
788        }
789        Ok(())
790    }
791
792    fn on_historical_funding_rates(
793        &mut self,
794        funding_rates: &[FundingRateUpdate],
795    ) -> anyhow::Result<()> {
796        if self.config.log_data {
797            log_info!(
798                "Received {} historical funding rates",
799                funding_rates.len(),
800                color = LogColor::Cyan
801            );
802            for rate in funding_rates.iter().take(5) {
803                log_info!("  {rate:?}", color = LogColor::Cyan);
804            }
805            if funding_rates.len() > 5 {
806                log_info!(
807                    "  ... and {} more funding rates",
808                    funding_rates.len() - 5,
809                    color = LogColor::Cyan
810                );
811            }
812        }
813        Ok(())
814    }
815
816    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
817        if self.config.log_data {
818            log_info!(
819                "Received {} historical bars",
820                bars.len(),
821                color = LogColor::Cyan
822            );
823            for bar in bars.iter().take(5) {
824                log_info!("  {bar:?}", color = LogColor::Cyan);
825            }
826            if bars.len() > 5 {
827                log_info!(
828                    "  ... and {} more bars",
829                    bars.len() - 5,
830                    color = LogColor::Cyan
831                );
832            }
833        }
834        Ok(())
835    }
836}
837
838impl DataTester {
839    /// Creates a new [`DataTester`] instance.
840    #[must_use]
841    pub fn new(config: DataTesterConfig) -> Self {
842        Self {
843            core: DataActorCore::new(config.base.clone()),
844            config,
845            books: AHashMap::new(),
846        }
847    }
848}
849
850#[cfg(test)]
851mod tests {
852    use nautilus_core::UnixNanos;
853    use nautilus_model::{
854        data::OrderBookDelta,
855        enums::{InstrumentCloseType, MarketStatusAction},
856        identifiers::Symbol,
857        instruments::CurrencyPair,
858        types::{Currency, Price, Quantity},
859    };
860    use rstest::*;
861    use rust_decimal::Decimal;
862
863    use super::*;
864
865    #[fixture]
866    fn config() -> DataTesterConfig {
867        let client_id = ClientId::new("TEST");
868        let instrument_ids = vec![
869            InstrumentId::from("BTC-USDT.TEST"),
870            InstrumentId::from("ETH-USDT.TEST"),
871        ];
872        DataTesterConfig::new(client_id, instrument_ids)
873            .with_subscribe_quotes(true)
874            .with_subscribe_trades(true)
875    }
876
877    #[rstest]
878    fn test_config_creation() {
879        let client_id = ClientId::new("TEST");
880        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
881        let config =
882            DataTesterConfig::new(client_id, instrument_ids.clone()).with_subscribe_quotes(true);
883
884        assert_eq!(config.client_id, Some(client_id));
885        assert_eq!(config.instrument_ids, instrument_ids);
886        assert!(config.subscribe_quotes);
887        assert!(!config.subscribe_trades);
888        assert!(config.log_data);
889        assert_eq!(config.stats_interval_secs, 5);
890    }
891
892    #[rstest]
893    fn test_config_default() {
894        let config = DataTesterConfig::default();
895
896        assert_eq!(config.client_id, None);
897        assert!(config.instrument_ids.is_empty());
898        assert!(!config.subscribe_quotes);
899        assert!(!config.subscribe_trades);
900        assert!(!config.subscribe_bars);
901        assert!(!config.request_instruments);
902        assert!(!config.request_book_snapshot);
903        assert!(!config.request_book_deltas);
904        assert!(!config.request_trades);
905        assert!(!config.request_bars);
906        assert!(!config.request_funding_rates);
907        assert!(config.can_unsubscribe);
908        assert!(config.log_data);
909        assert!(config.subscribe_params.is_none());
910        assert!(config.request_params.is_none());
911    }
912
913    #[rstest]
914    fn test_config_with_params() {
915        let client_id = ClientId::new("TEST");
916        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
917
918        let mut sub_params = Params::new();
919        sub_params.insert("key".to_string(), serde_json::json!("value"));
920
921        let mut req_params = Params::new();
922        req_params.insert("limit".to_string(), serde_json::json!(100));
923
924        let config = DataTesterConfig::new(client_id, instrument_ids)
925            .with_subscribe_params(Some(sub_params.clone()))
926            .with_request_params(Some(req_params.clone()));
927
928        assert_eq!(config.subscribe_params, Some(sub_params));
929        assert_eq!(config.request_params, Some(req_params));
930    }
931
932    #[rstest]
933    fn test_actor_creation(config: DataTesterConfig) {
934        let actor = DataTester::new(config);
935
936        assert_eq!(actor.config.client_id, Some(ClientId::new("TEST")));
937        assert_eq!(actor.config.instrument_ids.len(), 2);
938    }
939
940    #[rstest]
941    fn test_on_quote_with_logging_enabled(config: DataTesterConfig) {
942        let mut actor = DataTester::new(config);
943
944        let quote = QuoteTick::default();
945        let result = actor.on_quote(&quote);
946
947        assert!(result.is_ok());
948    }
949
950    #[rstest]
951    fn test_on_quote_with_logging_disabled(mut config: DataTesterConfig) {
952        config.log_data = false;
953        let mut actor = DataTester::new(config);
954
955        let quote = QuoteTick::default();
956        let result = actor.on_quote(&quote);
957
958        assert!(result.is_ok());
959    }
960
961    #[rstest]
962    fn test_on_trade(config: DataTesterConfig) {
963        let mut actor = DataTester::new(config);
964
965        let trade = TradeTick::default();
966        let result = actor.on_trade(&trade);
967
968        assert!(result.is_ok());
969    }
970
971    #[rstest]
972    fn test_on_bar(config: DataTesterConfig) {
973        let mut actor = DataTester::new(config);
974
975        let bar = Bar::default();
976        let result = actor.on_bar(&bar);
977
978        assert!(result.is_ok());
979    }
980
981    #[rstest]
982    fn test_on_instrument(config: DataTesterConfig) {
983        let mut actor = DataTester::new(config);
984
985        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
986        let instrument = CurrencyPair::new(
987            instrument_id,
988            Symbol::from("BTC/USDT"),
989            Currency::USD(),
990            Currency::USD(),
991            4,
992            3,
993            Price::from("0.0001"),
994            Quantity::from("0.001"),
995            None,
996            None,
997            None,
998            None,
999            None,
1000            None,
1001            None,
1002            None,
1003            None,
1004            None,
1005            None,
1006            None,
1007            None, // info
1008            UnixNanos::default(),
1009            UnixNanos::default(),
1010        );
1011        let result = actor.on_instrument(&InstrumentAny::CurrencyPair(instrument));
1012
1013        assert!(result.is_ok());
1014    }
1015
1016    #[rstest]
1017    fn test_on_book_deltas_without_managed_book(config: DataTesterConfig) {
1018        let mut actor = DataTester::new(config);
1019
1020        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1021        let delta =
1022            OrderBookDelta::clear(instrument_id, 0, UnixNanos::default(), UnixNanos::default());
1023        let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1024        let result = actor.on_book_deltas(&deltas);
1025
1026        assert!(result.is_ok());
1027    }
1028
1029    #[rstest]
1030    fn test_on_mark_price(config: DataTesterConfig) {
1031        let mut actor = DataTester::new(config);
1032
1033        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1034        let price = Price::from("50000.0");
1035        let mark_price = MarkPriceUpdate::new(
1036            instrument_id,
1037            price,
1038            UnixNanos::default(),
1039            UnixNanos::default(),
1040        );
1041        let result = actor.on_mark_price(&mark_price);
1042
1043        assert!(result.is_ok());
1044    }
1045
1046    #[rstest]
1047    fn test_on_index_price(config: DataTesterConfig) {
1048        let mut actor = DataTester::new(config);
1049
1050        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1051        let price = Price::from("50000.0");
1052        let index_price = IndexPriceUpdate::new(
1053            instrument_id,
1054            price,
1055            UnixNanos::default(),
1056            UnixNanos::default(),
1057        );
1058        let result = actor.on_index_price(&index_price);
1059
1060        assert!(result.is_ok());
1061    }
1062
1063    #[rstest]
1064    fn test_on_funding_rate(config: DataTesterConfig) {
1065        let mut actor = DataTester::new(config);
1066
1067        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1068        let funding_rate = FundingRateUpdate::new(
1069            instrument_id,
1070            Decimal::new(1, 4),
1071            None,
1072            UnixNanos::default(),
1073            UnixNanos::default(),
1074        );
1075        let result = actor.on_funding_rate(&funding_rate);
1076
1077        assert!(result.is_ok());
1078    }
1079
1080    #[rstest]
1081    fn test_on_historical_funding_rates(config: DataTesterConfig) {
1082        let mut actor = DataTester::new(config);
1083
1084        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1085        let rates = vec![
1086            FundingRateUpdate::new(
1087                instrument_id,
1088                Decimal::new(1, 4),
1089                None,
1090                UnixNanos::default(),
1091                UnixNanos::default(),
1092            ),
1093            FundingRateUpdate::new(
1094                instrument_id,
1095                Decimal::new(2, 4),
1096                None,
1097                UnixNanos::default(),
1098                UnixNanos::default(),
1099            ),
1100        ];
1101        let result = actor.on_historical_funding_rates(&rates);
1102
1103        assert!(result.is_ok());
1104    }
1105
1106    #[rstest]
1107    fn test_config_request_funding_rates() {
1108        let client_id = ClientId::new("TEST");
1109        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1110        let config =
1111            DataTesterConfig::new(client_id, instrument_ids).with_request_funding_rates(true);
1112
1113        assert!(config.request_funding_rates);
1114    }
1115
1116    #[rstest]
1117    fn test_config_request_book_deltas() {
1118        let client_id = ClientId::new("TEST");
1119        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1120        let config =
1121            DataTesterConfig::new(client_id, instrument_ids).with_request_book_deltas(true);
1122
1123        assert!(config.request_book_deltas);
1124    }
1125
1126    #[rstest]
1127    fn test_on_instrument_status(config: DataTesterConfig) {
1128        let mut actor = DataTester::new(config);
1129
1130        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1131        let status = InstrumentStatus::new(
1132            instrument_id,
1133            MarketStatusAction::Trading,
1134            UnixNanos::default(),
1135            UnixNanos::default(),
1136            None,
1137            None,
1138            None,
1139            None,
1140            None,
1141        );
1142        let result = actor.on_instrument_status(&status);
1143
1144        assert!(result.is_ok());
1145    }
1146
1147    #[rstest]
1148    fn test_on_instrument_close(config: DataTesterConfig) {
1149        let mut actor = DataTester::new(config);
1150
1151        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1152        let price = Price::from("50000.0");
1153        let close = InstrumentClose::new(
1154            instrument_id,
1155            price,
1156            InstrumentCloseType::EndOfSession,
1157            UnixNanos::default(),
1158            UnixNanos::default(),
1159        );
1160        let result = actor.on_instrument_close(&close);
1161
1162        assert!(result.is_ok());
1163    }
1164
1165    #[rstest]
1166    fn test_on_time_event(config: DataTesterConfig) {
1167        let mut actor = DataTester::new(config);
1168
1169        let event = TimeEvent::new(
1170            "TEST".into(),
1171            Default::default(),
1172            UnixNanos::default(),
1173            UnixNanos::default(),
1174        );
1175        let result = actor.on_time_event(&event);
1176
1177        assert!(result.is_ok());
1178    }
1179
1180    #[rstest]
1181    fn test_config_with_all_subscriptions_enabled(mut config: DataTesterConfig) {
1182        config.subscribe_book_deltas = true;
1183        config.subscribe_book_at_interval = true;
1184        config.subscribe_bars = true;
1185        config.subscribe_mark_prices = true;
1186        config.subscribe_index_prices = true;
1187        config.subscribe_funding_rates = true;
1188        config.subscribe_instrument = true;
1189        config.subscribe_instrument_status = true;
1190        config.subscribe_instrument_close = true;
1191
1192        let actor = DataTester::new(config);
1193
1194        assert!(actor.config.subscribe_book_deltas);
1195        assert!(actor.config.subscribe_book_at_interval);
1196        assert!(actor.config.subscribe_bars);
1197        assert!(actor.config.subscribe_mark_prices);
1198        assert!(actor.config.subscribe_index_prices);
1199        assert!(actor.config.subscribe_funding_rates);
1200        assert!(actor.config.subscribe_instrument);
1201        assert!(actor.config.subscribe_instrument_status);
1202        assert!(actor.config.subscribe_instrument_close);
1203    }
1204
1205    #[rstest]
1206    fn test_config_with_book_management(mut config: DataTesterConfig) {
1207        config.manage_book = true;
1208        config.book_levels_to_print = 5;
1209
1210        let actor = DataTester::new(config);
1211
1212        assert!(actor.config.manage_book);
1213        assert_eq!(actor.config.book_levels_to_print, 5);
1214        assert!(actor.books.is_empty());
1215    }
1216
1217    #[rstest]
1218    fn test_config_with_custom_stats_interval(mut config: DataTesterConfig) {
1219        config.stats_interval_secs = 10;
1220
1221        let actor = DataTester::new(config);
1222
1223        assert_eq!(actor.config.stats_interval_secs, 10);
1224    }
1225
1226    #[rstest]
1227    fn test_config_with_unsubscribe_disabled(mut config: DataTesterConfig) {
1228        config.can_unsubscribe = false;
1229
1230        let actor = DataTester::new(config);
1231
1232        assert!(!actor.config.can_unsubscribe);
1233    }
1234}