simulator_client/transaction_step.rs
1use std::time::Duration;
2
3use simulator_api::{
4 BacktestRequest, BacktestResponse, ContinueParams, ContinueToParams, DiscoveryBatchEvent,
5 PausedEvent,
6};
7
8use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
9
10/// A batch discovery paired with the pause location immediately before it.
11#[derive(Debug, Clone)]
12pub struct DiscoveryPause {
13 /// The discovery event that triggered the pause.
14 pub discovery: DiscoveryBatchEvent,
15 /// Where the session paused; state through `batch_index - 1` is visible via RPC.
16 pub paused: PausedEvent,
17}
18
19/// Result of a single [`BacktestSession::advance_to_discovery`] call.
20#[derive(Debug)]
21pub enum DiscoveryStepResult {
22 /// Session paused before the discovered batch. Inspect state via RPC or
23 /// simulate custom transactions, then call `advance_to_discovery` again.
24 Paused(DiscoveryPause),
25 /// No more discoveries in the session range.
26 Completed,
27}
28
29impl BacktestSession {
30 /// Wait for the next batch matching a registered discovery filter and pause
31 /// immediately before it executes.
32 ///
33 /// `DiscoveryBatch` events are emitted by the server as a background scan
34 /// independent of execution — no `Continue` is needed to trigger them.
35 /// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
36 /// to execute up to (but not including) that batch, then waits for `Paused`.
37 ///
38 /// While paused, the session's RPC endpoint reflects state through
39 /// `batch_index - 1` of the discovered slot — no transaction in the
40 /// discovered batch has executed yet.
41 pub async fn advance_to_discovery(
42 &mut self,
43 timeout: Option<Duration>,
44 ) -> BacktestClientResult<DiscoveryStepResult> {
45 // Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
46 let discovery: DiscoveryBatchEvent = loop {
47 let Some(response) = self.next_response(timeout).await? else {
48 return Err(BacktestClientError::Closed {
49 reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
50 });
51 };
52 match response {
53 BacktestResponse::DiscoveryBatch(event) => break event,
54 BacktestResponse::Completed { .. } => return Ok(DiscoveryStepResult::Completed),
55 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
56 // Server is paused waiting for a Continue. Send one so it can advance
57 // to the next discovery or reach end-of-range and emit Completed.
58 BacktestResponse::ReadyForContinue => {
59 self.send(
60 &BacktestRequest::Continue(ContinueParams {
61 advance_count: u64::MAX,
62 ..Default::default()
63 }),
64 timeout,
65 )
66 .await?;
67 }
68 other => {
69 tracing::warn!("ignoring {other:?} while waiting for DiscoveryBatch");
70 }
71 }
72 };
73
74 // Phase 2: execute up to (not including) the discovered batch.
75 self.send(
76 &BacktestRequest::ContinueTo(ContinueToParams {
77 slot: discovery.slot,
78 batch_index: Some(discovery.batch_index),
79 }),
80 timeout,
81 )
82 .await?;
83
84 // Phase 3: consume until Paused.
85 // Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
86 // so next_response keeps reading from the websocket. After Paused arrives
87 // they are flushed to self.backlog for the next advance_to_discovery call.
88 let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
89 let paused: PausedEvent = loop {
90 let Some(response) = self.next_response(timeout).await? else {
91 return Err(BacktestClientError::Closed {
92 reason: "websocket closed while waiting for Paused after ContinueTo"
93 .to_string(),
94 });
95 };
96 match response {
97 BacktestResponse::Paused(event) => {
98 self.ready_for_continue = true;
99 for d in pending_discoveries {
100 self.push_backlog(d);
101 }
102 break event;
103 }
104 next @ BacktestResponse::DiscoveryBatch(_) => {
105 pending_discoveries.push(next);
106 }
107 BacktestResponse::SlotNotification(_)
108 | BacktestResponse::Status { .. }
109 | BacktestResponse::Success
110 | BacktestResponse::ReadyForContinue => {}
111 BacktestResponse::Completed { .. } => {
112 return Ok(DiscoveryStepResult::Completed);
113 }
114 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
115 other => {
116 return Err(BacktestClientError::UnexpectedResponse {
117 context: "waiting for Paused after ContinueTo",
118 response: Box::new(other),
119 });
120 }
121 }
122 };
123
124 Ok(DiscoveryStepResult::Paused(DiscoveryPause {
125 discovery,
126 paused,
127 }))
128 }
129}