1use std::{
19 num::NonZeroUsize,
20 ops::{Deref, DerefMut},
21 time::Duration,
22};
23
24use ahash::{AHashMap, AHashSet};
25use chrono::Duration as ChronoDuration;
26use nautilus_common::{
27 actor::{DataActor, DataActorConfig, DataActorCore},
28 enums::LogColor,
29 log_info,
30 timer::TimeEvent,
31};
32use nautilus_core::Params;
33use nautilus_model::{
34 data::{
35 Bar, FundingRateUpdate, IndexPriceUpdate, InstrumentClose, InstrumentStatus,
36 MarkPriceUpdate, OrderBookDeltas, QuoteTick, TradeTick, bar::BarType,
37 },
38 enums::BookType,
39 identifiers::{ClientId, InstrumentId},
40 instruments::InstrumentAny,
41 orderbook::OrderBook,
42};
43
44#[derive(Debug, Clone)]
46pub struct DataTesterConfig {
47 pub base: DataActorConfig,
49 pub instrument_ids: Vec<InstrumentId>,
51 pub client_id: Option<ClientId>,
53 pub bar_types: Option<Vec<BarType>>,
55 pub subscribe_book_deltas: bool,
57 pub subscribe_book_depth: bool,
59 pub subscribe_book_at_interval: bool,
61 pub subscribe_quotes: bool,
63 pub subscribe_trades: bool,
65 pub subscribe_mark_prices: bool,
67 pub subscribe_index_prices: bool,
69 pub subscribe_funding_rates: bool,
71 pub subscribe_bars: bool,
73 pub subscribe_instrument: bool,
75 pub subscribe_instrument_status: bool,
77 pub subscribe_instrument_close: bool,
79 pub subscribe_params: Option<Params>,
81 pub request_params: Option<Params>,
83 pub can_unsubscribe: bool,
85 pub request_instruments: bool,
87 pub request_quotes: bool,
90 pub request_trades: bool,
93 pub request_bars: bool,
95 pub request_book_snapshot: bool,
97 pub request_book_deltas: bool,
100 pub request_funding_rates: bool,
102 pub book_type: BookType,
105 pub book_depth: Option<NonZeroUsize>,
107 pub book_interval_ms: NonZeroUsize,
110 pub book_levels_to_print: usize,
112 pub manage_book: bool,
114 pub log_data: bool,
116 pub stats_interval_secs: u64,
118}
119
120impl DataTesterConfig {
121 #[must_use]
127 pub fn new(client_id: ClientId, instrument_ids: Vec<InstrumentId>) -> Self {
128 Self {
129 base: DataActorConfig::default(),
130 instrument_ids,
131 client_id: Some(client_id),
132 bar_types: None,
133 subscribe_book_deltas: false,
134 subscribe_book_depth: false,
135 subscribe_book_at_interval: false,
136 subscribe_quotes: false,
137 subscribe_trades: false,
138 subscribe_mark_prices: false,
139 subscribe_index_prices: false,
140 subscribe_funding_rates: false,
141 subscribe_bars: false,
142
143 subscribe_instrument: false,
144 subscribe_instrument_status: false,
145 subscribe_instrument_close: false,
146 subscribe_params: None,
147 request_params: None,
148 can_unsubscribe: true,
149 request_instruments: false,
150 request_quotes: false,
151 request_trades: false,
152 request_bars: false,
153 request_book_snapshot: false,
154 request_book_deltas: false,
155 request_funding_rates: false,
156 book_type: BookType::L2_MBP,
157 book_depth: None,
158 book_interval_ms: NonZeroUsize::new(1000).unwrap(),
159 book_levels_to_print: 10,
160 manage_book: true,
161 log_data: true,
162 stats_interval_secs: 5,
163 }
164 }
165
166 #[must_use]
167 pub fn with_log_data(mut self, log_data: bool) -> Self {
168 self.log_data = log_data;
169 self
170 }
171
172 #[must_use]
173 pub fn with_subscribe_book_deltas(mut self, subscribe: bool) -> Self {
174 self.subscribe_book_deltas = subscribe;
175 self
176 }
177
178 #[must_use]
179 pub fn with_subscribe_book_depth(mut self, subscribe: bool) -> Self {
180 self.subscribe_book_depth = subscribe;
181 self
182 }
183
184 #[must_use]
185 pub fn with_subscribe_book_at_interval(mut self, subscribe: bool) -> Self {
186 self.subscribe_book_at_interval = subscribe;
187 self
188 }
189
190 #[must_use]
191 pub fn with_subscribe_quotes(mut self, subscribe: bool) -> Self {
192 self.subscribe_quotes = subscribe;
193 self
194 }
195
196 #[must_use]
197 pub fn with_subscribe_trades(mut self, subscribe: bool) -> Self {
198 self.subscribe_trades = subscribe;
199 self
200 }
201
202 #[must_use]
203 pub fn with_subscribe_mark_prices(mut self, subscribe: bool) -> Self {
204 self.subscribe_mark_prices = subscribe;
205 self
206 }
207
208 #[must_use]
209 pub fn with_subscribe_index_prices(mut self, subscribe: bool) -> Self {
210 self.subscribe_index_prices = subscribe;
211 self
212 }
213
214 #[must_use]
215 pub fn with_subscribe_funding_rates(mut self, subscribe: bool) -> Self {
216 self.subscribe_funding_rates = subscribe;
217 self
218 }
219
220 #[must_use]
221 pub fn with_subscribe_bars(mut self, subscribe: bool) -> Self {
222 self.subscribe_bars = subscribe;
223 self
224 }
225
226 #[must_use]
227 pub fn with_bar_types(mut self, bar_types: Vec<BarType>) -> Self {
228 self.bar_types = Some(bar_types);
229 self
230 }
231
232 #[must_use]
233 pub fn with_subscribe_instrument(mut self, subscribe: bool) -> Self {
234 self.subscribe_instrument = subscribe;
235 self
236 }
237
238 #[must_use]
239 pub fn with_subscribe_instrument_status(mut self, subscribe: bool) -> Self {
240 self.subscribe_instrument_status = subscribe;
241 self
242 }
243
244 #[must_use]
245 pub fn with_subscribe_instrument_close(mut self, subscribe: bool) -> Self {
246 self.subscribe_instrument_close = subscribe;
247 self
248 }
249
250 #[must_use]
251 pub fn with_book_type(mut self, book_type: BookType) -> Self {
252 self.book_type = book_type;
253 self
254 }
255
256 #[must_use]
257 pub fn with_book_depth(mut self, depth: Option<NonZeroUsize>) -> Self {
258 self.book_depth = depth;
259 self
260 }
261
262 #[must_use]
263 pub fn with_book_interval_ms(mut self, interval_ms: NonZeroUsize) -> Self {
264 self.book_interval_ms = interval_ms;
265 self
266 }
267
268 #[must_use]
269 pub fn with_manage_book(mut self, manage: bool) -> Self {
270 self.manage_book = manage;
271 self
272 }
273
274 #[must_use]
275 pub fn with_request_instruments(mut self, request: bool) -> Self {
276 self.request_instruments = request;
277 self
278 }
279
280 #[must_use]
281 pub fn with_request_book_snapshot(mut self, request: bool) -> Self {
282 self.request_book_snapshot = request;
283 self
284 }
285
286 #[must_use]
287 pub fn with_request_book_deltas(mut self, request: bool) -> Self {
288 self.request_book_deltas = request;
289 self
290 }
291
292 #[must_use]
293 pub fn with_request_trades(mut self, request: bool) -> Self {
294 self.request_trades = request;
295 self
296 }
297
298 #[must_use]
299 pub fn with_request_bars(mut self, request: bool) -> Self {
300 self.request_bars = request;
301 self
302 }
303
304 #[must_use]
305 pub fn with_request_funding_rates(mut self, request: bool) -> Self {
306 self.request_funding_rates = request;
307 self
308 }
309
310 #[must_use]
311 pub fn with_can_unsubscribe(mut self, can_unsubscribe: bool) -> Self {
312 self.can_unsubscribe = can_unsubscribe;
313 self
314 }
315
316 #[must_use]
317 pub fn with_subscribe_params(mut self, params: Option<Params>) -> Self {
318 self.subscribe_params = params;
319 self
320 }
321
322 #[must_use]
323 pub fn with_request_params(mut self, params: Option<Params>) -> Self {
324 self.request_params = params;
325 self
326 }
327
328 #[must_use]
329 pub fn with_stats_interval_secs(mut self, interval_secs: u64) -> Self {
330 self.stats_interval_secs = interval_secs;
331 self
332 }
333}
334
335impl Default for DataTesterConfig {
336 fn default() -> Self {
337 Self {
338 base: DataActorConfig::default(),
339 instrument_ids: Vec::new(),
340 client_id: None,
341 bar_types: None,
342 subscribe_book_deltas: false,
343 subscribe_book_depth: false,
344 subscribe_book_at_interval: false,
345 subscribe_quotes: false,
346 subscribe_trades: false,
347 subscribe_mark_prices: false,
348 subscribe_index_prices: false,
349 subscribe_funding_rates: false,
350 subscribe_bars: false,
351 subscribe_instrument: false,
352 subscribe_instrument_status: false,
353 subscribe_instrument_close: false,
354 subscribe_params: None,
355 request_params: None,
356 can_unsubscribe: true,
357 request_instruments: false,
358 request_quotes: false,
359 request_trades: false,
360 request_bars: false,
361 request_book_snapshot: false,
362 request_book_deltas: false,
363 request_funding_rates: false,
364 book_type: BookType::L2_MBP,
365 book_depth: None,
366 book_interval_ms: NonZeroUsize::new(1000).unwrap(),
367 book_levels_to_print: 10,
368 manage_book: false,
369 log_data: true,
370 stats_interval_secs: 5,
371 }
372 }
373}
374
375#[derive(Debug)]
384pub struct DataTester {
385 core: DataActorCore,
386 config: DataTesterConfig,
387 books: AHashMap<InstrumentId, OrderBook>,
388}
389
390impl Deref for DataTester {
391 type Target = DataActorCore;
392
393 fn deref(&self) -> &Self::Target {
394 &self.core
395 }
396}
397
398impl DerefMut for DataTester {
399 fn deref_mut(&mut self) -> &mut Self::Target {
400 &mut self.core
401 }
402}
403
404impl DataActor for DataTester {
405 fn on_start(&mut self) -> anyhow::Result<()> {
406 let instrument_ids = self.config.instrument_ids.clone();
407 let client_id = self.config.client_id;
408 let subscribe_params = self.config.subscribe_params.clone();
409 let request_params = self.config.request_params.clone();
410 let stats_interval_secs = self.config.stats_interval_secs;
411
412 if self.config.request_instruments {
414 let mut venues = AHashSet::new();
415 for instrument_id in &instrument_ids {
416 venues.insert(instrument_id.venue);
417 }
418
419 for venue in venues {
420 let _ = self.request_instruments(
421 Some(venue),
422 None,
423 None,
424 client_id,
425 request_params.clone(),
426 );
427 }
428 }
429
430 for instrument_id in instrument_ids {
432 if self.config.subscribe_instrument {
433 self.subscribe_instrument(instrument_id, client_id, subscribe_params.clone());
434 }
435
436 if self.config.subscribe_book_deltas {
437 self.subscribe_book_deltas(
438 instrument_id,
439 self.config.book_type,
440 None,
441 client_id,
442 self.config.manage_book,
443 subscribe_params.clone(),
444 );
445
446 if self.config.manage_book {
447 let book = OrderBook::new(instrument_id, self.config.book_type);
448 self.books.insert(instrument_id, book);
449 }
450 }
451
452 if self.config.subscribe_book_at_interval {
453 self.subscribe_book_at_interval(
454 instrument_id,
455 self.config.book_type,
456 self.config.book_depth,
457 self.config.book_interval_ms,
458 client_id,
459 subscribe_params.clone(),
460 );
461 }
462
463 if self.config.subscribe_quotes {
475 self.subscribe_quotes(instrument_id, client_id, subscribe_params.clone());
476 }
477
478 if self.config.subscribe_trades {
479 self.subscribe_trades(instrument_id, client_id, subscribe_params.clone());
480 }
481
482 if self.config.subscribe_mark_prices {
483 self.subscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
484 }
485
486 if self.config.subscribe_index_prices {
487 self.subscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
488 }
489
490 if self.config.subscribe_funding_rates {
491 self.subscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
492 }
493
494 if self.config.subscribe_instrument_status {
495 self.subscribe_instrument_status(
496 instrument_id,
497 client_id,
498 subscribe_params.clone(),
499 );
500 }
501
502 if self.config.subscribe_instrument_close {
503 self.subscribe_instrument_close(instrument_id, client_id, subscribe_params.clone());
504 }
505
506 if self.config.request_book_snapshot {
513 let _ = self.request_book_snapshot(
514 instrument_id,
515 self.config.book_depth,
516 client_id,
517 request_params.clone(),
518 );
519 }
520
521 if self.config.request_trades {
525 let start = self.clock().utc_now() - ChronoDuration::hours(1);
526 if let Err(e) = self.request_trades(
527 instrument_id,
528 Some(start),
529 None,
530 None,
531 client_id,
532 request_params.clone(),
533 ) {
534 log::error!("Failed to request trades for {instrument_id}: {e}");
535 }
536 }
537
538 if self.config.request_funding_rates {
540 let start = self.clock().utc_now() - ChronoDuration::days(7);
541 if let Err(e) = self.request_funding_rates(
542 instrument_id,
543 Some(start),
544 None,
545 None,
546 client_id,
547 request_params.clone(),
548 ) {
549 log::error!("Failed to request funding rates for {instrument_id}: {e}");
550 }
551 }
552 }
553
554 if let Some(bar_types) = self.config.bar_types.clone() {
556 for bar_type in bar_types {
557 if self.config.subscribe_bars {
558 self.subscribe_bars(bar_type, client_id, subscribe_params.clone());
559 }
560
561 if self.config.request_bars {
563 let start = self.clock().utc_now() - ChronoDuration::hours(1);
564 if let Err(e) = self.request_bars(
565 bar_type,
566 Some(start),
567 None,
568 None,
569 client_id,
570 request_params.clone(),
571 ) {
572 log::error!("Failed to request bars for {bar_type}: {e}");
573 }
574 }
575 }
576 }
577
578 if stats_interval_secs > 0 {
580 self.clock().set_timer(
581 "STATS-TIMER",
582 Duration::from_secs(stats_interval_secs),
583 None,
584 None,
585 None,
586 Some(true),
587 Some(false),
588 )?;
589 }
590
591 Ok(())
592 }
593
594 fn on_stop(&mut self) -> anyhow::Result<()> {
595 if !self.config.can_unsubscribe {
596 return Ok(());
597 }
598
599 let instrument_ids = self.config.instrument_ids.clone();
600 let client_id = self.config.client_id;
601 let subscribe_params = self.config.subscribe_params.clone();
602
603 for instrument_id in instrument_ids {
604 if self.config.subscribe_instrument {
605 self.unsubscribe_instrument(instrument_id, client_id, subscribe_params.clone());
606 }
607
608 if self.config.subscribe_book_deltas {
609 self.unsubscribe_book_deltas(instrument_id, client_id, subscribe_params.clone());
610 }
611
612 if self.config.subscribe_book_at_interval {
613 self.unsubscribe_book_at_interval(
614 instrument_id,
615 self.config.book_interval_ms,
616 client_id,
617 subscribe_params.clone(),
618 );
619 }
620
621 if self.config.subscribe_quotes {
627 self.unsubscribe_quotes(instrument_id, client_id, subscribe_params.clone());
628 }
629
630 if self.config.subscribe_trades {
631 self.unsubscribe_trades(instrument_id, client_id, subscribe_params.clone());
632 }
633
634 if self.config.subscribe_mark_prices {
635 self.unsubscribe_mark_prices(instrument_id, client_id, subscribe_params.clone());
636 }
637
638 if self.config.subscribe_index_prices {
639 self.unsubscribe_index_prices(instrument_id, client_id, subscribe_params.clone());
640 }
641
642 if self.config.subscribe_funding_rates {
643 self.unsubscribe_funding_rates(instrument_id, client_id, subscribe_params.clone());
644 }
645
646 if self.config.subscribe_instrument_status {
647 self.unsubscribe_instrument_status(
648 instrument_id,
649 client_id,
650 subscribe_params.clone(),
651 );
652 }
653
654 if self.config.subscribe_instrument_close {
655 self.unsubscribe_instrument_close(
656 instrument_id,
657 client_id,
658 subscribe_params.clone(),
659 );
660 }
661 }
662
663 if let Some(bar_types) = self.config.bar_types.clone() {
664 for bar_type in bar_types {
665 if self.config.subscribe_bars {
666 self.unsubscribe_bars(bar_type, client_id, subscribe_params.clone());
667 }
668 }
669 }
670
671 Ok(())
672 }
673
674 fn on_time_event(&mut self, _event: &TimeEvent) -> anyhow::Result<()> {
675 Ok(())
677 }
678
679 fn on_instrument(&mut self, instrument: &InstrumentAny) -> anyhow::Result<()> {
680 if self.config.log_data {
681 log_info!("{instrument:?}", color = LogColor::Cyan);
682 }
683 Ok(())
684 }
685
686 fn on_book(&mut self, book: &OrderBook) -> anyhow::Result<()> {
687 if self.config.log_data {
688 let levels = self.config.book_levels_to_print;
689 let instrument_id = book.instrument_id;
690 let book_str = book.pprint(levels, None);
691 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
692 }
693
694 Ok(())
695 }
696
697 fn on_book_deltas(&mut self, deltas: &OrderBookDeltas) -> anyhow::Result<()> {
698 if self.config.manage_book {
699 if let Some(book) = self.books.get_mut(&deltas.instrument_id) {
700 book.apply_deltas(deltas)?;
701
702 if self.config.log_data {
703 let levels = self.config.book_levels_to_print;
704 let instrument_id = deltas.instrument_id;
705 let book_str = book.pprint(levels, None);
706 log_info!("\n{instrument_id}\n{book_str}", color = LogColor::Cyan);
707 }
708 }
709 } else if self.config.log_data {
710 log_info!("{deltas:?}", color = LogColor::Cyan);
711 }
712 Ok(())
713 }
714
715 fn on_quote(&mut self, quote: &QuoteTick) -> anyhow::Result<()> {
716 if self.config.log_data {
717 log_info!("{quote:?}", color = LogColor::Cyan);
718 }
719 Ok(())
720 }
721
722 fn on_trade(&mut self, trade: &TradeTick) -> anyhow::Result<()> {
723 if self.config.log_data {
724 log_info!("{trade:?}", color = LogColor::Cyan);
725 }
726 Ok(())
727 }
728
729 fn on_bar(&mut self, bar: &Bar) -> anyhow::Result<()> {
730 if self.config.log_data {
731 log_info!("{bar:?}", color = LogColor::Cyan);
732 }
733 Ok(())
734 }
735
736 fn on_mark_price(&mut self, mark_price: &MarkPriceUpdate) -> anyhow::Result<()> {
737 if self.config.log_data {
738 log_info!("{mark_price:?}", color = LogColor::Cyan);
739 }
740 Ok(())
741 }
742
743 fn on_index_price(&mut self, index_price: &IndexPriceUpdate) -> anyhow::Result<()> {
744 if self.config.log_data {
745 log_info!("{index_price:?}", color = LogColor::Cyan);
746 }
747 Ok(())
748 }
749
750 fn on_funding_rate(&mut self, funding_rate: &FundingRateUpdate) -> anyhow::Result<()> {
751 if self.config.log_data {
752 log_info!("{funding_rate:?}", color = LogColor::Cyan);
753 }
754 Ok(())
755 }
756
757 fn on_instrument_status(&mut self, data: &InstrumentStatus) -> anyhow::Result<()> {
758 if self.config.log_data {
759 log_info!("{data:?}", color = LogColor::Cyan);
760 }
761 Ok(())
762 }
763
764 fn on_instrument_close(&mut self, update: &InstrumentClose) -> anyhow::Result<()> {
765 if self.config.log_data {
766 log_info!("{update:?}", color = LogColor::Cyan);
767 }
768 Ok(())
769 }
770
771 fn on_historical_trades(&mut self, trades: &[TradeTick]) -> anyhow::Result<()> {
772 if self.config.log_data {
773 log_info!(
774 "Received {} historical trades",
775 trades.len(),
776 color = LogColor::Cyan
777 );
778 for trade in trades.iter().take(5) {
779 log_info!(" {trade:?}", color = LogColor::Cyan);
780 }
781 if trades.len() > 5 {
782 log_info!(
783 " ... and {} more trades",
784 trades.len() - 5,
785 color = LogColor::Cyan
786 );
787 }
788 }
789 Ok(())
790 }
791
792 fn on_historical_funding_rates(
793 &mut self,
794 funding_rates: &[FundingRateUpdate],
795 ) -> anyhow::Result<()> {
796 if self.config.log_data {
797 log_info!(
798 "Received {} historical funding rates",
799 funding_rates.len(),
800 color = LogColor::Cyan
801 );
802 for rate in funding_rates.iter().take(5) {
803 log_info!(" {rate:?}", color = LogColor::Cyan);
804 }
805 if funding_rates.len() > 5 {
806 log_info!(
807 " ... and {} more funding rates",
808 funding_rates.len() - 5,
809 color = LogColor::Cyan
810 );
811 }
812 }
813 Ok(())
814 }
815
816 fn on_historical_bars(&mut self, bars: &[Bar]) -> anyhow::Result<()> {
817 if self.config.log_data {
818 log_info!(
819 "Received {} historical bars",
820 bars.len(),
821 color = LogColor::Cyan
822 );
823 for bar in bars.iter().take(5) {
824 log_info!(" {bar:?}", color = LogColor::Cyan);
825 }
826 if bars.len() > 5 {
827 log_info!(
828 " ... and {} more bars",
829 bars.len() - 5,
830 color = LogColor::Cyan
831 );
832 }
833 }
834 Ok(())
835 }
836}
837
838impl DataTester {
839 #[must_use]
841 pub fn new(config: DataTesterConfig) -> Self {
842 Self {
843 core: DataActorCore::new(config.base.clone()),
844 config,
845 books: AHashMap::new(),
846 }
847 }
848}
849
850#[cfg(test)]
851mod tests {
852 use nautilus_core::UnixNanos;
853 use nautilus_model::{
854 data::OrderBookDelta,
855 enums::{InstrumentCloseType, MarketStatusAction},
856 identifiers::Symbol,
857 instruments::CurrencyPair,
858 types::{Currency, Price, Quantity},
859 };
860 use rstest::*;
861 use rust_decimal::Decimal;
862
863 use super::*;
864
865 #[fixture]
866 fn config() -> DataTesterConfig {
867 let client_id = ClientId::new("TEST");
868 let instrument_ids = vec![
869 InstrumentId::from("BTC-USDT.TEST"),
870 InstrumentId::from("ETH-USDT.TEST"),
871 ];
872 DataTesterConfig::new(client_id, instrument_ids)
873 .with_subscribe_quotes(true)
874 .with_subscribe_trades(true)
875 }
876
877 #[rstest]
878 fn test_config_creation() {
879 let client_id = ClientId::new("TEST");
880 let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
881 let config =
882 DataTesterConfig::new(client_id, instrument_ids.clone()).with_subscribe_quotes(true);
883
884 assert_eq!(config.client_id, Some(client_id));
885 assert_eq!(config.instrument_ids, instrument_ids);
886 assert!(config.subscribe_quotes);
887 assert!(!config.subscribe_trades);
888 assert!(config.log_data);
889 assert_eq!(config.stats_interval_secs, 5);
890 }
891
892 #[rstest]
893 fn test_config_default() {
894 let config = DataTesterConfig::default();
895
896 assert_eq!(config.client_id, None);
897 assert!(config.instrument_ids.is_empty());
898 assert!(!config.subscribe_quotes);
899 assert!(!config.subscribe_trades);
900 assert!(!config.subscribe_bars);
901 assert!(!config.request_instruments);
902 assert!(!config.request_book_snapshot);
903 assert!(!config.request_book_deltas);
904 assert!(!config.request_trades);
905 assert!(!config.request_bars);
906 assert!(!config.request_funding_rates);
907 assert!(config.can_unsubscribe);
908 assert!(config.log_data);
909 assert!(config.subscribe_params.is_none());
910 assert!(config.request_params.is_none());
911 }
912
913 #[rstest]
914 fn test_config_with_params() {
915 let client_id = ClientId::new("TEST");
916 let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
917
918 let mut sub_params = Params::new();
919 sub_params.insert("key".to_string(), serde_json::json!("value"));
920
921 let mut req_params = Params::new();
922 req_params.insert("limit".to_string(), serde_json::json!(100));
923
924 let config = DataTesterConfig::new(client_id, instrument_ids)
925 .with_subscribe_params(Some(sub_params.clone()))
926 .with_request_params(Some(req_params.clone()));
927
928 assert_eq!(config.subscribe_params, Some(sub_params));
929 assert_eq!(config.request_params, Some(req_params));
930 }
931
932 #[rstest]
933 fn test_actor_creation(config: DataTesterConfig) {
934 let actor = DataTester::new(config);
935
936 assert_eq!(actor.config.client_id, Some(ClientId::new("TEST")));
937 assert_eq!(actor.config.instrument_ids.len(), 2);
938 }
939
940 #[rstest]
941 fn test_on_quote_with_logging_enabled(config: DataTesterConfig) {
942 let mut actor = DataTester::new(config);
943
944 let quote = QuoteTick::default();
945 let result = actor.on_quote("e);
946
947 assert!(result.is_ok());
948 }
949
950 #[rstest]
951 fn test_on_quote_with_logging_disabled(mut config: DataTesterConfig) {
952 config.log_data = false;
953 let mut actor = DataTester::new(config);
954
955 let quote = QuoteTick::default();
956 let result = actor.on_quote("e);
957
958 assert!(result.is_ok());
959 }
960
961 #[rstest]
962 fn test_on_trade(config: DataTesterConfig) {
963 let mut actor = DataTester::new(config);
964
965 let trade = TradeTick::default();
966 let result = actor.on_trade(&trade);
967
968 assert!(result.is_ok());
969 }
970
971 #[rstest]
972 fn test_on_bar(config: DataTesterConfig) {
973 let mut actor = DataTester::new(config);
974
975 let bar = Bar::default();
976 let result = actor.on_bar(&bar);
977
978 assert!(result.is_ok());
979 }
980
981 #[rstest]
982 fn test_on_instrument(config: DataTesterConfig) {
983 let mut actor = DataTester::new(config);
984
985 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
986 let instrument = CurrencyPair::new(
987 instrument_id,
988 Symbol::from("BTC/USDT"),
989 Currency::USD(),
990 Currency::USD(),
991 4,
992 3,
993 Price::from("0.0001"),
994 Quantity::from("0.001"),
995 None,
996 None,
997 None,
998 None,
999 None,
1000 None,
1001 None,
1002 None,
1003 None,
1004 None,
1005 None,
1006 None,
1007 None, UnixNanos::default(),
1009 UnixNanos::default(),
1010 );
1011 let result = actor.on_instrument(&InstrumentAny::CurrencyPair(instrument));
1012
1013 assert!(result.is_ok());
1014 }
1015
1016 #[rstest]
1017 fn test_on_book_deltas_without_managed_book(config: DataTesterConfig) {
1018 let mut actor = DataTester::new(config);
1019
1020 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1021 let delta =
1022 OrderBookDelta::clear(instrument_id, 0, UnixNanos::default(), UnixNanos::default());
1023 let deltas = OrderBookDeltas::new(instrument_id, vec![delta]);
1024 let result = actor.on_book_deltas(&deltas);
1025
1026 assert!(result.is_ok());
1027 }
1028
1029 #[rstest]
1030 fn test_on_mark_price(config: DataTesterConfig) {
1031 let mut actor = DataTester::new(config);
1032
1033 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1034 let price = Price::from("50000.0");
1035 let mark_price = MarkPriceUpdate::new(
1036 instrument_id,
1037 price,
1038 UnixNanos::default(),
1039 UnixNanos::default(),
1040 );
1041 let result = actor.on_mark_price(&mark_price);
1042
1043 assert!(result.is_ok());
1044 }
1045
1046 #[rstest]
1047 fn test_on_index_price(config: DataTesterConfig) {
1048 let mut actor = DataTester::new(config);
1049
1050 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1051 let price = Price::from("50000.0");
1052 let index_price = IndexPriceUpdate::new(
1053 instrument_id,
1054 price,
1055 UnixNanos::default(),
1056 UnixNanos::default(),
1057 );
1058 let result = actor.on_index_price(&index_price);
1059
1060 assert!(result.is_ok());
1061 }
1062
1063 #[rstest]
1064 fn test_on_funding_rate(config: DataTesterConfig) {
1065 let mut actor = DataTester::new(config);
1066
1067 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1068 let funding_rate = FundingRateUpdate::new(
1069 instrument_id,
1070 Decimal::new(1, 4),
1071 None,
1072 UnixNanos::default(),
1073 UnixNanos::default(),
1074 );
1075 let result = actor.on_funding_rate(&funding_rate);
1076
1077 assert!(result.is_ok());
1078 }
1079
1080 #[rstest]
1081 fn test_on_historical_funding_rates(config: DataTesterConfig) {
1082 let mut actor = DataTester::new(config);
1083
1084 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1085 let rates = vec![
1086 FundingRateUpdate::new(
1087 instrument_id,
1088 Decimal::new(1, 4),
1089 None,
1090 UnixNanos::default(),
1091 UnixNanos::default(),
1092 ),
1093 FundingRateUpdate::new(
1094 instrument_id,
1095 Decimal::new(2, 4),
1096 None,
1097 UnixNanos::default(),
1098 UnixNanos::default(),
1099 ),
1100 ];
1101 let result = actor.on_historical_funding_rates(&rates);
1102
1103 assert!(result.is_ok());
1104 }
1105
1106 #[rstest]
1107 fn test_config_request_funding_rates() {
1108 let client_id = ClientId::new("TEST");
1109 let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1110 let config =
1111 DataTesterConfig::new(client_id, instrument_ids).with_request_funding_rates(true);
1112
1113 assert!(config.request_funding_rates);
1114 }
1115
1116 #[rstest]
1117 fn test_config_request_book_deltas() {
1118 let client_id = ClientId::new("TEST");
1119 let instrument_ids = vec![InstrumentId::from("BTC-USDT.TEST")];
1120 let config =
1121 DataTesterConfig::new(client_id, instrument_ids).with_request_book_deltas(true);
1122
1123 assert!(config.request_book_deltas);
1124 }
1125
1126 #[rstest]
1127 fn test_on_instrument_status(config: DataTesterConfig) {
1128 let mut actor = DataTester::new(config);
1129
1130 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1131 let status = InstrumentStatus::new(
1132 instrument_id,
1133 MarketStatusAction::Trading,
1134 UnixNanos::default(),
1135 UnixNanos::default(),
1136 None,
1137 None,
1138 None,
1139 None,
1140 None,
1141 );
1142 let result = actor.on_instrument_status(&status);
1143
1144 assert!(result.is_ok());
1145 }
1146
1147 #[rstest]
1148 fn test_on_instrument_close(config: DataTesterConfig) {
1149 let mut actor = DataTester::new(config);
1150
1151 let instrument_id = InstrumentId::from("BTC-USDT.TEST");
1152 let price = Price::from("50000.0");
1153 let close = InstrumentClose::new(
1154 instrument_id,
1155 price,
1156 InstrumentCloseType::EndOfSession,
1157 UnixNanos::default(),
1158 UnixNanos::default(),
1159 );
1160 let result = actor.on_instrument_close(&close);
1161
1162 assert!(result.is_ok());
1163 }
1164
1165 #[rstest]
1166 fn test_on_time_event(config: DataTesterConfig) {
1167 let mut actor = DataTester::new(config);
1168
1169 let event = TimeEvent::new(
1170 "TEST".into(),
1171 Default::default(),
1172 UnixNanos::default(),
1173 UnixNanos::default(),
1174 );
1175 let result = actor.on_time_event(&event);
1176
1177 assert!(result.is_ok());
1178 }
1179
1180 #[rstest]
1181 fn test_config_with_all_subscriptions_enabled(mut config: DataTesterConfig) {
1182 config.subscribe_book_deltas = true;
1183 config.subscribe_book_at_interval = true;
1184 config.subscribe_bars = true;
1185 config.subscribe_mark_prices = true;
1186 config.subscribe_index_prices = true;
1187 config.subscribe_funding_rates = true;
1188 config.subscribe_instrument = true;
1189 config.subscribe_instrument_status = true;
1190 config.subscribe_instrument_close = true;
1191
1192 let actor = DataTester::new(config);
1193
1194 assert!(actor.config.subscribe_book_deltas);
1195 assert!(actor.config.subscribe_book_at_interval);
1196 assert!(actor.config.subscribe_bars);
1197 assert!(actor.config.subscribe_mark_prices);
1198 assert!(actor.config.subscribe_index_prices);
1199 assert!(actor.config.subscribe_funding_rates);
1200 assert!(actor.config.subscribe_instrument);
1201 assert!(actor.config.subscribe_instrument_status);
1202 assert!(actor.config.subscribe_instrument_close);
1203 }
1204
1205 #[rstest]
1206 fn test_config_with_book_management(mut config: DataTesterConfig) {
1207 config.manage_book = true;
1208 config.book_levels_to_print = 5;
1209
1210 let actor = DataTester::new(config);
1211
1212 assert!(actor.config.manage_book);
1213 assert_eq!(actor.config.book_levels_to_print, 5);
1214 assert!(actor.books.is_empty());
1215 }
1216
1217 #[rstest]
1218 fn test_config_with_custom_stats_interval(mut config: DataTesterConfig) {
1219 config.stats_interval_secs = 10;
1220
1221 let actor = DataTester::new(config);
1222
1223 assert_eq!(actor.config.stats_interval_secs, 10);
1224 }
1225
1226 #[rstest]
1227 fn test_config_with_unsubscribe_disabled(mut config: DataTesterConfig) {
1228 config.can_unsubscribe = false;
1229
1230 let actor = DataTester::new(config);
1231
1232 assert!(!actor.config.can_unsubscribe);
1233 }
1234}