Skip to main content

simulator_client/
transaction_step.rs

1use std::time::Duration;
2
3use simulator_api::{
4    BacktestRequest, BacktestResponse, ContinueParams, ContinueToParams, DiscoveryBatchEvent,
5    PausedEvent,
6};
7
8use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
9
10/// A batch discovery paired with the pause location immediately before it.
11#[derive(Debug, Clone)]
12pub struct DiscoveryPause {
13    /// The discovery event that triggered the pause.
14    pub discovery: DiscoveryBatchEvent,
15    /// Where the session paused; state through `batch_index - 1` is visible via RPC.
16    pub paused: PausedEvent,
17}
18
19/// Result of a single [`BacktestSession::advance_to_discovery`] call.
20#[derive(Debug)]
21pub enum DiscoveryStepResult {
22    /// Session paused before the discovered batch. Inspect state via RPC or
23    /// simulate custom transactions, then call `advance_to_discovery` again.
24    Paused(DiscoveryPause),
25    /// No more discoveries in the session range.
26    Completed,
27}
28
29impl BacktestSession {
30    /// Wait for the next batch matching a registered discovery filter and pause
31    /// immediately before it executes.
32    ///
33    /// `DiscoveryBatch` events are emitted by the server as a background scan
34    /// independent of execution — no `Continue` is needed to trigger them.
35    /// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
36    /// to execute up to (but not including) that batch, then waits for `Paused`.
37    ///
38    /// While paused, the session's RPC endpoint reflects state through
39    /// `batch_index - 1` of the discovered slot — no transaction in the
40    /// discovered batch has executed yet.
41    pub async fn advance_to_discovery(
42        &mut self,
43        timeout: Option<Duration>,
44    ) -> BacktestClientResult<DiscoveryStepResult> {
45        // Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
46        let discovery: DiscoveryBatchEvent = loop {
47            let Some(response) = self.next_response(timeout).await? else {
48                return Err(BacktestClientError::Closed {
49                    reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
50                });
51            };
52            match response {
53                BacktestResponse::DiscoveryBatch(event) => break event,
54                BacktestResponse::Completed { .. } => return Ok(DiscoveryStepResult::Completed),
55                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
56                // Server is paused waiting for a Continue. Send one so it can advance
57                // to the next discovery or reach end-of-range and emit Completed.
58                BacktestResponse::ReadyForContinue => {
59                    self.send(
60                        &BacktestRequest::Continue(ContinueParams {
61                            advance_count: u64::MAX,
62                            ..Default::default()
63                        }),
64                        timeout,
65                    )
66                    .await?;
67                }
68                other => {
69                    tracing::warn!("ignoring {other:?} while waiting for DiscoveryBatch");
70                }
71            }
72        };
73
74        // Phase 2: execute up to (not including) the discovered batch.
75        self.send(
76            &BacktestRequest::ContinueTo(ContinueToParams {
77                slot: discovery.slot,
78                batch_index: Some(discovery.batch_index),
79            }),
80            timeout,
81        )
82        .await?;
83
84        // Phase 3: consume until Paused.
85        // Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
86        // so next_response keeps reading from the websocket. After Paused arrives
87        // they are flushed to self.backlog for the next advance_to_discovery call.
88        let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
89        let paused: PausedEvent = loop {
90            let Some(response) = self.next_response(timeout).await? else {
91                return Err(BacktestClientError::Closed {
92                    reason: "websocket closed while waiting for Paused after ContinueTo"
93                        .to_string(),
94                });
95            };
96            match response {
97                BacktestResponse::Paused(event) => {
98                    self.ready_for_continue = true;
99                    for d in pending_discoveries {
100                        self.push_backlog(d);
101                    }
102                    break event;
103                }
104                next @ BacktestResponse::DiscoveryBatch(_) => {
105                    pending_discoveries.push(next);
106                }
107                BacktestResponse::SlotNotification(_)
108                | BacktestResponse::Status { .. }
109                | BacktestResponse::Success
110                | BacktestResponse::ReadyForContinue => {}
111                BacktestResponse::Completed { .. } => {
112                    return Ok(DiscoveryStepResult::Completed);
113                }
114                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
115                other => {
116                    return Err(BacktestClientError::UnexpectedResponse {
117                        context: "waiting for Paused after ContinueTo",
118                        response: Box::new(other),
119                    });
120                }
121            }
122        };
123
124        Ok(DiscoveryStepResult::Paused(DiscoveryPause {
125            discovery,
126            paused,
127        }))
128    }
129}