1use std::time::Duration;
17
18use ahash::{AHashMap, AHashSet};
19use chrono::Duration as ChronoDuration;
20use nautilus_common::{
21 actor::{DataActor, DataActorCore},
22 enums::LogColor,
23 log_info, nautilus_actor,
24 timer::TimeEvent,
25};
26use nautilus_model::{
27 data::{
28 Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
29 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, option_chain::OptionGreeks,
30 },
31 identifiers::InstrumentId,
32 instruments::InstrumentAny,
33 orderbook::OrderBook,
34};
35
36use super::config::DataTesterConfig;
37
38#[derive(Debug)]
47pub struct DataTester {
48 pub(super) core: DataActorCore,
49 pub(super) config: DataTesterConfig,
50 pub(super) books: AHashMap<InstrumentId, OrderBook>,
51}
52
53nautilus_actor!(DataTester);
54
55impl DataActor for DataTester {
56 fn on_start(&mut self) -> anyhow::Result<()> {
57 let instrument_ids = self.config.instrument_ids.clone();
58 let client_id = self.config.client_id;
59 let subscribe_params = self.config.subscribe_params.clone();
60 let request_params = self.config.request_params.clone();
61 let stats_interval_secs = self.config.stats_interval_secs;
62
63 if self.config.request_instruments {
65 let mut venues = AHashSet::new();
66 for instrument_id in &instrument_ids {
67 venues.insert(instrument_id.venue);
68 }
69
70 for venue in venues {
71 let _ = self.request_instruments(
72 Some(venue),
73 None,
74 None,
75 client_id,
76 request_params.clone(),
77 );
78 }
79 }
80
81 for instrument_id in instrument_ids {
83 if self.config.subscribe_instrument {
84 self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
85 }
86
87 if self.config.subscribe_book_deltas {
88 self.subscribe_book_deltas(
89 instrument_id,
90 self.config.book_type,
91 None,
92 client_id,
93 self.config.manage_book,
94 subscribe_params.clone(),
95 );
96
97 if self.config.manage_book {
98 let book = OrderBook::new(instrument_id, self.config.book_type);
99 self.books.insert(instrument_id, book);
100 }
101 }
102
103 if self.config.subscribe_book_at_interval {
104 self.subscribe_book_at_interval(
105 instrument_id,
106 self.config.book_type,
107 self.config.book_depth,
108 self.config.book_interval_ms,
109 client_id,
110 subscribe_params.clone(),
111 );
112 }
113
114 if self.config.subscribe_quotes {
126 self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
127 }
128
129 if self.config.subscribe_trades {
130 self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
131 }
132
133 if self.config.subscribe_mark_prices {
134 self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
135 }
136
137 if self.config.subscribe_index_prices {
138 self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
139 }
140
141 if self.config.subscribe_funding_rates {
142 self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
143 }
144
145 if self.config.subscribe_instrument_status {
146 self.subscribe_instrument_status(
147 instrument_id,
148 client_id,
149 subscribe_params.clone(),
150 );
151 }
152
153 if self.config.subscribe_instrument_close {
154 self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
155 }
156
157 if self.config.subscribe_option_greeks {
158 self.subscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
159 }
160
161 if self.config.request_book_snapshot {
168 let _ = self.request_book_snapshot(
169 instrument_id,
170 self.config.book_depth,
171 client_id,
172 request_params.clone(),
173 );
174 }
175
176 if self.config.request_trades {
180 let start = self.clock().utc_now() - ChronoDuration::hours(1);
181
182 if let Err(e) = self.request_trades(
183 instrument_id,
184 Some(start),
185 None,
186 None,
187 client_id,
188 request_params.clone(),
189 ) {
190 log::error!("Failed to request trades for {instrument_id}: {e}");
191 }
192 }
193
194 if self.config.request_funding_rates {
196 let start = self.clock().utc_now() - ChronoDuration::days(7);
197
198 if let Err(e) = self.request_funding_rates(
199 instrument_id,
200 Some(start),
201 None,
202 None,
203 client_id,
204 request_params.clone(),
205 ) {
206 log::error!("Failed to request funding rates for {instrument_id}: {e}");
207 }
208 }
209 }
210
211 if let Some(bar_types) = self.config.bar_types.clone() {
213 for bar_type in bar_types {
214 if self.config.subscribe_bars {
215 self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
216 }
217
218 if self.config.request_bars {
220 let start = self.clock().utc_now() - ChronoDuration::hours(1);
221
222 if let Err(e) = self.request_bars(
223 bar_type,
224 Some(start),
225 None,
226 None,
227 client_id,
228 request_params.clone(),
229 ) {
230 log::error!("Failed to request bars for {bar_type}: {e}");
231 }
232 }
233 }
234 }
235
236 if stats_interval_secs > 0 {
238 self.clock().set_timer(
239 "STATS-TIMER",
240 Duration::from_secs(stats_interval_secs),
241 None,
242 None,
243 None,
244 Some(true),
245 Some(false),
246 )?;
247 }
248
249 Ok(())
250 }
251
252 fn on_stop(&mut self) -> anyhow::Result<()> {
253 if !self.config.can_unsubscribe {
254 return Ok(());
255 }
256
257 let instrument_ids = self.config.instrument_ids.clone();
258 let client_id = self.config.client_id;
259 let subscribe_params = self.config.subscribe_params.clone();
260
261 for instrument_id in instrument_ids {
262 if self.config.subscribe_instrument {
263 self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
264 }
265
266 if self.config.subscribe_book_deltas {
267 self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
268 }
269
270 if self.config.subscribe_book_at_interval {
271 self.unsubscribe_book_at_interval(
272 instrument_id,
273 self.config.book_interval_ms,
274 client_id,
275 subscribe_params.clone(),
276 );
277 }
278
279 if self.config.subscribe_quotes {
285 self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
286 }
287
288 if self.config.subscribe_trades {
289 self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
290 }
291
292 if self.config.subscribe_mark_prices {
293 self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
294 }
295
296 if self.config.subscribe_index_prices {
297 self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
298 }
299
300 if self.config.subscribe_funding_rates {
301 self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
302 }
303
304 if self.config.subscribe_instrument_status {
305 self.unsubscribe_instrument_status(
306 instrument_id,
307 client_id,
308 subscribe_params.clone(),
309 );
310 }
311
312 if self.config.subscribe_instrument_close {
313 self.unsubscribe_instrument_close(
314 instrument_id,
315 client_id,
316 subscribe_params.clone(),
317 );
318 }
319
320 if self.config.subscribe_option_greeks {
321 self.unsubscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
322 }
323 }
324
325 if let Some(bar_types) = self.config.bar_types.clone() {
326 for bar_type in bar_types {
327 if self.config.subscribe_bars {
328 self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
329 }
330 }
331 }
332
333 Ok(())
334 }
335
336 fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
337 Ok(())
339 }
340
341 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
342 if self.config.log_data {
343 log_info!("{instrument:?}", color = LogColor::Cyan);
344 }
345 Ok(())
346 }
347
348 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
349 if self.config.log_data {
350 let levels = self.config.book_levels_to_print;
351 let instrument_id = book.instrument_id;
352 let book_str = book.pprint(levels, None);
353 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
354 }
355
356 Ok(())
357 }
358
359 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
360 if self.config.manage_book {
361 if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
362 book.apply_deltas(deltas)?;
363
364 if self.config.log_data {
365 let levels = self.config.book_levels_to_print;
366 let instrument_id = deltas.instrument_id;
367 let book_str = book.pprint(levels, None);
368 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
369 }
370 }
371 } else if self.config.log_data {
372 log_info!("{deltas:?}", color = LogColor::Cyan);
373 }
374 Ok(())
375 }
376
377 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
378 if self.config.log_data {
379 log_info!("{quote:?}", color = LogColor::Cyan);
380 }
381 Ok(())
382 }
383
384 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
385 if self.config.log_data {
386 log_info!("{trade:?}", color = LogColor::Cyan);
387 }
388 Ok(())
389 }
390
391 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
392 if self.config.log_data {
393 log_info!("{bar:?}", color = LogColor::Cyan);
394 }
395 Ok(())
396 }
397
398 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
399 if self.config.log_data {
400 log_info!("{mark_price:?}", color = LogColor::Cyan);
401 }
402 Ok(())
403 }
404
405 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
406 if self.config.log_data {
407 log_info!("{index_price:?}", color = LogColor::Cyan);
408 }
409 Ok(())
410 }
411
412 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
413 if self.config.log_data {
414 log_info!("{funding_rate:?}", color = LogColor::Cyan);
415 }
416 Ok(())
417 }
418
419 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
420 if self.config.log_data {
421 log_info!("{data:?}", color = LogColor::Cyan);
422 }
423 Ok(())
424 }
425
426 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
427 if self.config.log_data {
428 log_info!("{update:?}", color = LogColor::Cyan);
429 }
430 Ok(())
431 }
432
433 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
434 if self.config.log_data {
435 log_info!("{greeks:?}", color = LogColor::Cyan);
436 }
437 Ok(())
438 }
439
440 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
441 if self.config.log_data {
442 log_info!(
443 "Received {} historical trades",
444 trades.len(),
445 color = LogColor::Cyan
446 );
447 for trade in trades.iter().take(5) {
448 log_info!(" {trade:?}", color = LogColor::Cyan);
449 }
450
451 if trades.len() > 5 {
452 log_info!(
453 " ... and {} more trades",
454 trades.len() - 5,
455 color = LogColor::Cyan
456 );
457 }
458 }
459 Ok(())
460 }
461
462 fn on_historical_funding_rates(
463 &mut self,
464 funding_rates: &[FundingRateUpdate],
465 ) -> anyhow::Result<()> {
466 if self.config.log_data {
467 log_info!(
468 "Received {} historical funding rates",
469 funding_rates.len(),
470 color = LogColor::Cyan
471 );
472 for rate in funding_rates.iter().take(5) {
473 log_info!(" {rate:?}", color = LogColor::Cyan);
474 }
475
476 if funding_rates.len() > 5 {
477 log_info!(
478 " ... and {} more funding rates",
479 funding_rates.len() - 5,
480 color = LogColor::Cyan
481 );
482 }
483 }
484 Ok(())
485 }
486
487 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
488 if self.config.log_data {
489 log_info!(
490 "Received {} historical bars",
491 bars.len(),
492 color = LogColor::Cyan
493 );
494 for bar in bars.iter().take(5) {
495 log_info!(" {bar:?}", color = LogColor::Cyan);
496 }
497
498 if bars.len() > 5 {
499 log_info!(
500 " ... and {} more bars",
501 bars.len() - 5,
502 color = LogColor::Cyan
503 );
504 }
505 }
506 Ok(())
507 }
508}
509
510impl DataTester {
511 #[must_use]
513 pub fn new(config: DataTesterConfig) -> Self {
514 Self {
515 core: DataActorCore::new(config.base.clone()),
516 config,
517 books: AHashMap::new(),
518 }
519 }
520}