1use std::time::Duration;
17
18use ahash::{AHashMap, AHashSet};
19use chrono::Duration as ChronoDuration;
20use nautilus_common::{
21 actor::{DataActor, DataActorCore},
22 enums::LogColor,
23 log_info, nautilus_actor,
24 timer::TimeEvent,
25};
26use nautilus_model::{
27 data::{
28 Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
29 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, option_chain::OptionGreeks,
30 },
31 identifiers::InstrumentId,
32 instruments::InstrumentAny,
33 orderbook::OrderBook,
34};
35
36use super::config::DataTesterConfig;
37
38#[derive(Debug)]
47pub struct DataTester {
48 pub(super) core: DataActorCore,
49 pub(super) config: DataTesterConfig,
50 pub(super) books: AHashMap<InstrumentId, OrderBook>,
51}
52
53nautilus_actor!(DataTester);
54
55impl DataActor for DataTester {
56 fn on_start(&mut self) -> anyhow::Result<()> {
57 let instrument_ids = self.config.instrument_ids.clone();
58 let client_id = self.config.client_id;
59 let subscribe_params = self.config.subscribe_params.clone();
60 let request_params = self.config.request_params.clone();
61 let stats_interval_secs = self.config.stats_interval_secs;
62
63 if self.config.request_instruments {
65 let mut venues = AHashSet::new();
66 for instrument_id in &instrument_ids {
67 venues.insert(instrument_id.venue);
68 }
69
70 for venue in venues {
71 let _ = self.request_instruments(
72 Some(venue),
73 None,
74 None,
75 client_id,
76 request_params.clone(),
77 );
78 }
79 }
80
81 for instrument_id in instrument_ids {
83 if self.config.subscribe_instrument {
84 self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
85 }
86
87 if self.config.subscribe_book_deltas {
88 self.subscribe_book_deltas(
89 instrument_id,
90 self.config.book_type,
91 None,
92 client_id,
93 self.config.manage_book,
94 subscribe_params.clone(),
95 );
96
97 if self.config.manage_book {
98 let book = OrderBook::new(instrument_id, self.config.book_type);
99 self.books.insert(instrument_id, book);
100 }
101 }
102
103 if self.config.subscribe_book_at_interval {
104 self.subscribe_book_at_interval(
105 instrument_id,
106 self.config.book_type,
107 self.config.book_depth,
108 self.config.book_interval_ms,
109 client_id,
110 subscribe_params.clone(),
111 );
112 }
113
114 if self.config.subscribe_quotes {
126 self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
127 }
128
129 if self.config.subscribe_trades {
130 self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
131 }
132
133 if self.config.subscribe_mark_prices {
134 self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
135 }
136
137 if self.config.subscribe_index_prices {
138 self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
139 }
140
141 if self.config.subscribe_funding_rates {
142 self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
143 }
144
145 if self.config.subscribe_instrument_status {
146 self.subscribe_instrument_status(
147 instrument_id,
148 client_id,
149 subscribe_params.clone(),
150 );
151 }
152
153 if self.config.subscribe_instrument_close {
154 self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
155 }
156
157 if self.config.subscribe_option_greeks {
158 self.subscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
159 }
160
161 if self.config.request_quotes {
163 let start = self.clock().utc_now() - ChronoDuration::hours(1);
164
165 if let Err(e) = self.request_quotes(
166 instrument_id,
167 Some(start),
168 None,
169 None,
170 client_id,
171 request_params.clone(),
172 ) {
173 log::error!("Failed to request quotes for {instrument_id}: {e}");
174 }
175 }
176
177 if self.config.request_book_snapshot {
179 let _ = self.request_book_snapshot(
180 instrument_id,
181 self.config.book_depth,
182 client_id,
183 request_params.clone(),
184 );
185 }
186
187 if self.config.request_trades {
191 let start = self.clock().utc_now() - ChronoDuration::hours(1);
192
193 if let Err(e) = self.request_trades(
194 instrument_id,
195 Some(start),
196 None,
197 None,
198 client_id,
199 request_params.clone(),
200 ) {
201 log::error!("Failed to request trades for {instrument_id}: {e}");
202 }
203 }
204
205 if self.config.request_funding_rates {
207 let start = self.clock().utc_now() - ChronoDuration::days(7);
208
209 if let Err(e) = self.request_funding_rates(
210 instrument_id,
211 Some(start),
212 None,
213 None,
214 client_id,
215 request_params.clone(),
216 ) {
217 log::error!("Failed to request funding rates for {instrument_id}: {e}");
218 }
219 }
220 }
221
222 if let Some(bar_types) = self.config.bar_types.clone() {
224 for bar_type in bar_types {
225 if self.config.subscribe_bars {
226 self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
227 }
228
229 if self.config.request_bars {
231 let start = self.clock().utc_now() - ChronoDuration::hours(1);
232
233 if let Err(e) = self.request_bars(
234 bar_type,
235 Some(start),
236 None,
237 None,
238 client_id,
239 request_params.clone(),
240 ) {
241 log::error!("Failed to request bars for {bar_type}: {e}");
242 }
243 }
244 }
245 }
246
247 if stats_interval_secs > 0 {
249 self.clock().set_timer(
250 "STATS-TIMER",
251 Duration::from_secs(stats_interval_secs),
252 None,
253 None,
254 None,
255 Some(true),
256 Some(false),
257 )?;
258 }
259
260 Ok(())
261 }
262
263 fn on_stop(&mut self) -> anyhow::Result<()> {
264 if !self.config.can_unsubscribe {
265 return Ok(());
266 }
267
268 let instrument_ids = self.config.instrument_ids.clone();
269 let client_id = self.config.client_id;
270 let subscribe_params = self.config.subscribe_params.clone();
271
272 for instrument_id in instrument_ids {
273 if self.config.subscribe_instrument {
274 self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
275 }
276
277 if self.config.subscribe_book_deltas {
278 self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
279 }
280
281 if self.config.subscribe_book_at_interval {
282 self.unsubscribe_book_at_interval(
283 instrument_id,
284 self.config.book_interval_ms,
285 client_id,
286 subscribe_params.clone(),
287 );
288 }
289
290 if self.config.subscribe_quotes {
296 self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
297 }
298
299 if self.config.subscribe_trades {
300 self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
301 }
302
303 if self.config.subscribe_mark_prices {
304 self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
305 }
306
307 if self.config.subscribe_index_prices {
308 self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
309 }
310
311 if self.config.subscribe_funding_rates {
312 self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
313 }
314
315 if self.config.subscribe_instrument_status {
316 self.unsubscribe_instrument_status(
317 instrument_id,
318 client_id,
319 subscribe_params.clone(),
320 );
321 }
322
323 if self.config.subscribe_instrument_close {
324 self.unsubscribe_instrument_close(
325 instrument_id,
326 client_id,
327 subscribe_params.clone(),
328 );
329 }
330
331 if self.config.subscribe_option_greeks {
332 self.unsubscribe_option_greeks(instrument_id, client_id, subscribe_params.clone());
333 }
334 }
335
336 if let Some(bar_types) = self.config.bar_types.clone() {
337 for bar_type in bar_types {
338 if self.config.subscribe_bars {
339 self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
340 }
341 }
342 }
343
344 Ok(())
345 }
346
347 fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
348 Ok(())
350 }
351
352 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
353 if self.config.log_data {
354 log_info!("{instrument:?}", color = LogColor::Cyan);
355 }
356 Ok(())
357 }
358
359 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
360 if self.config.log_data {
361 let levels = self.config.book_levels_to_print;
362 let instrument_id = book.instrument_id;
363 let book_str = book.pprint(levels, None);
364 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
365 }
366
367 Ok(())
368 }
369
370 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
371 if self.config.manage_book {
372 if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
373 book.apply_deltas(deltas)?;
374
375 if self.config.log_data {
376 let levels = self.config.book_levels_to_print;
377 let instrument_id = deltas.instrument_id;
378 let book_str = book.pprint(levels, None);
379 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
380 }
381 }
382 } else if self.config.log_data {
383 log_info!("{deltas:?}", color = LogColor::Cyan);
384 }
385 Ok(())
386 }
387
388 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
389 if self.config.log_data {
390 log_info!("{quote:?}", color = LogColor::Cyan);
391 }
392 Ok(())
393 }
394
395 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
396 if self.config.log_data {
397 log_info!("{trade:?}", color = LogColor::Cyan);
398 }
399 Ok(())
400 }
401
402 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
403 if self.config.log_data {
404 log_info!("{bar:?}", color = LogColor::Cyan);
405 }
406 Ok(())
407 }
408
409 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
410 if self.config.log_data {
411 log_info!("{mark_price:?}", color = LogColor::Cyan);
412 }
413 Ok(())
414 }
415
416 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
417 if self.config.log_data {
418 log_info!("{index_price:?}", color = LogColor::Cyan);
419 }
420 Ok(())
421 }
422
423 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
424 if self.config.log_data {
425 log_info!("{funding_rate:?}", color = LogColor::Cyan);
426 }
427 Ok(())
428 }
429
430 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
431 if self.config.log_data {
432 log_info!("{data:?}", color = LogColor::Cyan);
433 }
434 Ok(())
435 }
436
437 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
438 if self.config.log_data {
439 log_info!("{update:?}", color = LogColor::Cyan);
440 }
441 Ok(())
442 }
443
444 fn on_option_greeks(&mut self, greeks: &OptionGreeks) -> anyhow::Result<()> {
445 if self.config.log_data {
446 log_info!("{greeks:?}", color = LogColor::Cyan);
447 }
448 Ok(())
449 }
450
451 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
452 if self.config.log_data {
453 log_info!(
454 "Received {} historical trades",
455 trades.len(),
456 color = LogColor::Cyan
457 );
458
459 for trade in trades.iter().take(5) {
460 log_info!(" {trade:?}", color = LogColor::Cyan);
461 }
462
463 if trades.len() > 5 {
464 log_info!(
465 " ... and {} more trades",
466 trades.len() - 5,
467 color = LogColor::Cyan
468 );
469 }
470 }
471 Ok(())
472 }
473
474 fn on_historical_quotes(&mut self, quotes: &[QuoteTick]) -> anyhow::Result<()> {
475 if self.config.log_data {
476 log_info!(
477 "Received {} historical quotes",
478 quotes.len(),
479 color = LogColor::Cyan
480 );
481
482 for quote in quotes.iter().take(5) {
483 log_info!(" {quote:?}", color = LogColor::Cyan);
484 }
485
486 if quotes.len() > 5 {
487 log_info!(
488 " ... and {} more quotes",
489 quotes.len() - 5,
490 color = LogColor::Cyan
491 );
492 }
493 }
494 Ok(())
495 }
496
497 fn on_historical_funding_rates(
498 &mut self,
499 funding_rates: &[FundingRateUpdate],
500 ) -> anyhow::Result<()> {
501 if self.config.log_data {
502 log_info!(
503 "Received {} historical funding rates",
504 funding_rates.len(),
505 color = LogColor::Cyan
506 );
507
508 for rate in funding_rates.iter().take(5) {
509 log_info!(" {rate:?}", color = LogColor::Cyan);
510 }
511
512 if funding_rates.len() > 5 {
513 log_info!(
514 " ... and {} more funding rates",
515 funding_rates.len() - 5,
516 color = LogColor::Cyan
517 );
518 }
519 }
520 Ok(())
521 }
522
523 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
524 if self.config.log_data {
525 log_info!(
526 "Received {} historical bars",
527 bars.len(),
528 color = LogColor::Cyan
529 );
530
531 for bar in bars.iter().take(5) {
532 log_info!(" {bar:?}", color = LogColor::Cyan);
533 }
534
535 if bars.len() > 5 {
536 log_info!(
537 " ... and {} more bars",
538 bars.len() - 5,
539 color = LogColor::Cyan
540 );
541 }
542 }
543 Ok(())
544 }
545}
546
547impl DataTester {
548 #[must_use]
550 pub fn new(config: DataTesterConfig) -> Self {
551 Self {
552 core: DataActorCore::new(config.base.clone()),
553 config,
554 books: AHashMap::new(),
555 }
556 }
557}