Skip to main content

simulator_client/managed/
session.rs

1use std::{collections::VecDeque, sync::Arc, time::Duration};
2
3use simulator_api::{
4    AgentStatsReport, BacktestError, BacktestStatus, ContinueParams, ContinueToParams,
5    CreateBacktestSessionRequest, DiscoveryBatchEvent, PausedEvent, SessionSummary,
6};
7use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta;
8use thiserror::Error;
9use tokio::sync::watch;
10use tokio_util::sync::CancellationToken;
11
12use super::{
13    ConnectionStatus, ControlEvent, ControlHandle, ReconnectCoordinator, SessionInfo,
14    SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
15    spawn_action_subscription_manager, spawn_control_manager,
16    spawn_transaction_subscription_manager,
17};
18use crate::subscriptions::{AccountDiffNotification, ActionResultNotification};
19
20/// Error returned by the high-level managed session wrapper.
21#[derive(Debug, Error)]
22pub enum ManagedSessionError {
23    #[error("session create failed: {0}")]
24    Create(String),
25
26    #[error("control channel closed")]
27    ControlClosed,
28
29    #[error("control failed: {0}")]
30    ControlFailed(String),
31
32    #[error("subscription failed: {0}")]
33    SubscriptionFailed(String),
34
35    #[error("cancelled")]
36    Cancelled,
37
38    #[error("control closed while sending continue: {0}")]
39    ContinueSend(String),
40}
41
42#[derive(Debug)]
43pub enum ManagedEvent {
44    ReadyForContinue,
45    /// Server paused at a `ContinueTo` target. The session is ready for
46    /// another `Continue` or `ContinueTo` from this point.
47    Paused(PausedEvent),
48    /// Server discovered an upcoming batch matching a registered
49    /// `DiscoveryFilter`. Send `send_continue_to(slot, batch_index)` to pause
50    /// immediately before it executes.
51    DiscoveryBatch(DiscoveryBatchEvent),
52    Slot(u64),
53    Status(BacktestStatus),
54    /// `agent_stats` is always `None` for [`super::ParallelSubSession`] —
55    /// the multiplexed wire carries only the summary.
56    Completed {
57        summary: Option<SessionSummary>,
58        agent_stats: Option<Vec<AgentStatsReport>>,
59    },
60    Error(BacktestError),
61    Transaction(Box<EncodedConfirmedTransactionWithStatusMeta>),
62    AccountDiff(AccountDiffNotification),
63    ActionResult(ActionResultNotification),
64}
65
66/// Idle backstop for the completion drain: if no trailing notification arrives
67/// for this long after `Completed`, assume the data plane is stalled and stop
68/// waiting. Bounds the gap *between* notifications, not total drain time, so a
69/// slow-but-flowing stream (large backlog over a slow link) is never truncated.
70const DEFAULT_COMPLETION_DRAIN_TIMEOUT: Duration = Duration::from_secs(60);
71
72/// Result of [`ManagedBacktestSession::drain_until_subscriptions_complete`].
73pub(super) enum DrainOutcome {
74    /// Every subscription delivered its terminal (channels closed), or the
75    /// session was cancelled. The drained events are complete up to the stop.
76    Complete(Vec<ManagedEvent>),
77    /// The idle backstop fired with subscriptions still open — trailing
78    /// notifications never arrived, so the run is incomplete. Carries whatever
79    /// was drained before the stall.
80    Stalled(Vec<ManagedEvent>),
81}
82
83/// High-level managed backtest session.
84///
85/// This wrapper owns the control manager, supported subscription managers,
86/// cancellation, status gating, and shutdown order. Callers only need to react
87/// to [`ManagedEvent`]s and send [`ContinueParams`] after `ReadyForContinue`.
88pub struct ManagedBacktestSession {
89    session_info: SessionInfo,
90    control: Option<ControlHandle>,
91    subscriptions: Vec<SubscriptionHandle>,
92    session_cancel: CancellationToken,
93    /// Notifications drained on `Completed`, followed by `Completed`; served in
94    /// order by `next_event`. `None` until completion.
95    post_completion: Option<VecDeque<ManagedEvent>>,
96    /// Surfaced after `post_completion` drains: set when the completion drain
97    /// stalled (idle backstop fired with subscriptions still open), so a
98    /// silently-truncated run fails loudly instead of reporting `Completed`.
99    post_completion_error: Option<ManagedSessionError>,
100    completion_drain_timeout: Duration,
101    /// Shared across a parallel batch so this session's subscription reconnects
102    /// step aside for still-streaming siblings. `None` for a standalone session.
103    reconnect_coordinator: Option<Arc<ReconnectCoordinator>>,
104}
105
106impl ManagedBacktestSession {
107    /// Start a managed session with an internally owned cancellation token.
108    pub async fn start(
109        url: String,
110        api_key: String,
111        create: CreateBacktestSessionRequest,
112    ) -> Result<Self, ManagedSessionError> {
113        Self::start_with_cancel(url, api_key, create, CancellationToken::new(), None).await
114    }
115
116    /// Start a managed session tied to a caller-owned cancellation token.
117    ///
118    /// Cancelling `parent_cancel` aborts startup and stops manager tasks.
119    ///
120    /// `reconnect_coordinator` is an optional coordinator shared across sibling
121    /// sessions in a parallel batch; a dropped subscription parks on it until
122    /// streaming siblings finish, then reconnects into the freed bandwidth. It
123    /// is handed to the subscription managers spawned by `subscribe_*`. Pass
124    /// `None` for a standalone session.
125    pub async fn start_with_cancel(
126        url: String,
127        api_key: String,
128        create: CreateBacktestSessionRequest,
129        parent_cancel: CancellationToken,
130        reconnect_coordinator: Option<Arc<ReconnectCoordinator>>,
131    ) -> Result<Self, ManagedSessionError> {
132        let session_cancel = parent_cancel.child_token();
133        let mut control = spawn_control_manager(url, api_key, create, session_cancel.clone());
134
135        let session_info = tokio::select! {
136            biased;
137            _ = parent_cancel.cancelled() => {
138                session_cancel.cancel();
139                control.join().await;
140                return Err(ManagedSessionError::Cancelled);
141            }
142            result = control.wait_for_session() => {
143                result.map_err(ManagedSessionError::Create)?
144            }
145        };
146
147        Ok(Self {
148            session_info,
149            control: Some(control),
150            subscriptions: Vec::new(),
151            session_cancel,
152            post_completion: None,
153            post_completion_error: None,
154            completion_drain_timeout: DEFAULT_COMPLETION_DRAIN_TIMEOUT,
155            reconnect_coordinator,
156        })
157    }
158
159    /// Metadata reported by the server when the session was created.
160    pub fn session_info(&self) -> &SessionInfo {
161        &self.session_info
162    }
163
164    /// Override the completion-drain idle timeout (default
165    /// [`DEFAULT_COMPLETION_DRAIN_TIMEOUT`]). The drain gives up only after this
166    /// long with no trailing notification; it does not cap total drain time.
167    pub fn set_completion_drain_timeout(&mut self, idle_timeout: std::time::Duration) {
168        self.completion_drain_timeout = idle_timeout;
169    }
170
171    /// Subscribe to transaction notifications for the configured programs.
172    pub fn subscribe_transactions(&mut self, program_ids: Vec<String>) {
173        self.subscriptions
174            .push(spawn_transaction_subscription_manager(
175                self.session_info.rpc_endpoint.clone(),
176                program_ids,
177                self.session_cancel.clone(),
178                self.reconnect_coordinator.clone(),
179            ));
180    }
181
182    /// Subscribe to account-diff notifications for the configured programs.
183    pub fn subscribe_account_diffs(&mut self, program_ids: Vec<String>) {
184        self.subscriptions
185            .push(spawn_account_diff_subscription_manager(
186                self.session_info.rpc_endpoint.clone(),
187                program_ids,
188                self.session_cancel.clone(),
189                self.reconnect_coordinator.clone(),
190            ));
191    }
192
193    /// Subscribe to scheduled-action results. Actions themselves are registered
194    /// in `CreateSessionParams`; this only attaches the result consumer.
195    pub fn subscribe_actions(&mut self) {
196        self.subscriptions.push(spawn_action_subscription_manager(
197            self.session_info.rpc_endpoint.clone(),
198            self.session_cancel.clone(),
199            self.reconnect_coordinator.clone(),
200        ));
201    }
202
203    /// Drain trailing notifications until every subscription delivers its
204    /// end-of-stream terminal (closing its channel), the session is cancelled,
205    /// or the data plane is found to have failed. See the `idle_timeout` arm for
206    /// how reconnecting/parked subscriptions are distinguished from a stall.
207    async fn drain_until_subscriptions_complete(
208        &mut self,
209        idle_timeout: std::time::Duration,
210    ) -> DrainOutcome {
211        drain_subscriptions_until_complete(
212            &mut self.subscriptions,
213            &self.session_cancel,
214            idle_timeout,
215        )
216        .await
217    }
218
219    /// Receive the next control or subscription event.
220    ///
221    /// On `Completed`, trailing subscription notifications are drained and
222    /// delivered before the `Completed` event.
223    pub async fn next_event(&mut self) -> Result<ManagedEvent, ManagedSessionError> {
224        // Serve buffered post-completion events (trailing notifications, then
225        // `Completed`); the control stream is gone once they're exhausted.
226        if let Some(buffered) = self.post_completion.as_mut() {
227            if let Some(event) = buffered.pop_front() {
228                return Ok(event);
229            }
230            // Buffer drained: surface a pending stall error so an incomplete
231            // run fails loudly, else the control stream is simply done.
232            return Err(self
233                .post_completion_error
234                .take()
235                .unwrap_or(ManagedSessionError::ControlClosed));
236        }
237
238        if let Some(event) = try_next_subscription_event(&mut self.subscriptions) {
239            return Ok(event);
240        }
241
242        // Scope the borrows to the `select!` so the completion drain below can
243        // re-borrow `self`.
244        let event = {
245            let cancel = self.session_cancel.clone();
246            let control = self
247                .control
248                .as_mut()
249                .ok_or(ManagedSessionError::ControlClosed)?;
250            let subscriptions = &mut self.subscriptions;
251            tokio::select! {
252                biased;
253                _ = cancel.cancelled() => return Err(ManagedSessionError::Cancelled),
254                event = control.events.recv() => {
255                    event.map(ManagedEvent::from).ok_or(ManagedSessionError::ControlClosed)?
256                }
257                event = wait_any_subscription_event(subscriptions) => event,
258            }
259        };
260
261        // Bind the payload so the re-emitted `Completed` below carries it.
262        let ManagedEvent::Completed {
263            summary,
264            agent_stats,
265        } = event
266        else {
267            return Ok(event);
268        };
269
270        // Flush trailing notifications up to each subscription's terminal,
271        // delivering them before `Completed` so none are dropped.
272        let (mut buffered, terminal): (VecDeque<ManagedEvent>, _) = match self
273            .drain_until_subscriptions_complete(self.completion_drain_timeout)
274            .await
275        {
276            DrainOutcome::Complete(events) => (
277                events.into(),
278                Ok(ManagedEvent::Completed {
279                    summary,
280                    agent_stats,
281                }),
282            ),
283            // The data plane stalled before every subscription finished:
284            // trailing notifications are missing, so report failure rather
285            // than a silently-truncated `Completed`. Deliver whatever was
286            // drained first, then surface the error once the buffer empties.
287            DrainOutcome::Stalled(events) => (
288                events.into(),
289                Err(ManagedSessionError::SubscriptionFailed(
290                    "completion drain stalled: subscriptions did not deliver their \
291                     end-of-stream terminals; the captured stream is incomplete"
292                        .to_string(),
293                )),
294            ),
295        };
296        match terminal {
297            Ok(completed) => buffered.push_back(completed),
298            Err(err) => self.post_completion_error = Some(err),
299        }
300        let first = buffered.pop_front();
301        self.post_completion = Some(buffered);
302        match first {
303            Some(event) => Ok(event),
304            // Nothing buffered and the run failed: surface the error now.
305            None => Err(self
306                .post_completion_error
307                .take()
308                .unwrap_or(ManagedSessionError::ControlClosed)),
309        }
310    }
311
312    /// Wait until the control connection and all subscription connections are
313    /// up, then send a `Continue` request.
314    ///
315    /// Call this after receiving [`ManagedEvent::ReadyForContinue`] or
316    /// [`ManagedEvent::Paused`]. If there are no subscriptions, only the
317    /// control connection is gated.
318    pub async fn send_continue(
319        &mut self,
320        params: ContinueParams,
321    ) -> Result<(), ManagedSessionError> {
322        self.wait_all_up().await?;
323        self.control_mut()?
324            .send_continue(params)
325            .await
326            .map_err(|e| ManagedSessionError::ContinueSend(e.to_string()))
327    }
328
329    /// Wait until the control connection and all subscription connections are
330    /// up, then send a `ContinueTo` request to step to a specific slot/batch
331    /// boundary.
332    ///
333    /// Pair with [`ManagedEvent::DiscoveryBatch`] to drive a discovery-paced
334    /// loop: receive a discovery event, send `ContinueTo(slot, batch_index)`,
335    /// and wait for [`ManagedEvent::Paused`] before inspecting state.
336    pub async fn send_continue_to(
337        &mut self,
338        params: ContinueToParams,
339    ) -> Result<(), ManagedSessionError> {
340        self.wait_all_up().await?;
341        self.control_mut()?
342            .send_continue_to(params)
343            .await
344            .map_err(|e| ManagedSessionError::ContinueSend(e.to_string()))
345    }
346
347    /// Cancel the session and join all manager tasks.
348    pub async fn shutdown(mut self) {
349        self.session_cancel.cancel();
350        if let Some(control) = self.control.take() {
351            control.join().await;
352        }
353        for sub in std::mem::take(&mut self.subscriptions) {
354            let _ = sub.join.await;
355        }
356    }
357
358    fn control_mut(&mut self) -> Result<&mut ControlHandle, ManagedSessionError> {
359        self.control
360            .as_mut()
361            .ok_or(ManagedSessionError::ControlClosed)
362    }
363
364    async fn wait_all_up(&self) -> Result<(), ManagedSessionError> {
365        let control = self
366            .control
367            .as_ref()
368            .ok_or(ManagedSessionError::ControlClosed)?
369            .status
370            .clone();
371        let subscriptions = self
372            .subscriptions
373            .iter()
374            .map(|s| s.status.clone())
375            .collect();
376        wait_connections_up(control, subscriptions, &self.session_cancel).await
377    }
378}
379
380/// Block until the control connection and every subscription connection report
381/// `Up`, returning an error if any reports `Failed` or the token is cancelled.
382/// Shared by the single-session and parallel sub-session drivers.
383pub(super) async fn wait_connections_up(
384    mut control: watch::Receiver<ConnectionStatus>,
385    mut subscriptions: Vec<watch::Receiver<ConnectionStatus>>,
386    cancel: &CancellationToken,
387) -> Result<(), ManagedSessionError> {
388    loop {
389        let control_status = control.borrow().clone();
390        if let ConnectionStatus::Failed(why) = &control_status {
391            return Err(ManagedSessionError::ControlFailed(why.clone()));
392        }
393
394        let mut all_subscriptions_up = true;
395        for subscription in &subscriptions {
396            match &*subscription.borrow() {
397                ConnectionStatus::Failed(why) => {
398                    return Err(ManagedSessionError::SubscriptionFailed(why.clone()));
399                }
400                ConnectionStatus::Up => {}
401                _ => all_subscriptions_up = false,
402            }
403        }
404
405        if control_status == ConnectionStatus::Up && all_subscriptions_up {
406            return Ok(());
407        }
408
409        tokio::select! {
410            _ = cancel.cancelled() => return Err(ManagedSessionError::Cancelled),
411            _ = control.changed() => {}
412            _ = wait_any_subscription_change(&mut subscriptions) => {}
413        }
414    }
415}
416
417/// Drain trailing notifications until every subscription delivers its
418/// end-of-stream terminal (closing its channel), `cancel` fires, or the data
419/// plane is found to have stalled. Shared by the single-session and parallel
420/// sub-session drivers. See
421/// [`ManagedBacktestSession::drain_until_subscriptions_complete`].
422///
423/// `idle_timeout` bounds the gap *between* notifications, not total drain time,
424/// so a slow-but-flowing stream (large backlog over a slow link) is never
425/// truncated. A subscription that's `Down` — reconnecting, or parked behind the
426/// reconnect coordinator waiting for siblings to free bandwidth — is *not* a
427/// stall: it resumes via `replayFromSlot` or exhausts its budget and reports
428/// `Failed`. So the idle gap only declares a `Stalled` outcome when an open
429/// subscription is still `Up` (connected but not delivering — a genuinely hung
430/// data plane); a subscription that closed in `Failed` left a truncated stream.
431pub(super) async fn drain_subscriptions_until_complete(
432    subscriptions: &mut [SubscriptionHandle],
433    cancel: &CancellationToken,
434    idle_timeout: std::time::Duration,
435) -> DrainOutcome {
436    let mut events = Vec::new();
437    if subscriptions.is_empty() {
438        return DrainOutcome::Complete(events);
439    }
440    loop {
441        while let Some(event) = try_next_subscription_event(subscriptions) {
442            events.push(event);
443        }
444        if subscriptions.iter().all(|s| s.notifications.is_closed()) {
445            let any_failed = subscriptions
446                .iter()
447                .any(|s| matches!(*s.status.borrow(), ConnectionStatus::Failed(_)));
448            return if any_failed {
449                DrainOutcome::Stalled(events)
450            } else {
451                DrainOutcome::Complete(events)
452            };
453        }
454        tokio::select! {
455            biased;
456            // A cancel is an intentional stop, not a data-plane failure.
457            _ = cancel.cancelled() => return DrainOutcome::Complete(events),
458            // Idle gap elapsed. A still-`Up` subscription should be delivering
459            // but isn't — a hung data plane (stall). If the open subscriptions
460            // are all reconnecting/parked (`Down`), keep waiting: the
461            // coordinator is deferring them on purpose, not failing.
462            _ = tokio::time::sleep(idle_timeout) => {
463                let any_up = subscriptions.iter().any(|s| {
464                    !s.notifications.is_closed()
465                        && matches!(*s.status.borrow(), ConnectionStatus::Up)
466                });
467                if any_up {
468                    return DrainOutcome::Stalled(events);
469                }
470            }
471            received = recv_any_open_subscription(subscriptions) => {
472                // `None` means a channel closed; the loop re-checks all-closed.
473                if let Some(event) = received {
474                    events.push(event);
475                }
476            }
477        }
478    }
479}
480
481impl Drop for ManagedBacktestSession {
482    fn drop(&mut self) {
483        self.session_cancel.cancel();
484    }
485}
486
487pub(super) async fn wait_any_subscription_change(
488    subscriptions: &mut [watch::Receiver<ConnectionStatus>],
489) {
490    if subscriptions.is_empty() {
491        std::future::pending::<()>().await;
492        return;
493    }
494    let _ =
495        futures::future::select_all(subscriptions.iter_mut().map(|s| Box::pin(s.changed()))).await;
496}
497
498pub(super) async fn wait_any_subscription_event(
499    subscriptions: &mut [SubscriptionHandle],
500) -> ManagedEvent {
501    loop {
502        if let Some(event) = try_next_subscription_event(subscriptions) {
503            return event;
504        }
505
506        let futures: Vec<_> = subscriptions
507            .iter_mut()
508            .filter(|s| !s.notifications.is_closed())
509            .map(|s| Box::pin(s.notifications.recv()))
510            .collect();
511
512        if futures.is_empty() {
513            std::future::pending::<()>().await;
514        }
515
516        let (notification, _, _) = futures::future::select_all(futures).await;
517        if let Some(notification) = notification {
518            return notification.into();
519        }
520    }
521}
522
523/// Await the next notification from any still-open subscription channel,
524/// returning `None` when one closes. Unlike [`wait_any_subscription_event`],
525/// which never resolves on closure, this lets the completion drain observe
526/// per-channel end-of-stream.
527pub(super) async fn recv_any_open_subscription(
528    subscriptions: &mut [SubscriptionHandle],
529) -> Option<ManagedEvent> {
530    let futures: Vec<_> = subscriptions
531        .iter_mut()
532        .filter(|s| !s.notifications.is_closed())
533        .map(|s| Box::pin(s.notifications.recv()))
534        .collect();
535
536    if futures.is_empty() {
537        return None;
538    }
539
540    let (notification, _, _) = futures::future::select_all(futures).await;
541    notification.map(Into::into)
542}
543
544pub(super) fn try_next_subscription_event(
545    subscriptions: &mut [SubscriptionHandle],
546) -> Option<ManagedEvent> {
547    for subscription in subscriptions {
548        if let Ok(notification) = subscription.notifications.try_recv() {
549            return Some(notification.into());
550        }
551    }
552    None
553}
554
555impl From<ControlEvent> for ManagedEvent {
556    fn from(event: ControlEvent) -> Self {
557        match event {
558            ControlEvent::ReadyForContinue => Self::ReadyForContinue,
559            ControlEvent::Paused(event) => Self::Paused(event),
560            ControlEvent::DiscoveryBatch(event) => Self::DiscoveryBatch(event),
561            ControlEvent::Slot(slot) => Self::Slot(slot),
562            ControlEvent::Status(status) => Self::Status(status),
563            ControlEvent::Completed {
564                summary,
565                agent_stats,
566            } => Self::Completed {
567                summary,
568                agent_stats,
569            },
570            ControlEvent::Error(error) => Self::Error(error),
571        }
572    }
573}
574
575impl From<SubscriptionNotification> for ManagedEvent {
576    fn from(notification: SubscriptionNotification) -> Self {
577        match notification {
578            SubscriptionNotification::Transaction(transaction) => Self::Transaction(transaction),
579            SubscriptionNotification::AccountDiff(diff) => Self::AccountDiff(diff),
580            SubscriptionNotification::ActionResult(result) => Self::ActionResult(result),
581        }
582    }
583}