Skip to main content

nautilus_testkit/testers/data/
actor.rs

1// -------------------------------------------------------------------------------------------------
2//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
3//  https://nautechsystems.io
4//
5//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
6//  You may not use this file except in compliance with the License.
7//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
8//
9//  Unless required by applicable law or agreed to in writing, software
10//  distributed under the License is distributed on an "AS IS" BASIS,
11//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12//  See the License for the specific language governing permissions and
13//  limitations under the License.
14// -------------------------------------------------------------------------------------------------
15
16use std::time::Duration;
17
18use ahash::{AHashMap, AHashSet};
19use chrono::Duration as ChronoDuration;
20use nautilus_common::{
21    actor::{DataActor, DataActorCore},
22    enums::LogColor,
23    log_info, nautilus_actor,
24    timer::TimeEvent,
25};
26use nautilus_model::{
27    data::{
28        Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
29        MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, option_chain::OptionGreeks,
30    },
31    identifiers::InstrumentId,
32    instruments::InstrumentAny,
33    orderbook::OrderBook,
34};
35
36use super::config::DataTesterConfig;
37
38/// A data tester actor for live testing market data subscriptions.
39///
40/// Subscribes to configured data types for specified instruments and logs
41/// received data to demonstrate the data flow. Useful for testing adapters
42/// and validating data connectivity.
43///
44/// This actor provides equivalent functionality to the Python `DataTester`
45/// in the test kit.
46#[derive(Debug)]
47pub struct DataTester {
48    pub(super) core: DataActorCore,
49    pub(super) config: DataTesterConfig,
50    pub(super) books: AHashMap<InstrumentId, OrderBook>,
51}
52
53nautilus_actor!(DataTester);
54
55impl DataActor for DataTester {
56    fn on_start(&mut self) -> anyhow::Result<()> {
57        let instrument_ids = self.config.instrument_ids.clone();
58        let client_id = self.config.client_id;
59        let subscribe_params = self.config.subscribe_params.clone();
60        let request_params = self.config.request_params.clone();
61        let stats_interval_secs = self.config.stats_interval_secs;
62
63        // Request instruments if configured
64        if self.config.request_instruments {
65            let mut venues = AHashSet::new();
66            for instrument_id in &instrument_ids {
67                venues.insert(instrument_id.venue);
68            }
69
70            for venue in venues {
71                let _ = self.request_instruments(
72                    Some(venue),
73                    None,
74                    None,
75                    client_id,
76                    request_params.clone(),
77                );
78            }
79        }
80
81        // Subscribe to data for each instrument
82        for instrument_id in instrument_ids {
83            if self.config.subscribe_instrument {
84                self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
85            }
86
87            if self.config.subscribe_book_deltas {
88                self.subscribe_book_deltas(
89                    instrument_id,
90                    self.config.book_type,
91                    None,
92                    client_id,
93                    self.config.manage_book,
94                    subscribe_params.clone(),
95                );
96
97                if self.config.manage_book {
98                    let book = OrderBook::new(instrument_id, self.config.book_type);
99                    self.books.insert(instrument_id, book);
100                }
101            }
102
103            if self.config.subscribe_book_at_interval {
104                self.subscribe_book_at_interval(
105                    instrument_id,
106                    self.config.book_type,
107                    self.config.book_depth,
108                    self.config.book_interval_ms,
109                    client_id,
110                    subscribe_params.clone(),
111                );
112            }
113
114            // TODO: Support subscribe_book_depth when the method is available
115            // if self.config.subscribe_book_depth {
116            //     self.subscribe_book_depth(
117            //         instrument_id,
118            //         self.config.book_type,
119            //         self.config.book_depth,
120            //         client_id,
121            //         subscribe_params.clone(),
122            //     );
123            // }
124
125            if self.config.subscribe_quotes {
126                self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
127            }
128
129            if self.config.subscribe_trades {
130                self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
131            }
132
133            if self.config.subscribe_mark_prices {
134                self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
135            }
136
137            if self.config.subscribe_index_prices {
138                self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
139            }
140
141            if self.config.subscribe_funding_rates {
142                self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
143            }
144
145            if self.config.subscribe_instrument_status {
146                self.subscribe_instrument_status(
147                    instrument_id,
148                    client_id,
149                    subscribe_params.clone(),
150                );
151            }
152
153            if self.config.subscribe_instrument_close {
154                self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
155            }
156
157            if self.config.subscribe_option_greeks {
158                self.subscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
159            }
160
161            // Request historical quotes (default to last 1 hour)
162            if self.config.request_quotes {
163                let start = self.clock().utc_now() - ChronoDuration::hours(1);
164
165                if let Err(e) = self.request_quotes(
166                    instrument_id,
167                    Some(start),
168                    None,
169                    None,
170                    client_id,
171                    request_params.clone(),
172                ) {
173                    log::error!("Failed to request quotes for {instrument_id}: {e}");
174                }
175            }
176
177            // Request order book snapshot if configured
178            if self.config.request_book_snapshot {
179                let _ = self.request_book_snapshot(
180                    instrument_id,
181                    self.config.book_depth,
182                    client_id,
183                    request_params.clone(),
184                );
185            }
186
187            // TODO: Request book deltas when Rust data engine has RequestBookDeltas
188
189            // Request historical trades (default to last 1 hour)
190            if self.config.request_trades {
191                let start = self.clock().utc_now() - ChronoDuration::hours(1);
192
193                if let Err(e) = self.request_trades(
194                    instrument_id,
195                    Some(start),
196                    None,
197                    None,
198                    client_id,
199                    request_params.clone(),
200                ) {
201                    log::error!("Failed to request trades for {instrument_id}: {e}");
202                }
203            }
204
205            // Request historical funding rates (default to last 7 days)
206            if self.config.request_funding_rates {
207                let start = self.clock().utc_now() - ChronoDuration::days(7);
208
209                if let Err(e) = self.request_funding_rates(
210                    instrument_id,
211                    Some(start),
212                    None,
213                    None,
214                    client_id,
215                    request_params.clone(),
216                ) {
217                    log::error!("Failed to request funding rates for {instrument_id}: {e}");
218                }
219            }
220        }
221
222        // Subscribe to bars
223        if let Some(bar_types) = self.config.bar_types.clone() {
224            for bar_type in bar_types {
225                if self.config.subscribe_bars {
226                    self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
227                }
228
229                // Request historical bars (default to last 1 hour)
230                if self.config.request_bars {
231                    let start = self.clock().utc_now() - ChronoDuration::hours(1);
232
233                    if let Err(e) = self.request_bars(
234                        bar_type,
235                        Some(start),
236                        None,
237                        None,
238                        client_id,
239                        request_params.clone(),
240                    ) {
241                        log::error!("Failed to request bars for {bar_type}: {e}");
242                    }
243                }
244            }
245        }
246
247        // Set up stats timer
248        if stats_interval_secs > 0 {
249            self.clock().set_timer(
250                "STATS-TIMER",
251                Duration::from_secs(stats_interval_secs),
252                None,
253                None,
254                None,
255                Some(true),
256                Some(false),
257            )?;
258        }
259
260        Ok(())
261    }
262
263    fn on_stop(&mut self) -> anyhow::Result<()> {
264        if !self.config.can_unsubscribe {
265            return Ok(());
266        }
267
268        let instrument_ids = self.config.instrument_ids.clone();
269        let client_id = self.config.client_id;
270        let subscribe_params = self.config.subscribe_params.clone();
271
272        for instrument_id in instrument_ids {
273            if self.config.subscribe_instrument {
274                self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
275            }
276
277            if self.config.subscribe_book_deltas {
278                self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
279            }
280
281            if self.config.subscribe_book_at_interval {
282                self.unsubscribe_book_at_interval(
283                    instrument_id,
284                    self.config.book_interval_ms,
285                    client_id,
286                    subscribe_params.clone(),
287                );
288            }
289
290            // TODO: Support unsubscribe_book_depth when the method is available
291            // if self.config.subscribe_book_depth {
292            //     self.unsubscribe_book_depth(instrument_id, client_id, subscribe_params.clone());
293            // }
294
295            if self.config.subscribe_quotes {
296                self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
297            }
298
299            if self.config.subscribe_trades {
300                self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
301            }
302
303            if self.config.subscribe_mark_prices {
304                self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
305            }
306
307            if self.config.subscribe_index_prices {
308                self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
309            }
310
311            if self.config.subscribe_funding_rates {
312                self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
313            }
314
315            if self.config.subscribe_instrument_status {
316                self.unsubscribe_instrument_status(
317                    instrument_id,
318                    client_id,
319                    subscribe_params.clone(),
320                );
321            }
322
323            if self.config.subscribe_instrument_close {
324                self.unsubscribe_instrument_close(
325                    instrument_id,
326                    client_id,
327                    subscribe_params.clone(),
328                );
329            }
330
331            if self.config.subscribe_option_greeks {
332                self.unsubscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
333            }
334        }
335
336        if let Some(bar_types) = self.config.bar_types.clone() {
337            for bar_type in bar_types {
338                if self.config.subscribe_bars {
339                    self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
340                }
341            }
342        }
343
344        Ok(())
345    }
346
347    fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
348        // Timer events are used by the actor but don't require specific handling
349        Ok(())
350    }
351
352    fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
353        if self.config.log_data {
354            log_info!("{instrument:?}", color = LogColor::Cyan);
355        }
356        Ok(())
357    }
358
359    fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
360        if self.config.log_data {
361            let levels = self.config.book_levels_to_print;
362            let instrument_id = book.instrument_id;
363            let book_str = book.pprint(levels, None);
364            log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
365        }
366
367        Ok(())
368    }
369
370    fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
371        if self.config.manage_book {
372            if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
373                book.apply_deltas(deltas)?;
374
375                if self.config.log_data {
376                    let levels = self.config.book_levels_to_print;
377                    let instrument_id = deltas.instrument_id;
378                    let book_str = book.pprint(levels, None);
379                    log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
380                }
381            }
382        } else if self.config.log_data {
383            log_info!("{deltas:?}", color = LogColor::Cyan);
384        }
385        Ok(())
386    }
387
388    fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
389        if self.config.log_data {
390            log_info!("{quote:?}", color = LogColor::Cyan);
391        }
392        Ok(())
393    }
394
395    fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
396        if self.config.log_data {
397            log_info!("{trade:?}", color = LogColor::Cyan);
398        }
399        Ok(())
400    }
401
402    fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
403        if self.config.log_data {
404            log_info!("{bar:?}", color = LogColor::Cyan);
405        }
406        Ok(())
407    }
408
409    fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
410        if self.config.log_data {
411            log_info!("{mark_price:?}", color = LogColor::Cyan);
412        }
413        Ok(())
414    }
415
416    fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
417        if self.config.log_data {
418            log_info!("{index_price:?}", color = LogColor::Cyan);
419        }
420        Ok(())
421    }
422
423    fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
424        if self.config.log_data {
425            log_info!("{funding_rate:?}", color = LogColor::Cyan);
426        }
427        Ok(())
428    }
429
430    fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
431        if self.config.log_data {
432            log_info!("{data:?}", color = LogColor::Cyan);
433        }
434        Ok(())
435    }
436
437    fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
438        if self.config.log_data {
439            log_info!("{update:?}", color = LogColor::Cyan);
440        }
441        Ok(())
442    }
443
444    fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
445        if self.config.log_data {
446            log_info!("{greeks:?}", color = LogColor::Cyan);
447        }
448        Ok(())
449    }
450
451    fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
452        if self.config.log_data {
453            log_info!(
454                "Received {} historical trades",
455                trades.len(),
456                color = LogColor::Cyan
457            );
458
459            for trade in trades.iter().take(5) {
460                log_info!("  {trade:?}", color = LogColor::Cyan);
461            }
462
463            if trades.len() > 5 {
464                log_info!(
465                    "  ... and {} more trades",
466                    trades.len() - 5,
467                    color = LogColor::Cyan
468                );
469            }
470        }
471        Ok(())
472    }
473
474    fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
475        if self.config.log_data {
476            log_info!(
477                "Received {} historical quotes",
478                quotes.len(),
479                color = LogColor::Cyan
480            );
481
482            for quote in quotes.iter().take(5) {
483                log_info!("  {quote:?}", color = LogColor::Cyan);
484            }
485
486            if quotes.len() > 5 {
487                log_info!(
488                    "  ... and {} more quotes",
489                    quotes.len() - 5,
490                    color = LogColor::Cyan
491                );
492            }
493        }
494        Ok(())
495    }
496
497    fn on_historical_funding_rates(
498        &mut self,
499        funding_rates: &[FundingRateUpdate],
500    ) -> anyhow::Result<()> {
501        if self.config.log_data {
502            log_info!(
503                "Received {} historical funding rates",
504                funding_rates.len(),
505                color = LogColor::Cyan
506            );
507
508            for rate in funding_rates.iter().take(5) {
509                log_info!("  {rate:?}", color = LogColor::Cyan);
510            }
511
512            if funding_rates.len() > 5 {
513                log_info!(
514                    "  ... and {} more funding rates",
515                    funding_rates.len() - 5,
516                    color = LogColor::Cyan
517                );
518            }
519        }
520        Ok(())
521    }
522
523    fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
524        if self.config.log_data {
525            log_info!(
526                "Received {} historical bars",
527                bars.len(),
528                color = LogColor::Cyan
529            );
530
531            for bar in bars.iter().take(5) {
532                log_info!("  {bar:?}", color = LogColor::Cyan);
533            }
534
535            if bars.len() > 5 {
536                log_info!(
537                    "  ... and {} more bars",
538                    bars.len() - 5,
539                    color = LogColor::Cyan
540                );
541            }
542        }
543        Ok(())
544    }
545}
546
547impl DataTester {
548    /// Creates a new [`DataTester`] instance.
549    #[must_use]
550    pub fn new(config: DataTesterConfig) -> Self {
551        Self {
552            core: DataActorCore::new(config.base.clone()),
553            config,
554            books: AHashMap::new(),
555        }
556    }
557}