1use std::{
2 collections::HashMap,
3 io::Error as IoError,
4 marker::PhantomData,
5 ops::{Deref, DerefMut},
6};
7
8pub use data::DataSource;
9use data::Reader;
10use models::FeeModel;
11use thiserror::Error;
12
13pub use crate::backtest::{
14 models::L3QueueModel,
15 proc::{L3Local, L3NoPartialFillExchange},
16};
17use crate::{
18 backtest::{
19 assettype::AssetType,
20 data::{Data, FeedLatencyAdjustment, NpyDTyped},
21 evs::{EventIntentKind, EventSet},
22 models::{LatencyModel, QueueModel},
23 order::order_bus,
24 proc::{Local, LocalProcessor, NoPartialFillExchange, PartialFillExchange, Processor},
25 state::State,
26 },
27 depth::{HashMapMarketDepth, L2MarketDepth, L3MarketDepth, MarketDepth},
28 prelude::{
29 Bot,
30 OrdType,
31 Order,
32 OrderId,
33 OrderRequest,
34 Side,
35 StateValues,
36 TimeInForce,
37 UNTIL_END_OF_DATA,
38 WaitOrderResponse,
39 },
40 types::{BuildError, ElapseResult, Event},
41};
42
43pub mod assettype;
45
46pub mod models;
47
48pub mod order;
50
51pub mod proc;
53
54pub mod state;
56
57pub mod recorder;
59
60pub mod data;
61mod evs;
62
63#[derive(Error, Debug)]
65pub enum BacktestError {
66 #[error("Order related to a given order id already exists")]
67 OrderIdExist,
68 #[error("Order request is in process")]
69 OrderRequestInProcess,
70 #[error("Order not found")]
71 OrderNotFound,
72 #[error("order request is invalid")]
73 InvalidOrderRequest,
74 #[error("order status is invalid to proceed the request")]
75 InvalidOrderStatus,
76 #[error("end of data")]
77 EndOfData,
78 #[error("data error: {0:?}")]
79 DataError(#[from] IoError),
80}
81
82pub struct Asset<L: ?Sized, E: ?Sized, D: NpyDTyped + Clone > {
84 pub local: Box<L>,
85 pub exch: Box<E>,
86 pub reader: Reader<D>,
87}
88
89impl<L, E, D: NpyDTyped + Clone> Asset<L, E, D> {
90 pub fn new(local: L, exch: E, reader: Reader<D>) -> Self {
93 Self {
94 local: Box::new(local),
95 exch: Box::new(exch),
96 reader,
97 }
98 }
99
100 pub fn l2_builder<LM, AT, QM, MD, FM>() -> L2AssetBuilder<LM, AT, QM, MD, FM>
102 where
103 AT: AssetType + Clone + 'static,
104 MD: MarketDepth + L2MarketDepth + 'static,
105 QM: QueueModel<MD> + 'static,
106 LM: LatencyModel + Clone + 'static,
107 FM: FeeModel + Clone + 'static,
108 {
109 L2AssetBuilder::new()
110 }
111
112 pub fn l3_builder<LM, AT, QM, MD, FM>() -> L3AssetBuilder<LM, AT, QM, MD, FM>
114 where
115 AT: AssetType + Clone + 'static,
116 MD: MarketDepth + L3MarketDepth + 'static,
117 QM: L3QueueModel<MD> + 'static,
118 LM: LatencyModel + Clone + 'static,
119 FM: FeeModel + Clone + 'static,
120 BacktestError: From<<MD as L3MarketDepth>::Error>,
121 {
122 L3AssetBuilder::new()
123 }
124}
125
126pub enum ExchangeKind {
128 NoPartialFillExchange,
130 PartialFillExchange,
132}
133
134pub struct L2AssetBuilder<LM, AT, QM, MD, FM> {
136 latency_model: Option<LM>,
137 asset_type: Option<AT>,
138 data: Vec<DataSource<Event>>,
139 parallel_load: bool,
140 latency_offset: i64,
141 fee_model: Option<FM>,
142 exch_kind: ExchangeKind,
143 last_trades_cap: usize,
144 queue_model: Option<QM>,
145 depth_builder: Option<Box<dyn Fn() -> MD>>,
146}
147
148impl<LM, AT, QM, MD, FM> L2AssetBuilder<LM, AT, QM, MD, FM>
149where
150 AT: AssetType + Clone + 'static,
151 MD: MarketDepth + L2MarketDepth + 'static,
152 QM: QueueModel<MD> + 'static,
153 LM: LatencyModel + Clone + 'static,
154 FM: FeeModel + Clone + 'static,
155{
156 pub fn new() -> Self {
158 Self {
159 latency_model: None,
160 asset_type: None,
161 data: vec![],
162 parallel_load: false,
163 latency_offset: 0,
164 fee_model: None,
165 exch_kind: ExchangeKind::NoPartialFillExchange,
166 last_trades_cap: 0,
167 queue_model: None,
168 depth_builder: None,
169 }
170 }
171
172 pub fn data(self, data: Vec<DataSource<Event>>) -> Self {
174 Self { data, ..self }
175 }
176
177 pub fn parallel_load(self, parallel_load: bool) -> Self {
181 Self {
182 parallel_load,
183 ..self
184 }
185 }
186
187 pub fn latency_offset(self, latency_offset: i64) -> Self {
191 Self {
192 latency_offset,
193 ..self
194 }
195 }
196
197 pub fn latency_model(self, latency_model: LM) -> Self {
199 Self {
200 latency_model: Some(latency_model),
201 ..self
202 }
203 }
204
205 pub fn asset_type(self, asset_type: AT) -> Self {
207 Self {
208 asset_type: Some(asset_type),
209 ..self
210 }
211 }
212
213 pub fn fee_model(self, fee_model: FM) -> Self {
215 Self {
216 fee_model: Some(fee_model),
217 ..self
218 }
219 }
220
221 pub fn exchange(self, exch_kind: ExchangeKind) -> Self {
223 Self { exch_kind, ..self }
224 }
225
226 pub fn last_trades_capacity(self, capacity: usize) -> Self {
229 Self {
230 last_trades_cap: capacity,
231 ..self
232 }
233 }
234
235 pub fn queue_model(self, queue_model: QM) -> Self {
237 Self {
238 queue_model: Some(queue_model),
239 ..self
240 }
241 }
242
243 pub fn depth<Builder>(self, builder: Builder) -> Self
245 where
246 Builder: Fn() -> MD + 'static,
247 {
248 Self {
249 depth_builder: Some(Box::new(builder)),
250 ..self
251 }
252 }
253
254 pub fn build(self) -> Result<Asset<dyn LocalProcessor<MD>, dyn Processor, Event>, BuildError> {
256 let reader = if self.latency_offset == 0 {
257 Reader::builder()
258 .parallel_load(self.parallel_load)
259 .data(self.data)
260 .build()
261 .map_err(|err| BuildError::Error(err.into()))?
262 } else {
263 Reader::builder()
264 .parallel_load(self.parallel_load)
265 .data(self.data)
266 .preprocessor(FeedLatencyAdjustment::new(self.latency_offset))
267 .build()
268 .map_err(|err| BuildError::Error(err.into()))?
269 };
270
271 let create_depth = self
272 .depth_builder
273 .as_ref()
274 .ok_or(BuildError::BuilderIncomplete("depth"))?;
275 let order_latency = self
276 .latency_model
277 .clone()
278 .ok_or(BuildError::BuilderIncomplete("order_latency"))?;
279 let asset_type = self
280 .asset_type
281 .clone()
282 .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
283 let fee_model = self
284 .fee_model
285 .clone()
286 .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
287
288 let (order_e2l, order_l2e) = order_bus(order_latency);
289
290 let local = Local::new(
291 create_depth(),
292 State::new(asset_type, fee_model),
293 self.last_trades_cap,
294 order_l2e,
295 );
296
297 let queue_model = self
298 .queue_model
299 .ok_or(BuildError::BuilderIncomplete("queue_model"))?;
300 let asset_type = self
301 .asset_type
302 .clone()
303 .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
304 let fee_model = self
305 .fee_model
306 .clone()
307 .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
308
309 match self.exch_kind {
310 ExchangeKind::NoPartialFillExchange => {
311 let exch = NoPartialFillExchange::new(
312 create_depth(),
313 State::new(asset_type, fee_model),
314 queue_model,
315 order_e2l,
316 );
317
318 Ok(Asset {
319 local: Box::new(local),
320 exch: Box::new(exch),
321 reader,
322 })
323 }
324 ExchangeKind::PartialFillExchange => {
325 let exch = PartialFillExchange::new(
326 create_depth(),
327 State::new(asset_type, fee_model),
328 queue_model,
329 order_e2l,
330 );
331
332 Ok(Asset {
333 local: Box::new(local),
334 exch: Box::new(exch),
335 reader,
336 })
337 }
338 }
339 }
340}
341
342impl<LM, AT, QM, MD, FM> Default for L2AssetBuilder<LM, AT, QM, MD, FM>
343where
344 AT: AssetType + Clone + 'static,
345 MD: MarketDepth + L2MarketDepth + 'static,
346 QM: QueueModel<MD> + 'static,
347 LM: LatencyModel + Clone + 'static,
348 FM: FeeModel + Clone + 'static,
349{
350 fn default() -> Self {
351 Self::new()
352 }
353}
354
355pub struct L3AssetBuilder<LM, AT, QM, MD, FM> {
357 latency_model: Option<LM>,
358 asset_type: Option<AT>,
359 data: Vec<DataSource<Event>>,
360 parallel_load: bool,
361 latency_offset: i64,
362 fee_model: Option<FM>,
363 exch_kind: ExchangeKind,
364 last_trades_cap: usize,
365 queue_model: Option<QM>,
366 depth_builder: Option<Box<dyn Fn() -> MD>>,
367}
368
369impl<LM, AT, QM, MD, FM> L3AssetBuilder<LM, AT, QM, MD, FM>
370where
371 AT: AssetType + Clone + 'static,
372 MD: MarketDepth + L3MarketDepth + 'static,
373 QM: L3QueueModel<MD> + 'static,
374 LM: LatencyModel + Clone + 'static,
375 FM: FeeModel + Clone + 'static,
376 BacktestError: From<<MD as L3MarketDepth>::Error>,
377{
378 pub fn new() -> Self {
380 Self {
381 latency_model: None,
382 asset_type: None,
383 data: vec![],
384 parallel_load: false,
385 latency_offset: 0,
386 fee_model: None,
387 exch_kind: ExchangeKind::NoPartialFillExchange,
388 last_trades_cap: 0,
389 queue_model: None,
390 depth_builder: None,
391 }
392 }
393
394 pub fn data(self, data: Vec<DataSource<Event>>) -> Self {
396 Self { data, ..self }
397 }
398
399 pub fn parallel_load(self, parallel_load: bool) -> Self {
403 Self {
404 parallel_load,
405 ..self
406 }
407 }
408
409 pub fn latency_offset(self, latency_offset: i64) -> Self {
413 Self {
414 latency_offset,
415 ..self
416 }
417 }
418
419 pub fn latency_model(self, latency_model: LM) -> Self {
421 Self {
422 latency_model: Some(latency_model),
423 ..self
424 }
425 }
426
427 pub fn asset_type(self, asset_type: AT) -> Self {
429 Self {
430 asset_type: Some(asset_type),
431 ..self
432 }
433 }
434
435 pub fn fee_model(self, fee_model: FM) -> Self {
437 Self {
438 fee_model: Some(fee_model),
439 ..self
440 }
441 }
442
443 pub fn exchange(self, exch_kind: ExchangeKind) -> Self {
445 Self { exch_kind, ..self }
446 }
447
448 pub fn last_trades_capacity(self, capacity: usize) -> Self {
451 Self {
452 last_trades_cap: capacity,
453 ..self
454 }
455 }
456
457 pub fn queue_model(self, queue_model: QM) -> Self {
459 Self {
460 queue_model: Some(queue_model),
461 ..self
462 }
463 }
464
465 pub fn depth<Builder>(self, builder: Builder) -> Self
467 where
468 Builder: Fn() -> MD + 'static,
469 {
470 Self {
471 depth_builder: Some(Box::new(builder)),
472 ..self
473 }
474 }
475
476 pub fn build(self) -> Result<Asset<dyn LocalProcessor<MD>, dyn Processor, Event>, BuildError> {
478 let reader = if self.latency_offset == 0 {
479 Reader::builder()
480 .parallel_load(self.parallel_load)
481 .data(self.data)
482 .build()
483 .map_err(|err| BuildError::Error(err.into()))?
484 } else {
485 Reader::builder()
486 .parallel_load(self.parallel_load)
487 .data(self.data)
488 .preprocessor(FeedLatencyAdjustment::new(self.latency_offset))
489 .build()
490 .map_err(|err| BuildError::Error(err.into()))?
491 };
492
493 let create_depth = self
494 .depth_builder
495 .as_ref()
496 .ok_or(BuildError::BuilderIncomplete("depth"))?;
497 let order_latency = self
498 .latency_model
499 .clone()
500 .ok_or(BuildError::BuilderIncomplete("order_latency"))?;
501 let asset_type = self
502 .asset_type
503 .clone()
504 .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
505 let fee_model = self
506 .fee_model
507 .clone()
508 .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
509
510 let (order_e2l, order_l2e) = order_bus(order_latency);
511
512 let local = L3Local::new(
513 create_depth(),
514 State::new(asset_type, fee_model),
515 self.last_trades_cap,
516 order_l2e,
517 );
518
519 let queue_model = self
520 .queue_model
521 .ok_or(BuildError::BuilderIncomplete("queue_model"))?;
522 let asset_type = self
523 .asset_type
524 .clone()
525 .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
526 let fee_model = self
527 .fee_model
528 .clone()
529 .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
530
531 match self.exch_kind {
532 ExchangeKind::NoPartialFillExchange => {
533 let exch = L3NoPartialFillExchange::new(
534 create_depth(),
535 State::new(asset_type, fee_model),
536 queue_model,
537 order_e2l,
538 );
539
540 Ok(Asset {
541 local: Box::new(local),
542 exch: Box::new(exch),
543 reader,
544 })
545 }
546 ExchangeKind::PartialFillExchange => {
547 unimplemented!();
548 }
549 }
550 }
551}
552
553impl<LM, AT, QM, MD, FM> Default for L3AssetBuilder<LM, AT, QM, MD, FM>
554where
555 AT: AssetType + Clone + 'static,
556 MD: MarketDepth + L3MarketDepth + 'static,
557 QM: L3QueueModel<MD> + 'static,
558 LM: LatencyModel + Clone + 'static,
559 FM: FeeModel + Clone + 'static,
560 BacktestError: From<<MD as L3MarketDepth>::Error>,
561{
562 fn default() -> Self {
563 Self::new()
564 }
565}
566
567pub struct BacktestBuilder<MD> {
569 local: Vec<BacktestProcessorState<Box<dyn LocalProcessor<MD>>>>,
570 exch: Vec<BacktestProcessorState<Box<dyn Processor>>>,
571}
572
573impl<MD> BacktestBuilder<MD> {
574 pub fn add_asset(self, asset: Asset<dyn LocalProcessor<MD>, dyn Processor, Event>) -> Self {
576 let mut self_ = Self { ..self };
577 self_.local.push(BacktestProcessorState::new(
578 asset.local,
579 asset.reader.clone(),
580 ));
581 self_
582 .exch
583 .push(BacktestProcessorState::new(asset.exch, asset.reader));
584 self_
585 }
586
587 pub fn build(self) -> Result<Backtest<MD>, BuildError> {
589 let num_assets = self.local.len();
590 if self.local.len() != num_assets || self.exch.len() != num_assets {
591 panic!();
592 }
593 Ok(Backtest {
594 cur_ts: i64::MAX,
595 evs: EventSet::new(num_assets),
596 local: self.local,
597 exch: self.exch,
598 })
599 }
600}
601
602pub struct Backtest<MD> {
606 cur_ts: i64,
607 evs: EventSet,
608 local: Vec<BacktestProcessorState<Box<dyn LocalProcessor<MD>>>>,
609 exch: Vec<BacktestProcessorState<Box<dyn Processor>>>,
610}
611
612impl<P: Processor> Deref for BacktestProcessorState<P> {
613 type Target = P;
614
615 fn deref(&self) -> &Self::Target {
616 &self.processor
617 }
618}
619
620impl<P: Processor> DerefMut for BacktestProcessorState<P> {
621 fn deref_mut(&mut self) -> &mut Self::Target {
622 &mut self.processor
623 }
624}
625
626pub struct BacktestProcessorState<P: Processor> {
628 data: Data<Event>,
629 processor: P,
630 reader: Reader<Event>,
631 row: Option<usize>,
632}
633
634impl<P: Processor> BacktestProcessorState<P> {
635 fn new(processor: P, reader: Reader<Event>) -> BacktestProcessorState<P> {
636 Self {
637 data: Data::empty(),
638 processor,
639 reader,
640 row: None,
641 }
642 }
643
644 fn next_row(&mut self) -> Result<usize, BacktestError> {
647 if self.row.is_none() {
648 let _ = self.advance()?;
649 }
650
651 self.row.ok_or(BacktestError::EndOfData)
652 }
653
654 fn advance(&mut self) -> Result<i64, BacktestError> {
657 loop {
658 let start = self.row.map(|rn| rn + 1).unwrap_or(0);
659
660 for rn in start..self.data.len() {
661 if let Some(ts) = self.processor.event_seen_timestamp(&self.data[rn]) {
662 self.row = Some(rn);
663 return Ok(ts);
664 }
665 }
666
667 let next = self.reader.next_data()?;
668
669 self.reader.release(std::mem::replace(&mut self.data, next));
670 self.row = None;
671 }
672 }
673}
674
675impl<MD> Backtest<MD>
676where
677 MD: MarketDepth,
678{
679 pub fn builder() -> BacktestBuilder<MD> {
680 BacktestBuilder {
681 local: vec![],
682 exch: vec![],
683 }
684 }
685
686 pub fn new(
687 local: Vec<Box<dyn LocalProcessor<MD>>>,
688 exch: Vec<Box<dyn Processor>>,
689 reader: Vec<Reader<Event>>,
690 ) -> Self {
691 let num_assets = local.len();
692 if local.len() != num_assets || exch.len() != num_assets || reader.len() != num_assets {
693 panic!();
694 }
695
696 let local = local
697 .into_iter()
698 .zip(reader.iter())
699 .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
700 .collect();
701 let exch = exch
702 .into_iter()
703 .zip(reader.iter())
704 .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
705 .collect();
706
707 Self {
708 local,
709 exch,
710 cur_ts: i64::MAX,
711 evs: EventSet::new(num_assets),
712 }
713 }
714
715 fn initialize_evs(&mut self) -> Result<(), BacktestError> {
716 for (asset_no, local) in self.local.iter_mut().enumerate() {
717 match local.advance() {
718 Ok(ts) => self.evs.update_local_data(asset_no, ts),
719 Err(BacktestError::EndOfData) => {
720 self.evs.invalidate_local_data(asset_no);
721 }
722 Err(e) => {
723 return Err(e);
724 }
725 }
726 }
727 for (asset_no, exch) in self.exch.iter_mut().enumerate() {
728 match exch.advance() {
729 Ok(ts) => self.evs.update_exch_data(asset_no, ts),
730 Err(BacktestError::EndOfData) => {
731 self.evs.invalidate_exch_data(asset_no);
732 }
733 Err(e) => {
734 return Err(e);
735 }
736 }
737 }
738 Ok(())
739 }
740
741 pub fn goto_end(&mut self) -> Result<ElapseResult, BacktestError> {
742 if self.cur_ts == i64::MAX {
743 self.initialize_evs()?;
744 match self.evs.next() {
745 Some(ev) => {
746 self.cur_ts = ev.timestamp;
747 }
748 None => {
749 return Ok(ElapseResult::EndOfData);
750 }
751 }
752 }
753 self.goto::<false>(UNTIL_END_OF_DATA, WaitOrderResponse::None)
754 }
755
756 fn goto<const WAIT_NEXT_FEED: bool>(
757 &mut self,
758 timestamp: i64,
759 wait_order_response: WaitOrderResponse,
760 ) -> Result<ElapseResult, BacktestError> {
761 let mut result = ElapseResult::Ok;
762 let mut timestamp = timestamp;
763 for (asset_no, local) in self.local.iter().enumerate() {
764 self.evs
765 .update_exch_order(asset_no, local.earliest_send_order_timestamp());
766 self.evs
767 .update_local_order(asset_no, local.earliest_recv_order_timestamp());
768 }
769 loop {
770 match self.evs.next() {
771 Some(ev) => {
772 if ev.timestamp > timestamp {
773 self.cur_ts = timestamp;
774 return Ok(result);
775 }
776 match ev.kind {
777 EventIntentKind::LocalData => {
778 let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
779 let next = local.next_row().and_then(|row| {
780 local.processor.process(&local.data[row])?;
781 local.advance()
782 });
783
784 match next {
785 Ok(next_ts) => {
786 self.evs.update_local_data(ev.asset_no, next_ts);
787 }
788 Err(BacktestError::EndOfData) => {
789 self.evs.invalidate_local_data(ev.asset_no);
790 }
791 Err(e) => {
792 return Err(e);
793 }
794 }
795 if WAIT_NEXT_FEED {
796 timestamp = ev.timestamp;
797 result = ElapseResult::MarketFeed;
798 }
799 }
800 EventIntentKind::LocalOrder => {
801 let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
802 let wait_order_resp_id = match wait_order_response {
803 WaitOrderResponse::Specified {
804 asset_no: wait_order_asset_no,
805 order_id: wait_order_id,
806 } if ev.asset_no == wait_order_asset_no => Some(wait_order_id),
807 _ => None,
808 };
809 if local.process_recv_order(ev.timestamp, wait_order_resp_id)?
810 || wait_order_response == WaitOrderResponse::Any
811 {
812 timestamp = ev.timestamp;
813 if WAIT_NEXT_FEED {
814 result = ElapseResult::OrderResponse;
815 }
816 }
817 self.evs.update_local_order(
818 ev.asset_no,
819 local.earliest_recv_order_timestamp(),
820 );
821 }
822 EventIntentKind::ExchData => {
823 let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
824 let next = exch.next_row().and_then(|row| {
825 exch.processor.process(&exch.data[row])?;
826 exch.advance()
827 });
828
829 match next {
830 Ok(next_ts) => {
831 self.evs.update_exch_data(ev.asset_no, next_ts);
832 }
833 Err(BacktestError::EndOfData) => {
834 self.evs.invalidate_exch_data(ev.asset_no);
835 }
836 Err(e) => {
837 return Err(e);
838 }
839 }
840 self.evs.update_local_order(
841 ev.asset_no,
842 exch.earliest_send_order_timestamp(),
843 );
844 }
845 EventIntentKind::ExchOrder => {
846 let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
847 let _ = exch.process_recv_order(ev.timestamp, None)?;
848 self.evs.update_exch_order(
849 ev.asset_no,
850 exch.earliest_recv_order_timestamp(),
851 );
852 self.evs.update_local_order(
853 ev.asset_no,
854 exch.earliest_send_order_timestamp(),
855 );
856 }
857 }
858 }
859 None => {
860 return Ok(ElapseResult::EndOfData);
861 }
862 }
863 }
864 }
865}
866
867impl<MD> Bot<MD> for Backtest<MD>
868where
869 MD: MarketDepth,
870{
871 type Error = BacktestError;
872
873 #[inline]
874 fn current_timestamp(&self) -> i64 {
875 self.cur_ts
876 }
877
878 #[inline]
879 fn num_assets(&self) -> usize {
880 self.local.len()
881 }
882
883 #[inline]
884 fn position(&self, asset_no: usize) -> f64 {
885 self.local.get(asset_no).unwrap().position()
886 }
887
888 #[inline]
889 fn state_values(&self, asset_no: usize) -> &StateValues {
890 self.local.get(asset_no).unwrap().state_values()
891 }
892
893 fn depth(&self, asset_no: usize) -> &MD {
894 self.local.get(asset_no).unwrap().depth()
895 }
896
897 fn last_trades(&self, asset_no: usize) -> &[Event] {
898 self.local.get(asset_no).unwrap().last_trades()
899 }
900
901 #[inline]
902 fn clear_last_trades(&mut self, asset_no: Option<usize>) {
903 match asset_no {
904 Some(an) => {
905 let local = self.local.get_mut(an).unwrap();
906 local.clear_last_trades();
907 }
908 None => {
909 for local in self.local.iter_mut() {
910 local.clear_last_trades();
911 }
912 }
913 }
914 }
915
916 #[inline]
917 fn orders(&self, asset_no: usize) -> &HashMap<u64, Order> {
918 self.local.get(asset_no).unwrap().orders()
919 }
920
921 #[inline]
922 fn submit_buy_order(
923 &mut self,
924 asset_no: usize,
925 order_id: OrderId,
926 price: f64,
927 qty: f64,
928 time_in_force: TimeInForce,
929 order_type: OrdType,
930 wait: bool,
931 ) -> Result<ElapseResult, Self::Error> {
932 let local = self.local.get_mut(asset_no).unwrap();
933 local.submit_order(
934 order_id,
935 Side::Buy,
936 price,
937 qty,
938 order_type,
939 time_in_force,
940 self.cur_ts,
941 )?;
942
943 if wait {
944 return self.goto::<false>(
945 UNTIL_END_OF_DATA,
946 WaitOrderResponse::Specified { asset_no, order_id },
947 );
948 }
949 Ok(ElapseResult::Ok)
950 }
951
952 #[inline]
953 fn submit_sell_order(
954 &mut self,
955 asset_no: usize,
956 order_id: OrderId,
957 price: f64,
958 qty: f64,
959 time_in_force: TimeInForce,
960 order_type: OrdType,
961 wait: bool,
962 ) -> Result<ElapseResult, Self::Error> {
963 let local = self.local.get_mut(asset_no).unwrap();
964 local.submit_order(
965 order_id,
966 Side::Sell,
967 price,
968 qty,
969 order_type,
970 time_in_force,
971 self.cur_ts,
972 )?;
973
974 if wait {
975 return self.goto::<false>(
976 UNTIL_END_OF_DATA,
977 WaitOrderResponse::Specified { asset_no, order_id },
978 );
979 }
980 Ok(ElapseResult::Ok)
981 }
982
983 fn submit_order(
984 &mut self,
985 asset_no: usize,
986 order: OrderRequest,
987 wait: bool,
988 ) -> Result<ElapseResult, Self::Error> {
989 let local = self.local.get_mut(asset_no).unwrap();
990 local.submit_order(
991 order.order_id,
992 Side::Sell,
993 order.price,
994 order.qty,
995 order.order_type,
996 order.time_in_force,
997 self.cur_ts,
998 )?;
999
1000 if wait {
1001 return self.goto::<false>(
1002 UNTIL_END_OF_DATA,
1003 WaitOrderResponse::Specified {
1004 asset_no,
1005 order_id: order.order_id,
1006 },
1007 );
1008 }
1009 Ok(ElapseResult::Ok)
1010 }
1011
1012 #[inline]
1013 fn modify(
1014 &mut self,
1015 asset_no: usize,
1016 order_id: OrderId,
1017 price: f64,
1018 qty: f64,
1019 wait: bool,
1020 ) -> Result<ElapseResult, Self::Error> {
1021 let local = self.local.get_mut(asset_no).unwrap();
1022 local.modify(order_id, price, qty, self.cur_ts)?;
1023
1024 if wait {
1025 return self.goto::<false>(
1026 UNTIL_END_OF_DATA,
1027 WaitOrderResponse::Specified { asset_no, order_id },
1028 );
1029 }
1030 Ok(ElapseResult::Ok)
1031 }
1032
1033 #[inline]
1034 fn cancel(
1035 &mut self,
1036 asset_no: usize,
1037 order_id: OrderId,
1038 wait: bool,
1039 ) -> Result<ElapseResult, Self::Error> {
1040 let local = self.local.get_mut(asset_no).unwrap();
1041 local.cancel(order_id, self.cur_ts)?;
1042
1043 if wait {
1044 return self.goto::<false>(
1045 UNTIL_END_OF_DATA,
1046 WaitOrderResponse::Specified { asset_no, order_id },
1047 );
1048 }
1049 Ok(ElapseResult::Ok)
1050 }
1051
1052 #[inline]
1053 fn clear_inactive_orders(&mut self, asset_no: Option<usize>) {
1054 match asset_no {
1055 Some(asset_no) => {
1056 self.local
1057 .get_mut(asset_no)
1058 .unwrap()
1059 .clear_inactive_orders();
1060 }
1061 None => {
1062 for local in self.local.iter_mut() {
1063 local.clear_inactive_orders();
1064 }
1065 }
1066 }
1067 }
1068
1069 #[inline]
1070 fn wait_order_response(
1071 &mut self,
1072 asset_no: usize,
1073 order_id: OrderId,
1074 timeout: i64,
1075 ) -> Result<ElapseResult, BacktestError> {
1076 self.goto::<false>(
1077 self.cur_ts + timeout,
1078 WaitOrderResponse::Specified { asset_no, order_id },
1079 )
1080 }
1081
1082 #[inline]
1083 fn wait_next_feed(
1084 &mut self,
1085 include_order_resp: bool,
1086 timeout: i64,
1087 ) -> Result<ElapseResult, Self::Error> {
1088 if self.cur_ts == i64::MAX {
1089 self.initialize_evs()?;
1090 match self.evs.next() {
1091 Some(ev) => {
1092 self.cur_ts = ev.timestamp;
1093 }
1094 None => {
1095 return Ok(ElapseResult::EndOfData);
1096 }
1097 }
1098 }
1099 if include_order_resp {
1100 self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::Any)
1101 } else {
1102 self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::None)
1103 }
1104 }
1105
1106 #[inline]
1107 fn elapse(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1108 if self.cur_ts == i64::MAX {
1109 self.initialize_evs()?;
1110 match self.evs.next() {
1111 Some(ev) => {
1112 self.cur_ts = ev.timestamp;
1113 }
1114 None => {
1115 return Ok(ElapseResult::EndOfData);
1116 }
1117 }
1118 }
1119 self.goto::<false>(self.cur_ts + duration, WaitOrderResponse::None)
1120 }
1121
1122 #[inline]
1123 fn elapse_bt(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1124 self.elapse(duration)
1125 }
1126
1127 #[inline]
1128 fn close(&mut self) -> Result<(), Self::Error> {
1129 Ok(())
1130 }
1131
1132 #[inline]
1133 fn feed_latency(&self, asset_no: usize) -> Option<(i64, i64)> {
1134 self.local.get(asset_no).unwrap().feed_latency()
1135 }
1136
1137 #[inline]
1138 fn order_latency(&self, asset_no: usize) -> Option<(i64, i64, i64)> {
1139 self.local.get(asset_no).unwrap().order_latency()
1140 }
1141}
1142
1143pub struct MultiAssetSingleExchangeBacktestBuilder<Local: Processor, Exchange: Processor> {
1145 local: Vec<BacktestProcessorState<Local>>,
1146 exch: Vec<BacktestProcessorState<Exchange>>,
1147}
1148
1149impl<Local, Exchange> MultiAssetSingleExchangeBacktestBuilder<Local, Exchange>
1150where
1151 Local: LocalProcessor<HashMapMarketDepth> + 'static,
1152 Exchange: Processor + 'static,
1153{
1154 pub fn add_asset(self, asset: Asset<Local, Exchange, Event>) -> Self {
1156 let mut self_ = Self { ..self };
1157 self_.local.push(BacktestProcessorState::new(
1158 *asset.local,
1159 asset.reader.clone(),
1160 ));
1161 self_.exch.push(BacktestProcessorState::new(
1162 *asset.exch,
1163 asset.reader.clone(),
1164 ));
1165 self_
1166 }
1167
1168 pub fn build(
1170 self,
1171 ) -> Result<MultiAssetSingleExchangeBacktest<HashMapMarketDepth, Local, Exchange>, BuildError>
1172 {
1173 let num_assets = self.local.len();
1174 if self.local.len() != num_assets || self.exch.len() != num_assets {
1175 panic!();
1176 }
1177 Ok(MultiAssetSingleExchangeBacktest {
1178 cur_ts: i64::MAX,
1179 evs: EventSet::new(num_assets),
1180 local: self.local,
1181 exch: self.exch,
1182 _md_marker: Default::default(),
1183 })
1184 }
1185}
1186
1187pub struct MultiAssetSingleExchangeBacktest<MD, Local, Exchange>
1192where
1193 MD: MarketDepth,
1194 Local: LocalProcessor<MD>,
1195 Exchange: Processor,
1196{
1197 cur_ts: i64,
1198 evs: EventSet,
1199 local: Vec<BacktestProcessorState<Local>>,
1200 exch: Vec<BacktestProcessorState<Exchange>>,
1201 _md_marker: PhantomData<MD>,
1202}
1203
1204impl<MD, Local, Exchange> MultiAssetSingleExchangeBacktest<MD, Local, Exchange>
1205where
1206 MD: MarketDepth,
1207 Local: LocalProcessor<MD>,
1208 Exchange: Processor,
1209{
1210 pub fn builder() -> MultiAssetSingleExchangeBacktestBuilder<Local, Exchange> {
1211 MultiAssetSingleExchangeBacktestBuilder {
1212 local: vec![],
1213 exch: vec![],
1214 }
1215 }
1216
1217 pub fn new(local: Vec<Local>, exch: Vec<Exchange>, reader: Vec<Reader<Event>>) -> Self {
1218 let num_assets = local.len();
1219 if local.len() != num_assets || exch.len() != num_assets || reader.len() != num_assets {
1220 panic!();
1221 }
1222
1223 let local = local
1224 .into_iter()
1225 .zip(reader.iter())
1226 .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
1227 .collect();
1228 let exch = exch
1229 .into_iter()
1230 .zip(reader.iter())
1231 .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
1232 .collect();
1233
1234 Self {
1235 local,
1236 exch,
1237 cur_ts: i64::MAX,
1238 evs: EventSet::new(num_assets),
1239 _md_marker: Default::default(),
1240 }
1241 }
1242
1243 fn initialize_evs(&mut self) -> Result<(), BacktestError> {
1244 for (asset_no, local) in self.local.iter_mut().enumerate() {
1245 match local.advance() {
1246 Ok(ts) => self.evs.update_local_data(asset_no, ts),
1247 Err(BacktestError::EndOfData) => {
1248 self.evs.invalidate_local_data(asset_no);
1249 }
1250 Err(e) => {
1251 return Err(e);
1252 }
1253 }
1254 }
1255 for (asset_no, exch) in self.exch.iter_mut().enumerate() {
1256 match exch.advance() {
1257 Ok(ts) => self.evs.update_exch_data(asset_no, ts),
1258 Err(BacktestError::EndOfData) => {
1259 self.evs.invalidate_exch_data(asset_no);
1260 }
1261 Err(e) => {
1262 return Err(e);
1263 }
1264 }
1265 }
1266 Ok(())
1267 }
1268
1269 pub fn goto<const WAIT_NEXT_FEED: bool>(
1270 &mut self,
1271 timestamp: i64,
1272 wait_order_response: WaitOrderResponse,
1273 ) -> Result<ElapseResult, BacktestError> {
1274 let mut result = ElapseResult::Ok;
1275 let mut timestamp = timestamp;
1276 for (asset_no, local) in self.local.iter().enumerate() {
1277 self.evs
1278 .update_exch_order(asset_no, local.earliest_send_order_timestamp());
1279 self.evs
1280 .update_local_order(asset_no, local.earliest_recv_order_timestamp());
1281 }
1282 loop {
1283 match self.evs.next() {
1284 Some(ev) => {
1285 if ev.timestamp > timestamp {
1286 self.cur_ts = timestamp;
1287 return Ok(result);
1288 }
1289 match ev.kind {
1290 EventIntentKind::LocalData => {
1291 let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
1292 let next = local.next_row().and_then(|row| {
1293 local.processor.process(&local.data[row])?;
1294 local.advance()
1295 });
1296
1297 match next {
1298 Ok(next_ts) => {
1299 self.evs.update_local_data(ev.asset_no, next_ts);
1300 }
1301 Err(BacktestError::EndOfData) => {
1302 self.evs.invalidate_local_data(ev.asset_no);
1303 }
1304 Err(e) => {
1305 return Err(e);
1306 }
1307 }
1308 if WAIT_NEXT_FEED {
1309 timestamp = ev.timestamp;
1310 result = ElapseResult::MarketFeed;
1311 }
1312 }
1313 EventIntentKind::LocalOrder => {
1314 let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
1315 let wait_order_resp_id = match wait_order_response {
1316 WaitOrderResponse::Specified {
1317 asset_no: wait_order_asset_no,
1318 order_id: wait_order_id,
1319 } if ev.asset_no == wait_order_asset_no => Some(wait_order_id),
1320 _ => None,
1321 };
1322 if local.process_recv_order(ev.timestamp, wait_order_resp_id)?
1323 || wait_order_response == WaitOrderResponse::Any
1324 {
1325 timestamp = ev.timestamp;
1326 if WAIT_NEXT_FEED {
1327 result = ElapseResult::OrderResponse;
1328 }
1329 }
1330 self.evs.update_local_order(
1331 ev.asset_no,
1332 local.earliest_recv_order_timestamp(),
1333 );
1334 }
1335 EventIntentKind::ExchData => {
1336 let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
1337 let next = exch.next_row().and_then(|row| {
1338 exch.processor.process(&exch.data[row])?;
1339 exch.advance()
1340 });
1341
1342 match next {
1343 Ok(next_ts) => {
1344 self.evs.update_exch_data(ev.asset_no, next_ts);
1345 }
1346 Err(BacktestError::EndOfData) => {
1347 self.evs.invalidate_exch_data(ev.asset_no);
1348 }
1349 Err(e) => {
1350 return Err(e);
1351 }
1352 }
1353 self.evs.update_local_order(
1354 ev.asset_no,
1355 exch.earliest_send_order_timestamp(),
1356 );
1357 }
1358 EventIntentKind::ExchOrder => {
1359 let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
1360 let _ = exch.process_recv_order(ev.timestamp, None)?;
1361 self.evs.update_exch_order(
1362 ev.asset_no,
1363 exch.earliest_recv_order_timestamp(),
1364 );
1365 self.evs.update_local_order(
1366 ev.asset_no,
1367 exch.earliest_send_order_timestamp(),
1368 );
1369 }
1370 }
1371 }
1372 None => {
1373 return Ok(ElapseResult::EndOfData);
1374 }
1375 }
1376 }
1377 }
1378}
1379
1380impl<MD, Local, Exchange> Bot<MD> for MultiAssetSingleExchangeBacktest<MD, Local, Exchange>
1381where
1382 MD: MarketDepth,
1383 Local: LocalProcessor<MD>,
1384 Exchange: Processor,
1385{
1386 type Error = BacktestError;
1387
1388 #[inline]
1389 fn current_timestamp(&self) -> i64 {
1390 self.cur_ts
1391 }
1392
1393 #[inline]
1394 fn num_assets(&self) -> usize {
1395 self.local.len()
1396 }
1397
1398 #[inline]
1399 fn position(&self, asset_no: usize) -> f64 {
1400 self.local.get(asset_no).unwrap().position()
1401 }
1402
1403 #[inline]
1404 fn state_values(&self, asset_no: usize) -> &StateValues {
1405 self.local.get(asset_no).unwrap().state_values()
1406 }
1407
1408 fn depth(&self, asset_no: usize) -> &MD {
1409 self.local.get(asset_no).unwrap().depth()
1410 }
1411
1412 fn last_trades(&self, asset_no: usize) -> &[Event] {
1413 self.local.get(asset_no).unwrap().last_trades()
1414 }
1415
1416 #[inline]
1417 fn clear_last_trades(&mut self, asset_no: Option<usize>) {
1418 match asset_no {
1419 Some(an) => {
1420 let local = self.local.get_mut(an).unwrap();
1421 local.clear_last_trades();
1422 }
1423 None => {
1424 for local in self.local.iter_mut() {
1425 local.clear_last_trades();
1426 }
1427 }
1428 }
1429 }
1430
1431 #[inline]
1432 fn orders(&self, asset_no: usize) -> &HashMap<OrderId, Order> {
1433 self.local.get(asset_no).unwrap().orders()
1434 }
1435
1436 #[inline]
1437 fn submit_buy_order(
1438 &mut self,
1439 asset_no: usize,
1440 order_id: OrderId,
1441 price: f64,
1442 qty: f64,
1443 time_in_force: TimeInForce,
1444 order_type: OrdType,
1445 wait: bool,
1446 ) -> Result<ElapseResult, Self::Error> {
1447 let local = self.local.get_mut(asset_no).unwrap();
1448 local.submit_order(
1449 order_id,
1450 Side::Buy,
1451 price,
1452 qty,
1453 order_type,
1454 time_in_force,
1455 self.cur_ts,
1456 )?;
1457
1458 if wait {
1459 return self.goto::<false>(
1460 UNTIL_END_OF_DATA,
1461 WaitOrderResponse::Specified { asset_no, order_id },
1462 );
1463 }
1464 Ok(ElapseResult::Ok)
1465 }
1466
1467 #[inline]
1468 fn submit_sell_order(
1469 &mut self,
1470 asset_no: usize,
1471 order_id: OrderId,
1472 price: f64,
1473 qty: f64,
1474 time_in_force: TimeInForce,
1475 order_type: OrdType,
1476 wait: bool,
1477 ) -> Result<ElapseResult, Self::Error> {
1478 let local = self.local.get_mut(asset_no).unwrap();
1479 local.submit_order(
1480 order_id,
1481 Side::Sell,
1482 price,
1483 qty,
1484 order_type,
1485 time_in_force,
1486 self.cur_ts,
1487 )?;
1488
1489 if wait {
1490 return self.goto::<false>(
1491 UNTIL_END_OF_DATA,
1492 WaitOrderResponse::Specified { asset_no, order_id },
1493 );
1494 }
1495 Ok(ElapseResult::Ok)
1496 }
1497
1498 fn submit_order(
1499 &mut self,
1500 asset_no: usize,
1501 order: OrderRequest,
1502 wait: bool,
1503 ) -> Result<ElapseResult, Self::Error> {
1504 let local = self.local.get_mut(asset_no).unwrap();
1505 local.submit_order(
1506 order.order_id,
1507 Side::Sell,
1508 order.price,
1509 order.qty,
1510 order.order_type,
1511 order.time_in_force,
1512 self.cur_ts,
1513 )?;
1514
1515 if wait {
1516 return self.goto::<false>(
1517 UNTIL_END_OF_DATA,
1518 WaitOrderResponse::Specified {
1519 asset_no,
1520 order_id: order.order_id,
1521 },
1522 );
1523 }
1524 Ok(ElapseResult::Ok)
1525 }
1526
1527 #[inline]
1528 fn modify(
1529 &mut self,
1530 asset_no: usize,
1531 order_id: OrderId,
1532 price: f64,
1533 qty: f64,
1534 wait: bool,
1535 ) -> Result<ElapseResult, Self::Error> {
1536 let local = self.local.get_mut(asset_no).unwrap();
1537 local.modify(order_id, price, qty, self.cur_ts)?;
1538
1539 if wait {
1540 return self.goto::<false>(
1541 UNTIL_END_OF_DATA,
1542 WaitOrderResponse::Specified { asset_no, order_id },
1543 );
1544 }
1545 Ok(ElapseResult::Ok)
1546 }
1547
1548 #[inline]
1549 fn cancel(
1550 &mut self,
1551 asset_no: usize,
1552 order_id: OrderId,
1553 wait: bool,
1554 ) -> Result<ElapseResult, Self::Error> {
1555 let local = self.local.get_mut(asset_no).unwrap();
1556 local.cancel(order_id, self.cur_ts)?;
1557
1558 if wait {
1559 return self.goto::<false>(
1560 UNTIL_END_OF_DATA,
1561 WaitOrderResponse::Specified { asset_no, order_id },
1562 );
1563 }
1564 Ok(ElapseResult::Ok)
1565 }
1566
1567 #[inline]
1568 fn clear_inactive_orders(&mut self, asset_no: Option<usize>) {
1569 match asset_no {
1570 Some(asset_no) => {
1571 self.local
1572 .get_mut(asset_no)
1573 .unwrap()
1574 .clear_inactive_orders();
1575 }
1576 None => {
1577 for local in self.local.iter_mut() {
1578 local.clear_inactive_orders();
1579 }
1580 }
1581 }
1582 }
1583
1584 #[inline]
1585 fn wait_order_response(
1586 &mut self,
1587 asset_no: usize,
1588 order_id: OrderId,
1589 timeout: i64,
1590 ) -> Result<ElapseResult, BacktestError> {
1591 self.goto::<false>(
1592 self.cur_ts + timeout,
1593 WaitOrderResponse::Specified { asset_no, order_id },
1594 )
1595 }
1596
1597 fn wait_next_feed(
1598 &mut self,
1599 include_order_resp: bool,
1600 timeout: i64,
1601 ) -> Result<ElapseResult, Self::Error> {
1602 if self.cur_ts == i64::MAX {
1603 self.initialize_evs()?;
1604 match self.evs.next() {
1605 Some(ev) => {
1606 self.cur_ts = ev.timestamp;
1607 }
1608 None => {
1609 return Ok(ElapseResult::EndOfData);
1610 }
1611 }
1612 }
1613 if include_order_resp {
1614 self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::Any)
1615 } else {
1616 self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::None)
1617 }
1618 }
1619
1620 #[inline]
1621 fn elapse(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1622 if self.cur_ts == i64::MAX {
1623 self.initialize_evs()?;
1624 match self.evs.next() {
1625 Some(ev) => {
1626 self.cur_ts = ev.timestamp;
1627 }
1628 None => {
1629 return Ok(ElapseResult::EndOfData);
1630 }
1631 }
1632 }
1633 self.goto::<false>(self.cur_ts + duration, WaitOrderResponse::None)
1634 }
1635
1636 #[inline]
1637 fn elapse_bt(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1638 self.elapse(duration)
1639 }
1640
1641 #[inline]
1642 fn close(&mut self) -> Result<(), Self::Error> {
1643 Ok(())
1644 }
1645
1646 #[inline]
1647 fn feed_latency(&self, asset_no: usize) -> Option<(i64, i64)> {
1648 self.local.get(asset_no).unwrap().feed_latency()
1649 }
1650
1651 #[inline]
1652 fn order_latency(&self, asset_no: usize) -> Option<(i64, i64, i64)> {
1653 self.local.get(asset_no).unwrap().order_latency()
1654 }
1655}
1656
1657#[cfg(test)]
1658mod test {
1659 use std::error::Error;
1660
1661 use crate::{
1662 backtest::{
1663 Backtest,
1664 DataSource,
1665 ExchangeKind::NoPartialFillExchange,
1666 L2AssetBuilder,
1667 assettype::LinearAsset,
1668 data::Data,
1669 models::{
1670 CommonFees,
1671 ConstantLatency,
1672 PowerProbQueueFunc3,
1673 ProbQueueModel,
1674 TradingValueFeeModel,
1675 },
1676 },
1677 depth::HashMapMarketDepth,
1678 prelude::{Bot, Event},
1679 types::{EXCH_EVENT, LOCAL_EVENT},
1680 };
1681
1682 #[test]
1683 fn skips_unseen_events() -> Result<(), Box<dyn Error>> {
1684 let data = Data::from_data(&[
1685 Event {
1686 ev: EXCH_EVENT | LOCAL_EVENT,
1687 exch_ts: 0,
1688 local_ts: 0,
1689 px: 0.0,
1690 qty: 0.0,
1691 order_id: 0,
1692 ival: 0,
1693 fval: 0.0,
1694 },
1695 Event {
1696 ev: LOCAL_EVENT | EXCH_EVENT,
1697 exch_ts: 1,
1698 local_ts: 1,
1699 px: 0.0,
1700 qty: 0.0,
1701 order_id: 0,
1702 ival: 0,
1703 fval: 0.0,
1704 },
1705 Event {
1706 ev: EXCH_EVENT,
1707 exch_ts: 3,
1708 local_ts: 4,
1709 px: 0.0,
1710 qty: 0.0,
1711 order_id: 0,
1712 ival: 0,
1713 fval: 0.0,
1714 },
1715 Event {
1716 ev: LOCAL_EVENT,
1717 exch_ts: 3,
1718 local_ts: 4,
1719 px: 0.0,
1720 qty: 0.0,
1721 order_id: 0,
1722 ival: 0,
1723 fval: 0.0,
1724 },
1725 ]);
1726
1727 let mut backtester = Backtest::builder()
1728 .add_asset(
1729 L2AssetBuilder::default()
1730 .data(vec![DataSource::Data(data)])
1731 .latency_model(ConstantLatency::new(50, 50))
1732 .asset_type(LinearAsset::new(1.0))
1733 .fee_model(TradingValueFeeModel::new(CommonFees::new(0.0, 0.0)))
1734 .queue_model(ProbQueueModel::new(PowerProbQueueFunc3::new(3.0)))
1735 .exchange(NoPartialFillExchange)
1736 .depth(|| HashMapMarketDepth::new(0.01, 1.0))
1737 .build()?,
1738 )
1739 .build()?;
1740
1741 backtester.elapse_bt(1)?;
1743 assert_eq!(1, backtester.cur_ts);
1744
1745 backtester.elapse_bt(1)?;
1747 assert_eq!(2, backtester.cur_ts);
1748 assert_eq!(Some(3), backtester.local[0].row);
1749 assert_eq!(Some(2), backtester.exch[0].row);
1750
1751 backtester.elapse_bt(1)?;
1752 assert_eq!(3, backtester.cur_ts);
1753
1754 Ok(())
1755 }
1756}