hftbacktest/backtest/
mod.rs

1use std::{
2    collections::HashMap,
3    io::Error as IoError,
4    marker::PhantomData,
5    ops::{Deref, DerefMut},
6};
7
8pub use data::DataSource;
9use data::Reader;
10use models::FeeModel;
11use thiserror::Error;
12
13pub use crate::backtest::{
14    models::L3QueueModel,
15    proc::{L3Local, L3NoPartialFillExchange},
16};
17use crate::{
18    backtest::{
19        assettype::AssetType,
20        data::{Data, FeedLatencyAdjustment, NpyDTyped},
21        evs::{EventIntentKind, EventSet},
22        models::{LatencyModel, QueueModel},
23        order::order_bus,
24        proc::{Local, LocalProcessor, NoPartialFillExchange, PartialFillExchange, Processor},
25        state::State,
26    },
27    depth::{HashMapMarketDepth, L2MarketDepth, L3MarketDepth, MarketDepth},
28    prelude::{
29        Bot,
30        OrdType,
31        Order,
32        OrderId,
33        OrderRequest,
34        Side,
35        StateValues,
36        TimeInForce,
37        UNTIL_END_OF_DATA,
38        WaitOrderResponse,
39    },
40    types::{BuildError, ElapseResult, Event},
41};
42
43/// Provides asset types.
44pub mod assettype;
45
46pub mod models;
47
48/// OrderBus implementation
49pub mod order;
50
51/// Local and exchange models
52pub mod proc;
53
54/// Trading state.
55pub mod state;
56
57/// Recorder for a bot's trading statistics.
58pub mod recorder;
59
60pub mod data;
61mod evs;
62
63/// Errors that can occur during backtesting.
64#[derive(Error, Debug)]
65pub enum BacktestError {
66    #[error("Order related to a given order id already exists")]
67    OrderIdExist,
68    #[error("Order request is in process")]
69    OrderRequestInProcess,
70    #[error("Order not found")]
71    OrderNotFound,
72    #[error("order request is invalid")]
73    InvalidOrderRequest,
74    #[error("order status is invalid to proceed the request")]
75    InvalidOrderStatus,
76    #[error("end of data")]
77    EndOfData,
78    #[error("data error: {0:?}")]
79    DataError(#[from] IoError),
80}
81
82/// Backtesting Asset
83pub struct Asset<L: ?Sized, E: ?Sized, D: NpyDTyped + Clone /* todo: ugly bounds */> {
84    pub local: Box<L>,
85    pub exch: Box<E>,
86    pub reader: Reader<D>,
87}
88
89impl<L, E, D: NpyDTyped + Clone> Asset<L, E, D> {
90    /// Constructs an instance of `Asset`. Use this method if a custom local processor or an
91    /// exchange processor is needed.
92    pub fn new(local: L, exch: E, reader: Reader<D>) -> Self {
93        Self {
94            local: Box::new(local),
95            exch: Box::new(exch),
96            reader,
97        }
98    }
99
100    /// Returns an `L2AssetBuilder`.
101    pub fn l2_builder<LM, AT, QM, MD, FM>() -> L2AssetBuilder<LM, AT, QM, MD, FM>
102    where
103        AT: AssetType + Clone + 'static,
104        MD: MarketDepth + L2MarketDepth + 'static,
105        QM: QueueModel<MD> + 'static,
106        LM: LatencyModel + Clone + 'static,
107        FM: FeeModel + Clone + 'static,
108    {
109        L2AssetBuilder::new()
110    }
111
112    /// Returns an `L3AssetBuilder`.
113    pub fn l3_builder<LM, AT, QM, MD, FM>() -> L3AssetBuilder<LM, AT, QM, MD, FM>
114    where
115        AT: AssetType + Clone + 'static,
116        MD: MarketDepth + L3MarketDepth + 'static,
117        QM: L3QueueModel<MD> + 'static,
118        LM: LatencyModel + Clone + 'static,
119        FM: FeeModel + Clone + 'static,
120        BacktestError: From<<MD as L3MarketDepth>::Error>,
121    {
122        L3AssetBuilder::new()
123    }
124}
125
126/// Exchange model kind.
127pub enum ExchangeKind {
128    /// Uses [NoPartialFillExchange](`NoPartialFillExchange`).
129    NoPartialFillExchange,
130    /// Uses [PartialFillExchange](`PartialFillExchange`).
131    PartialFillExchange,
132}
133
134/// A level-2 asset builder.
135pub struct L2AssetBuilder<LM, AT, QM, MD, FM> {
136    latency_model: Option<LM>,
137    asset_type: Option<AT>,
138    data: Vec<DataSource<Event>>,
139    parallel_load: bool,
140    latency_offset: i64,
141    fee_model: Option<FM>,
142    exch_kind: ExchangeKind,
143    last_trades_cap: usize,
144    queue_model: Option<QM>,
145    depth_builder: Option<Box<dyn Fn() -> MD>>,
146}
147
148impl<LM, AT, QM, MD, FM> L2AssetBuilder<LM, AT, QM, MD, FM>
149where
150    AT: AssetType + Clone + 'static,
151    MD: MarketDepth + L2MarketDepth + 'static,
152    QM: QueueModel<MD> + 'static,
153    LM: LatencyModel + Clone + 'static,
154    FM: FeeModel + Clone + 'static,
155{
156    /// Constructs an `L2AssetBuilder`.
157    pub fn new() -> Self {
158        Self {
159            latency_model: None,
160            asset_type: None,
161            data: vec![],
162            parallel_load: false,
163            latency_offset: 0,
164            fee_model: None,
165            exch_kind: ExchangeKind::NoPartialFillExchange,
166            last_trades_cap: 0,
167            queue_model: None,
168            depth_builder: None,
169        }
170    }
171
172    /// Sets the feed data.
173    pub fn data(self, data: Vec<DataSource<Event>>) -> Self {
174        Self { data, ..self }
175    }
176
177    /// Sets whether to load the next data in parallel with backtesting. This can speed up the
178    /// backtest by reducing data loading time, but it also increases memory usage.
179    /// The default value is `true`.
180    pub fn parallel_load(self, parallel_load: bool) -> Self {
181        Self {
182            parallel_load,
183            ..self
184        }
185    }
186
187    /// Sets the latency offset to adjust the feed latency by the specified amount. This is
188    /// particularly useful in cross-exchange backtesting, where the feed data is collected from a
189    /// different site than the one where the strategy is intended to run.
190    pub fn latency_offset(self, latency_offset: i64) -> Self {
191        Self {
192            latency_offset,
193            ..self
194        }
195    }
196
197    /// Sets a latency model.
198    pub fn latency_model(self, latency_model: LM) -> Self {
199        Self {
200            latency_model: Some(latency_model),
201            ..self
202        }
203    }
204
205    /// Sets an asset type.
206    pub fn asset_type(self, asset_type: AT) -> Self {
207        Self {
208            asset_type: Some(asset_type),
209            ..self
210        }
211    }
212
213    /// Sets a fee model.
214    pub fn fee_model(self, fee_model: FM) -> Self {
215        Self {
216            fee_model: Some(fee_model),
217            ..self
218        }
219    }
220
221    /// Sets an exchange model. The default value is [`NoPartialFillExchange`].
222    pub fn exchange(self, exch_kind: ExchangeKind) -> Self {
223        Self { exch_kind, ..self }
224    }
225
226    /// Sets the initial capacity of the vector storing the last market trades.
227    /// The default value is `0`, indicating that no last trades are stored.
228    pub fn last_trades_capacity(self, capacity: usize) -> Self {
229        Self {
230            last_trades_cap: capacity,
231            ..self
232        }
233    }
234
235    /// Sets a queue model.
236    pub fn queue_model(self, queue_model: QM) -> Self {
237        Self {
238            queue_model: Some(queue_model),
239            ..self
240        }
241    }
242
243    /// Sets a market depth builder.
244    pub fn depth<Builder>(self, builder: Builder) -> Self
245    where
246        Builder: Fn() -> MD + 'static,
247    {
248        Self {
249            depth_builder: Some(Box::new(builder)),
250            ..self
251        }
252    }
253
254    /// Builds an `Asset`.
255    pub fn build(self) -> Result<Asset<dyn LocalProcessor<MD>, dyn Processor, Event>, BuildError> {
256        let reader = if self.latency_offset == 0 {
257            Reader::builder()
258                .parallel_load(self.parallel_load)
259                .data(self.data)
260                .build()
261                .map_err(|err| BuildError::Error(err.into()))?
262        } else {
263            Reader::builder()
264                .parallel_load(self.parallel_load)
265                .data(self.data)
266                .preprocessor(FeedLatencyAdjustment::new(self.latency_offset))
267                .build()
268                .map_err(|err| BuildError::Error(err.into()))?
269        };
270
271        let create_depth = self
272            .depth_builder
273            .as_ref()
274            .ok_or(BuildError::BuilderIncomplete("depth"))?;
275        let order_latency = self
276            .latency_model
277            .clone()
278            .ok_or(BuildError::BuilderIncomplete("order_latency"))?;
279        let asset_type = self
280            .asset_type
281            .clone()
282            .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
283        let fee_model = self
284            .fee_model
285            .clone()
286            .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
287
288        let (order_e2l, order_l2e) = order_bus(order_latency);
289
290        let local = Local::new(
291            create_depth(),
292            State::new(asset_type, fee_model),
293            self.last_trades_cap,
294            order_l2e,
295        );
296
297        let queue_model = self
298            .queue_model
299            .ok_or(BuildError::BuilderIncomplete("queue_model"))?;
300        let asset_type = self
301            .asset_type
302            .clone()
303            .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
304        let fee_model = self
305            .fee_model
306            .clone()
307            .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
308
309        match self.exch_kind {
310            ExchangeKind::NoPartialFillExchange => {
311                let exch = NoPartialFillExchange::new(
312                    create_depth(),
313                    State::new(asset_type, fee_model),
314                    queue_model,
315                    order_e2l,
316                );
317
318                Ok(Asset {
319                    local: Box::new(local),
320                    exch: Box::new(exch),
321                    reader,
322                })
323            }
324            ExchangeKind::PartialFillExchange => {
325                let exch = PartialFillExchange::new(
326                    create_depth(),
327                    State::new(asset_type, fee_model),
328                    queue_model,
329                    order_e2l,
330                );
331
332                Ok(Asset {
333                    local: Box::new(local),
334                    exch: Box::new(exch),
335                    reader,
336                })
337            }
338        }
339    }
340}
341
342impl<LM, AT, QM, MD, FM> Default for L2AssetBuilder<LM, AT, QM, MD, FM>
343where
344    AT: AssetType + Clone + 'static,
345    MD: MarketDepth + L2MarketDepth + 'static,
346    QM: QueueModel<MD> + 'static,
347    LM: LatencyModel + Clone + 'static,
348    FM: FeeModel + Clone + 'static,
349{
350    fn default() -> Self {
351        Self::new()
352    }
353}
354
355/// A level-3 asset builder.
356pub struct L3AssetBuilder<LM, AT, QM, MD, FM> {
357    latency_model: Option<LM>,
358    asset_type: Option<AT>,
359    data: Vec<DataSource<Event>>,
360    parallel_load: bool,
361    latency_offset: i64,
362    fee_model: Option<FM>,
363    exch_kind: ExchangeKind,
364    last_trades_cap: usize,
365    queue_model: Option<QM>,
366    depth_builder: Option<Box<dyn Fn() -> MD>>,
367}
368
369impl<LM, AT, QM, MD, FM> L3AssetBuilder<LM, AT, QM, MD, FM>
370where
371    AT: AssetType + Clone + 'static,
372    MD: MarketDepth + L3MarketDepth + 'static,
373    QM: L3QueueModel<MD> + 'static,
374    LM: LatencyModel + Clone + 'static,
375    FM: FeeModel + Clone + 'static,
376    BacktestError: From<<MD as L3MarketDepth>::Error>,
377{
378    /// Constructs an `L3AssetBuilder`.
379    pub fn new() -> Self {
380        Self {
381            latency_model: None,
382            asset_type: None,
383            data: vec![],
384            parallel_load: false,
385            latency_offset: 0,
386            fee_model: None,
387            exch_kind: ExchangeKind::NoPartialFillExchange,
388            last_trades_cap: 0,
389            queue_model: None,
390            depth_builder: None,
391        }
392    }
393
394    /// Sets the feed data.
395    pub fn data(self, data: Vec<DataSource<Event>>) -> Self {
396        Self { data, ..self }
397    }
398
399    /// Sets whether to load the next data in parallel with backtesting. This can speed up the
400    /// backtest by reducing data loading time, but it also increases memory usage.
401    /// The default value is `true`.
402    pub fn parallel_load(self, parallel_load: bool) -> Self {
403        Self {
404            parallel_load,
405            ..self
406        }
407    }
408
409    /// Sets the latency offset to adjust the feed latency by the specified amount. This is
410    /// particularly useful in cross-exchange backtesting, where the feed data is collected from a
411    /// different site than the one where the strategy is intended to run.
412    pub fn latency_offset(self, latency_offset: i64) -> Self {
413        Self {
414            latency_offset,
415            ..self
416        }
417    }
418
419    /// Sets a latency model.
420    pub fn latency_model(self, latency_model: LM) -> Self {
421        Self {
422            latency_model: Some(latency_model),
423            ..self
424        }
425    }
426
427    /// Sets an asset type.
428    pub fn asset_type(self, asset_type: AT) -> Self {
429        Self {
430            asset_type: Some(asset_type),
431            ..self
432        }
433    }
434
435    /// Sets a fee model.
436    pub fn fee_model(self, fee_model: FM) -> Self {
437        Self {
438            fee_model: Some(fee_model),
439            ..self
440        }
441    }
442
443    /// Sets an exchange model. The default value is [`NoPartialFillExchange`].
444    pub fn exchange(self, exch_kind: ExchangeKind) -> Self {
445        Self { exch_kind, ..self }
446    }
447
448    /// Sets the initial capacity of the vector storing the last market trades.
449    /// The default value is `0`, indicating that no last trades are stored.
450    pub fn last_trades_capacity(self, capacity: usize) -> Self {
451        Self {
452            last_trades_cap: capacity,
453            ..self
454        }
455    }
456
457    /// Sets a queue model.
458    pub fn queue_model(self, queue_model: QM) -> Self {
459        Self {
460            queue_model: Some(queue_model),
461            ..self
462        }
463    }
464
465    /// Sets a market depth builder.
466    pub fn depth<Builder>(self, builder: Builder) -> Self
467    where
468        Builder: Fn() -> MD + 'static,
469    {
470        Self {
471            depth_builder: Some(Box::new(builder)),
472            ..self
473        }
474    }
475
476    /// Builds an `Asset`.
477    pub fn build(self) -> Result<Asset<dyn LocalProcessor<MD>, dyn Processor, Event>, BuildError> {
478        let reader = if self.latency_offset == 0 {
479            Reader::builder()
480                .parallel_load(self.parallel_load)
481                .data(self.data)
482                .build()
483                .map_err(|err| BuildError::Error(err.into()))?
484        } else {
485            Reader::builder()
486                .parallel_load(self.parallel_load)
487                .data(self.data)
488                .preprocessor(FeedLatencyAdjustment::new(self.latency_offset))
489                .build()
490                .map_err(|err| BuildError::Error(err.into()))?
491        };
492
493        let create_depth = self
494            .depth_builder
495            .as_ref()
496            .ok_or(BuildError::BuilderIncomplete("depth"))?;
497        let order_latency = self
498            .latency_model
499            .clone()
500            .ok_or(BuildError::BuilderIncomplete("order_latency"))?;
501        let asset_type = self
502            .asset_type
503            .clone()
504            .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
505        let fee_model = self
506            .fee_model
507            .clone()
508            .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
509
510        let (order_e2l, order_l2e) = order_bus(order_latency);
511
512        let local = L3Local::new(
513            create_depth(),
514            State::new(asset_type, fee_model),
515            self.last_trades_cap,
516            order_l2e,
517        );
518
519        let queue_model = self
520            .queue_model
521            .ok_or(BuildError::BuilderIncomplete("queue_model"))?;
522        let asset_type = self
523            .asset_type
524            .clone()
525            .ok_or(BuildError::BuilderIncomplete("asset_type"))?;
526        let fee_model = self
527            .fee_model
528            .clone()
529            .ok_or(BuildError::BuilderIncomplete("fee_model"))?;
530
531        match self.exch_kind {
532            ExchangeKind::NoPartialFillExchange => {
533                let exch = L3NoPartialFillExchange::new(
534                    create_depth(),
535                    State::new(asset_type, fee_model),
536                    queue_model,
537                    order_e2l,
538                );
539
540                Ok(Asset {
541                    local: Box::new(local),
542                    exch: Box::new(exch),
543                    reader,
544                })
545            }
546            ExchangeKind::PartialFillExchange => {
547                unimplemented!();
548            }
549        }
550    }
551}
552
553impl<LM, AT, QM, MD, FM> Default for L3AssetBuilder<LM, AT, QM, MD, FM>
554where
555    AT: AssetType + Clone + 'static,
556    MD: MarketDepth + L3MarketDepth + 'static,
557    QM: L3QueueModel<MD> + 'static,
558    LM: LatencyModel + Clone + 'static,
559    FM: FeeModel + Clone + 'static,
560    BacktestError: From<<MD as L3MarketDepth>::Error>,
561{
562    fn default() -> Self {
563        Self::new()
564    }
565}
566
567/// [`Backtest`] builder.
568pub struct BacktestBuilder<MD> {
569    local: Vec<BacktestProcessorState<Box<dyn LocalProcessor<MD>>>>,
570    exch: Vec<BacktestProcessorState<Box<dyn Processor>>>,
571}
572
573impl<MD> BacktestBuilder<MD> {
574    /// Adds [`Asset`], which will undergo simulation within the backtester.
575    pub fn add_asset(self, asset: Asset<dyn LocalProcessor<MD>, dyn Processor, Event>) -> Self {
576        let mut self_ = Self { ..self };
577        self_.local.push(BacktestProcessorState::new(
578            asset.local,
579            asset.reader.clone(),
580        ));
581        self_
582            .exch
583            .push(BacktestProcessorState::new(asset.exch, asset.reader));
584        self_
585    }
586
587    /// Builds [`Backtest`].
588    pub fn build(self) -> Result<Backtest<MD>, BuildError> {
589        let num_assets = self.local.len();
590        if self.local.len() != num_assets || self.exch.len() != num_assets {
591            panic!();
592        }
593        Ok(Backtest {
594            cur_ts: i64::MAX,
595            evs: EventSet::new(num_assets),
596            local: self.local,
597            exch: self.exch,
598        })
599    }
600}
601
602/// This backtester provides multi-asset and multi-exchange model backtesting, allowing you to
603/// configure different setups such as queue models or asset types for each asset. However, this may
604/// result in slightly slower performance compared to [`Backtest`].
605pub struct Backtest<MD> {
606    cur_ts: i64,
607    evs: EventSet,
608    local: Vec<BacktestProcessorState<Box<dyn LocalProcessor<MD>>>>,
609    exch: Vec<BacktestProcessorState<Box<dyn Processor>>>,
610}
611
612impl<P: Processor> Deref for BacktestProcessorState<P> {
613    type Target = P;
614
615    fn deref(&self) -> &Self::Target {
616        &self.processor
617    }
618}
619
620impl<P: Processor> DerefMut for BacktestProcessorState<P> {
621    fn deref_mut(&mut self) -> &mut Self::Target {
622        &mut self.processor
623    }
624}
625
626/// Per asset backtesting state used internally to advance event buffers.
627pub struct BacktestProcessorState<P: Processor> {
628    data: Data<Event>,
629    processor: P,
630    reader: Reader<Event>,
631    row: Option<usize>,
632}
633
634impl<P: Processor> BacktestProcessorState<P> {
635    fn new(processor: P, reader: Reader<Event>) -> BacktestProcessorState<P> {
636        Self {
637            data: Data::empty(),
638            processor,
639            reader,
640            row: None,
641        }
642    }
643
644    /// Get the index of the next available row, only advancing the reader if there's no
645    /// row currently available.
646    fn next_row(&mut self) -> Result<usize, BacktestError> {
647        if self.row.is_none() {
648            let _ = self.advance()?;
649        }
650
651        self.row.ok_or(BacktestError::EndOfData)
652    }
653
654    /// Advance the state of this processor to the next available event and return the
655    /// timestamp it occurred at, if any.
656    fn advance(&mut self) -> Result<i64, BacktestError> {
657        loop {
658            let start = self.row.map(|rn| rn + 1).unwrap_or(0);
659
660            for rn in start..self.data.len() {
661                if let Some(ts) = self.processor.event_seen_timestamp(&self.data[rn]) {
662                    self.row = Some(rn);
663                    return Ok(ts);
664                }
665            }
666
667            let next = self.reader.next_data()?;
668
669            self.reader.release(std::mem::replace(&mut self.data, next));
670            self.row = None;
671        }
672    }
673}
674
675impl<MD> Backtest<MD>
676where
677    MD: MarketDepth,
678{
679    pub fn builder() -> BacktestBuilder<MD> {
680        BacktestBuilder {
681            local: vec![],
682            exch: vec![],
683        }
684    }
685
686    pub fn new(
687        local: Vec<Box<dyn LocalProcessor<MD>>>,
688        exch: Vec<Box<dyn Processor>>,
689        reader: Vec<Reader<Event>>,
690    ) -> Self {
691        let num_assets = local.len();
692        if local.len() != num_assets || exch.len() != num_assets || reader.len() != num_assets {
693            panic!();
694        }
695
696        let local = local
697            .into_iter()
698            .zip(reader.iter())
699            .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
700            .collect();
701        let exch = exch
702            .into_iter()
703            .zip(reader.iter())
704            .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
705            .collect();
706
707        Self {
708            local,
709            exch,
710            cur_ts: i64::MAX,
711            evs: EventSet::new(num_assets),
712        }
713    }
714
715    fn initialize_evs(&mut self) -> Result<(), BacktestError> {
716        for (asset_no, local) in self.local.iter_mut().enumerate() {
717            match local.advance() {
718                Ok(ts) => self.evs.update_local_data(asset_no, ts),
719                Err(BacktestError::EndOfData) => {
720                    self.evs.invalidate_local_data(asset_no);
721                }
722                Err(e) => {
723                    return Err(e);
724                }
725            }
726        }
727        for (asset_no, exch) in self.exch.iter_mut().enumerate() {
728            match exch.advance() {
729                Ok(ts) => self.evs.update_exch_data(asset_no, ts),
730                Err(BacktestError::EndOfData) => {
731                    self.evs.invalidate_exch_data(asset_no);
732                }
733                Err(e) => {
734                    return Err(e);
735                }
736            }
737        }
738        Ok(())
739    }
740
741    pub fn goto_end(&mut self) -> Result<ElapseResult, BacktestError> {
742        if self.cur_ts == i64::MAX {
743            self.initialize_evs()?;
744            match self.evs.next() {
745                Some(ev) => {
746                    self.cur_ts = ev.timestamp;
747                }
748                None => {
749                    return Ok(ElapseResult::EndOfData);
750                }
751            }
752        }
753        self.goto::<false>(UNTIL_END_OF_DATA, WaitOrderResponse::None)
754    }
755
756    fn goto<const WAIT_NEXT_FEED: bool>(
757        &mut self,
758        timestamp: i64,
759        wait_order_response: WaitOrderResponse,
760    ) -> Result<ElapseResult, BacktestError> {
761        let mut result = ElapseResult::Ok;
762        let mut timestamp = timestamp;
763        for (asset_no, local) in self.local.iter().enumerate() {
764            self.evs
765                .update_exch_order(asset_no, local.earliest_send_order_timestamp());
766            self.evs
767                .update_local_order(asset_no, local.earliest_recv_order_timestamp());
768        }
769        loop {
770            match self.evs.next() {
771                Some(ev) => {
772                    if ev.timestamp > timestamp {
773                        self.cur_ts = timestamp;
774                        return Ok(result);
775                    }
776                    match ev.kind {
777                        EventIntentKind::LocalData => {
778                            let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
779                            let next = local.next_row().and_then(|row| {
780                                local.processor.process(&local.data[row])?;
781                                local.advance()
782                            });
783
784                            match next {
785                                Ok(next_ts) => {
786                                    self.evs.update_local_data(ev.asset_no, next_ts);
787                                }
788                                Err(BacktestError::EndOfData) => {
789                                    self.evs.invalidate_local_data(ev.asset_no);
790                                }
791                                Err(e) => {
792                                    return Err(e);
793                                }
794                            }
795                            if WAIT_NEXT_FEED {
796                                timestamp = ev.timestamp;
797                                result = ElapseResult::MarketFeed;
798                            }
799                        }
800                        EventIntentKind::LocalOrder => {
801                            let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
802                            let wait_order_resp_id = match wait_order_response {
803                                WaitOrderResponse::Specified {
804                                    asset_no: wait_order_asset_no,
805                                    order_id: wait_order_id,
806                                } if ev.asset_no == wait_order_asset_no => Some(wait_order_id),
807                                _ => None,
808                            };
809                            if local.process_recv_order(ev.timestamp, wait_order_resp_id)?
810                                || wait_order_response == WaitOrderResponse::Any
811                            {
812                                timestamp = ev.timestamp;
813                                if WAIT_NEXT_FEED {
814                                    result = ElapseResult::OrderResponse;
815                                }
816                            }
817                            self.evs.update_local_order(
818                                ev.asset_no,
819                                local.earliest_recv_order_timestamp(),
820                            );
821                        }
822                        EventIntentKind::ExchData => {
823                            let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
824                            let next = exch.next_row().and_then(|row| {
825                                exch.processor.process(&exch.data[row])?;
826                                exch.advance()
827                            });
828
829                            match next {
830                                Ok(next_ts) => {
831                                    self.evs.update_exch_data(ev.asset_no, next_ts);
832                                }
833                                Err(BacktestError::EndOfData) => {
834                                    self.evs.invalidate_exch_data(ev.asset_no);
835                                }
836                                Err(e) => {
837                                    return Err(e);
838                                }
839                            }
840                            self.evs.update_local_order(
841                                ev.asset_no,
842                                exch.earliest_send_order_timestamp(),
843                            );
844                        }
845                        EventIntentKind::ExchOrder => {
846                            let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
847                            let _ = exch.process_recv_order(ev.timestamp, None)?;
848                            self.evs.update_exch_order(
849                                ev.asset_no,
850                                exch.earliest_recv_order_timestamp(),
851                            );
852                            self.evs.update_local_order(
853                                ev.asset_no,
854                                exch.earliest_send_order_timestamp(),
855                            );
856                        }
857                    }
858                }
859                None => {
860                    return Ok(ElapseResult::EndOfData);
861                }
862            }
863        }
864    }
865}
866
867impl<MD> Bot<MD> for Backtest<MD>
868where
869    MD: MarketDepth,
870{
871    type Error = BacktestError;
872
873    #[inline]
874    fn current_timestamp(&self) -> i64 {
875        self.cur_ts
876    }
877
878    #[inline]
879    fn num_assets(&self) -> usize {
880        self.local.len()
881    }
882
883    #[inline]
884    fn position(&self, asset_no: usize) -> f64 {
885        self.local.get(asset_no).unwrap().position()
886    }
887
888    #[inline]
889    fn state_values(&self, asset_no: usize) -> &StateValues {
890        self.local.get(asset_no).unwrap().state_values()
891    }
892
893    fn depth(&self, asset_no: usize) -> &MD {
894        self.local.get(asset_no).unwrap().depth()
895    }
896
897    fn last_trades(&self, asset_no: usize) -> &[Event] {
898        self.local.get(asset_no).unwrap().last_trades()
899    }
900
901    #[inline]
902    fn clear_last_trades(&mut self, asset_no: Option<usize>) {
903        match asset_no {
904            Some(an) => {
905                let local = self.local.get_mut(an).unwrap();
906                local.clear_last_trades();
907            }
908            None => {
909                for local in self.local.iter_mut() {
910                    local.clear_last_trades();
911                }
912            }
913        }
914    }
915
916    #[inline]
917    fn orders(&self, asset_no: usize) -> &HashMap<u64, Order> {
918        self.local.get(asset_no).unwrap().orders()
919    }
920
921    #[inline]
922    fn submit_buy_order(
923        &mut self,
924        asset_no: usize,
925        order_id: OrderId,
926        price: f64,
927        qty: f64,
928        time_in_force: TimeInForce,
929        order_type: OrdType,
930        wait: bool,
931    ) -> Result<ElapseResult, Self::Error> {
932        let local = self.local.get_mut(asset_no).unwrap();
933        local.submit_order(
934            order_id,
935            Side::Buy,
936            price,
937            qty,
938            order_type,
939            time_in_force,
940            self.cur_ts,
941        )?;
942
943        if wait {
944            return self.goto::<false>(
945                UNTIL_END_OF_DATA,
946                WaitOrderResponse::Specified { asset_no, order_id },
947            );
948        }
949        Ok(ElapseResult::Ok)
950    }
951
952    #[inline]
953    fn submit_sell_order(
954        &mut self,
955        asset_no: usize,
956        order_id: OrderId,
957        price: f64,
958        qty: f64,
959        time_in_force: TimeInForce,
960        order_type: OrdType,
961        wait: bool,
962    ) -> Result<ElapseResult, Self::Error> {
963        let local = self.local.get_mut(asset_no).unwrap();
964        local.submit_order(
965            order_id,
966            Side::Sell,
967            price,
968            qty,
969            order_type,
970            time_in_force,
971            self.cur_ts,
972        )?;
973
974        if wait {
975            return self.goto::<false>(
976                UNTIL_END_OF_DATA,
977                WaitOrderResponse::Specified { asset_no, order_id },
978            );
979        }
980        Ok(ElapseResult::Ok)
981    }
982
983    fn submit_order(
984        &mut self,
985        asset_no: usize,
986        order: OrderRequest,
987        wait: bool,
988    ) -> Result<ElapseResult, Self::Error> {
989        let local = self.local.get_mut(asset_no).unwrap();
990        local.submit_order(
991            order.order_id,
992            Side::Sell,
993            order.price,
994            order.qty,
995            order.order_type,
996            order.time_in_force,
997            self.cur_ts,
998        )?;
999
1000        if wait {
1001            return self.goto::<false>(
1002                UNTIL_END_OF_DATA,
1003                WaitOrderResponse::Specified {
1004                    asset_no,
1005                    order_id: order.order_id,
1006                },
1007            );
1008        }
1009        Ok(ElapseResult::Ok)
1010    }
1011
1012    #[inline]
1013    fn modify(
1014        &mut self,
1015        asset_no: usize,
1016        order_id: OrderId,
1017        price: f64,
1018        qty: f64,
1019        wait: bool,
1020    ) -> Result<ElapseResult, Self::Error> {
1021        let local = self.local.get_mut(asset_no).unwrap();
1022        local.modify(order_id, price, qty, self.cur_ts)?;
1023
1024        if wait {
1025            return self.goto::<false>(
1026                UNTIL_END_OF_DATA,
1027                WaitOrderResponse::Specified { asset_no, order_id },
1028            );
1029        }
1030        Ok(ElapseResult::Ok)
1031    }
1032
1033    #[inline]
1034    fn cancel(
1035        &mut self,
1036        asset_no: usize,
1037        order_id: OrderId,
1038        wait: bool,
1039    ) -> Result<ElapseResult, Self::Error> {
1040        let local = self.local.get_mut(asset_no).unwrap();
1041        local.cancel(order_id, self.cur_ts)?;
1042
1043        if wait {
1044            return self.goto::<false>(
1045                UNTIL_END_OF_DATA,
1046                WaitOrderResponse::Specified { asset_no, order_id },
1047            );
1048        }
1049        Ok(ElapseResult::Ok)
1050    }
1051
1052    #[inline]
1053    fn clear_inactive_orders(&mut self, asset_no: Option<usize>) {
1054        match asset_no {
1055            Some(asset_no) => {
1056                self.local
1057                    .get_mut(asset_no)
1058                    .unwrap()
1059                    .clear_inactive_orders();
1060            }
1061            None => {
1062                for local in self.local.iter_mut() {
1063                    local.clear_inactive_orders();
1064                }
1065            }
1066        }
1067    }
1068
1069    #[inline]
1070    fn wait_order_response(
1071        &mut self,
1072        asset_no: usize,
1073        order_id: OrderId,
1074        timeout: i64,
1075    ) -> Result<ElapseResult, BacktestError> {
1076        self.goto::<false>(
1077            self.cur_ts + timeout,
1078            WaitOrderResponse::Specified { asset_no, order_id },
1079        )
1080    }
1081
1082    #[inline]
1083    fn wait_next_feed(
1084        &mut self,
1085        include_order_resp: bool,
1086        timeout: i64,
1087    ) -> Result<ElapseResult, Self::Error> {
1088        if self.cur_ts == i64::MAX {
1089            self.initialize_evs()?;
1090            match self.evs.next() {
1091                Some(ev) => {
1092                    self.cur_ts = ev.timestamp;
1093                }
1094                None => {
1095                    return Ok(ElapseResult::EndOfData);
1096                }
1097            }
1098        }
1099        if include_order_resp {
1100            self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::Any)
1101        } else {
1102            self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::None)
1103        }
1104    }
1105
1106    #[inline]
1107    fn elapse(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1108        if self.cur_ts == i64::MAX {
1109            self.initialize_evs()?;
1110            match self.evs.next() {
1111                Some(ev) => {
1112                    self.cur_ts = ev.timestamp;
1113                }
1114                None => {
1115                    return Ok(ElapseResult::EndOfData);
1116                }
1117            }
1118        }
1119        self.goto::<false>(self.cur_ts + duration, WaitOrderResponse::None)
1120    }
1121
1122    #[inline]
1123    fn elapse_bt(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1124        self.elapse(duration)
1125    }
1126
1127    #[inline]
1128    fn close(&mut self) -> Result<(), Self::Error> {
1129        Ok(())
1130    }
1131
1132    #[inline]
1133    fn feed_latency(&self, asset_no: usize) -> Option<(i64, i64)> {
1134        self.local.get(asset_no).unwrap().feed_latency()
1135    }
1136
1137    #[inline]
1138    fn order_latency(&self, asset_no: usize) -> Option<(i64, i64, i64)> {
1139        self.local.get(asset_no).unwrap().order_latency()
1140    }
1141}
1142
1143/// `MultiAssetSingleExchangeBacktest` builder.
1144pub struct MultiAssetSingleExchangeBacktestBuilder<Local: Processor, Exchange: Processor> {
1145    local: Vec<BacktestProcessorState<Local>>,
1146    exch: Vec<BacktestProcessorState<Exchange>>,
1147}
1148
1149impl<Local, Exchange> MultiAssetSingleExchangeBacktestBuilder<Local, Exchange>
1150where
1151    Local: LocalProcessor<HashMapMarketDepth> + 'static,
1152    Exchange: Processor + 'static,
1153{
1154    /// Adds [`Asset`], which will undergo simulation within the backtester.
1155    pub fn add_asset(self, asset: Asset<Local, Exchange, Event>) -> Self {
1156        let mut self_ = Self { ..self };
1157        self_.local.push(BacktestProcessorState::new(
1158            *asset.local,
1159            asset.reader.clone(),
1160        ));
1161        self_.exch.push(BacktestProcessorState::new(
1162            *asset.exch,
1163            asset.reader.clone(),
1164        ));
1165        self_
1166    }
1167
1168    /// Builds [`MultiAssetSingleExchangeBacktest`].
1169    pub fn build(
1170        self,
1171    ) -> Result<MultiAssetSingleExchangeBacktest<HashMapMarketDepth, Local, Exchange>, BuildError>
1172    {
1173        let num_assets = self.local.len();
1174        if self.local.len() != num_assets || self.exch.len() != num_assets {
1175            panic!();
1176        }
1177        Ok(MultiAssetSingleExchangeBacktest {
1178            cur_ts: i64::MAX,
1179            evs: EventSet::new(num_assets),
1180            local: self.local,
1181            exch: self.exch,
1182            _md_marker: Default::default(),
1183        })
1184    }
1185}
1186
1187/// This backtester provides multi-asset and single-exchange model backtesting, meaning all assets
1188/// have the same setups for models such as asset type or queue model. However, this can be slightly
1189/// faster than [`Backtest`]. If you need to configure different models for each asset, use
1190/// [`Backtest`].
1191pub struct MultiAssetSingleExchangeBacktest<MD, Local, Exchange>
1192where
1193    MD: MarketDepth,
1194    Local: LocalProcessor<MD>,
1195    Exchange: Processor,
1196{
1197    cur_ts: i64,
1198    evs: EventSet,
1199    local: Vec<BacktestProcessorState<Local>>,
1200    exch: Vec<BacktestProcessorState<Exchange>>,
1201    _md_marker: PhantomData<MD>,
1202}
1203
1204impl<MD, Local, Exchange> MultiAssetSingleExchangeBacktest<MD, Local, Exchange>
1205where
1206    MD: MarketDepth,
1207    Local: LocalProcessor<MD>,
1208    Exchange: Processor,
1209{
1210    pub fn builder() -> MultiAssetSingleExchangeBacktestBuilder<Local, Exchange> {
1211        MultiAssetSingleExchangeBacktestBuilder {
1212            local: vec![],
1213            exch: vec![],
1214        }
1215    }
1216
1217    pub fn new(local: Vec<Local>, exch: Vec<Exchange>, reader: Vec<Reader<Event>>) -> Self {
1218        let num_assets = local.len();
1219        if local.len() != num_assets || exch.len() != num_assets || reader.len() != num_assets {
1220            panic!();
1221        }
1222
1223        let local = local
1224            .into_iter()
1225            .zip(reader.iter())
1226            .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
1227            .collect();
1228        let exch = exch
1229            .into_iter()
1230            .zip(reader.iter())
1231            .map(|(proc, reader)| BacktestProcessorState::new(proc, reader.clone()))
1232            .collect();
1233
1234        Self {
1235            local,
1236            exch,
1237            cur_ts: i64::MAX,
1238            evs: EventSet::new(num_assets),
1239            _md_marker: Default::default(),
1240        }
1241    }
1242
1243    fn initialize_evs(&mut self) -> Result<(), BacktestError> {
1244        for (asset_no, local) in self.local.iter_mut().enumerate() {
1245            match local.advance() {
1246                Ok(ts) => self.evs.update_local_data(asset_no, ts),
1247                Err(BacktestError::EndOfData) => {
1248                    self.evs.invalidate_local_data(asset_no);
1249                }
1250                Err(e) => {
1251                    return Err(e);
1252                }
1253            }
1254        }
1255        for (asset_no, exch) in self.exch.iter_mut().enumerate() {
1256            match exch.advance() {
1257                Ok(ts) => self.evs.update_exch_data(asset_no, ts),
1258                Err(BacktestError::EndOfData) => {
1259                    self.evs.invalidate_exch_data(asset_no);
1260                }
1261                Err(e) => {
1262                    return Err(e);
1263                }
1264            }
1265        }
1266        Ok(())
1267    }
1268
1269    pub fn goto<const WAIT_NEXT_FEED: bool>(
1270        &mut self,
1271        timestamp: i64,
1272        wait_order_response: WaitOrderResponse,
1273    ) -> Result<ElapseResult, BacktestError> {
1274        let mut result = ElapseResult::Ok;
1275        let mut timestamp = timestamp;
1276        for (asset_no, local) in self.local.iter().enumerate() {
1277            self.evs
1278                .update_exch_order(asset_no, local.earliest_send_order_timestamp());
1279            self.evs
1280                .update_local_order(asset_no, local.earliest_recv_order_timestamp());
1281        }
1282        loop {
1283            match self.evs.next() {
1284                Some(ev) => {
1285                    if ev.timestamp > timestamp {
1286                        self.cur_ts = timestamp;
1287                        return Ok(result);
1288                    }
1289                    match ev.kind {
1290                        EventIntentKind::LocalData => {
1291                            let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
1292                            let next = local.next_row().and_then(|row| {
1293                                local.processor.process(&local.data[row])?;
1294                                local.advance()
1295                            });
1296
1297                            match next {
1298                                Ok(next_ts) => {
1299                                    self.evs.update_local_data(ev.asset_no, next_ts);
1300                                }
1301                                Err(BacktestError::EndOfData) => {
1302                                    self.evs.invalidate_local_data(ev.asset_no);
1303                                }
1304                                Err(e) => {
1305                                    return Err(e);
1306                                }
1307                            }
1308                            if WAIT_NEXT_FEED {
1309                                timestamp = ev.timestamp;
1310                                result = ElapseResult::MarketFeed;
1311                            }
1312                        }
1313                        EventIntentKind::LocalOrder => {
1314                            let local = unsafe { self.local.get_unchecked_mut(ev.asset_no) };
1315                            let wait_order_resp_id = match wait_order_response {
1316                                WaitOrderResponse::Specified {
1317                                    asset_no: wait_order_asset_no,
1318                                    order_id: wait_order_id,
1319                                } if ev.asset_no == wait_order_asset_no => Some(wait_order_id),
1320                                _ => None,
1321                            };
1322                            if local.process_recv_order(ev.timestamp, wait_order_resp_id)?
1323                                || wait_order_response == WaitOrderResponse::Any
1324                            {
1325                                timestamp = ev.timestamp;
1326                                if WAIT_NEXT_FEED {
1327                                    result = ElapseResult::OrderResponse;
1328                                }
1329                            }
1330                            self.evs.update_local_order(
1331                                ev.asset_no,
1332                                local.earliest_recv_order_timestamp(),
1333                            );
1334                        }
1335                        EventIntentKind::ExchData => {
1336                            let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
1337                            let next = exch.next_row().and_then(|row| {
1338                                exch.processor.process(&exch.data[row])?;
1339                                exch.advance()
1340                            });
1341
1342                            match next {
1343                                Ok(next_ts) => {
1344                                    self.evs.update_exch_data(ev.asset_no, next_ts);
1345                                }
1346                                Err(BacktestError::EndOfData) => {
1347                                    self.evs.invalidate_exch_data(ev.asset_no);
1348                                }
1349                                Err(e) => {
1350                                    return Err(e);
1351                                }
1352                            }
1353                            self.evs.update_local_order(
1354                                ev.asset_no,
1355                                exch.earliest_send_order_timestamp(),
1356                            );
1357                        }
1358                        EventIntentKind::ExchOrder => {
1359                            let exch = unsafe { self.exch.get_unchecked_mut(ev.asset_no) };
1360                            let _ = exch.process_recv_order(ev.timestamp, None)?;
1361                            self.evs.update_exch_order(
1362                                ev.asset_no,
1363                                exch.earliest_recv_order_timestamp(),
1364                            );
1365                            self.evs.update_local_order(
1366                                ev.asset_no,
1367                                exch.earliest_send_order_timestamp(),
1368                            );
1369                        }
1370                    }
1371                }
1372                None => {
1373                    return Ok(ElapseResult::EndOfData);
1374                }
1375            }
1376        }
1377    }
1378}
1379
1380impl<MD, Local, Exchange> Bot<MD> for MultiAssetSingleExchangeBacktest<MD, Local, Exchange>
1381where
1382    MD: MarketDepth,
1383    Local: LocalProcessor<MD>,
1384    Exchange: Processor,
1385{
1386    type Error = BacktestError;
1387
1388    #[inline]
1389    fn current_timestamp(&self) -> i64 {
1390        self.cur_ts
1391    }
1392
1393    #[inline]
1394    fn num_assets(&self) -> usize {
1395        self.local.len()
1396    }
1397
1398    #[inline]
1399    fn position(&self, asset_no: usize) -> f64 {
1400        self.local.get(asset_no).unwrap().position()
1401    }
1402
1403    #[inline]
1404    fn state_values(&self, asset_no: usize) -> &StateValues {
1405        self.local.get(asset_no).unwrap().state_values()
1406    }
1407
1408    fn depth(&self, asset_no: usize) -> &MD {
1409        self.local.get(asset_no).unwrap().depth()
1410    }
1411
1412    fn last_trades(&self, asset_no: usize) -> &[Event] {
1413        self.local.get(asset_no).unwrap().last_trades()
1414    }
1415
1416    #[inline]
1417    fn clear_last_trades(&mut self, asset_no: Option<usize>) {
1418        match asset_no {
1419            Some(an) => {
1420                let local = self.local.get_mut(an).unwrap();
1421                local.clear_last_trades();
1422            }
1423            None => {
1424                for local in self.local.iter_mut() {
1425                    local.clear_last_trades();
1426                }
1427            }
1428        }
1429    }
1430
1431    #[inline]
1432    fn orders(&self, asset_no: usize) -> &HashMap<OrderId, Order> {
1433        self.local.get(asset_no).unwrap().orders()
1434    }
1435
1436    #[inline]
1437    fn submit_buy_order(
1438        &mut self,
1439        asset_no: usize,
1440        order_id: OrderId,
1441        price: f64,
1442        qty: f64,
1443        time_in_force: TimeInForce,
1444        order_type: OrdType,
1445        wait: bool,
1446    ) -> Result<ElapseResult, Self::Error> {
1447        let local = self.local.get_mut(asset_no).unwrap();
1448        local.submit_order(
1449            order_id,
1450            Side::Buy,
1451            price,
1452            qty,
1453            order_type,
1454            time_in_force,
1455            self.cur_ts,
1456        )?;
1457
1458        if wait {
1459            return self.goto::<false>(
1460                UNTIL_END_OF_DATA,
1461                WaitOrderResponse::Specified { asset_no, order_id },
1462            );
1463        }
1464        Ok(ElapseResult::Ok)
1465    }
1466
1467    #[inline]
1468    fn submit_sell_order(
1469        &mut self,
1470        asset_no: usize,
1471        order_id: OrderId,
1472        price: f64,
1473        qty: f64,
1474        time_in_force: TimeInForce,
1475        order_type: OrdType,
1476        wait: bool,
1477    ) -> Result<ElapseResult, Self::Error> {
1478        let local = self.local.get_mut(asset_no).unwrap();
1479        local.submit_order(
1480            order_id,
1481            Side::Sell,
1482            price,
1483            qty,
1484            order_type,
1485            time_in_force,
1486            self.cur_ts,
1487        )?;
1488
1489        if wait {
1490            return self.goto::<false>(
1491                UNTIL_END_OF_DATA,
1492                WaitOrderResponse::Specified { asset_no, order_id },
1493            );
1494        }
1495        Ok(ElapseResult::Ok)
1496    }
1497
1498    fn submit_order(
1499        &mut self,
1500        asset_no: usize,
1501        order: OrderRequest,
1502        wait: bool,
1503    ) -> Result<ElapseResult, Self::Error> {
1504        let local = self.local.get_mut(asset_no).unwrap();
1505        local.submit_order(
1506            order.order_id,
1507            Side::Sell,
1508            order.price,
1509            order.qty,
1510            order.order_type,
1511            order.time_in_force,
1512            self.cur_ts,
1513        )?;
1514
1515        if wait {
1516            return self.goto::<false>(
1517                UNTIL_END_OF_DATA,
1518                WaitOrderResponse::Specified {
1519                    asset_no,
1520                    order_id: order.order_id,
1521                },
1522            );
1523        }
1524        Ok(ElapseResult::Ok)
1525    }
1526
1527    #[inline]
1528    fn modify(
1529        &mut self,
1530        asset_no: usize,
1531        order_id: OrderId,
1532        price: f64,
1533        qty: f64,
1534        wait: bool,
1535    ) -> Result<ElapseResult, Self::Error> {
1536        let local = self.local.get_mut(asset_no).unwrap();
1537        local.modify(order_id, price, qty, self.cur_ts)?;
1538
1539        if wait {
1540            return self.goto::<false>(
1541                UNTIL_END_OF_DATA,
1542                WaitOrderResponse::Specified { asset_no, order_id },
1543            );
1544        }
1545        Ok(ElapseResult::Ok)
1546    }
1547
1548    #[inline]
1549    fn cancel(
1550        &mut self,
1551        asset_no: usize,
1552        order_id: OrderId,
1553        wait: bool,
1554    ) -> Result<ElapseResult, Self::Error> {
1555        let local = self.local.get_mut(asset_no).unwrap();
1556        local.cancel(order_id, self.cur_ts)?;
1557
1558        if wait {
1559            return self.goto::<false>(
1560                UNTIL_END_OF_DATA,
1561                WaitOrderResponse::Specified { asset_no, order_id },
1562            );
1563        }
1564        Ok(ElapseResult::Ok)
1565    }
1566
1567    #[inline]
1568    fn clear_inactive_orders(&mut self, asset_no: Option<usize>) {
1569        match asset_no {
1570            Some(asset_no) => {
1571                self.local
1572                    .get_mut(asset_no)
1573                    .unwrap()
1574                    .clear_inactive_orders();
1575            }
1576            None => {
1577                for local in self.local.iter_mut() {
1578                    local.clear_inactive_orders();
1579                }
1580            }
1581        }
1582    }
1583
1584    #[inline]
1585    fn wait_order_response(
1586        &mut self,
1587        asset_no: usize,
1588        order_id: OrderId,
1589        timeout: i64,
1590    ) -> Result<ElapseResult, BacktestError> {
1591        self.goto::<false>(
1592            self.cur_ts + timeout,
1593            WaitOrderResponse::Specified { asset_no, order_id },
1594        )
1595    }
1596
1597    fn wait_next_feed(
1598        &mut self,
1599        include_order_resp: bool,
1600        timeout: i64,
1601    ) -> Result<ElapseResult, Self::Error> {
1602        if self.cur_ts == i64::MAX {
1603            self.initialize_evs()?;
1604            match self.evs.next() {
1605                Some(ev) => {
1606                    self.cur_ts = ev.timestamp;
1607                }
1608                None => {
1609                    return Ok(ElapseResult::EndOfData);
1610                }
1611            }
1612        }
1613        if include_order_resp {
1614            self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::Any)
1615        } else {
1616            self.goto::<true>(self.cur_ts + timeout, WaitOrderResponse::None)
1617        }
1618    }
1619
1620    #[inline]
1621    fn elapse(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1622        if self.cur_ts == i64::MAX {
1623            self.initialize_evs()?;
1624            match self.evs.next() {
1625                Some(ev) => {
1626                    self.cur_ts = ev.timestamp;
1627                }
1628                None => {
1629                    return Ok(ElapseResult::EndOfData);
1630                }
1631            }
1632        }
1633        self.goto::<false>(self.cur_ts + duration, WaitOrderResponse::None)
1634    }
1635
1636    #[inline]
1637    fn elapse_bt(&mut self, duration: i64) -> Result<ElapseResult, Self::Error> {
1638        self.elapse(duration)
1639    }
1640
1641    #[inline]
1642    fn close(&mut self) -> Result<(), Self::Error> {
1643        Ok(())
1644    }
1645
1646    #[inline]
1647    fn feed_latency(&self, asset_no: usize) -> Option<(i64, i64)> {
1648        self.local.get(asset_no).unwrap().feed_latency()
1649    }
1650
1651    #[inline]
1652    fn order_latency(&self, asset_no: usize) -> Option<(i64, i64, i64)> {
1653        self.local.get(asset_no).unwrap().order_latency()
1654    }
1655}
1656
1657#[cfg(test)]
1658mod test {
1659    use std::error::Error;
1660
1661    use crate::{
1662        backtest::{
1663            Backtest,
1664            DataSource,
1665            ExchangeKind::NoPartialFillExchange,
1666            L2AssetBuilder,
1667            assettype::LinearAsset,
1668            data::Data,
1669            models::{
1670                CommonFees,
1671                ConstantLatency,
1672                PowerProbQueueFunc3,
1673                ProbQueueModel,
1674                TradingValueFeeModel,
1675            },
1676        },
1677        depth::HashMapMarketDepth,
1678        prelude::{Bot, Event},
1679        types::{EXCH_EVENT, LOCAL_EVENT},
1680    };
1681
1682    #[test]
1683    fn skips_unseen_events() -> Result<(), Box<dyn Error>> {
1684        let data = Data::from_data(&[
1685            Event {
1686                ev: EXCH_EVENT | LOCAL_EVENT,
1687                exch_ts: 0,
1688                local_ts: 0,
1689                px: 0.0,
1690                qty: 0.0,
1691                order_id: 0,
1692                ival: 0,
1693                fval: 0.0,
1694            },
1695            Event {
1696                ev: LOCAL_EVENT | EXCH_EVENT,
1697                exch_ts: 1,
1698                local_ts: 1,
1699                px: 0.0,
1700                qty: 0.0,
1701                order_id: 0,
1702                ival: 0,
1703                fval: 0.0,
1704            },
1705            Event {
1706                ev: EXCH_EVENT,
1707                exch_ts: 3,
1708                local_ts: 4,
1709                px: 0.0,
1710                qty: 0.0,
1711                order_id: 0,
1712                ival: 0,
1713                fval: 0.0,
1714            },
1715            Event {
1716                ev: LOCAL_EVENT,
1717                exch_ts: 3,
1718                local_ts: 4,
1719                px: 0.0,
1720                qty: 0.0,
1721                order_id: 0,
1722                ival: 0,
1723                fval: 0.0,
1724            },
1725        ]);
1726
1727        let mut backtester = Backtest::builder()
1728            .add_asset(
1729                L2AssetBuilder::default()
1730                    .data(vec![DataSource::Data(data)])
1731                    .latency_model(ConstantLatency::new(50, 50))
1732                    .asset_type(LinearAsset::new(1.0))
1733                    .fee_model(TradingValueFeeModel::new(CommonFees::new(0.0, 0.0)))
1734                    .queue_model(ProbQueueModel::new(PowerProbQueueFunc3::new(3.0)))
1735                    .exchange(NoPartialFillExchange)
1736                    .depth(|| HashMapMarketDepth::new(0.01, 1.0))
1737                    .build()?,
1738            )
1739            .build()?;
1740
1741        // Process first events and advance a single timestep
1742        backtester.elapse_bt(1)?;
1743        assert_eq!(1, backtester.cur_ts);
1744
1745        // Check that we correctly skip past events that aren't seen by a given processor
1746        backtester.elapse_bt(1)?;
1747        assert_eq!(2, backtester.cur_ts);
1748        assert_eq!(Some(3), backtester.local[0].row);
1749        assert_eq!(Some(2), backtester.exch[0].row);
1750
1751        backtester.elapse_bt(1)?;
1752        assert_eq!(3, backtester.cur_ts);
1753
1754        Ok(())
1755    }
1756}