1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::time::Duration;
use simulator_api::{
BacktestRequest, BacktestResponse, ContinueToParams, DiscoveryBatchEvent, PausedEvent,
};
use crate::{BacktestClientError, BacktestClientResult, BacktestSession};
/// A batch discovery paired with the pause location immediately before it.
#[derive(Debug, Clone)]
pub struct DiscoveryPause {
/// The discovery event that triggered the pause.
pub discovery: DiscoveryBatchEvent,
/// Where the session paused; state through `batch_index - 1` is visible via RPC.
pub paused: PausedEvent,
}
/// Result of a single [`BacktestSession::advance_to_discovery`] call.
#[derive(Debug)]
pub enum DiscoveryStepResult {
/// Session paused before the discovered batch. Inspect state via RPC or
/// simulate custom transactions, then call `advance_to_discovery` again.
Paused(DiscoveryPause),
/// No more discoveries in the session range.
Completed,
}
impl BacktestSession {
/// Wait for the next batch matching a registered discovery filter and pause
/// immediately before it executes.
///
/// `DiscoveryBatch` events are emitted by the server as a background scan
/// independent of execution — no `Continue` is needed to trigger them.
/// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
/// to execute up to (but not including) that batch, then waits for `Paused`.
///
/// While paused, the session's RPC endpoint reflects state through
/// `batch_index - 1` of the discovered slot — no transaction in the
/// discovered batch has executed yet.
pub async fn advance_to_discovery(
&mut self,
timeout: Option<Duration>,
) -> BacktestClientResult<DiscoveryStepResult> {
// Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
let Some(response) = self.next_response(timeout).await? else {
return Err(BacktestClientError::Closed {
reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
});
};
let discovery: DiscoveryBatchEvent = match response {
BacktestResponse::DiscoveryBatch(event) => event,
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
other => {
return Err(BacktestClientError::UnexpectedResponse {
context: "waiting for DiscoveryBatch",
response: Box::new(other),
});
}
};
// Phase 2: execute up to (not including) the discovered batch.
self.send(
&BacktestRequest::ContinueTo(ContinueToParams {
slot: discovery.slot,
batch_index: Some(discovery.batch_index),
}),
timeout,
)
.await?;
// Phase 3: consume until Paused.
// Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
// so next_response keeps reading from the websocket. After Paused arrives
// they are flushed to self.backlog for the next advance_to_discovery call.
let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
let paused: PausedEvent = loop {
let Some(response) = self.next_response(timeout).await? else {
return Err(BacktestClientError::Closed {
reason: "websocket closed while waiting for Paused after ContinueTo"
.to_string(),
});
};
tracing::debug!("<- advance {response:?}");
match response {
BacktestResponse::Paused(event) => {
self.ready_for_continue = true;
for d in pending_discoveries {
self.push_backlog(d);
}
break event;
}
next @ BacktestResponse::DiscoveryBatch(_) => {
pending_discoveries.push(next);
}
BacktestResponse::SlotNotification(_)
| BacktestResponse::Status { .. }
| BacktestResponse::Success
| BacktestResponse::ReadyForContinue => {}
BacktestResponse::Completed { .. } => {
return Ok(DiscoveryStepResult::Completed);
}
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
other => {
return Err(BacktestClientError::UnexpectedResponse {
context: "waiting for Paused after ContinueTo",
response: Box::new(other),
});
}
}
};
Ok(DiscoveryStepResult::Paused(DiscoveryPause {
discovery,
paused,
}))
}
}