simulator_client/transaction_step.rs
1use std::time::Duration;
2
3use simulator_api::{
4 BacktestRequest, BacktestResponse, ContinueToParams, DiscoveryBatchEvent, PausedEvent,
5};
6
7use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
8
9/// A batch discovery paired with the pause location immediately before it.
10#[derive(Debug, Clone)]
11pub struct DiscoveryPause {
12 /// The discovery event that triggered the pause.
13 pub discovery: DiscoveryBatchEvent,
14 /// Where the session paused; state through `batch_index - 1` is visible via RPC.
15 pub paused: PausedEvent,
16}
17
18/// Result of a single [`BacktestSession::advance_to_discovery`] call.
19#[derive(Debug)]
20pub enum DiscoveryStepResult {
21 /// Session paused before the discovered batch. Inspect state via RPC or
22 /// simulate custom transactions, then call `advance_to_discovery` again.
23 Paused(DiscoveryPause),
24 /// No more discoveries in the session range.
25 Completed,
26}
27
28impl BacktestSession {
29 /// Wait for the next batch matching a registered discovery filter and pause
30 /// immediately before it executes.
31 ///
32 /// `DiscoveryBatch` events are emitted by the server as a background scan
33 /// independent of execution — no `Continue` is needed to trigger them.
34 /// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
35 /// to execute up to (but not including) that batch, then waits for `Paused`.
36 ///
37 /// While paused, the session's RPC endpoint reflects state through
38 /// `batch_index - 1` of the discovered slot — no transaction in the
39 /// discovered batch has executed yet.
40 pub async fn advance_to_discovery(
41 &mut self,
42 timeout: Option<Duration>,
43 ) -> BacktestClientResult<DiscoveryStepResult> {
44 // Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
45 let discovery: DiscoveryBatchEvent = loop {
46 let Some(response) = self.next_response(timeout).await? else {
47 return Err(BacktestClientError::Closed {
48 reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
49 });
50 };
51 match response {
52 BacktestResponse::DiscoveryBatch(event) => break event,
53 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
54 other => {
55 // We don't expect any non-DiscoveryBatch events, but if this happens skip instead of failing
56 tracing::warn!("ignoring {other:?} while waiting for DiscoveryBatch");
57 }
58 }
59 };
60
61 // Phase 2: execute up to (not including) the discovered batch.
62 self.send(
63 &BacktestRequest::ContinueTo(ContinueToParams {
64 slot: discovery.slot,
65 batch_index: Some(discovery.batch_index),
66 }),
67 timeout,
68 )
69 .await?;
70
71 // Phase 3: consume until Paused.
72 // Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
73 // so next_response keeps reading from the websocket. After Paused arrives
74 // they are flushed to self.backlog for the next advance_to_discovery call.
75 let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
76 let paused: PausedEvent = loop {
77 let Some(response) = self.next_response(timeout).await? else {
78 return Err(BacktestClientError::Closed {
79 reason: "websocket closed while waiting for Paused after ContinueTo"
80 .to_string(),
81 });
82 };
83 match response {
84 BacktestResponse::Paused(event) => {
85 self.ready_for_continue = true;
86 for d in pending_discoveries {
87 self.push_backlog(d);
88 }
89 break event;
90 }
91 next @ BacktestResponse::DiscoveryBatch(_) => {
92 pending_discoveries.push(next);
93 }
94 BacktestResponse::SlotNotification(_)
95 | BacktestResponse::Status { .. }
96 | BacktestResponse::Success
97 | BacktestResponse::ReadyForContinue => {}
98 BacktestResponse::Completed { .. } => {
99 return Ok(DiscoveryStepResult::Completed);
100 }
101 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
102 other => {
103 return Err(BacktestClientError::UnexpectedResponse {
104 context: "waiting for Paused after ContinueTo",
105 response: Box::new(other),
106 });
107 }
108 }
109 };
110
111 Ok(DiscoveryStepResult::Paused(DiscoveryPause {
112 discovery,
113 paused,
114 }))
115 }
116}