Skip to main content

nautilus_testkit/testers/data/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::time::Duration;
17
18use ahash::{AHashMap, AHashSet};
19use chrono::Duration as ChronoDuration;
20use nautilus_common::{
21    actor::{DataActor, DataActorCore},
22    enums::LogColor,
23    log_info, nautilus_actor,
24    timer::TimeEvent,
25};
26use nautilus_model::{
27    data::{
28        Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
29        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, option_chain::OptionGreeks,
30    },
31    identifiers::InstrumentId,
32    instruments::InstrumentAny,
33    orderbook::OrderBook,
34};
35
36use super::config::DataTesterConfig;
37
38/// A data tester actor for live testing market data subscriptions.
39///
40/// Subscribes to configured data types for specified instruments and logs
41/// received data to demonstrate the data flow. Useful for testing adapters
42/// and validating data connectivity.
43///
44/// This actor provides equivalent functionality to the Python `DataTester`
45/// in the test kit.
46#[derive(Debug)]
47pub struct DataTester {
48    pub(super) core: DataActorCore,
49    pub(super) config: DataTesterConfig,
50    pub(super) books: AHashMap<InstrumentId, OrderBook>,
51}
52
53nautilus_actor!(DataTester);
54
55impl DataActor for DataTester {
56    fn on_start(&mut self) -> anyhow::Result<()> {
57        let instrument_ids = self.config.instrument_ids.clone();
58        let client_id = self.config.client_id;
59        let subscribe_params = self.config.subscribe_params.clone();
60        let request_params = self.config.request_params.clone();
61        let stats_interval_secs = self.config.stats_interval_secs;
62
63        // Request instruments if configured
64        if self.config.request_instruments {
65            let mut venues = AHashSet::new();
66            for instrument_id in &instrument_ids {
67                venues.insert(instrument_id.venue);
68            }
69
70            for venue in venues {
71                let _ = self.request_instruments(
72                    Some(venue),
73                    None,
74                    None,
75                    client_id,
76                    request_params.clone(),
77                );
78            }
79        }
80
81        // Subscribe to data for each instrument
82        for instrument_id in instrument_ids {
83            if self.config.subscribe_instrument {
84                self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
85            }
86
87            if self.config.subscribe_book_deltas {
88                self.subscribe_book_deltas(
89                    instrument_id,
90                    self.config.book_type,
91                    None,
92                    client_id,
93                    self.config.manage_book,
94                    subscribe_params.clone(),
95                );
96
97                if self.config.manage_book {
98                    let book = OrderBook::new(instrument_id, self.config.book_type);
99                    self.books.insert(instrument_id, book);
100                }
101            }
102
103            if self.config.subscribe_book_at_interval {
104                self.subscribe_book_at_interval(
105                    instrument_id,
106                    self.config.book_type,
107                    self.config.book_depth,
108                    self.config.book_interval_ms,
109                    client_id,
110                    subscribe_params.clone(),
111                );
112            }
113
114            // TODO: Support subscribe_book_depth when the method is available
115            // if self.config.subscribe_book_depth {
116            //     self.subscribe_book_depth(
117            //         instrument_id,
118            //         self.config.book_type,
119            //         self.config.book_depth,
120            //         client_id,
121            //         subscribe_params.clone(),
122            //     );
123            // }
124
125            if self.config.subscribe_quotes {
126                self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
127            }
128
129            if self.config.subscribe_trades {
130                self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
131            }
132
133            if self.config.subscribe_mark_prices {
134                self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
135            }
136
137            if self.config.subscribe_index_prices {
138                self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
139            }
140
141            if self.config.subscribe_funding_rates {
142                self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
143            }
144
145            if self.config.subscribe_instrument_status {
146                self.subscribe_instrument_status(
147                    instrument_id,
148                    client_id,
149                    subscribe_params.clone(),
150                );
151            }
152
153            if self.config.subscribe_instrument_close {
154                self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
155            }
156
157            if self.config.subscribe_option_greeks {
158                self.subscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
159            }
160
161            // TODO: Implement historical data requests
162            // if self.config.request_quotes {
163            //     self.request_quote_ticks(...);
164            // }
165
166            // Request order book snapshot if configured
167            if self.config.request_book_snapshot {
168                let _ = self.request_book_snapshot(
169                    instrument_id,
170                    self.config.book_depth,
171                    client_id,
172                    request_params.clone(),
173                );
174            }
175
176            // TODO: Request book deltas when Rust data engine has RequestBookDeltas
177
178            // Request historical trades (default to last 1 hour)
179            if self.config.request_trades {
180                let start = self.clock().utc_now() - ChronoDuration::hours(1);
181
182                if let Err(e) = self.request_trades(
183                    instrument_id,
184                    Some(start),
185                    None,
186                    None,
187                    client_id,
188                    request_params.clone(),
189                ) {
190                    log::error!("Failed to request trades for {instrument_id}: {e}");
191                }
192            }
193
194            // Request historical funding rates (default to last 7 days)
195            if self.config.request_funding_rates {
196                let start = self.clock().utc_now() - ChronoDuration::days(7);
197
198                if let Err(e) = self.request_funding_rates(
199                    instrument_id,
200                    Some(start),
201                    None,
202                    None,
203                    client_id,
204                    request_params.clone(),
205                ) {
206                    log::error!("Failed to request funding rates for {instrument_id}: {e}");
207                }
208            }
209        }
210
211        // Subscribe to bars
212        if let Some(bar_types) = self.config.bar_types.clone() {
213            for bar_type in bar_types {
214                if self.config.subscribe_bars {
215                    self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
216                }
217
218                // Request historical bars (default to last 1 hour)
219                if self.config.request_bars {
220                    let start = self.clock().utc_now() - ChronoDuration::hours(1);
221
222                    if let Err(e) = self.request_bars(
223                        bar_type,
224                        Some(start),
225                        None,
226                        None,
227                        client_id,
228                        request_params.clone(),
229                    ) {
230                        log::error!("Failed to request bars for {bar_type}: {e}");
231                    }
232                }
233            }
234        }
235
236        // Set up stats timer
237        if stats_interval_secs > 0 {
238            self.clock().set_timer(
239                "STATS-TIMER",
240                Duration::from_secs(stats_interval_secs),
241                None,
242                None,
243                None,
244                Some(true),
245                Some(false),
246            )?;
247        }
248
249        Ok(())
250    }
251
252    fn on_stop(&mut self) -> anyhow::Result<()> {
253        if !self.config.can_unsubscribe {
254            return Ok(());
255        }
256
257        let instrument_ids = self.config.instrument_ids.clone();
258        let client_id = self.config.client_id;
259        let subscribe_params = self.config.subscribe_params.clone();
260
261        for instrument_id in instrument_ids {
262            if self.config.subscribe_instrument {
263                self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
264            }
265
266            if self.config.subscribe_book_deltas {
267                self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
268            }
269
270            if self.config.subscribe_book_at_interval {
271                self.unsubscribe_book_at_interval(
272                    instrument_id,
273                    self.config.book_interval_ms,
274                    client_id,
275                    subscribe_params.clone(),
276                );
277            }
278
279            // TODO: Support unsubscribe_book_depth when the method is available
280            // if self.config.subscribe_book_depth {
281            //     self.unsubscribe_book_depth(instrument_id, client_id, subscribe_params.clone());
282            // }
283
284            if self.config.subscribe_quotes {
285                self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
286            }
287
288            if self.config.subscribe_trades {
289                self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
290            }
291
292            if self.config.subscribe_mark_prices {
293                self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
294            }
295
296            if self.config.subscribe_index_prices {
297                self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
298            }
299
300            if self.config.subscribe_funding_rates {
301                self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
302            }
303
304            if self.config.subscribe_instrument_status {
305                self.unsubscribe_instrument_status(
306                    instrument_id,
307                    client_id,
308                    subscribe_params.clone(),
309                );
310            }
311
312            if self.config.subscribe_instrument_close {
313                self.unsubscribe_instrument_close(
314                    instrument_id,
315                    client_id,
316                    subscribe_params.clone(),
317                );
318            }
319
320            if self.config.subscribe_option_greeks {
321                self.unsubscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
322            }
323        }
324
325        if let Some(bar_types) = self.config.bar_types.clone() {
326            for bar_type in bar_types {
327                if self.config.subscribe_bars {
328                    self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
329                }
330            }
331        }
332
333        Ok(())
334    }
335
336    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
337        // Timer events are used by the actor but don't require specific handling
338        Ok(())
339    }
340
341    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
342        if self.config.log_data {
343            log_info!("{instrument:?}", color = LogColor::Cyan);
344        }
345        Ok(())
346    }
347
348    fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
349        if self.config.log_data {
350            let levels = self.config.book_levels_to_print;
351            let instrument_id = book.instrument_id;
352            let book_str = book.pprint(levels, None);
353            log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
354        }
355
356        Ok(())
357    }
358
359    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
360        if self.config.manage_book {
361            if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
362                book.apply_deltas(deltas)?;
363
364                if self.config.log_data {
365                    let levels = self.config.book_levels_to_print;
366                    let instrument_id = deltas.instrument_id;
367                    let book_str = book.pprint(levels, None);
368                    log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
369                }
370            }
371        } else if self.config.log_data {
372            log_info!("{deltas:?}", color = LogColor::Cyan);
373        }
374        Ok(())
375    }
376
377    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
378        if self.config.log_data {
379            log_info!("{quote:?}", color = LogColor::Cyan);
380        }
381        Ok(())
382    }
383
384    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
385        if self.config.log_data {
386            log_info!("{trade:?}", color = LogColor::Cyan);
387        }
388        Ok(())
389    }
390
391    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
392        if self.config.log_data {
393            log_info!("{bar:?}", color = LogColor::Cyan);
394        }
395        Ok(())
396    }
397
398    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
399        if self.config.log_data {
400            log_info!("{mark_price:?}", color = LogColor::Cyan);
401        }
402        Ok(())
403    }
404
405    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
406        if self.config.log_data {
407            log_info!("{index_price:?}", color = LogColor::Cyan);
408        }
409        Ok(())
410    }
411
412    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
413        if self.config.log_data {
414            log_info!("{funding_rate:?}", color = LogColor::Cyan);
415        }
416        Ok(())
417    }
418
419    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
420        if self.config.log_data {
421            log_info!("{data:?}", color = LogColor::Cyan);
422        }
423        Ok(())
424    }
425
426    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
427        if self.config.log_data {
428            log_info!("{update:?}", color = LogColor::Cyan);
429        }
430        Ok(())
431    }
432
433    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
434        if self.config.log_data {
435            log_info!("{greeks:?}", color = LogColor::Cyan);
436        }
437        Ok(())
438    }
439
440    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
441        if self.config.log_data {
442            log_info!(
443                "Received {} historical trades",
444                trades.len(),
445                color = LogColor::Cyan
446            );
447            for trade in trades.iter().take(5) {
448                log_info!("  {trade:?}", color = LogColor::Cyan);
449            }
450
451            if trades.len() > 5 {
452                log_info!(
453                    "  ... and {} more trades",
454                    trades.len() - 5,
455                    color = LogColor::Cyan
456                );
457            }
458        }
459        Ok(())
460    }
461
462    fn on_historical_funding_rates(
463        &mut self,
464        funding_rates: &[FundingRateUpdate],
465    ) -> anyhow::Result<()> {
466        if self.config.log_data {
467            log_info!(
468                "Received {} historical funding rates",
469                funding_rates.len(),
470                color = LogColor::Cyan
471            );
472            for rate in funding_rates.iter().take(5) {
473                log_info!("  {rate:?}", color = LogColor::Cyan);
474            }
475
476            if funding_rates.len() > 5 {
477                log_info!(
478                    "  ... and {} more funding rates",
479                    funding_rates.len() - 5,
480                    color = LogColor::Cyan
481                );
482            }
483        }
484        Ok(())
485    }
486
487    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
488        if self.config.log_data {
489            log_info!(
490                "Received {} historical bars",
491                bars.len(),
492                color = LogColor::Cyan
493            );
494            for bar in bars.iter().take(5) {
495                log_info!("  {bar:?}", color = LogColor::Cyan);
496            }
497
498            if bars.len() > 5 {
499                log_info!(
500                    "  ... and {} more bars",
501                    bars.len() - 5,
502                    color = LogColor::Cyan
503                );
504            }
505        }
506        Ok(())
507    }
508}
509
510impl DataTester {
511    /// Creates a new [`DataTester`] instance.
512    #[must_use]
513    pub fn new(config: DataTesterConfig) -> Self {
514        Self {
515            core: DataActorCore::new(config.base.clone()),
516            config,
517            books: AHashMap::new(),
518        }
519    }
520}