Skip to main content

simulator_client/
transaction_step.rs

1use std::time::Duration;
2
3use simulator_api::{
4    BacktestRequest, BacktestResponse, ContinueToParams, DiscoveryBatchEvent, PausedEvent,
5};
6
7use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
8
9/// A batch discovery paired with the pause location immediately before it.
10#[derive(Debug, Clone)]
11pub struct DiscoveryPause {
12    /// The discovery event that triggered the pause.
13    pub discovery: DiscoveryBatchEvent,
14    /// Where the session paused; state through `batch_index - 1` is visible via RPC.
15    pub paused: PausedEvent,
16}
17
18/// Result of a single [`BacktestSession::advance_to_discovery`] call.
19#[derive(Debug)]
20pub enum DiscoveryStepResult {
21    /// Session paused before the discovered batch. Inspect state via RPC or
22    /// simulate custom transactions, then call `advance_to_discovery` again.
23    Paused(DiscoveryPause),
24    /// No more discoveries in the session range.
25    Completed,
26}
27
28impl BacktestSession {
29    /// Wait for the next batch matching a registered discovery filter and pause
30    /// immediately before it executes.
31    ///
32    /// `DiscoveryBatch` events are emitted by the server as a background scan
33    /// independent of execution — no `Continue` is needed to trigger them.
34    /// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
35    /// to execute up to (but not including) that batch, then waits for `Paused`.
36    ///
37    /// While paused, the session's RPC endpoint reflects state through
38    /// `batch_index - 1` of the discovered slot — no transaction in the
39    /// discovered batch has executed yet.
40    pub async fn advance_to_discovery(
41        &mut self,
42        timeout: Option<Duration>,
43    ) -> BacktestClientResult<DiscoveryStepResult> {
44        // Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
45        let discovery: DiscoveryBatchEvent = loop {
46            let Some(response) = self.next_response(timeout).await? else {
47                return Err(BacktestClientError::Closed {
48                    reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
49                });
50            };
51            match response {
52                BacktestResponse::DiscoveryBatch(event) => break event,
53                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
54                other => {
55                    // We don't expect any non-DiscoveryBatch events, but if this happens skip instead of failing
56                    tracing::warn!("ignoring {other:?} while waiting for DiscoveryBatch");
57                }
58            }
59        };
60
61        // Phase 2: execute up to (not including) the discovered batch.
62        self.send(
63            &BacktestRequest::ContinueTo(ContinueToParams {
64                slot: discovery.slot,
65                batch_index: Some(discovery.batch_index),
66            }),
67            timeout,
68        )
69        .await?;
70
71        // Phase 3: consume until Paused.
72        // Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
73        // so next_response keeps reading from the websocket. After Paused arrives
74        // they are flushed to self.backlog for the next advance_to_discovery call.
75        let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
76        let paused: PausedEvent = loop {
77            let Some(response) = self.next_response(timeout).await? else {
78                return Err(BacktestClientError::Closed {
79                    reason: "websocket closed while waiting for Paused after ContinueTo"
80                        .to_string(),
81                });
82            };
83            match response {
84                BacktestResponse::Paused(event) => {
85                    self.ready_for_continue = true;
86                    for d in pending_discoveries {
87                        self.push_backlog(d);
88                    }
89                    break event;
90                }
91                next @ BacktestResponse::DiscoveryBatch(_) => {
92                    pending_discoveries.push(next);
93                }
94                BacktestResponse::SlotNotification(_)
95                | BacktestResponse::Status { .. }
96                | BacktestResponse::Success
97                | BacktestResponse::ReadyForContinue => {}
98                BacktestResponse::Completed { .. } => {
99                    return Ok(DiscoveryStepResult::Completed);
100                }
101                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
102                other => {
103                    return Err(BacktestClientError::UnexpectedResponse {
104                        context: "waiting for Paused after ContinueTo",
105                        response: Box::new(other),
106                    });
107                }
108            }
109        };
110
111        Ok(DiscoveryStepResult::Paused(DiscoveryPause {
112            discovery,
113            paused,
114        }))
115    }
116}