Skip to main content

simulator_client/
session.rs

1use std::{
2    borrow::Cow,
3    collections::{BTreeMap, VecDeque},
4    future::Future,
5    time::Duration,
6};
7
8use futures::{SinkExt, StreamExt, stream};
9use simulator_api::{
10    AccountData, BacktestRequest, BacktestResponse, BacktestStatus, ContinueParams,
11    CreateSessionParams, SessionSummary,
12};
13use solana_address::Address;
14use solana_client::{
15    nonblocking::rpc_client::RpcClient,
16    rpc_response::{Response, RpcLogsResponse},
17};
18use solana_commitment_config::CommitmentConfig;
19use thiserror::Error;
20use tokio::net::TcpStream;
21use tokio_tungstenite::{
22    MaybeTlsStream, WebSocketStream,
23    tungstenite::{
24        Error as WsError, Message,
25        error::ProtocolError,
26        protocol::{CloseFrame, frame::coding::CloseCode},
27    },
28};
29
30use crate::{
31    BacktestClientError, BacktestClientResult, Continue,
32    injection::{ProgramModError, build_program_injection},
33    subscriptions::{LogSubscriptionHandle, SubscriptionError},
34};
35
36/// Outcome of waiting for readiness on a backtest session.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ReadyOutcome {
39    /// The session is ready to accept a `Continue` request.
40    Ready,
41    /// The session completed before becoming ready.
42    Completed,
43}
44
45/// Summary of responses collected while advancing a backtest.
46#[derive(Debug, Default)]
47pub struct ContinueResult {
48    /// Number of slot notifications observed.
49    pub slot_notifications: u64,
50    /// Last slot received via notification.
51    pub last_slot: Option<u64>,
52    /// Status messages received.
53    pub statuses: Vec<BacktestStatus>,
54    /// Whether the session became ready for the next `Continue`.
55    pub ready_for_continue: bool,
56    /// Whether the session completed while advancing.
57    pub completed: bool,
58}
59
60/// Mutable state for driving a session with `advance_step`.
61#[derive(Debug)]
62pub struct AdvanceState {
63    /// Expected number of slot notifications for this step.
64    pub expected_slots: u64,
65    /// Count of slot notifications received so far.
66    pub slot_notifications: u64,
67    /// Most recent slot notification.
68    pub last_slot: Option<u64>,
69    /// Status messages received so far.
70    pub statuses: Vec<BacktestStatus>,
71    /// Whether the session is ready for another `Continue`.
72    pub ready_for_continue: bool,
73    /// Whether the session completed while advancing.
74    pub completed: bool,
75    /// Session summary received on completion (if send_summary was enabled).
76    pub summary: Option<SessionSummary>,
77}
78
79impl AdvanceState {
80    /// Create new tracking state for a step that expects `expected_slots` notifications.
81    pub fn new(expected_slots: u64) -> Self {
82        Self {
83            expected_slots,
84            slot_notifications: 0,
85            last_slot: None,
86            statuses: Vec::new(),
87            ready_for_continue: false,
88            completed: false,
89            summary: None,
90        }
91    }
92
93    /// Return true when the step is complete based on readiness and slot count.
94    pub fn is_done(&self, wait_for_slots: bool) -> bool {
95        if self.completed {
96            return true;
97        }
98
99        if !self.ready_for_continue {
100            return false;
101        }
102
103        !wait_for_slots || self.slot_notifications >= self.expected_slots
104    }
105}
106
107/// Coverage of a session's observed slot notifications and completion state.
108#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
109pub struct SessionCoverage {
110    completed: bool,
111    highest_slot_seen: Option<u64>,
112}
113
114impl SessionCoverage {
115    /// Record a slot notification.
116    pub fn observe_slot(&mut self, slot: u64) {
117        self.highest_slot_seen = Some(
118            self.highest_slot_seen
119                .map_or(slot, |current| current.max(slot)),
120        );
121    }
122
123    /// Mark the session as completed.
124    pub fn mark_completed(&mut self) {
125        self.completed = true;
126    }
127
128    /// Update coverage state from a backtest response.
129    pub fn observe_response(&mut self, response: &BacktestResponse) {
130        match response {
131            BacktestResponse::SlotNotification(slot) => self.observe_slot(*slot),
132            BacktestResponse::Completed { .. } => self.mark_completed(),
133            _ => {}
134        }
135    }
136
137    /// Return whether completion has been observed.
138    pub fn is_completed(&self) -> bool {
139        self.completed
140    }
141
142    /// Return the highest slot observed via slot notifications.
143    pub fn highest_slot_seen(&self) -> Option<u64> {
144        self.highest_slot_seen
145    }
146
147    /// Validate that the session completed and reached `expected_end_slot`.
148    pub fn validate_end_slot(&self, expected_end_slot: u64) -> Result<(), CoverageError> {
149        if !self.completed {
150            return Err(CoverageError::NotCompleted);
151        }
152
153        let Some(actual_end_slot) = self.highest_slot_seen else {
154            return Err(CoverageError::NoSlotsObserved);
155        };
156
157        if actual_end_slot < expected_end_slot {
158            return Err(CoverageError::RangeNotReached {
159                actual_end_slot,
160                expected_end_slot,
161            });
162        }
163
164        Ok(())
165    }
166}
167
168/// Coverage validation failures for a backtest session.
169#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
170pub enum CoverageError {
171    #[error("ended before completion")]
172    NotCompleted,
173    #[error("completed without slot notifications")]
174    NoSlotsObserved,
175    #[error("completed at slot {actual_end_slot} but expected at least {expected_end_slot}")]
176    RangeNotReached {
177        actual_end_slot: u64,
178        expected_end_slot: u64,
179    },
180}
181
182/// Active backtest session over a WebSocket connection.
183///
184/// Dropping the session sends a best-effort close frame if a Tokio runtime is
185/// available. Call [`BacktestSession::close`] to request server-side cleanup.
186pub struct BacktestSession {
187    ws: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
188    session_id: Option<String>,
189    rpc_endpoint: Option<String>,
190    rpc: Option<RpcClient>,
191    ready_for_continue: bool,
192    request_timeout: Option<Duration>,
193    log_raw: bool,
194    backlog: VecDeque<BacktestResponse>,
195}
196
197impl BacktestSession {
198    pub(crate) fn new(
199        ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
200        request_timeout: Option<Duration>,
201        log_raw: bool,
202    ) -> Self {
203        Self {
204            ws: Some(ws),
205            session_id: None,
206            rpc_endpoint: None,
207            rpc: None,
208            ready_for_continue: false,
209            request_timeout,
210            log_raw,
211            backlog: VecDeque::new(),
212        }
213    }
214
215    /// Return the server-assigned session id, if known.
216    pub fn session_id(&self) -> Option<&str> {
217        self.session_id.as_deref()
218    }
219
220    /// Return the session-scoped RPC endpoint if provided.
221    pub fn rpc_endpoint(&self) -> Option<&str> {
222        self.rpc_endpoint.as_deref()
223    }
224
225    /// Return the RPC client for this session's endpoint.
226    ///
227    /// Always available after [`BacktestClient::create_session`](crate::BacktestClient::create_session) completes.
228    pub fn rpc(&self) -> &RpcClient {
229        self.rpc
230            .as_ref()
231            .expect("rpc is set during session creation")
232    }
233
234    /// Return whether the session is currently ready to accept `Continue`.
235    pub fn is_ready_for_continue(&self) -> bool {
236        self.ready_for_continue
237    }
238
239    /// Update internal readiness state based on a response.
240    pub fn apply_response(&mut self, response: &BacktestResponse) {
241        match response {
242            BacktestResponse::ReadyForContinue => {
243                self.ready_for_continue = true;
244            }
245            BacktestResponse::Completed { .. } => {
246                self.ready_for_continue = false;
247            }
248            _ => {}
249        }
250    }
251
252    fn ws_mut(&mut self) -> BacktestClientResult<&mut WebSocketStream<MaybeTlsStream<TcpStream>>> {
253        self.ws.as_mut().ok_or_else(|| BacktestClientError::Closed {
254            reason: "websocket closed".to_string(),
255        })
256    }
257
258    pub(crate) async fn create(
259        &mut self,
260        params: CreateSessionParams,
261        rpc_base_url: String,
262    ) -> BacktestClientResult<()> {
263        self.send(&BacktestRequest::CreateBacktestSession(params), None)
264            .await?;
265
266        loop {
267            let response =
268                self.next_response(None)
269                    .await?
270                    .ok_or_else(|| BacktestClientError::Closed {
271                        reason: "websocket ended before SessionCreated".to_string(),
272                    })?;
273
274            match response {
275                BacktestResponse::SessionCreated {
276                    session_id,
277                    rpc_endpoint,
278                } => {
279                    self.session_id = Some(session_id);
280                    let resolved = resolve_rpc_url(&rpc_base_url, &rpc_endpoint);
281                    self.rpc = Some(RpcClient::new_with_commitment(
282                        resolved.clone(),
283                        CommitmentConfig::confirmed(),
284                    ));
285                    self.rpc_endpoint = Some(resolved);
286                    return Ok(());
287                }
288                BacktestResponse::ReadyForContinue => {
289                    self.ready_for_continue = true;
290                }
291                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
292                other => {
293                    self.backlog.push_back(other);
294                }
295            }
296        }
297    }
298
299    /// Send a raw backtest request over the WebSocket.
300    pub async fn send(
301        &mut self,
302        request: &BacktestRequest,
303        timeout: Option<Duration>,
304    ) -> BacktestClientResult<()> {
305        let text = serde_json::to_string(request)
306            .map_err(|source| BacktestClientError::SerializeRequest { source })?;
307
308        let request_timeout = self.request_timeout;
309        let timeout = timeout.or(request_timeout);
310
311        let send_fut = self.ws_mut()?.send(Message::Text(text));
312        let send_result = match timeout {
313            Some(duration) => tokio::time::timeout(duration, send_fut)
314                .await
315                .map_err(|_| BacktestClientError::Timeout {
316                    action: "sending",
317                    duration,
318                })?,
319            None => send_fut.await,
320        };
321
322        send_result.map_err(|source| BacktestClientError::WebSocket {
323            action: "sending",
324            source: Box::new(source),
325        })?;
326
327        Ok(())
328    }
329
330    /// Receive the next response, using the backlog first.
331    pub async fn next_response(
332        &mut self,
333        timeout: Option<Duration>,
334    ) -> BacktestClientResult<Option<BacktestResponse>> {
335        if let Some(response) = self.backlog.pop_front() {
336            return Ok(Some(response));
337        }
338
339        let text = match self.next_text(timeout).await? {
340            Some(text) => text,
341            None => return Ok(None),
342        };
343
344        let response = serde_json::from_str::<BacktestResponse>(&text).map_err(|source| {
345            BacktestClientError::DeserializeResponse {
346                raw: text.clone(),
347                source,
348            }
349        })?;
350
351        Ok(Some(response))
352    }
353
354    /// Receive the next response and update readiness state.
355    pub async fn next_event(
356        &mut self,
357        timeout: Option<Duration>,
358    ) -> BacktestClientResult<Option<BacktestResponse>> {
359        let response = self.next_response(timeout).await?;
360        if let Some(ref response) = response {
361            self.apply_response(response);
362        }
363        Ok(response)
364    }
365
366    /// Stream responses, updating readiness state as items arrive.
367    ///
368    /// This consumes the session and yields responses until the connection ends.
369    pub fn responses(
370        self,
371        timeout: Option<Duration>,
372    ) -> impl futures::Stream<Item = BacktestClientResult<BacktestResponse>> {
373        stream::unfold(Some(self), move |state| async move {
374            let mut session = match state {
375                Some(session) => session,
376                None => return None,
377            };
378
379            match session.next_response(timeout).await {
380                Ok(Some(response)) => {
381                    session.apply_response(&response);
382                    Some((Ok(response), Some(session)))
383                }
384                Ok(None) => None,
385                Err(err) => Some((Err(err), None)),
386            }
387        })
388    }
389
390    /// Wait for the session to become ready or completed.
391    pub async fn ensure_ready(
392        &mut self,
393        timeout: Option<Duration>,
394    ) -> BacktestClientResult<ReadyOutcome> {
395        if self.ready_for_continue {
396            return Ok(ReadyOutcome::Ready);
397        }
398
399        loop {
400            let response =
401                self.next_response(timeout)
402                    .await?
403                    .ok_or_else(|| BacktestClientError::Closed {
404                        reason: "websocket ended while waiting for ReadyForContinue".to_string(),
405                    })?;
406
407            match response {
408                BacktestResponse::ReadyForContinue => {
409                    self.ready_for_continue = true;
410                    return Ok(ReadyOutcome::Ready);
411                }
412                BacktestResponse::Completed { .. } => return Ok(ReadyOutcome::Completed),
413                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
414                _ => {}
415            }
416        }
417    }
418
419    /// Wait for a specific status to be emitted.
420    pub async fn wait_for_status(
421        &mut self,
422        desired: BacktestStatus,
423        timeout: Option<Duration>,
424    ) -> BacktestClientResult<()> {
425        let desired = std::mem::discriminant(&desired);
426
427        loop {
428            let response =
429                self.next_response(timeout)
430                    .await?
431                    .ok_or_else(|| BacktestClientError::Closed {
432                        reason: "websocket ended while waiting for status".to_string(),
433                    })?;
434
435            match response {
436                BacktestResponse::Status { status }
437                    if std::mem::discriminant(&status) == desired =>
438                {
439                    return Ok(());
440                }
441                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
442                BacktestResponse::Completed { summary } => {
443                    return Err(BacktestClientError::UnexpectedResponse {
444                        context: "waiting for status",
445                        response: Box::new(BacktestResponse::Completed { summary }),
446                    });
447                }
448                _ => {}
449            }
450        }
451    }
452
453    /// Send a `Continue` request and reset readiness.
454    pub async fn send_continue(
455        &mut self,
456        params: ContinueParams,
457        timeout: Option<Duration>,
458    ) -> BacktestClientResult<()> {
459        self.ready_for_continue = false;
460        self.send(&BacktestRequest::Continue(params), timeout).await
461    }
462
463    /// Read and apply a single response while advancing.
464    pub async fn advance_step<F>(
465        &mut self,
466        state: &mut AdvanceState,
467        wait_for_slots: bool,
468        timeout: Option<Duration>,
469        on_event: &mut F,
470    ) -> BacktestClientResult<()>
471    where
472        F: FnMut(&BacktestResponse),
473    {
474        let Some(response) = self.next_response(timeout).await? else {
475            return Err(BacktestClientError::Closed {
476                reason: "websocket ended while awaiting continue responses".to_string(),
477            });
478        };
479
480        if self.log_raw {
481            tracing::debug!("<- {response:?}");
482        }
483
484        on_event(&response);
485
486        match response {
487            BacktestResponse::ReadyForContinue => {
488                self.ready_for_continue = true;
489                state.ready_for_continue = true;
490            }
491            BacktestResponse::SlotNotification(slot) => {
492                state.slot_notifications += 1;
493                state.last_slot = Some(slot);
494            }
495            BacktestResponse::Status { status } => {
496                state.statuses.push(status);
497            }
498            BacktestResponse::Success => {}
499            BacktestResponse::Completed { summary } => {
500                state.completed = true;
501                state.summary = summary;
502            }
503            BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
504            BacktestResponse::SessionCreated { .. } | BacktestResponse::SessionAttached { .. } => {
505                return Err(BacktestClientError::UnexpectedResponse {
506                    context: "continuing",
507                    response: Box::new(response),
508                });
509            }
510        }
511
512        if wait_for_slots && state.slot_notifications > state.expected_slots {
513            tracing::warn!(
514                "received {} slot notifications (expected {})",
515                state.slot_notifications,
516                state.expected_slots
517            );
518        }
519
520        Ok(())
521    }
522
523    /// Advance until the session becomes ready for another `Continue`.
524    pub async fn continue_until_ready<F>(
525        &mut self,
526        cont: Continue,
527        timeout: Option<Duration>,
528        mut on_event: F,
529    ) -> BacktestClientResult<ContinueResult>
530    where
531        F: FnMut(&BacktestResponse),
532    {
533        let expected_slots = cont.advance_count;
534        self.advance_internal(
535            cont.into_params(),
536            expected_slots,
537            false,
538            timeout,
539            &mut on_event,
540        )
541        .await
542    }
543
544    /// Advance and wait for both readiness and slot notifications.
545    pub async fn advance<F>(
546        &mut self,
547        cont: Continue,
548        timeout: Option<Duration>,
549        mut on_event: F,
550    ) -> BacktestClientResult<ContinueResult>
551    where
552        F: FnMut(&BacktestResponse),
553    {
554        let expected_slots = cont.advance_count;
555        self.advance_internal(
556            cont.into_params(),
557            expected_slots,
558            true,
559            timeout,
560            &mut on_event,
561        )
562        .await
563    }
564
565    async fn advance_internal<F>(
566        &mut self,
567        params: ContinueParams,
568        expected_slots: u64,
569        wait_for_slots: bool,
570        timeout: Option<Duration>,
571        on_event: &mut F,
572    ) -> BacktestClientResult<ContinueResult>
573    where
574        F: FnMut(&BacktestResponse),
575    {
576        self.send_continue(params, timeout).await?;
577
578        let mut state = AdvanceState::new(expected_slots);
579        while !state.is_done(wait_for_slots) {
580            self.advance_step(&mut state, wait_for_slots, timeout, on_event)
581                .await?;
582        }
583
584        Ok(ContinueResult {
585            slot_notifications: state.slot_notifications,
586            last_slot: state.last_slot,
587            statuses: state.statuses,
588            ready_for_continue: state.ready_for_continue,
589            completed: state.completed,
590        })
591    }
592
593    /// Build a program-data account modification from raw ELF bytes.
594    ///
595    /// Derives the `ProgramData` address from `program_id`, queries the session's RPC endpoint
596    /// for the current slot and rent-exempt minimum, then returns a modification map ready to
597    /// pass to [`Continue::builder().modify_accounts(...)`](crate::Continue).
598    ///
599    /// The deploy slot is set to `current_slot - 1` so the program appears deployed before the
600    /// next executed slot.
601    pub async fn modify_program(
602        &self,
603        program_id: &str,
604        elf: &[u8],
605    ) -> Result<BTreeMap<Address, AccountData>, ProgramModError> {
606        let rpc = self.rpc.as_ref().ok_or(ProgramModError::NoRpcEndpoint)?;
607
608        let program_addr: Address =
609            program_id
610                .parse()
611                .map_err(|_| ProgramModError::InvalidProgramId {
612                    id: program_id.to_string(),
613                })?;
614        let programdata_addr = solana_loader_v3_interface::get_program_data_address(&program_addr);
615
616        let slot = rpc.get_slot().await.map_err(|e| ProgramModError::Rpc {
617            source: Box::new(e),
618        })?;
619
620        // 13 = 4 (variant) + 8 (slot) + 1 (None upgrade-authority discriminant)
621        let data_len = 13 + elf.len();
622        let lamports = rpc
623            .get_minimum_balance_for_rent_exemption(data_len)
624            .await
625            .map_err(|e| ProgramModError::Rpc {
626                source: Box::new(e),
627            })?;
628
629        Ok(build_program_injection(
630            programdata_addr,
631            elf,
632            slot.saturating_sub(1),
633            None,
634            lamports,
635        ))
636    }
637
638    /// Subscribe to program log notifications using the session's RPC endpoint.
639    ///
640    /// Equivalent to calling [`subscribe_program_logs`](crate::subscribe_program_logs) with
641    /// the endpoint from [`rpc_endpoint`](Self::rpc_endpoint), which is set after
642    /// [`BacktestClient::create_session`](crate::BacktestClient::create_session) completes.
643    pub async fn subscribe_program_logs<F, Fut>(
644        &self,
645        program_id: &str,
646        commitment: CommitmentConfig,
647        on_notification: F,
648    ) -> Result<LogSubscriptionHandle, SubscriptionError>
649    where
650        F: Fn(Response<RpcLogsResponse>) -> Fut + Send + Sync + 'static,
651        Fut: Future<Output = ()> + Send + 'static,
652    {
653        let rpc_endpoint = self
654            .rpc_endpoint
655            .as_deref()
656            .ok_or(SubscriptionError::NoRpcEndpoint)?;
657        crate::subscriptions::subscribe_program_logs(
658            rpc_endpoint,
659            program_id,
660            commitment,
661            on_notification,
662        )
663        .await
664    }
665
666    /// Request server cleanup and close the underlying WebSocket.
667    ///
668    /// This is idempotent and will return `Ok(())` if the connection is already closed.
669    pub async fn close(&mut self, timeout: Option<Duration>) -> BacktestClientResult<()> {
670        self.close_with_frame(timeout, None).await
671    }
672
673    /// Close the session with a specific WebSocket close frame.
674    pub async fn close_with_frame(
675        &mut self,
676        timeout: Option<Duration>,
677        frame: Option<CloseFrame<'static>>,
678    ) -> BacktestClientResult<()> {
679        if self.ws.is_none() {
680            return Ok(());
681        }
682
683        let mut sent = false;
684        match self
685            .send(&BacktestRequest::CloseBacktestSession, timeout)
686            .await
687        {
688            Ok(()) => sent = true,
689            Err(err) if is_close_ok(&err) => {}
690            Err(err) => return Err(err),
691        }
692
693        if sent {
694            let response = match self.next_response(timeout).await {
695                Ok(Some(r)) => r,
696                Ok(None) => {
697                    self.ws.take();
698                    return Ok(());
699                }
700                Err(BacktestClientError::Closed { .. }) => {
701                    self.ws.take();
702                    return Ok(());
703                }
704                Err(BacktestClientError::WebSocket {
705                    action: "receiving",
706                    source,
707                }) if is_reset_without_close(&source) => {
708                    self.ws.take();
709                    return Ok(());
710                }
711                Err(err) => return Err(err),
712            };
713
714            match response {
715                BacktestResponse::Success | BacktestResponse::Completed { .. } => {}
716                BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
717                other => {
718                    return Err(BacktestClientError::UnexpectedResponse {
719                        context: "closing session",
720                        response: Box::new(other),
721                    });
722                }
723            }
724        }
725
726        if let Some(ws) = self.ws.as_mut() {
727            let close_result = ws.close(frame).await;
728            if let Err(source) = close_result
729                && !is_ws_closed_error(&source)
730            {
731                return Err(BacktestClientError::WebSocket {
732                    action: "closing",
733                    source: Box::new(source),
734                });
735            }
736        }
737
738        // Await the close confirmation
739        match self.next_response(timeout).await {
740            Ok(_) => {}
741            Err(BacktestClientError::Closed { .. }) => {}
742            Err(BacktestClientError::WebSocket {
743                action: "receiving",
744                source,
745            }) if is_reset_without_close(&source) => {}
746            Err(err) => return Err(err),
747        }
748
749        // Give some time for the close to propagate
750        tokio::time::sleep(Duration::from_millis(100)).await;
751        self.ws.take();
752        Ok(())
753    }
754
755    /// Close the session with a close code and reason.
756    pub async fn close_with_reason(
757        &mut self,
758        timeout: Option<Duration>,
759        code: CloseCode,
760        reason: impl Into<String>,
761    ) -> BacktestClientResult<()> {
762        let frame = CloseFrame {
763            code,
764            reason: Cow::Owned(reason.into()),
765        };
766        self.close_with_frame(timeout, Some(frame)).await
767    }
768
769    async fn next_text(
770        &mut self,
771        timeout: Option<Duration>,
772    ) -> BacktestClientResult<Option<String>> {
773        loop {
774            let request_timeout = self.request_timeout;
775            let timeout = timeout.or(request_timeout);
776
777            let next_fut = self.ws_mut()?.next();
778            let msg = match timeout {
779                Some(duration) => tokio::time::timeout(duration, next_fut)
780                    .await
781                    .map_err(|_| BacktestClientError::Timeout {
782                        action: "receiving",
783                        duration,
784                    })?,
785                None => next_fut.await,
786            };
787
788            let Some(msg) = msg else {
789                return Ok(None);
790            };
791
792            let msg = match msg {
793                Ok(msg) => msg,
794                Err(source) => {
795                    return Err(BacktestClientError::WebSocket {
796                        action: "receiving",
797                        source: Box::new(source),
798                    });
799                }
800            };
801
802            match msg {
803                Message::Text(text) => {
804                    if self.log_raw {
805                        tracing::debug!("<- raw: {text}");
806                    }
807                    return Ok(Some(text));
808                }
809                Message::Binary(bin) => match String::from_utf8(bin) {
810                    Ok(text) => {
811                        if self.log_raw {
812                            tracing::debug!("<- raw(bin): {text}");
813                        }
814                        return Ok(Some(text));
815                    }
816                    Err(err) => {
817                        tracing::warn!("discarding non-utf8 binary message: {err}");
818                        continue;
819                    }
820                },
821                Message::Close(frame) => {
822                    let reason = close_reason(frame);
823                    return Err(BacktestClientError::Closed { reason });
824                }
825                Message::Ping(_) | Message::Pong(_) => continue,
826                Message::Frame(_) => continue,
827            }
828        }
829    }
830}
831
832impl std::fmt::Debug for BacktestSession {
833    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
834        f.debug_struct("BacktestSession")
835            .field("session_id", &self.session_id)
836            .field("rpc_endpoint", &self.rpc_endpoint)
837            .field(
838                "rpc",
839                &self
840                    .rpc
841                    .as_ref()
842                    .map(|_| "<RpcClient>")
843                    .unwrap_or("<not set>"),
844            )
845            .field("ready_for_continue", &self.ready_for_continue)
846            .field("request_timeout", &self.request_timeout)
847            .finish_non_exhaustive()
848    }
849}
850
851impl Drop for BacktestSession {
852    fn drop(&mut self) {
853        let Some(ws) = self.ws.take() else {
854            return;
855        };
856
857        if let Ok(handle) = tokio::runtime::Handle::try_current() {
858            handle.spawn(async move {
859                let mut ws = ws;
860                let _ = ws.close(None).await;
861            });
862        }
863    }
864}
865
866fn resolve_rpc_url(base: &str, endpoint: &str) -> String {
867    if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
868        endpoint.to_string()
869    } else {
870        format!("{}/{}", base, endpoint.trim_start_matches('/'))
871    }
872}
873
874fn close_reason(frame: Option<CloseFrame<'static>>) -> String {
875    match frame {
876        Some(frame) => format!("{:?}: {}", frame.code, frame.reason),
877        None => "no close frame".to_string(),
878    }
879}
880
881fn is_reset_without_close(err: &WsError) -> bool {
882    matches!(
883        err,
884        WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
885    )
886}
887
888fn is_ws_closed_error(err: &WsError) -> bool {
889    matches!(
890        err,
891        WsError::ConnectionClosed
892            | WsError::AlreadyClosed
893            | WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
894    )
895}
896
897fn is_close_ok(err: &BacktestClientError) -> bool {
898    match err {
899        BacktestClientError::Closed { .. } => true,
900        BacktestClientError::WebSocket { source, .. } => is_ws_closed_error(source),
901        _ => false,
902    }
903}
904
905#[cfg(test)]
906mod tests {
907    use super::*;
908
909    #[test]
910    fn coverage_tracks_slot_and_completion_from_responses() {
911        let mut coverage = SessionCoverage::default();
912        coverage.observe_response(&BacktestResponse::SlotNotification(10));
913        coverage.observe_response(&BacktestResponse::SlotNotification(12));
914        coverage.observe_response(&BacktestResponse::Completed { summary: None });
915
916        assert!(coverage.is_completed());
917        assert_eq!(coverage.highest_slot_seen(), Some(12));
918    }
919
920    #[test]
921    fn coverage_validate_end_slot_checks_completion_and_range() {
922        let mut coverage = SessionCoverage::default();
923        assert_eq!(
924            coverage.validate_end_slot(5),
925            Err(CoverageError::NotCompleted)
926        );
927
928        coverage.mark_completed();
929        assert_eq!(
930            coverage.validate_end_slot(5),
931            Err(CoverageError::NoSlotsObserved)
932        );
933
934        coverage.observe_slot(4);
935        assert_eq!(
936            coverage.validate_end_slot(5),
937            Err(CoverageError::RangeNotReached {
938                actual_end_slot: 4,
939                expected_end_slot: 5,
940            })
941        );
942
943        coverage.observe_slot(6);
944        assert_eq!(coverage.validate_end_slot(5), Ok(()));
945    }
946}