Skip to main content

nautilus_testkit/testers/
data.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16//! Data tester actor for live testing market data subscriptions.
17
18use std::{
19    num::NonZeroUsize,
20    ops::{Deref, DerefMut},
21    time::Duration,
22};
23
24use ahash::{AHashMap, AHashSet};
25use chrono::Duration as ChronoDuration;
26use nautilus_common::{
27    actor::{DataActor, DataActorConfig, DataActorCore},
28    enums::LogColor,
29    log_info,
30    timer::TimeEvent,
31};
32use nautilus_core::Params;
33use nautilus_model::{
34    data::{
35        Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
36        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, bar::BarType,
37    },
38    enums::BookType,
39    identifiers::{ClientId, InstrumentId},
40    instruments::InstrumentAny,
41    orderbook::OrderBook,
42};
43
44/// Configuration for the data tester actor.
45#[derive(Debug, Clone)]
46pub struct DataTesterConfig {
47    /// Base data actor configuration.
48    pub base: DataActorConfig,
49    /// Instrument IDs to subscribe to.
50    pub instrument_ids: Vec<InstrumentId>,
51    /// Client ID to use for subscriptions.
52    pub client_id: Option<ClientId>,
53    /// Bar types to subscribe to.
54    pub bar_types: Option<Vec<BarType>>,
55    /// Whether to subscribe to order book deltas.
56    pub subscribe_book_deltas: bool,
57    /// Whether to subscribe to order book depth snapshots.
58    pub subscribe_book_depth: bool,
59    /// Whether to subscribe to order book at interval.
60    pub subscribe_book_at_interval: bool,
61    /// Whether to subscribe to quotes.
62    pub subscribe_quotes: bool,
63    /// Whether to subscribe to trades.
64    pub subscribe_trades: bool,
65    /// Whether to subscribe to mark prices.
66    pub subscribe_mark_prices: bool,
67    /// Whether to subscribe to index prices.
68    pub subscribe_index_prices: bool,
69    /// Whether to subscribe to funding rates.
70    pub subscribe_funding_rates: bool,
71    /// Whether to subscribe to bars.
72    pub subscribe_bars: bool,
73    /// Whether to subscribe to instrument updates.
74    pub subscribe_instrument: bool,
75    /// Whether to subscribe to instrument status.
76    pub subscribe_instrument_status: bool,
77    /// Whether to subscribe to instrument close.
78    pub subscribe_instrument_close: bool,
79    /// Optional parameters passed to all subscribe calls.
80    pub subscribe_params: Option<Params>,
81    /// Optional parameters passed to all request calls.
82    pub request_params: Option<Params>,
83    /// Whether unsubscribe is supported on stop.
84    pub can_unsubscribe: bool,
85    /// Whether to request instruments on start.
86    pub request_instruments: bool,
87    // TODO: Support request_quotes when historical data requests are available
88    /// Whether to request historical quotes (not yet implemented).
89    pub request_quotes: bool,
90    // TODO: Support request_trades when historical data requests are available
91    /// Whether to request historical trades (not yet implemented).
92    pub request_trades: bool,
93    /// Whether to request historical bars.
94    pub request_bars: bool,
95    /// Whether to request order book snapshots.
96    pub request_book_snapshot: bool,
97    // TODO: Support request_book_deltas when Rust data engine has RequestBookDeltas
98    /// Whether to request historical order book deltas (not yet implemented).
99    pub request_book_deltas: bool,
100    /// Whether to request historical funding rates.
101    pub request_funding_rates: bool,
102    // TODO: Support requests_start_delta when we implement historical data requests
103    /// Book type for order book subscriptions.
104    pub book_type: BookType,
105    /// Order book depth for subscriptions.
106    pub book_depth: Option<NonZeroUsize>,
107    // TODO: Support book_group_size when order book grouping is implemented
108    /// Order book interval in milliseconds for at_interval subscriptions.
109    pub book_interval_ms: NonZeroUsize,
110    /// Number of order book levels to print when logging.
111    pub book_levels_to_print: usize,
112    /// Whether to manage local order book from deltas.
113    pub manage_book: bool,
114    /// Whether to log received data.
115    pub log_data: bool,
116    /// Stats logging interval in seconds (0 to disable).
117    pub stats_interval_secs: u64,
118}
119
120impl DataTesterConfig {
121    /// Creates a new [`DataTesterConfig`] instance with minimal settings.
122    ///
123    /// # Panics
124    ///
125    /// Panics if `NonZeroUsize::new(1000)` fails (which should never happen).
126    #[must_use]
127    pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
128        Self {
129            base: DataActorConfig::default(),
130            instrument_ids,
131            client_id: Some(client_id),
132            bar_types: None,
133            subscribe_book_deltas: false,
134            subscribe_book_depth: false,
135            subscribe_book_at_interval: false,
136            subscribe_quotes: false,
137            subscribe_trades: false,
138            subscribe_mark_prices: false,
139            subscribe_index_prices: false,
140            subscribe_funding_rates: false,
141            subscribe_bars: false,
142
143            subscribe_instrument: false,
144            subscribe_instrument_status: false,
145            subscribe_instrument_close: false,
146            subscribe_params: None,
147            request_params: None,
148            can_unsubscribe: true,
149            request_instruments: false,
150            request_quotes: false,
151            request_trades: false,
152            request_bars: false,
153            request_book_snapshot: false,
154            request_book_deltas: false,
155            request_funding_rates: false,
156            book_type: BookType::L2_MBP,
157            book_depth: None,
158            book_interval_ms: NonZeroUsize::new(1000).unwrap(),
159            book_levels_to_print: 10,
160            manage_book: true,
161            log_data: true,
162            stats_interval_secs: 5,
163        }
164    }
165
166    #[must_use]
167    pub fn with_log_data(mut self, log_data: bool) -> Self {
168        self.log_data = log_data;
169        self
170    }
171
172    #[must_use]
173    pub fn with_subscribe_book_deltas(mut self, subscribe: bool) -> Self {
174        self.subscribe_book_deltas = subscribe;
175        self
176    }
177
178    #[must_use]
179    pub fn with_subscribe_book_depth(mut self, subscribe: bool) -> Self {
180        self.subscribe_book_depth = subscribe;
181        self
182    }
183
184    #[must_use]
185    pub fn with_subscribe_book_at_interval(mut self, subscribe: bool) -> Self {
186        self.subscribe_book_at_interval = subscribe;
187        self
188    }
189
190    #[must_use]
191    pub fn with_subscribe_quotes(mut self, subscribe: bool) -> Self {
192        self.subscribe_quotes = subscribe;
193        self
194    }
195
196    #[must_use]
197    pub fn with_subscribe_trades(mut self, subscribe: bool) -> Self {
198        self.subscribe_trades = subscribe;
199        self
200    }
201
202    #[must_use]
203    pub fn with_subscribe_mark_prices(mut self, subscribe: bool) -> Self {
204        self.subscribe_mark_prices = subscribe;
205        self
206    }
207
208    #[must_use]
209    pub fn with_subscribe_index_prices(mut self, subscribe: bool) -> Self {
210        self.subscribe_index_prices = subscribe;
211        self
212    }
213
214    #[must_use]
215    pub fn with_subscribe_funding_rates(mut self, subscribe: bool) -> Self {
216        self.subscribe_funding_rates = subscribe;
217        self
218    }
219
220    #[must_use]
221    pub fn with_subscribe_bars(mut self, subscribe: bool) -> Self {
222        self.subscribe_bars = subscribe;
223        self
224    }
225
226    #[must_use]
227    pub fn with_bar_types(mut self, bar_types: Vec<BarType>) -> Self {
228        self.bar_types = Some(bar_types);
229        self
230    }
231
232    #[must_use]
233    pub fn with_subscribe_instrument(mut self, subscribe: bool) -> Self {
234        self.subscribe_instrument = subscribe;
235        self
236    }
237
238    #[must_use]
239    pub fn with_subscribe_instrument_status(mut self, subscribe: bool) -> Self {
240        self.subscribe_instrument_status = subscribe;
241        self
242    }
243
244    #[must_use]
245    pub fn with_subscribe_instrument_close(mut self, subscribe: bool) -> Self {
246        self.subscribe_instrument_close = subscribe;
247        self
248    }
249
250    #[must_use]
251    pub fn with_book_type(mut self, book_type: BookType) -> Self {
252        self.book_type = book_type;
253        self
254    }
255
256    #[must_use]
257    pub fn with_book_depth(mut self, depth: Option<NonZeroUsize>) -> Self {
258        self.book_depth = depth;
259        self
260    }
261
262    #[must_use]
263    pub fn with_book_interval_ms(mut self, interval_ms: NonZeroUsize) -> Self {
264        self.book_interval_ms = interval_ms;
265        self
266    }
267
268    #[must_use]
269    pub fn with_manage_book(mut self, manage: bool) -> Self {
270        self.manage_book = manage;
271        self
272    }
273
274    #[must_use]
275    pub fn with_request_instruments(mut self, request: bool) -> Self {
276        self.request_instruments = request;
277        self
278    }
279
280    #[must_use]
281    pub fn with_request_book_snapshot(mut self, request: bool) -> Self {
282        self.request_book_snapshot = request;
283        self
284    }
285
286    #[must_use]
287    pub fn with_request_book_deltas(mut self, request: bool) -> Self {
288        self.request_book_deltas = request;
289        self
290    }
291
292    #[must_use]
293    pub fn with_request_trades(mut self, request: bool) -> Self {
294        self.request_trades = request;
295        self
296    }
297
298    #[must_use]
299    pub fn with_request_bars(mut self, request: bool) -> Self {
300        self.request_bars = request;
301        self
302    }
303
304    #[must_use]
305    pub fn with_request_quotes(mut self, request: bool) -> Self {
306        self.request_quotes = request;
307        self
308    }
309
310    #[must_use]
311    pub fn with_request_funding_rates(mut self, request: bool) -> Self {
312        self.request_funding_rates = request;
313        self
314    }
315
316    #[must_use]
317    pub fn with_book_levels_to_print(mut self, levels: usize) -> Self {
318        self.book_levels_to_print = levels;
319        self
320    }
321
322    #[must_use]
323    pub fn with_can_unsubscribe(mut self, can_unsubscribe: bool) -> Self {
324        self.can_unsubscribe = can_unsubscribe;
325        self
326    }
327
328    #[must_use]
329    pub fn with_subscribe_params(mut self, params: Option<Params>) -> Self {
330        self.subscribe_params = params;
331        self
332    }
333
334    #[must_use]
335    pub fn with_request_params(mut self, params: Option<Params>) -> Self {
336        self.request_params = params;
337        self
338    }
339
340    #[must_use]
341    pub fn with_stats_interval_secs(mut self, interval_secs: u64) -> Self {
342        self.stats_interval_secs = interval_secs;
343        self
344    }
345}
346
347impl Default for DataTesterConfig {
348    fn default() -> Self {
349        Self {
350            base: DataActorConfig::default(),
351            instrument_ids: Vec::new(),
352            client_id: None,
353            bar_types: None,
354            subscribe_book_deltas: false,
355            subscribe_book_depth: false,
356            subscribe_book_at_interval: false,
357            subscribe_quotes: false,
358            subscribe_trades: false,
359            subscribe_mark_prices: false,
360            subscribe_index_prices: false,
361            subscribe_funding_rates: false,
362            subscribe_bars: false,
363            subscribe_instrument: false,
364            subscribe_instrument_status: false,
365            subscribe_instrument_close: false,
366            subscribe_params: None,
367            request_params: None,
368            can_unsubscribe: true,
369            request_instruments: false,
370            request_quotes: false,
371            request_trades: false,
372            request_bars: false,
373            request_book_snapshot: false,
374            request_book_deltas: false,
375            request_funding_rates: false,
376            book_type: BookType::L2_MBP,
377            book_depth: None,
378            book_interval_ms: NonZeroUsize::new(1000).unwrap(),
379            book_levels_to_print: 10,
380            manage_book: false,
381            log_data: true,
382            stats_interval_secs: 5,
383        }
384    }
385}
386
387/// A data tester actor for live testing market data subscriptions.
388///
389/// Subscribes to configured data types for specified instruments and logs
390/// received data to demonstrate the data flow. Useful for testing adapters
391/// and validating data connectivity.
392///
393/// This actor provides equivalent functionality to the Python `DataTester`
394/// in the test kit.
395#[derive(Debug)]
396pub struct DataTester {
397    core: DataActorCore,
398    config: DataTesterConfig,
399    books: AHashMap<InstrumentId, OrderBook>,
400}
401
402impl Deref for DataTester {
403    type Target = DataActorCore;
404
405    fn deref(&self) -> &Self::Target {
406        &self.core
407    }
408}
409
410impl DerefMut for DataTester {
411    fn deref_mut(&mut self) -> &mut Self::Target {
412        &mut self.core
413    }
414}
415
416impl DataActor for DataTester {
417    fn on_start(&mut self) -> anyhow::Result<()> {
418        let instrument_ids = self.config.instrument_ids.clone();
419        let client_id = self.config.client_id;
420        let subscribe_params = self.config.subscribe_params.clone();
421        let request_params = self.config.request_params.clone();
422        let stats_interval_secs = self.config.stats_interval_secs;
423
424        // Request instruments if configured
425        if self.config.request_instruments {
426            let mut venues = AHashSet::new();
427            for instrument_id in &instrument_ids {
428                venues.insert(instrument_id.venue);
429            }
430
431            for venue in venues {
432                let _ = self.request_instruments(
433                    Some(venue),
434                    None,
435                    None,
436                    client_id,
437                    request_params.clone(),
438                );
439            }
440        }
441
442        // Subscribe to data for each instrument
443        for instrument_id in instrument_ids {
444            if self.config.subscribe_instrument {
445                self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
446            }
447
448            if self.config.subscribe_book_deltas {
449                self.subscribe_book_deltas(
450                    instrument_id,
451                    self.config.book_type,
452                    None,
453                    client_id,
454                    self.config.manage_book,
455                    subscribe_params.clone(),
456                );
457
458                if self.config.manage_book {
459                    let book = OrderBook::new(instrument_id, self.config.book_type);
460                    self.books.insert(instrument_id, book);
461                }
462            }
463
464            if self.config.subscribe_book_at_interval {
465                self.subscribe_book_at_interval(
466                    instrument_id,
467                    self.config.book_type,
468                    self.config.book_depth,
469                    self.config.book_interval_ms,
470                    client_id,
471                    subscribe_params.clone(),
472                );
473            }
474
475            // TODO: Support subscribe_book_depth when the method is available
476            // if self.config.subscribe_book_depth {
477            //     self.subscribe_book_depth(
478            //         instrument_id,
479            //         self.config.book_type,
480            //         self.config.book_depth,
481            //         client_id,
482            //         subscribe_params.clone(),
483            //     );
484            // }
485
486            if self.config.subscribe_quotes {
487                self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
488            }
489
490            if self.config.subscribe_trades {
491                self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
492            }
493
494            if self.config.subscribe_mark_prices {
495                self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
496            }
497
498            if self.config.subscribe_index_prices {
499                self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
500            }
501
502            if self.config.subscribe_funding_rates {
503                self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
504            }
505
506            if self.config.subscribe_instrument_status {
507                self.subscribe_instrument_status(
508                    instrument_id,
509                    client_id,
510                    subscribe_params.clone(),
511                );
512            }
513
514            if self.config.subscribe_instrument_close {
515                self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
516            }
517
518            // TODO: Implement historical data requests
519            // if self.config.request_quotes {
520            //     self.request_quote_ticks(...);
521            // }
522
523            // Request order book snapshot if configured
524            if self.config.request_book_snapshot {
525                let _ = self.request_book_snapshot(
526                    instrument_id,
527                    self.config.book_depth,
528                    client_id,
529                    request_params.clone(),
530                );
531            }
532
533            // TODO: Request book deltas when Rust data engine has RequestBookDeltas
534
535            // Request historical trades (default to last 1 hour)
536            if self.config.request_trades {
537                let start = self.clock().utc_now() - ChronoDuration::hours(1);
538
539                if let Err(e) = self.request_trades(
540                    instrument_id,
541                    Some(start),
542                    None,
543                    None,
544                    client_id,
545                    request_params.clone(),
546                ) {
547                    log::error!("Failed to request trades for {instrument_id}: {e}");
548                }
549            }
550
551            // Request historical funding rates (default to last 7 days)
552            if self.config.request_funding_rates {
553                let start = self.clock().utc_now() - ChronoDuration::days(7);
554
555                if let Err(e) = self.request_funding_rates(
556                    instrument_id,
557                    Some(start),
558                    None,
559                    None,
560                    client_id,
561                    request_params.clone(),
562                ) {
563                    log::error!("Failed to request funding rates for {instrument_id}: {e}");
564                }
565            }
566        }
567
568        // Subscribe to bars
569        if let Some(bar_types) = self.config.bar_types.clone() {
570            for bar_type in bar_types {
571                if self.config.subscribe_bars {
572                    self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
573                }
574
575                // Request historical bars (default to last 1 hour)
576                if self.config.request_bars {
577                    let start = self.clock().utc_now() - ChronoDuration::hours(1);
578
579                    if let Err(e) = self.request_bars(
580                        bar_type,
581                        Some(start),
582                        None,
583                        None,
584                        client_id,
585                        request_params.clone(),
586                    ) {
587                        log::error!("Failed to request bars for {bar_type}: {e}");
588                    }
589                }
590            }
591        }
592
593        // Set up stats timer
594        if stats_interval_secs > 0 {
595            self.clock().set_timer(
596                "STATS-TIMER",
597                Duration::from_secs(stats_interval_secs),
598                None,
599                None,
600                None,
601                Some(true),
602                Some(false),
603            )?;
604        }
605
606        Ok(())
607    }
608
609    fn on_stop(&mut self) -> anyhow::Result<()> {
610        if !self.config.can_unsubscribe {
611            return Ok(());
612        }
613
614        let instrument_ids = self.config.instrument_ids.clone();
615        let client_id = self.config.client_id;
616        let subscribe_params = self.config.subscribe_params.clone();
617
618        for instrument_id in instrument_ids {
619            if self.config.subscribe_instrument {
620                self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
621            }
622
623            if self.config.subscribe_book_deltas {
624                self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
625            }
626
627            if self.config.subscribe_book_at_interval {
628                self.unsubscribe_book_at_interval(
629                    instrument_id,
630                    self.config.book_interval_ms,
631                    client_id,
632                    subscribe_params.clone(),
633                );
634            }
635
636            // TODO: Support unsubscribe_book_depth when the method is available
637            // if self.config.subscribe_book_depth {
638            //     self.unsubscribe_book_depth(instrument_id, client_id, subscribe_params.clone());
639            // }
640
641            if self.config.subscribe_quotes {
642                self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
643            }
644
645            if self.config.subscribe_trades {
646                self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
647            }
648
649            if self.config.subscribe_mark_prices {
650                self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
651            }
652
653            if self.config.subscribe_index_prices {
654                self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
655            }
656
657            if self.config.subscribe_funding_rates {
658                self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
659            }
660
661            if self.config.subscribe_instrument_status {
662                self.unsubscribe_instrument_status(
663                    instrument_id,
664                    client_id,
665                    subscribe_params.clone(),
666                );
667            }
668
669            if self.config.subscribe_instrument_close {
670                self.unsubscribe_instrument_close(
671                    instrument_id,
672                    client_id,
673                    subscribe_params.clone(),
674                );
675            }
676        }
677
678        if let Some(bar_types) = self.config.bar_types.clone() {
679            for bar_type in bar_types {
680                if self.config.subscribe_bars {
681                    self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
682                }
683            }
684        }
685
686        Ok(())
687    }
688
689    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
690        // Timer events are used by the actor but don't require specific handling
691        Ok(())
692    }
693
694    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
695        if self.config.log_data {
696            log_info!("{instrument:?}", color = LogColor::Cyan);
697        }
698        Ok(())
699    }
700
701    fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
702        if self.config.log_data {
703            let levels = self.config.book_levels_to_print;
704            let instrument_id = book.instrument_id;
705            let book_str = book.pprint(levels, None);
706            log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
707        }
708
709        Ok(())
710    }
711
712    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
713        if self.config.manage_book {
714            if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
715                book.apply_deltas(deltas)?;
716
717                if self.config.log_data {
718                    let levels = self.config.book_levels_to_print;
719                    let instrument_id = deltas.instrument_id;
720                    let book_str = book.pprint(levels, None);
721                    log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
722                }
723            }
724        } else if self.config.log_data {
725            log_info!("{deltas:?}", color = LogColor::Cyan);
726        }
727        Ok(())
728    }
729
730    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
731        if self.config.log_data {
732            log_info!("{quote:?}", color = LogColor::Cyan);
733        }
734        Ok(())
735    }
736
737    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
738        if self.config.log_data {
739            log_info!("{trade:?}", color = LogColor::Cyan);
740        }
741        Ok(())
742    }
743
744    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
745        if self.config.log_data {
746            log_info!("{bar:?}", color = LogColor::Cyan);
747        }
748        Ok(())
749    }
750
751    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
752        if self.config.log_data {
753            log_info!("{mark_price:?}", color = LogColor::Cyan);
754        }
755        Ok(())
756    }
757
758    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
759        if self.config.log_data {
760            log_info!("{index_price:?}", color = LogColor::Cyan);
761        }
762        Ok(())
763    }
764
765    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
766        if self.config.log_data {
767            log_info!("{funding_rate:?}", color = LogColor::Cyan);
768        }
769        Ok(())
770    }
771
772    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
773        if self.config.log_data {
774            log_info!("{data:?}", color = LogColor::Cyan);
775        }
776        Ok(())
777    }
778
779    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
780        if self.config.log_data {
781            log_info!("{update:?}", color = LogColor::Cyan);
782        }
783        Ok(())
784    }
785
786    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
787        if self.config.log_data {
788            log_info!(
789                "Received {} historical trades",
790                trades.len(),
791                color = LogColor::Cyan
792            );
793            for trade in trades.iter().take(5) {
794                log_info!("  {trade:?}", color = LogColor::Cyan);
795            }
796
797            if trades.len() > 5 {
798                log_info!(
799                    "  ... and {} more trades",
800                    trades.len() - 5,
801                    color = LogColor::Cyan
802                );
803            }
804        }
805        Ok(())
806    }
807
808    fn on_historical_funding_rates(
809        &mut self,
810        funding_rates: &[FundingRateUpdate],
811    ) -> anyhow::Result<()> {
812        if self.config.log_data {
813            log_info!(
814                "Received {} historical funding rates",
815                funding_rates.len(),
816                color = LogColor::Cyan
817            );
818            for rate in funding_rates.iter().take(5) {
819                log_info!("  {rate:?}", color = LogColor::Cyan);
820            }
821
822            if funding_rates.len() > 5 {
823                log_info!(
824                    "  ... and {} more funding rates",
825                    funding_rates.len() - 5,
826                    color = LogColor::Cyan
827                );
828            }
829        }
830        Ok(())
831    }
832
833    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
834        if self.config.log_data {
835            log_info!(
836                "Received {} historical bars",
837                bars.len(),
838                color = LogColor::Cyan
839            );
840            for bar in bars.iter().take(5) {
841                log_info!("  {bar:?}", color = LogColor::Cyan);
842            }
843
844            if bars.len() > 5 {
845                log_info!(
846                    "  ... and {} more bars",
847                    bars.len() - 5,
848                    color = LogColor::Cyan
849                );
850            }
851        }
852        Ok(())
853    }
854}
855
856impl DataTester {
857    /// Creates a new [`DataTester`] instance.
858    #[must_use]
859    pub fn new(config: DataTesterConfig) -> Self {
860        Self {
861            core: DataActorCore::new(config.base.clone()),
862            config,
863            books: AHashMap::new(),
864        }
865    }
866}
867
868#[cfg(test)]
869mod tests {
870    use nautilus_core::{UUID4, UnixNanos};
871    use nautilus_model::{
872        data::OrderBookDelta,
873        enums::{InstrumentCloseType, MarketStatusAction},
874        identifiers::Symbol,
875        instruments::CurrencyPair,
876        types::{Currency, Price, Quantity},
877    };
878    use rstest::*;
879    use rust_decimal::Decimal;
880
881    use super::*;
882
883    #[fixture]
884    fn config() -> DataTesterConfig {
885        let client_id = ClientId::new("TEST");
886        let instrument_ids = vec![
887            InstrumentId::from("BTC-USDT.TEST"),
888            InstrumentId::from("ETH-USDT.TEST"),
889        ];
890        DataTesterConfig::new(client_id, instrument_ids)
891            .with_subscribe_quotes(true)
892            .with_subscribe_trades(true)
893    }
894
895    #[rstest]
896    fn test_config_creation() {
897        let client_id = ClientId::new("TEST");
898        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
899        let config =
900            DataTesterConfig::new(client_id, instrument_ids.clone()).with_subscribe_quotes(true);
901
902        assert_eq!(config.client_id, Some(client_id));
903        assert_eq!(config.instrument_ids, instrument_ids);
904        assert!(config.subscribe_quotes);
905        assert!(!config.subscribe_trades);
906        assert!(config.log_data);
907        assert_eq!(config.stats_interval_secs, 5);
908    }
909
910    #[rstest]
911    fn test_config_default() {
912        let config = DataTesterConfig::default();
913
914        assert_eq!(config.client_id, None);
915        assert!(config.instrument_ids.is_empty());
916        assert!(!config.subscribe_quotes);
917        assert!(!config.subscribe_trades);
918        assert!(!config.subscribe_bars);
919        assert!(!config.request_instruments);
920        assert!(!config.request_book_snapshot);
921        assert!(!config.request_book_deltas);
922        assert!(!config.request_trades);
923        assert!(!config.request_bars);
924        assert!(!config.request_funding_rates);
925        assert!(config.can_unsubscribe);
926        assert!(config.log_data);
927        assert!(config.subscribe_params.is_none());
928        assert!(config.request_params.is_none());
929    }
930
931    #[rstest]
932    fn test_config_with_params() {
933        let client_id = ClientId::new("TEST");
934        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
935
936        let mut sub_params = Params::new();
937        sub_params.insert("key".to_string(), serde_json::json!("value"));
938
939        let mut req_params = Params::new();
940        req_params.insert("limit".to_string(), serde_json::json!(100));
941
942        let config = DataTesterConfig::new(client_id, instrument_ids)
943            .with_subscribe_params(Some(sub_params.clone()))
944            .with_request_params(Some(req_params.clone()));
945
946        assert_eq!(config.subscribe_params, Some(sub_params));
947        assert_eq!(config.request_params, Some(req_params));
948    }
949
950    #[rstest]
951    fn test_actor_creation(config: DataTesterConfig) {
952        let actor = DataTester::new(config);
953
954        assert_eq!(actor.config.client_id, Some(ClientId::new("TEST")));
955        assert_eq!(actor.config.instrument_ids.len(), 2);
956    }
957
958    #[rstest]
959    fn test_on_quote_with_logging_enabled(config: DataTesterConfig) {
960        let mut actor = DataTester::new(config);
961
962        let quote = QuoteTick::default();
963        let result = actor.on_quote(&quote);
964
965        assert!(result.is_ok());
966    }
967
968    #[rstest]
969    fn test_on_quote_with_logging_disabled(mut config: DataTesterConfig) {
970        config.log_data = false;
971        let mut actor = DataTester::new(config);
972
973        let quote = QuoteTick::default();
974        let result = actor.on_quote(&quote);
975
976        assert!(result.is_ok());
977    }
978
979    #[rstest]
980    fn test_on_trade(config: DataTesterConfig) {
981        let mut actor = DataTester::new(config);
982
983        let trade = TradeTick::default();
984        let result = actor.on_trade(&trade);
985
986        assert!(result.is_ok());
987    }
988
989    #[rstest]
990    fn test_on_bar(config: DataTesterConfig) {
991        let mut actor = DataTester::new(config);
992
993        let bar = Bar::default();
994        let result = actor.on_bar(&bar);
995
996        assert!(result.is_ok());
997    }
998
999    #[rstest]
1000    fn test_on_instrument(config: DataTesterConfig) {
1001        let mut actor = DataTester::new(config);
1002
1003        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1004        let instrument = CurrencyPair::new(
1005            instrument_id,
1006            Symbol::from("BTC/USDT"),
1007            Currency::USD(),
1008            Currency::USD(),
1009            4,
1010            3,
1011            Price::from("0.0001"),
1012            Quantity::from("0.001"),
1013            None,
1014            None,
1015            None,
1016            None,
1017            None,
1018            None,
1019            None,
1020            None,
1021            None,
1022            None,
1023            None,
1024            None,
1025            None, // info
1026            UnixNanos::default(),
1027            UnixNanos::default(),
1028        );
1029        let result = actor.on_instrument(&InstrumentAny::CurrencyPair(instrument));
1030
1031        assert!(result.is_ok());
1032    }
1033
1034    #[rstest]
1035    fn test_on_book_deltas_without_managed_book(config: DataTesterConfig) {
1036        let mut actor = DataTester::new(config);
1037
1038        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1039        let delta =
1040            OrderBookDelta::clear(instrument_id, 0, UnixNanos::default(), UnixNanos::default());
1041        let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1042        let result = actor.on_book_deltas(&deltas);
1043
1044        assert!(result.is_ok());
1045    }
1046
1047    #[rstest]
1048    fn test_on_mark_price(config: DataTesterConfig) {
1049        let mut actor = DataTester::new(config);
1050
1051        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1052        let price = Price::from("50000.0");
1053        let mark_price = MarkPriceUpdate::new(
1054            instrument_id,
1055            price,
1056            UnixNanos::default(),
1057            UnixNanos::default(),
1058        );
1059        let result = actor.on_mark_price(&mark_price);
1060
1061        assert!(result.is_ok());
1062    }
1063
1064    #[rstest]
1065    fn test_on_index_price(config: DataTesterConfig) {
1066        let mut actor = DataTester::new(config);
1067
1068        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1069        let price = Price::from("50000.0");
1070        let index_price = IndexPriceUpdate::new(
1071            instrument_id,
1072            price,
1073            UnixNanos::default(),
1074            UnixNanos::default(),
1075        );
1076        let result = actor.on_index_price(&index_price);
1077
1078        assert!(result.is_ok());
1079    }
1080
1081    #[rstest]
1082    fn test_on_funding_rate(config: DataTesterConfig) {
1083        let mut actor = DataTester::new(config);
1084
1085        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1086        let funding_rate = FundingRateUpdate::new(
1087            instrument_id,
1088            Decimal::new(1, 4),
1089            None,
1090            UnixNanos::default(),
1091            UnixNanos::default(),
1092        );
1093        let result = actor.on_funding_rate(&funding_rate);
1094
1095        assert!(result.is_ok());
1096    }
1097
1098    #[rstest]
1099    fn test_on_historical_funding_rates(config: DataTesterConfig) {
1100        let mut actor = DataTester::new(config);
1101
1102        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1103        let rates = vec![
1104            FundingRateUpdate::new(
1105                instrument_id,
1106                Decimal::new(1, 4),
1107                None,
1108                UnixNanos::default(),
1109                UnixNanos::default(),
1110            ),
1111            FundingRateUpdate::new(
1112                instrument_id,
1113                Decimal::new(2, 4),
1114                None,
1115                UnixNanos::default(),
1116                UnixNanos::default(),
1117            ),
1118        ];
1119        let result = actor.on_historical_funding_rates(&rates);
1120
1121        assert!(result.is_ok());
1122    }
1123
1124    #[rstest]
1125    fn test_config_request_funding_rates() {
1126        let client_id = ClientId::new("TEST");
1127        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1128        let config =
1129            DataTesterConfig::new(client_id, instrument_ids).with_request_funding_rates(true);
1130
1131        assert!(config.request_funding_rates);
1132    }
1133
1134    #[rstest]
1135    fn test_config_request_book_deltas() {
1136        let client_id = ClientId::new("TEST");
1137        let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1138        let config =
1139            DataTesterConfig::new(client_id, instrument_ids).with_request_book_deltas(true);
1140
1141        assert!(config.request_book_deltas);
1142    }
1143
1144    #[rstest]
1145    fn test_on_instrument_status(config: DataTesterConfig) {
1146        let mut actor = DataTester::new(config);
1147
1148        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1149        let status = InstrumentStatus::new(
1150            instrument_id,
1151            MarketStatusAction::Trading,
1152            UnixNanos::default(),
1153            UnixNanos::default(),
1154            None,
1155            None,
1156            None,
1157            None,
1158            None,
1159        );
1160        let result = actor.on_instrument_status(&status);
1161
1162        assert!(result.is_ok());
1163    }
1164
1165    #[rstest]
1166    fn test_on_instrument_close(config: DataTesterConfig) {
1167        let mut actor = DataTester::new(config);
1168
1169        let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1170        let price = Price::from("50000.0");
1171        let close = InstrumentClose::new(
1172            instrument_id,
1173            price,
1174            InstrumentCloseType::EndOfSession,
1175            UnixNanos::default(),
1176            UnixNanos::default(),
1177        );
1178        let result = actor.on_instrument_close(&close);
1179
1180        assert!(result.is_ok());
1181    }
1182
1183    #[rstest]
1184    fn test_on_time_event(config: DataTesterConfig) {
1185        let mut actor = DataTester::new(config);
1186
1187        let event = TimeEvent::new(
1188            "TEST".into(),
1189            UUID4::default(),
1190            UnixNanos::default(),
1191            UnixNanos::default(),
1192        );
1193        let result = actor.on_time_event(&event);
1194
1195        assert!(result.is_ok());
1196    }
1197
1198    #[rstest]
1199    fn test_config_with_all_subscriptions_enabled(mut config: DataTesterConfig) {
1200        config.subscribe_book_deltas = true;
1201        config.subscribe_book_at_interval = true;
1202        config.subscribe_bars = true;
1203        config.subscribe_mark_prices = true;
1204        config.subscribe_index_prices = true;
1205        config.subscribe_funding_rates = true;
1206        config.subscribe_instrument = true;
1207        config.subscribe_instrument_status = true;
1208        config.subscribe_instrument_close = true;
1209
1210        let actor = DataTester::new(config);
1211
1212        assert!(actor.config.subscribe_book_deltas);
1213        assert!(actor.config.subscribe_book_at_interval);
1214        assert!(actor.config.subscribe_bars);
1215        assert!(actor.config.subscribe_mark_prices);
1216        assert!(actor.config.subscribe_index_prices);
1217        assert!(actor.config.subscribe_funding_rates);
1218        assert!(actor.config.subscribe_instrument);
1219        assert!(actor.config.subscribe_instrument_status);
1220        assert!(actor.config.subscribe_instrument_close);
1221    }
1222
1223    #[rstest]
1224    fn test_config_with_book_management(mut config: DataTesterConfig) {
1225        config.manage_book = true;
1226        config.book_levels_to_print = 5;
1227
1228        let actor = DataTester::new(config);
1229
1230        assert!(actor.config.manage_book);
1231        assert_eq!(actor.config.book_levels_to_print, 5);
1232        assert!(actor.books.is_empty());
1233    }
1234
1235    #[rstest]
1236    fn test_config_with_custom_stats_interval(mut config: DataTesterConfig) {
1237        config.stats_interval_secs = 10;
1238
1239        let actor = DataTester::new(config);
1240
1241        assert_eq!(actor.config.stats_interval_secs, 10);
1242    }
1243
1244    #[rstest]
1245    fn test_config_with_unsubscribe_disabled(mut config: DataTesterConfig) {
1246        config.can_unsubscribe = false;
1247
1248        let actor = DataTester::new(config);
1249
1250        assert!(!actor.config.can_unsubscribe);
1251    }
1252}