simulator-client 0.8.0

Async WebSocket client for the Solana simulator backtest API
Documentation
use std::time::Duration;

use simulator_api::{
    BacktestRequest, BacktestResponse, ContinueParams, ContinueToParams, DiscoveryBatchEvent,
    PausedEvent,
};

use crate::{BacktestClientError, BacktestClientResult, BacktestSession};

/// A batch discovery paired with the pause location immediately before it.
#[derive(Debug, Clone)]
pub struct DiscoveryPause {
    /// The discovery event that triggered the pause.
    pub discovery: DiscoveryBatchEvent,
    /// Where the session paused; state through `batch_index - 1` is visible via RPC.
    pub paused: PausedEvent,
}

/// Result of a single [`BacktestSession::advance_to_discovery`] call.
#[derive(Debug)]
pub enum DiscoveryStepResult {
    /// Session paused before the discovered batch. Inspect state via RPC or
    /// simulate custom transactions, then call `advance_to_discovery` again.
    Paused(DiscoveryPause),
    /// No more discoveries in the session range.
    Completed,
}

impl BacktestSession {
    /// Wait for the next batch matching a registered discovery filter and pause
    /// immediately before it executes.
    ///
    /// `DiscoveryBatch` events are emitted by the server as a background scan
    /// independent of execution — no `Continue` is needed to trigger them.
    /// This method consumes the next queued `DiscoveryBatch`, sends `ContinueTo`
    /// to execute up to (but not including) that batch, then waits for `Paused`.
    ///
    /// While paused, the session's RPC endpoint reflects state through
    /// `batch_index - 1` of the discovered slot — no transaction in the
    /// discovered batch has executed yet.
    pub async fn advance_to_discovery(
        &mut self,
        timeout: Option<Duration>,
    ) -> BacktestClientResult<DiscoveryStepResult> {
        // Phase 1: fetch next DiscoveryBatch or fail if none arrives within the timeout.
        let discovery: DiscoveryBatchEvent = loop {
            let Some(response) = self.next_response(timeout).await? else {
                return Err(BacktestClientError::Closed {
                    reason: "websocket closed while waiting for DiscoveryBatch".to_string(),
                });
            };
            match response {
                BacktestResponse::DiscoveryBatch(event) => break event,
                BacktestResponse::Completed { .. } => return Ok(DiscoveryStepResult::Completed),
                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
                // Server is paused waiting for a Continue. Send one so it can advance
                // to the next discovery or reach end-of-range and emit Completed.
                BacktestResponse::ReadyForContinue => {
                    self.send(
                        &BacktestRequest::Continue(ContinueParams {
                            advance_count: u64::MAX,
                            ..Default::default()
                        }),
                        timeout,
                    )
                    .await?;
                }
                other => {
                    tracing::warn!("ignoring {other:?} while waiting for DiscoveryBatch");
                }
            }
        };

        // Phase 2: execute up to (not including) the discovered batch.
        self.send(
            &BacktestRequest::ContinueTo(ContinueToParams {
                slot: discovery.slot,
                batch_index: Some(discovery.batch_index),
            }),
            timeout,
        )
        .await?;

        // Phase 3: consume until Paused.
        // Extra DiscoveryBatch events are held in a local buffer (not self.backlog)
        // so next_response keeps reading from the websocket. After Paused arrives
        // they are flushed to self.backlog for the next advance_to_discovery call.
        let mut pending_discoveries: Vec<BacktestResponse> = Vec::new();
        let paused: PausedEvent = loop {
            let Some(response) = self.next_response(timeout).await? else {
                return Err(BacktestClientError::Closed {
                    reason: "websocket closed while waiting for Paused after ContinueTo"
                        .to_string(),
                });
            };
            match response {
                BacktestResponse::Paused(event) => {
                    self.ready_for_continue = true;
                    for d in pending_discoveries {
                        self.push_backlog(d);
                    }
                    break event;
                }
                next @ BacktestResponse::DiscoveryBatch(_) => {
                    pending_discoveries.push(next);
                }
                BacktestResponse::SlotNotification(_)
                | BacktestResponse::Status { .. }
                | BacktestResponse::Success
                | BacktestResponse::ReadyForContinue => {}
                BacktestResponse::Completed { .. } => {
                    return Ok(DiscoveryStepResult::Completed);
                }
                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
                other => {
                    return Err(BacktestClientError::UnexpectedResponse {
                        context: "waiting for Paused after ContinueTo",
                        response: Box::new(other),
                    });
                }
            }
        };

        Ok(DiscoveryStepResult::Paused(DiscoveryPause {
            discovery,
            paused,
        }))
    }
}