Skip to main content

simulator_client/
transaction_step.rs

1use std::time::Duration;
2
3use simulator_api::{
4    BacktestRequest, BacktestResponse, ContinueToParams, DiscoveryBatchEvent, PausedEvent,
5};
6
7use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
8
9/// A batch discovery paired with the pause location immediately before it.
10#[derive(Debug, Clone)]
11pub struct DiscoveryPause {
12    /// The discovery event that triggered the pause.
13    pub discovery: DiscoveryBatchEvent,
14    /// Where the session paused; state through `batch_index - 1` is visible via RPC.
15    pub paused: PausedEvent,
16}
17
18/// Result of a single [`BacktestSession::advance_to_discovery`] call.
19#[derive(Debug)]
20pub enum DiscoveryStepResult {
21    /// Session paused before the discovered batch. Inspect state via RPC or
22    /// simulate custom transactions, then call `advance_to_discovery` again.
23    Paused(DiscoveryPause),
24    /// No more discoveries in the session range.
25    Completed,
26}
27
28impl BacktestSession {
29    /// Wait for the next batch matching a registered discovery filter and pause
30    /// immediately before it executes.
31    ///
32    /// `DiscoveryBatch` events are emitted by the server as a background scan
33    /// independent of execution — no `Continue` is needed to trigger them.
34    /// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
35    /// to execute up to (but not including) that batch, then waits for `Paused`.
36    ///
37    /// While paused, the session's RPC endpoint reflects state through
38    /// `batch_index - 1` of the discovered slot — no transaction in the
39    /// discovered batch has executed yet.
40    pub async fn advance_to_discovery(
41        &mut self,
42        timeout: Option<Duration>,
43    ) -> BacktestClientResult<DiscoveryStepResult> {
44        // Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
45        let Some(response) = self.next_response(timeout).await? else {
46            return Err(BacktestClientError::Closed {
47                reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
48            });
49        };
50        let discovery: DiscoveryBatchEvent = match response {
51            BacktestResponse::DiscoveryBatch(event) => event,
52            BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
53            other => {
54                return Err(BacktestClientError::UnexpectedResponse {
55                    context: "waiting for DiscoveryBatch",
56                    response: Box::new(other),
57                });
58            }
59        };
60
61        // Phase 2: execute up to (not including) the discovered batch.
62        self.send(
63            &BacktestRequest::ContinueTo(ContinueToParams {
64                slot: discovery.slot,
65                batch_index: Some(discovery.batch_index),
66            }),
67            timeout,
68        )
69        .await?;
70
71        // Phase 3: consume until Paused.
72        // Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
73        // so next_response keeps reading from the websocket. After Paused arrives
74        // they are flushed to self.backlog for the next advance_to_discovery call.
75        let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
76        let paused: PausedEvent = loop {
77            let Some(response) = self.next_response(timeout).await? else {
78                return Err(BacktestClientError::Closed {
79                    reason: "websocket closed while waiting for Paused after ContinueTo"
80                        .to_string(),
81                });
82            };
83            tracing::debug!("<- advance {response:?}");
84            match response {
85                BacktestResponse::Paused(event) => {
86                    self.ready_for_continue = true;
87                    for d in pending_discoveries {
88                        self.push_backlog(d);
89                    }
90                    break event;
91                }
92                next @ BacktestResponse::DiscoveryBatch(_) => {
93                    pending_discoveries.push(next);
94                }
95                BacktestResponse::SlotNotification(_)
96                | BacktestResponse::Status { .. }
97                | BacktestResponse::Success
98                | BacktestResponse::ReadyForContinue => {}
99                BacktestResponse::Completed { .. } => {
100                    return Ok(DiscoveryStepResult::Completed);
101                }
102                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
103                other => {
104                    return Err(BacktestClientError::UnexpectedResponse {
105                        context: "waiting for Paused after ContinueTo",
106                        response: Box::new(other),
107                    });
108                }
109            }
110        };
111
112        Ok(DiscoveryStepResult::Paused(DiscoveryPause {
113            discovery,
114            paused,
115        }))
116    }
117}