use std::{
borrow::Cow,
collections::{BTreeMap, VecDeque},
future::Future,
time::Duration,
};
use futures::{SinkExt, StreamExt, stream};
use simulator_api::{
AccountData, AccountModifications, AgentStatsReport, BacktestError, BacktestRequest,
BacktestResponse, BacktestStatus, ContinueParams, CreateBacktestSessionRequest,
CreateBacktestSessionRequestV1, SequencedResponse, SessionSummary,
};
use solana_address::Address;
use solana_client::{
nonblocking::rpc_client::RpcClient,
rpc_response::{Response, RpcLogsResponse},
};
use solana_commitment_config::CommitmentConfig;
use thiserror::Error;
use tokio::net::TcpStream;
use tokio_tungstenite::{
MaybeTlsStream, WebSocketStream,
tungstenite::{
Error as WsError, Message,
error::ProtocolError,
protocol::{CloseFrame, frame::coding::CloseCode},
},
};
use crate::{
BacktestClientError, BacktestClientResult, Continue,
injection::ProgramModError,
subscriptions::{
AccountDiffNotification, AccountDiffSubscriptionHandle, LogSubscriptionHandle,
SubscriptionError,
},
};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ReadyOutcome {
Ready,
Completed,
}
#[derive(Debug, Default)]
pub struct ContinueResult {
pub slot_notifications: u64,
pub last_slot: Option<u64>,
pub statuses: Vec<BacktestStatus>,
pub ready_for_continue: bool,
pub completed: bool,
}
#[derive(Debug)]
pub struct AdvanceState {
pub expected_slots: u64,
pub slot_notifications: u64,
pub last_slot: Option<u64>,
pub statuses: Vec<BacktestStatus>,
pub ready_for_continue: bool,
pub completed: bool,
pub summary: Option<SessionSummary>,
pub agent_stats: Option<Vec<AgentStatsReport>>,
}
impl AdvanceState {
pub fn new(expected_slots: u64) -> Self {
Self {
expected_slots,
slot_notifications: 0,
last_slot: None,
statuses: Vec::new(),
ready_for_continue: false,
completed: false,
summary: None,
agent_stats: None,
}
}
pub fn is_done(&self, wait_for_slots: bool) -> bool {
if self.completed {
return true;
}
if !self.ready_for_continue {
return false;
}
!wait_for_slots || self.slot_notifications >= self.expected_slots
}
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct SessionCoverage {
completed: bool,
highest_slot_seen: Option<u64>,
}
impl SessionCoverage {
pub fn observe_slot(&mut self, slot: u64) {
self.highest_slot_seen = Some(
self.highest_slot_seen
.map_or(slot, |current| current.max(slot)),
);
}
pub fn mark_completed(&mut self) {
self.completed = true;
}
pub fn observe_response(&mut self, response: &BacktestResponse) {
match response {
BacktestResponse::SlotNotification(slot) => self.observe_slot(*slot),
BacktestResponse::Completed { .. } => self.mark_completed(),
_ => {}
}
}
pub fn is_completed(&self) -> bool {
self.completed
}
pub fn highest_slot_seen(&self) -> Option<u64> {
self.highest_slot_seen
}
pub fn validate_end_slot(&self, expected_end_slot: u64) -> Result<(), CoverageError> {
if !self.completed {
return Err(CoverageError::NotCompleted);
}
let Some(actual_end_slot) = self.highest_slot_seen else {
return Err(CoverageError::NoSlotsObserved);
};
if actual_end_slot < expected_end_slot {
return Err(CoverageError::RangeNotReached {
actual_end_slot,
expected_end_slot,
});
}
Ok(())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
pub enum CoverageError {
#[error("ended before completion")]
NotCompleted,
#[error("completed without slot notifications")]
NoSlotsObserved,
#[error("completed at slot {actual_end_slot} but expected at least {expected_end_slot}")]
RangeNotReached {
actual_end_slot: u64,
expected_end_slot: u64,
},
}
pub struct BacktestSession {
ws: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
session_id: Option<String>,
rpc_endpoint: Option<String>,
rpc: Option<RpcClient>,
last_sequence: Option<u64>,
pub(crate) ready_for_continue: bool,
request_timeout: Option<Duration>,
log_raw: bool,
backlog: VecDeque<(Option<u64>, BacktestResponse)>,
}
impl BacktestSession {
pub(crate) fn new(
ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
request_timeout: Option<Duration>,
log_raw: bool,
) -> Self {
Self {
ws: Some(ws),
session_id: None,
rpc_endpoint: None,
rpc: None,
last_sequence: None,
ready_for_continue: false,
request_timeout,
log_raw,
backlog: VecDeque::new(),
}
}
pub fn session_id(&self) -> Option<&str> {
self.session_id.as_deref()
}
pub fn rpc_endpoint(&self) -> Option<&str> {
self.rpc_endpoint.as_deref()
}
pub fn last_sequence(&self) -> Option<u64> {
self.last_sequence
}
pub fn rpc(&self) -> &RpcClient {
self.rpc
.as_ref()
.expect("rpc is set during session creation")
}
pub fn is_ready_for_continue(&self) -> bool {
self.ready_for_continue
}
pub fn apply_response(&mut self, response: &BacktestResponse) {
match response {
BacktestResponse::ReadyForContinue | BacktestResponse::Paused(_) => {
self.ready_for_continue = true;
}
BacktestResponse::Completed { .. } => {
self.ready_for_continue = false;
}
_ => {}
}
}
fn ws_mut(&mut self) -> BacktestClientResult<&mut WebSocketStream<MaybeTlsStream<TcpStream>>> {
self.ws.as_mut().ok_or_else(|| BacktestClientError::Closed {
reason: "websocket closed".to_string(),
})
}
pub(crate) async fn create_with_request(
&mut self,
request: CreateBacktestSessionRequest,
rpc_base_url: String,
mut on_parallel_session_created: Option<&mut (dyn FnMut(String) + Send)>,
) -> BacktestClientResult<CreateRequestResult> {
let expect_parallel = matches!(
&request,
CreateBacktestSessionRequest::V1(CreateBacktestSessionRequestV1 { parallel: true, .. })
);
self.send(&BacktestRequest::CreateBacktestSession(request), None)
.await?;
let mut streamed_parallel_session_ids = Vec::new();
loop {
let response =
self.next_response(None)
.await?
.ok_or_else(|| BacktestClientError::Closed {
reason: "websocket ended before SessionCreated".to_string(),
})?;
match response {
BacktestResponse::SessionCreated {
session_id,
rpc_endpoint,
} => {
if expect_parallel {
if let Some(callback) = on_parallel_session_created.as_mut() {
(**callback)(session_id.clone());
}
streamed_parallel_session_ids.push(session_id);
continue;
}
let created_session_id = session_id.clone();
self.session_id = Some(session_id);
let resolved = resolve_rpc_url(&rpc_base_url, &rpc_endpoint);
self.rpc = Some(RpcClient::new_with_commitment(
resolved.clone(),
CommitmentConfig::confirmed(),
));
self.rpc_endpoint = Some(resolved);
return Ok(CreateRequestResult::Single {
session_id: created_session_id,
});
}
BacktestResponse::SessionsCreated { session_ids } => {
if expect_parallel && session_ids.is_empty() {
return Ok(CreateRequestResult::Parallel {
session_ids: streamed_parallel_session_ids,
});
}
return Ok(CreateRequestResult::Parallel { session_ids });
}
BacktestResponse::SessionsCreatedV2 { session_ids, .. } => {
if expect_parallel && session_ids.is_empty() {
return Ok(CreateRequestResult::Parallel {
session_ids: streamed_parallel_session_ids,
});
}
return Ok(CreateRequestResult::Parallel { session_ids });
}
BacktestResponse::ReadyForContinue => {
self.ready_for_continue = true;
}
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
other => {
self.backlog.push_back((self.last_sequence, other));
}
}
}
}
pub(crate) async fn attach(
&mut self,
session_id: String,
last_sequence: Option<u64>,
rpc_base_url: String,
) -> BacktestClientResult<()> {
self.send(
&BacktestRequest::AttachBacktestSession {
session_id,
last_sequence,
},
None,
)
.await?;
self.wait_for_response(
|| BacktestClientError::Closed {
reason: "websocket ended before SessionAttached".to_string(),
},
move |session, response| match response {
BacktestResponse::SessionAttached {
session_id,
rpc_endpoint,
} => {
session.session_id = Some(session_id);
let resolved = resolve_rpc_url(&rpc_base_url, &rpc_endpoint);
session.rpc = Some(RpcClient::new_with_commitment(
resolved.clone(),
CommitmentConfig::confirmed(),
));
session.rpc_endpoint = Some(resolved);
Ok(Some(()))
}
BacktestResponse::ReadyForContinue => {
session.ready_for_continue = true;
Ok(None)
}
BacktestResponse::Error(err) => Err(BacktestClientError::Remote(err)),
other => {
session.backlog.push_back((session.last_sequence, other));
Ok(None)
}
},
)
.await
}
pub async fn resume_attached_session(&mut self) -> BacktestClientResult<()> {
self.send(&BacktestRequest::ResumeAttachedSession, None)
.await?;
self.wait_for_response(
|| BacktestClientError::Closed {
reason: "websocket ended before ResumeAttachedSession acknowledgement".to_string(),
},
|session, response| match response {
BacktestResponse::Success => Ok(Some(())),
BacktestResponse::Error(err) => Err(BacktestClientError::Remote(err)),
other => {
session.backlog.push_back((session.last_sequence, other));
Ok(None)
}
},
)
.await
}
async fn wait_for_response<T, E, F>(
&mut self,
closed_error: E,
mut handle_response: F,
) -> BacktestClientResult<T>
where
E: FnOnce() -> BacktestClientError,
F: FnMut(&mut Self, BacktestResponse) -> BacktestClientResult<Option<T>>,
{
let mut closed_error = Some(closed_error);
loop {
let response = self
.next_response(None)
.await?
.ok_or_else(|| closed_error.take().expect("closed error set")())?;
if let Some(result) = handle_response(self, response)? {
return Ok(result);
}
}
}
pub async fn send(
&mut self,
request: &BacktestRequest,
timeout: Option<Duration>,
) -> BacktestClientResult<()> {
let text = serde_json::to_string(request)
.map_err(|source| BacktestClientError::SerializeRequest { source })?;
let request_timeout = self.request_timeout;
let timeout = timeout.or(request_timeout);
let send_fut = self.ws_mut()?.send(Message::Text(text));
let send_result = match timeout {
Some(duration) => tokio::time::timeout(duration, send_fut)
.await
.map_err(|_| BacktestClientError::Timeout {
action: "sending",
duration,
})?,
None => send_fut.await,
};
send_result.map_err(|source| BacktestClientError::WebSocket {
action: "sending",
source: Box::new(source),
})?;
Ok(())
}
pub async fn next_response(
&mut self,
timeout: Option<Duration>,
) -> BacktestClientResult<Option<BacktestResponse>> {
if let Some((sequence, response)) = self.backlog.pop_front() {
self.last_sequence = sequence.or(self.last_sequence);
return Ok(Some(response));
}
let text = match self.next_text(timeout).await? {
Some(text) => text,
None => return Ok(None),
};
let (sequence, response) = match serde_json::from_str::<SequencedResponse>(&text) {
Ok(sequenced) => (Some(sequenced.seq_id), sequenced.response),
Err(_) => {
let response =
serde_json::from_str::<BacktestResponse>(&text).map_err(|source| {
BacktestClientError::DeserializeResponse {
raw: text.clone(),
source,
}
})?;
(None, response)
}
};
self.last_sequence = sequence.or(self.last_sequence);
Ok(Some(response))
}
pub async fn next_event(
&mut self,
timeout: Option<Duration>,
) -> BacktestClientResult<Option<BacktestResponse>> {
let response = self.next_response(timeout).await?;
if let Some(ref response) = response {
self.apply_response(response);
}
Ok(response)
}
pub fn responses(
self,
timeout: Option<Duration>,
) -> impl futures::Stream<Item = BacktestClientResult<BacktestResponse>> {
stream::unfold(Some(self), move |state| async move {
let mut session = match state {
Some(session) => session,
None => return None,
};
match session.next_response(timeout).await {
Ok(Some(response)) => {
session.apply_response(&response);
Some((Ok(response), Some(session)))
}
Ok(None) => None,
Err(err) => Some((Err(err), None)),
}
})
}
pub async fn ensure_ready(
&mut self,
timeout: Option<Duration>,
) -> BacktestClientResult<ReadyOutcome> {
if self.ready_for_continue {
return Ok(ReadyOutcome::Ready);
}
loop {
let response =
self.next_response(timeout)
.await?
.ok_or_else(|| BacktestClientError::Closed {
reason: "websocket ended while waiting for ReadyForContinue".to_string(),
})?;
match response {
BacktestResponse::ReadyForContinue => {
self.ready_for_continue = true;
return Ok(ReadyOutcome::Ready);
}
BacktestResponse::Completed { .. } => return Ok(ReadyOutcome::Completed),
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
_ => {}
}
}
}
pub async fn wait_for_status(
&mut self,
desired: BacktestStatus,
timeout: Option<Duration>,
) -> BacktestClientResult<()> {
let desired = std::mem::discriminant(&desired);
loop {
let response =
self.next_response(timeout)
.await?
.ok_or_else(|| BacktestClientError::Closed {
reason: "websocket ended while waiting for status".to_string(),
})?;
match response {
BacktestResponse::Status { status }
if std::mem::discriminant(&status) == desired =>
{
return Ok(());
}
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
BacktestResponse::Completed {
summary,
agent_stats,
} => {
return Err(BacktestClientError::UnexpectedResponse {
context: "waiting for status",
response: Box::new(BacktestResponse::Completed {
summary,
agent_stats,
}),
});
}
_ => {}
}
}
}
pub(crate) fn push_backlog(&mut self, response: BacktestResponse) {
self.backlog.push_back((None, response));
}
pub async fn send_continue(
&mut self,
params: ContinueParams,
timeout: Option<Duration>,
) -> BacktestClientResult<()> {
self.ready_for_continue = false;
self.send(&BacktestRequest::Continue(params), timeout).await
}
pub async fn advance_step<F>(
&mut self,
state: &mut AdvanceState,
wait_for_slots: bool,
timeout: Option<Duration>,
on_event: &mut F,
) -> BacktestClientResult<()>
where
F: FnMut(&BacktestResponse),
{
let Some(response) = self.next_response(timeout).await? else {
return Err(BacktestClientError::Closed {
reason: "websocket ended while awaiting continue responses".to_string(),
});
};
if self.log_raw {
tracing::debug!("<- {response:?}");
}
on_event(&response);
match response {
BacktestResponse::ReadyForContinue => {
self.ready_for_continue = true;
state.ready_for_continue = true;
}
BacktestResponse::SlotNotification(slot) => {
state.slot_notifications += 1;
state.last_slot = Some(slot);
}
BacktestResponse::Status { status } => {
state.statuses.push(status);
}
BacktestResponse::Success => {}
BacktestResponse::Completed {
summary,
agent_stats,
} => {
state.completed = true;
state.summary = summary;
state.agent_stats = agent_stats;
}
BacktestResponse::Error(err @ BacktestError::SimulationError { .. }) => {
tracing::warn!(error = %crate::error::err_chain(&err), "simulation error");
}
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
BacktestResponse::SessionCreated { .. }
| BacktestResponse::SessionAttached { .. }
| BacktestResponse::SessionsCreated { .. }
| BacktestResponse::SessionsCreatedV2 { .. }
| BacktestResponse::ParallelSessionAttachedV2 { .. }
| BacktestResponse::SessionEventV1 { .. }
| BacktestResponse::SessionEventV2 { .. }
| BacktestResponse::Paused(_)
| BacktestResponse::DiscoveryBatch(_) => {
return Err(BacktestClientError::UnexpectedResponse {
context: "continuing",
response: Box::new(response),
});
}
}
if wait_for_slots && state.slot_notifications > state.expected_slots {
tracing::warn!(
"received {} slot notifications (expected {})",
state.slot_notifications,
state.expected_slots
);
}
Ok(())
}
pub async fn continue_until_ready<F>(
&mut self,
cont: Continue,
timeout: Option<Duration>,
mut on_event: F,
) -> BacktestClientResult<ContinueResult>
where
F: FnMut(&BacktestResponse),
{
let expected_slots = cont.advance_count;
self.advance_internal(
cont.into_params(),
expected_slots,
false,
timeout,
&mut on_event,
)
.await
}
pub async fn advance<F>(
&mut self,
cont: Continue,
timeout: Option<Duration>,
mut on_event: F,
) -> BacktestClientResult<ContinueResult>
where
F: FnMut(&BacktestResponse),
{
let expected_slots = cont.advance_count;
self.advance_internal(
cont.into_params(),
expected_slots,
true,
timeout,
&mut on_event,
)
.await
}
async fn advance_internal<F>(
&mut self,
params: ContinueParams,
expected_slots: u64,
wait_for_slots: bool,
timeout: Option<Duration>,
on_event: &mut F,
) -> BacktestClientResult<ContinueResult>
where
F: FnMut(&BacktestResponse),
{
self.send_continue(params, timeout).await?;
let mut state = AdvanceState::new(expected_slots);
while !state.is_done(wait_for_slots) {
self.advance_step(&mut state, wait_for_slots, timeout, on_event)
.await?;
}
Ok(ContinueResult {
slot_notifications: state.slot_notifications,
last_slot: state.last_slot,
statuses: state.statuses,
ready_for_continue: state.ready_for_continue,
completed: state.completed,
})
}
pub async fn modify_program(
&self,
program_id: &str,
elf: &[u8],
) -> Result<BTreeMap<Address, AccountData>, ProgramModError> {
let rpc = self.rpc.as_ref().ok_or(ProgramModError::NoRpcEndpoint)?;
crate::injection::modify_program_via_rpc(rpc, program_id, elf).await
}
pub async fn modify_accounts(
&self,
modifications: &AccountModifications,
) -> BacktestClientResult<usize> {
let rpc_endpoint =
self.rpc_endpoint
.as_deref()
.ok_or_else(|| BacktestClientError::Closed {
reason: "no RPC endpoint available".to_string(),
})?;
crate::rpc::modify_accounts(rpc_endpoint, modifications).await
}
pub async fn subscribe_program_logs<F, Fut>(
&self,
program_id: &str,
commitment: CommitmentConfig,
on_notification: F,
) -> Result<LogSubscriptionHandle, SubscriptionError>
where
F: Fn(Response<RpcLogsResponse>) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let rpc_endpoint = self
.rpc_endpoint
.as_deref()
.ok_or(SubscriptionError::NoRpcEndpoint)?;
crate::subscriptions::subscribe_program_logs(
rpc_endpoint,
program_id,
commitment,
on_notification,
)
.await
}
pub async fn subscribe_account_diffs<F, Fut>(
&self,
account: &str,
on_notification: F,
) -> Result<AccountDiffSubscriptionHandle, SubscriptionError>
where
F: Fn(AccountDiffNotification) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let rpc_endpoint = self
.rpc_endpoint
.as_deref()
.ok_or(SubscriptionError::NoRpcEndpoint)?;
crate::subscriptions::subscribe_account_diffs(rpc_endpoint, account, on_notification).await
}
pub async fn close(&mut self, timeout: Option<Duration>) -> BacktestClientResult<()> {
self.close_with_frame(timeout, None).await
}
pub async fn close_with_frame(
&mut self,
timeout: Option<Duration>,
frame: Option<CloseFrame<'static>>,
) -> BacktestClientResult<()> {
if self.ws.is_none() {
return Ok(());
}
let mut sent = false;
match self
.send(&BacktestRequest::CloseBacktestSession, timeout)
.await
{
Ok(()) => sent = true,
Err(err) if is_close_ok(&err) => {}
Err(err) => return Err(err),
}
if sent {
let response = match self.next_response(timeout).await {
Ok(Some(r)) => r,
Ok(None) => {
self.ws.take();
return Ok(());
}
Err(BacktestClientError::Closed { .. }) => {
self.ws.take();
return Ok(());
}
Err(BacktestClientError::WebSocket {
action: "receiving",
source,
}) if is_reset_without_close(&source) => {
self.ws.take();
return Ok(());
}
Err(err) => return Err(err),
};
match response {
BacktestResponse::Success | BacktestResponse::Completed { .. } => {}
BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
other => {
return Err(BacktestClientError::UnexpectedResponse {
context: "closing session",
response: Box::new(other),
});
}
}
}
if let Some(ws) = self.ws.as_mut() {
let close_result = ws.close(frame).await;
if let Err(source) = close_result
&& !is_ws_closed_error(&source)
{
return Err(BacktestClientError::WebSocket {
action: "closing",
source: Box::new(source),
});
}
}
match self.next_response(timeout).await {
Ok(_) => {}
Err(BacktestClientError::Closed { .. }) => {}
Err(BacktestClientError::WebSocket {
action: "receiving",
source,
}) if is_reset_without_close(&source) => {}
Err(err) => return Err(err),
}
tokio::time::sleep(Duration::from_millis(100)).await;
self.ws.take();
Ok(())
}
pub async fn close_with_reason(
&mut self,
timeout: Option<Duration>,
code: CloseCode,
reason: impl Into<String>,
) -> BacktestClientResult<()> {
let frame = CloseFrame {
code,
reason: Cow::Owned(reason.into()),
};
self.close_with_frame(timeout, Some(frame)).await
}
async fn next_text(
&mut self,
timeout: Option<Duration>,
) -> BacktestClientResult<Option<String>> {
loop {
let request_timeout = self.request_timeout;
let timeout = timeout.or(request_timeout);
let next_fut = self.ws_mut()?.next();
let msg = match timeout {
Some(duration) => tokio::time::timeout(duration, next_fut)
.await
.map_err(|_| BacktestClientError::Timeout {
action: "receiving",
duration,
})?,
None => next_fut.await,
};
let Some(msg) = msg else {
return Ok(None);
};
let msg = match msg {
Ok(msg) => msg,
Err(source) => {
return Err(BacktestClientError::WebSocket {
action: "receiving",
source: Box::new(source),
});
}
};
match msg {
Message::Text(text) => {
if self.log_raw {
tracing::debug!("<- raw: {text}");
}
return Ok(Some(text));
}
Message::Binary(bin) => match String::from_utf8(bin) {
Ok(text) => {
if self.log_raw {
tracing::debug!("<- raw(bin): {text}");
}
return Ok(Some(text));
}
Err(err) => {
tracing::warn!("discarding non-utf8 binary message: {err}");
continue;
}
},
Message::Close(frame) => {
let reason = close_reason(frame);
return Err(BacktestClientError::Closed { reason });
}
Message::Ping(_) | Message::Pong(_) => continue,
Message::Frame(_) => continue,
}
}
}
}
impl std::fmt::Debug for BacktestSession {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("BacktestSession")
.field("session_id", &self.session_id)
.field("rpc_endpoint", &self.rpc_endpoint)
.field(
"rpc",
&self
.rpc
.as_ref()
.map(|_| "<RpcClient>")
.unwrap_or("<not set>"),
)
.field("ready_for_continue", &self.ready_for_continue)
.field("request_timeout", &self.request_timeout)
.finish_non_exhaustive()
}
}
#[derive(Debug)]
pub(crate) enum CreateRequestResult {
Single { session_id: String },
Parallel { session_ids: Vec<String> },
}
impl Drop for BacktestSession {
fn drop(&mut self) {
let Some(ws) = self.ws.take() else {
return;
};
if let Ok(handle) = tokio::runtime::Handle::try_current() {
handle.spawn(async move {
let mut ws = ws;
let _ = ws.close(None).await;
});
}
}
}
fn resolve_rpc_url(base: &str, endpoint: &str) -> String {
if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
endpoint.to_string()
} else {
format!("{}/{}", base, endpoint.trim_start_matches('/'))
}
}
fn close_reason(frame: Option<CloseFrame<'static>>) -> String {
match frame {
Some(frame) => format!("{:?}: {}", frame.code, frame.reason),
None => "no close frame".to_string(),
}
}
fn is_reset_without_close(err: &WsError) -> bool {
matches!(
err,
WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
)
}
fn is_ws_closed_error(err: &WsError) -> bool {
matches!(
err,
WsError::ConnectionClosed
| WsError::AlreadyClosed
| WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
)
}
fn is_close_ok(err: &BacktestClientError) -> bool {
match err {
BacktestClientError::Closed { .. } => true,
BacktestClientError::WebSocket { source, .. } => is_ws_closed_error(source),
_ => false,
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn coverage_tracks_slot_and_completion_from_responses() {
let mut coverage = SessionCoverage::default();
coverage.observe_response(&BacktestResponse::SlotNotification(10));
coverage.observe_response(&BacktestResponse::SlotNotification(12));
coverage.observe_response(&BacktestResponse::Completed {
summary: None,
agent_stats: None,
});
assert!(coverage.is_completed());
assert_eq!(coverage.highest_slot_seen(), Some(12));
}
#[test]
fn coverage_validate_end_slot_checks_completion_and_range() {
let mut coverage = SessionCoverage::default();
assert_eq!(
coverage.validate_end_slot(5),
Err(CoverageError::NotCompleted)
);
coverage.mark_completed();
assert_eq!(
coverage.validate_end_slot(5),
Err(CoverageError::NoSlotsObserved)
);
coverage.observe_slot(4);
assert_eq!(
coverage.validate_end_slot(5),
Err(CoverageError::RangeNotReached {
actual_end_slot: 4,
expected_end_slot: 5,
})
);
coverage.observe_slot(6);
assert_eq!(coverage.validate_end_slot(5), Ok(()));
}
}