simulator_client/managed/
session.rs1use std::{collections::VecDeque, sync::Arc, time::Duration};
2
3use simulator_api::{
4 AgentStatsReport, BacktestError, BacktestStatus, ContinueParams, ContinueToParams,
5 CreateBacktestSessionRequest, DiscoveryBatchEvent, PausedEvent, SessionSummary,
6};
7use solana_transaction_status::EncodedConfirmedTransactionWithStatusMeta;
8use thiserror::Error;
9use tokio::sync::watch;
10use tokio_util::sync::CancellationToken;
11
12use super::{
13 ConnectionStatus, ControlEvent, ControlHandle, ReconnectCoordinator, SessionInfo,
14 SubscriptionHandle, SubscriptionNotification, spawn_account_diff_subscription_manager,
15 spawn_action_subscription_manager, spawn_control_manager,
16 spawn_transaction_subscription_manager,
17};
18use crate::subscriptions::{AccountDiffNotification, ActionResultNotification};
19
20#[derive(Debug, Error)]
22pub enum ManagedSessionError {
23 #[error("session create failed: {0}")]
24 Create(String),
25
26 #[error("control channel closed")]
27 ControlClosed,
28
29 #[error("control failed: {0}")]
30 ControlFailed(String),
31
32 #[error("subscription failed: {0}")]
33 SubscriptionFailed(String),
34
35 #[error("cancelled")]
36 Cancelled,
37
38 #[error("control closed while sending continue: {0}")]
39 ContinueSend(String),
40}
41
42#[derive(Debug)]
43pub enum ManagedEvent {
44 ReadyForContinue,
45 Paused(PausedEvent),
48 DiscoveryBatch(DiscoveryBatchEvent),
52 Slot(u64),
53 Status(BacktestStatus),
54 Completed {
57 summary: Option<SessionSummary>,
58 agent_stats: Option<Vec<AgentStatsReport>>,
59 },
60 Error(BacktestError),
61 Transaction(Box<EncodedConfirmedTransactionWithStatusMeta>),
62 AccountDiff(AccountDiffNotification),
63 ActionResult(ActionResultNotification),
64}
65
66const DEFAULT_COMPLETION_DRAIN_TIMEOUT: Duration = Duration::from_secs(60);
71
72pub(super) enum DrainOutcome {
74 Complete(Vec<ManagedEvent>),
77 Stalled(Vec<ManagedEvent>),
81}
82
83pub struct ManagedBacktestSession {
89 session_info: SessionInfo,
90 control: Option<ControlHandle>,
91 subscriptions: Vec<SubscriptionHandle>,
92 session_cancel: CancellationToken,
93 post_completion: Option<VecDeque<ManagedEvent>>,
96 post_completion_error: Option<ManagedSessionError>,
100 completion_drain_timeout: Duration,
101 reconnect_coordinator: Option<Arc<ReconnectCoordinator>>,
104}
105
106impl ManagedBacktestSession {
107 pub async fn start(
109 url: String,
110 api_key: String,
111 create: CreateBacktestSessionRequest,
112 ) -> Result<Self, ManagedSessionError> {
113 Self::start_with_cancel(url, api_key, create, CancellationToken::new(), None).await
114 }
115
116 pub async fn start_with_cancel(
126 url: String,
127 api_key: String,
128 create: CreateBacktestSessionRequest,
129 parent_cancel: CancellationToken,
130 reconnect_coordinator: Option<Arc<ReconnectCoordinator>>,
131 ) -> Result<Self, ManagedSessionError> {
132 let session_cancel = parent_cancel.child_token();
133 let mut control = spawn_control_manager(url, api_key, create, session_cancel.clone());
134
135 let session_info = tokio::select! {
136 biased;
137 _ = parent_cancel.cancelled() => {
138 session_cancel.cancel();
139 control.join().await;
140 return Err(ManagedSessionError::Cancelled);
141 }
142 result = control.wait_for_session() => {
143 result.map_err(ManagedSessionError::Create)?
144 }
145 };
146
147 Ok(Self {
148 session_info,
149 control: Some(control),
150 subscriptions: Vec::new(),
151 session_cancel,
152 post_completion: None,
153 post_completion_error: None,
154 completion_drain_timeout: DEFAULT_COMPLETION_DRAIN_TIMEOUT,
155 reconnect_coordinator,
156 })
157 }
158
159 pub fn session_info(&self) -> &SessionInfo {
161 &self.session_info
162 }
163
164 pub fn set_completion_drain_timeout(&mut self, idle_timeout: std::time::Duration) {
168 self.completion_drain_timeout = idle_timeout;
169 }
170
171 pub fn subscribe_transactions(&mut self, program_ids: Vec<String>) {
173 self.subscriptions
174 .push(spawn_transaction_subscription_manager(
175 self.session_info.rpc_endpoint.clone(),
176 program_ids,
177 self.session_cancel.clone(),
178 self.reconnect_coordinator.clone(),
179 ));
180 }
181
182 pub fn subscribe_account_diffs(&mut self, program_ids: Vec<String>) {
184 self.subscriptions
185 .push(spawn_account_diff_subscription_manager(
186 self.session_info.rpc_endpoint.clone(),
187 program_ids,
188 self.session_cancel.clone(),
189 self.reconnect_coordinator.clone(),
190 ));
191 }
192
193 pub fn subscribe_actions(&mut self) {
196 self.subscriptions.push(spawn_action_subscription_manager(
197 self.session_info.rpc_endpoint.clone(),
198 self.session_cancel.clone(),
199 self.reconnect_coordinator.clone(),
200 ));
201 }
202
203 async fn drain_until_subscriptions_complete(
208 &mut self,
209 idle_timeout: std::time::Duration,
210 ) -> DrainOutcome {
211 drain_subscriptions_until_complete(
212 &mut self.subscriptions,
213 &self.session_cancel,
214 idle_timeout,
215 )
216 .await
217 }
218
219 pub async fn next_event(&mut self) -> Result<ManagedEvent, ManagedSessionError> {
224 if let Some(buffered) = self.post_completion.as_mut() {
227 if let Some(event) = buffered.pop_front() {
228 return Ok(event);
229 }
230 return Err(self
233 .post_completion_error
234 .take()
235 .unwrap_or(ManagedSessionError::ControlClosed));
236 }
237
238 if let Some(event) = try_next_subscription_event(&mut self.subscriptions) {
239 return Ok(event);
240 }
241
242 let event = {
245 let cancel = self.session_cancel.clone();
246 let control = self
247 .control
248 .as_mut()
249 .ok_or(ManagedSessionError::ControlClosed)?;
250 let subscriptions = &mut self.subscriptions;
251 tokio::select! {
252 biased;
253 _ = cancel.cancelled() => return Err(ManagedSessionError::Cancelled),
254 event = control.events.recv() => {
255 event.map(ManagedEvent::from).ok_or(ManagedSessionError::ControlClosed)?
256 }
257 event = wait_any_subscription_event(subscriptions) => event,
258 }
259 };
260
261 let ManagedEvent::Completed {
263 summary,
264 agent_stats,
265 } = event
266 else {
267 return Ok(event);
268 };
269
270 let (mut buffered, terminal): (VecDeque<ManagedEvent>, _) = match self
273 .drain_until_subscriptions_complete(self.completion_drain_timeout)
274 .await
275 {
276 DrainOutcome::Complete(events) => (
277 events.into(),
278 Ok(ManagedEvent::Completed {
279 summary,
280 agent_stats,
281 }),
282 ),
283 DrainOutcome::Stalled(events) => (
288 events.into(),
289 Err(ManagedSessionError::SubscriptionFailed(
290 "completion drain stalled: subscriptions did not deliver their \
291 end-of-stream terminals; the captured stream is incomplete"
292 .to_string(),
293 )),
294 ),
295 };
296 match terminal {
297 Ok(completed) => buffered.push_back(completed),
298 Err(err) => self.post_completion_error = Some(err),
299 }
300 let first = buffered.pop_front();
301 self.post_completion = Some(buffered);
302 match first {
303 Some(event) => Ok(event),
304 None => Err(self
306 .post_completion_error
307 .take()
308 .unwrap_or(ManagedSessionError::ControlClosed)),
309 }
310 }
311
312 pub async fn send_continue(
319 &mut self,
320 params: ContinueParams,
321 ) -> Result<(), ManagedSessionError> {
322 self.wait_all_up().await?;
323 self.control_mut()?
324 .send_continue(params)
325 .await
326 .map_err(|e| ManagedSessionError::ContinueSend(e.to_string()))
327 }
328
329 pub async fn send_continue_to(
337 &mut self,
338 params: ContinueToParams,
339 ) -> Result<(), ManagedSessionError> {
340 self.wait_all_up().await?;
341 self.control_mut()?
342 .send_continue_to(params)
343 .await
344 .map_err(|e| ManagedSessionError::ContinueSend(e.to_string()))
345 }
346
347 pub async fn shutdown(mut self) {
349 self.session_cancel.cancel();
350 if let Some(control) = self.control.take() {
351 control.join().await;
352 }
353 for sub in std::mem::take(&mut self.subscriptions) {
354 let _ = sub.join.await;
355 }
356 }
357
358 fn control_mut(&mut self) -> Result<&mut ControlHandle, ManagedSessionError> {
359 self.control
360 .as_mut()
361 .ok_or(ManagedSessionError::ControlClosed)
362 }
363
364 async fn wait_all_up(&self) -> Result<(), ManagedSessionError> {
365 let control = self
366 .control
367 .as_ref()
368 .ok_or(ManagedSessionError::ControlClosed)?
369 .status
370 .clone();
371 let subscriptions = self
372 .subscriptions
373 .iter()
374 .map(|s| s.status.clone())
375 .collect();
376 wait_connections_up(control, subscriptions, &self.session_cancel).await
377 }
378}
379
380pub(super) async fn wait_connections_up(
384 mut control: watch::Receiver<ConnectionStatus>,
385 mut subscriptions: Vec<watch::Receiver<ConnectionStatus>>,
386 cancel: &CancellationToken,
387) -> Result<(), ManagedSessionError> {
388 loop {
389 let control_status = control.borrow().clone();
390 if let ConnectionStatus::Failed(why) = &control_status {
391 return Err(ManagedSessionError::ControlFailed(why.clone()));
392 }
393
394 let mut all_subscriptions_up = true;
395 for subscription in &subscriptions {
396 match &*subscription.borrow() {
397 ConnectionStatus::Failed(why) => {
398 return Err(ManagedSessionError::SubscriptionFailed(why.clone()));
399 }
400 ConnectionStatus::Up => {}
401 _ => all_subscriptions_up = false,
402 }
403 }
404
405 if control_status == ConnectionStatus::Up && all_subscriptions_up {
406 return Ok(());
407 }
408
409 tokio::select! {
410 _ = cancel.cancelled() => return Err(ManagedSessionError::Cancelled),
411 _ = control.changed() => {}
412 _ = wait_any_subscription_change(&mut subscriptions) => {}
413 }
414 }
415}
416
417pub(super) async fn drain_subscriptions_until_complete(
432 subscriptions: &mut [SubscriptionHandle],
433 cancel: &CancellationToken,
434 idle_timeout: std::time::Duration,
435) -> DrainOutcome {
436 let mut events = Vec::new();
437 if subscriptions.is_empty() {
438 return DrainOutcome::Complete(events);
439 }
440 loop {
441 while let Some(event) = try_next_subscription_event(subscriptions) {
442 events.push(event);
443 }
444 if subscriptions.iter().all(|s| s.notifications.is_closed()) {
445 let any_failed = subscriptions
446 .iter()
447 .any(|s| matches!(*s.status.borrow(), ConnectionStatus::Failed(_)));
448 return if any_failed {
449 DrainOutcome::Stalled(events)
450 } else {
451 DrainOutcome::Complete(events)
452 };
453 }
454 tokio::select! {
455 biased;
456 _ = cancel.cancelled() => return DrainOutcome::Complete(events),
458 _ = tokio::time::sleep(idle_timeout) => {
463 let any_up = subscriptions.iter().any(|s| {
464 !s.notifications.is_closed()
465 && matches!(*s.status.borrow(), ConnectionStatus::Up)
466 });
467 if any_up {
468 return DrainOutcome::Stalled(events);
469 }
470 }
471 received = recv_any_open_subscription(subscriptions) => {
472 if let Some(event) = received {
474 events.push(event);
475 }
476 }
477 }
478 }
479}
480
481impl Drop for ManagedBacktestSession {
482 fn drop(&mut self) {
483 self.session_cancel.cancel();
484 }
485}
486
487pub(super) async fn wait_any_subscription_change(
488 subscriptions: &mut [watch::Receiver<ConnectionStatus>],
489) {
490 if subscriptions.is_empty() {
491 std::future::pending::<()>().await;
492 return;
493 }
494 let _ =
495 futures::future::select_all(subscriptions.iter_mut().map(|s| Box::pin(s.changed()))).await;
496}
497
498pub(super) async fn wait_any_subscription_event(
499 subscriptions: &mut [SubscriptionHandle],
500) -> ManagedEvent {
501 loop {
502 if let Some(event) = try_next_subscription_event(subscriptions) {
503 return event;
504 }
505
506 let futures: Vec<_> = subscriptions
507 .iter_mut()
508 .filter(|s| !s.notifications.is_closed())
509 .map(|s| Box::pin(s.notifications.recv()))
510 .collect();
511
512 if futures.is_empty() {
513 std::future::pending::<()>().await;
514 }
515
516 let (notification, _, _) = futures::future::select_all(futures).await;
517 if let Some(notification) = notification {
518 return notification.into();
519 }
520 }
521}
522
523pub(super) async fn recv_any_open_subscription(
528 subscriptions: &mut [SubscriptionHandle],
529) -> Option<ManagedEvent> {
530 let futures: Vec<_> = subscriptions
531 .iter_mut()
532 .filter(|s| !s.notifications.is_closed())
533 .map(|s| Box::pin(s.notifications.recv()))
534 .collect();
535
536 if futures.is_empty() {
537 return None;
538 }
539
540 let (notification, _, _) = futures::future::select_all(futures).await;
541 notification.map(Into::into)
542}
543
544pub(super) fn try_next_subscription_event(
545 subscriptions: &mut [SubscriptionHandle],
546) -> Option<ManagedEvent> {
547 for subscription in subscriptions {
548 if let Ok(notification) = subscription.notifications.try_recv() {
549 return Some(notification.into());
550 }
551 }
552 None
553}
554
555impl From<ControlEvent> for ManagedEvent {
556 fn from(event: ControlEvent) -> Self {
557 match event {
558 ControlEvent::ReadyForContinue => Self::ReadyForContinue,
559 ControlEvent::Paused(event) => Self::Paused(event),
560 ControlEvent::DiscoveryBatch(event) => Self::DiscoveryBatch(event),
561 ControlEvent::Slot(slot) => Self::Slot(slot),
562 ControlEvent::Status(status) => Self::Status(status),
563 ControlEvent::Completed {
564 summary,
565 agent_stats,
566 } => Self::Completed {
567 summary,
568 agent_stats,
569 },
570 ControlEvent::Error(error) => Self::Error(error),
571 }
572 }
573}
574
575impl From<SubscriptionNotification> for ManagedEvent {
576 fn from(notification: SubscriptionNotification) -> Self {
577 match notification {
578 SubscriptionNotification::Transaction(transaction) => Self::Transaction(transaction),
579 SubscriptionNotification::AccountDiff(diff) => Self::AccountDiff(diff),
580 SubscriptionNotification::ActionResult(result) => Self::ActionResult(result),
581 }
582 }
583}