1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
use std::time::Duration;
use simulator_api::{
BacktestRequest, BacktestResponse, ContinueParams, ContinueToParams, DiscoveryBatchEvent,
PausedEvent,
};
use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
/// A batch discovery paired with the pause location immediately before it.
#[derive(Debug, Clone)]
pub struct DiscoveryPause {
/// The discovery event that triggered the pause.
pub discovery: DiscoveryBatchEvent,
/// Where the session paused; state through `batch_index - 1` is visible via RPC.
pub paused: PausedEvent,
}
/// Result of a single [`BacktestSession::advance_to_discovery`] call.
#[derive(Debug)]
pub enum DiscoveryStepResult {
/// Session paused before the discovered batch. Inspect state via RPC or
/// simulate custom transactions, then call `advance_to_discovery` again.
Paused(DiscoveryPause),
/// No more discoveries in the session range.
Completed,
}
impl BacktestSession {
/// Wait for the next batch matching a registered discovery filter and pause
/// immediately before it executes.
///
/// `DiscoveryBatch` events are emitted by the server as a background scan
/// independent of execution — no `Continue` is needed to trigger them.
/// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
/// to execute up to (but not including) that batch, then waits for `Paused`.
///
/// While paused, the session's RPC endpoint reflects state through
/// `batch_index - 1` of the discovered slot — no transaction in the
/// discovered batch has executed yet.
pub async fn advance_to_discovery(
&mut self,
timeout: Option<Duration>,
) -> BacktestClientResult<DiscoveryStepResult> {
// Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
let discovery: DiscoveryBatchEvent = loop {
let Some(response) = self.next_response(timeout).await? else {
return Err(BacktestClientError::Closed {
reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
});
};
match response {
BacktestResponse::DiscoveryBatch(event) => break event,
BacktestResponse::Completed { .. } => return Ok(DiscoveryStepResult::Completed),
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
// Server is paused waiting for a Continue. Send one so it can advance
// to the next discovery or reach end-of-range and emit Completed.
BacktestResponse::ReadyForContinue => {
self.send(
&BacktestRequest::Continue(ContinueParams {
advance_count: u64::MAX,
..Default::default()
}),
timeout,
)
.await?;
}
other => {
tracing::warn!("ignoring {other:?} while waiting for DiscoveryBatch");
}
}
};
// Phase 2: execute up to (not including) the discovered batch.
self.send(
&BacktestRequest::ContinueTo(ContinueToParams {
slot: discovery.slot,
batch_index: Some(discovery.batch_index),
}),
timeout,
)
.await?;
// Phase 3: consume until Paused.
// Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
// so next_response keeps reading from the websocket. After Paused arrives
// they are flushed to self.backlog for the next advance_to_discovery call.
let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
let paused: PausedEvent = loop {
let Some(response) = self.next_response(timeout).await? else {
return Err(BacktestClientError::Closed {
reason: "websocket closed while waiting for Paused after ContinueTo"
.to_string(),
});
};
match response {
BacktestResponse::Paused(event) => {
self.ready_for_continue = true;
for d in pending_discoveries {
self.push_backlog(d);
}
break event;
}
next @ BacktestResponse::DiscoveryBatch(_) => {
pending_discoveries.push(next);
}
BacktestResponse::SlotNotification(_)
| BacktestResponse::Status { .. }
| BacktestResponse::Success
| BacktestResponse::ReadyForContinue => {}
BacktestResponse::Completed { .. } => {
return Ok(DiscoveryStepResult::Completed);
}
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
other => {
return Err(BacktestClientError::UnexpectedResponse {
context: "waiting for Paused after ContinueTo",
response: Box::new(other),
});
}
}
};
Ok(DiscoveryStepResult::Paused(DiscoveryPause {
discovery,
paused,
}))
}
}