1use std::{
2 borrow::Cow,
3 collections::{BTreeMap, VecDeque},
4 future::Future,
5 time::Duration,
6};
7
8use futures::{SinkExt, StreamExt, stream};
9use simulator_api::{
10 AccountData, BacktestRequest, BacktestResponse, BacktestStatus, ContinueParams,
11 CreateSessionParams, SessionSummary,
12};
13use solana_address::Address;
14use solana_client::{
15 nonblocking::rpc_client::RpcClient,
16 rpc_response::{Response, RpcLogsResponse},
17};
18use solana_commitment_config::CommitmentConfig;
19use thiserror::Error;
20use tokio::net::TcpStream;
21use tokio_tungstenite::{
22 MaybeTlsStream, WebSocketStream,
23 tungstenite::{
24 Error as WsError, Message,
25 error::ProtocolError,
26 protocol::{CloseFrame, frame::coding::CloseCode},
27 },
28};
29
30use crate::{
31 BacktestClientError, BacktestClientResult, Continue,
32 injection::{ProgramModError, build_program_injection},
33 subscriptions::{LogSubscriptionHandle, SubscriptionError},
34};
35
36#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum ReadyOutcome {
39 Ready,
41 Completed,
43}
44
45#[derive(Debug, Default)]
47pub struct ContinueResult {
48 pub slot_notifications: u64,
50 pub last_slot: Option<u64>,
52 pub statuses: Vec<BacktestStatus>,
54 pub ready_for_continue: bool,
56 pub completed: bool,
58}
59
60#[derive(Debug)]
62pub struct AdvanceState {
63 pub expected_slots: u64,
65 pub slot_notifications: u64,
67 pub last_slot: Option<u64>,
69 pub statuses: Vec<BacktestStatus>,
71 pub ready_for_continue: bool,
73 pub completed: bool,
75 pub summary: Option<SessionSummary>,
77}
78
79impl AdvanceState {
80 pub fn new(expected_slots: u64) -> Self {
82 Self {
83 expected_slots,
84 slot_notifications: 0,
85 last_slot: None,
86 statuses: Vec::new(),
87 ready_for_continue: false,
88 completed: false,
89 summary: None,
90 }
91 }
92
93 pub fn is_done(&self, wait_for_slots: bool) -> bool {
95 if self.completed {
96 return true;
97 }
98
99 if !self.ready_for_continue {
100 return false;
101 }
102
103 !wait_for_slots || self.slot_notifications >= self.expected_slots
104 }
105}
106
107#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
109pub struct SessionCoverage {
110 completed: bool,
111 highest_slot_seen: Option<u64>,
112}
113
114impl SessionCoverage {
115 pub fn observe_slot(&mut self, slot: u64) {
117 self.highest_slot_seen = Some(
118 self.highest_slot_seen
119 .map_or(slot, |current| current.max(slot)),
120 );
121 }
122
123 pub fn mark_completed(&mut self) {
125 self.completed = true;
126 }
127
128 pub fn observe_response(&mut self, response: &BacktestResponse) {
130 match response {
131 BacktestResponse::SlotNotification(slot) => self.observe_slot(*slot),
132 BacktestResponse::Completed { .. } => self.mark_completed(),
133 _ => {}
134 }
135 }
136
137 pub fn is_completed(&self) -> bool {
139 self.completed
140 }
141
142 pub fn highest_slot_seen(&self) -> Option<u64> {
144 self.highest_slot_seen
145 }
146
147 pub fn validate_end_slot(&self, expected_end_slot: u64) -> Result<(), CoverageError> {
149 if !self.completed {
150 return Err(CoverageError::NotCompleted);
151 }
152
153 let Some(actual_end_slot) = self.highest_slot_seen else {
154 return Err(CoverageError::NoSlotsObserved);
155 };
156
157 if actual_end_slot < expected_end_slot {
158 return Err(CoverageError::RangeNotReached {
159 actual_end_slot,
160 expected_end_slot,
161 });
162 }
163
164 Ok(())
165 }
166}
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq, Error)]
170pub enum CoverageError {
171 #[error("ended before completion")]
172 NotCompleted,
173 #[error("completed without slot notifications")]
174 NoSlotsObserved,
175 #[error("completed at slot {actual_end_slot} but expected at least {expected_end_slot}")]
176 RangeNotReached {
177 actual_end_slot: u64,
178 expected_end_slot: u64,
179 },
180}
181
182pub struct BacktestSession {
187 ws: Option<WebSocketStream<MaybeTlsStream<TcpStream>>>,
188 session_id: Option<String>,
189 rpc_endpoint: Option<String>,
190 rpc: Option<RpcClient>,
191 ready_for_continue: bool,
192 request_timeout: Option<Duration>,
193 log_raw: bool,
194 backlog: VecDeque<BacktestResponse>,
195}
196
197impl BacktestSession {
198 pub(crate) fn new(
199 ws: WebSocketStream<MaybeTlsStream<TcpStream>>,
200 request_timeout: Option<Duration>,
201 log_raw: bool,
202 ) -> Self {
203 Self {
204 ws: Some(ws),
205 session_id: None,
206 rpc_endpoint: None,
207 rpc: None,
208 ready_for_continue: false,
209 request_timeout,
210 log_raw,
211 backlog: VecDeque::new(),
212 }
213 }
214
215 pub fn session_id(&self) -> Option<&str> {
217 self.session_id.as_deref()
218 }
219
220 pub fn rpc_endpoint(&self) -> Option<&str> {
222 self.rpc_endpoint.as_deref()
223 }
224
225 pub fn rpc(&self) -> &RpcClient {
229 self.rpc
230 .as_ref()
231 .expect("rpc is set during session creation")
232 }
233
234 pub fn is_ready_for_continue(&self) -> bool {
236 self.ready_for_continue
237 }
238
239 pub fn apply_response(&mut self, response: &BacktestResponse) {
241 match response {
242 BacktestResponse::ReadyForContinue => {
243 self.ready_for_continue = true;
244 }
245 BacktestResponse::Completed { .. } => {
246 self.ready_for_continue = false;
247 }
248 _ => {}
249 }
250 }
251
252 fn ws_mut(&mut self) -> BacktestClientResult<&mut WebSocketStream<MaybeTlsStream<TcpStream>>> {
253 self.ws.as_mut().ok_or_else(|| BacktestClientError::Closed {
254 reason: "websocket closed".to_string(),
255 })
256 }
257
258 pub(crate) async fn create(
259 &mut self,
260 params: CreateSessionParams,
261 rpc_base_url: String,
262 ) -> BacktestClientResult<()> {
263 self.send(&BacktestRequest::CreateBacktestSession(params), None)
264 .await?;
265
266 loop {
267 let response =
268 self.next_response(None)
269 .await?
270 .ok_or_else(|| BacktestClientError::Closed {
271 reason: "websocket ended before SessionCreated".to_string(),
272 })?;
273
274 match response {
275 BacktestResponse::SessionCreated {
276 session_id,
277 rpc_endpoint,
278 } => {
279 self.session_id = Some(session_id);
280 let resolved = resolve_rpc_url(&rpc_base_url, &rpc_endpoint);
281 self.rpc = Some(RpcClient::new_with_commitment(
282 resolved.clone(),
283 CommitmentConfig::confirmed(),
284 ));
285 self.rpc_endpoint = Some(resolved);
286 return Ok(());
287 }
288 BacktestResponse::ReadyForContinue => {
289 self.ready_for_continue = true;
290 }
291 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
292 other => {
293 self.backlog.push_back(other);
294 }
295 }
296 }
297 }
298
299 pub async fn send(
301 &mut self,
302 request: &BacktestRequest,
303 timeout: Option<Duration>,
304 ) -> BacktestClientResult<()> {
305 let text = serde_json::to_string(request)
306 .map_err(|source| BacktestClientError::SerializeRequest { source })?;
307
308 let request_timeout = self.request_timeout;
309 let timeout = timeout.or(request_timeout);
310
311 let send_fut = self.ws_mut()?.send(Message::Text(text));
312 let send_result = match timeout {
313 Some(duration) => tokio::time::timeout(duration, send_fut)
314 .await
315 .map_err(|_| BacktestClientError::Timeout {
316 action: "sending",
317 duration,
318 })?,
319 None => send_fut.await,
320 };
321
322 send_result.map_err(|source| BacktestClientError::WebSocket {
323 action: "sending",
324 source: Box::new(source),
325 })?;
326
327 Ok(())
328 }
329
330 pub async fn next_response(
332 &mut self,
333 timeout: Option<Duration>,
334 ) -> BacktestClientResult<Option<BacktestResponse>> {
335 if let Some(response) = self.backlog.pop_front() {
336 return Ok(Some(response));
337 }
338
339 let text = match self.next_text(timeout).await? {
340 Some(text) => text,
341 None => return Ok(None),
342 };
343
344 let response = serde_json::from_str::<BacktestResponse>(&text).map_err(|source| {
345 BacktestClientError::DeserializeResponse {
346 raw: text.clone(),
347 source,
348 }
349 })?;
350
351 Ok(Some(response))
352 }
353
354 pub async fn next_event(
356 &mut self,
357 timeout: Option<Duration>,
358 ) -> BacktestClientResult<Option<BacktestResponse>> {
359 let response = self.next_response(timeout).await?;
360 if let Some(ref response) = response {
361 self.apply_response(response);
362 }
363 Ok(response)
364 }
365
366 pub fn responses(
370 self,
371 timeout: Option<Duration>,
372 ) -> impl futures::Stream<Item = BacktestClientResult<BacktestResponse>> {
373 stream::unfold(Some(self), move |state| async move {
374 let mut session = match state {
375 Some(session) => session,
376 None => return None,
377 };
378
379 match session.next_response(timeout).await {
380 Ok(Some(response)) => {
381 session.apply_response(&response);
382 Some((Ok(response), Some(session)))
383 }
384 Ok(None) => None,
385 Err(err) => Some((Err(err), None)),
386 }
387 })
388 }
389
390 pub async fn ensure_ready(
392 &mut self,
393 timeout: Option<Duration>,
394 ) -> BacktestClientResult<ReadyOutcome> {
395 if self.ready_for_continue {
396 return Ok(ReadyOutcome::Ready);
397 }
398
399 loop {
400 let response =
401 self.next_response(timeout)
402 .await?
403 .ok_or_else(|| BacktestClientError::Closed {
404 reason: "websocket ended while waiting for ReadyForContinue".to_string(),
405 })?;
406
407 match response {
408 BacktestResponse::ReadyForContinue => {
409 self.ready_for_continue = true;
410 return Ok(ReadyOutcome::Ready);
411 }
412 BacktestResponse::Completed { .. } => return Ok(ReadyOutcome::Completed),
413 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
414 _ => {}
415 }
416 }
417 }
418
419 pub async fn wait_for_status(
421 &mut self,
422 desired: BacktestStatus,
423 timeout: Option<Duration>,
424 ) -> BacktestClientResult<()> {
425 let desired = std::mem::discriminant(&desired);
426
427 loop {
428 let response =
429 self.next_response(timeout)
430 .await?
431 .ok_or_else(|| BacktestClientError::Closed {
432 reason: "websocket ended while waiting for status".to_string(),
433 })?;
434
435 match response {
436 BacktestResponse::Status { status }
437 if std::mem::discriminant(&status) == desired =>
438 {
439 return Ok(());
440 }
441 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
442 BacktestResponse::Completed { summary } => {
443 return Err(BacktestClientError::UnexpectedResponse {
444 context: "waiting for status",
445 response: Box::new(BacktestResponse::Completed { summary }),
446 });
447 }
448 _ => {}
449 }
450 }
451 }
452
453 pub async fn send_continue(
455 &mut self,
456 params: ContinueParams,
457 timeout: Option<Duration>,
458 ) -> BacktestClientResult<()> {
459 self.ready_for_continue = false;
460 self.send(&BacktestRequest::Continue(params), timeout).await
461 }
462
463 pub async fn advance_step<F>(
465 &mut self,
466 state: &mut AdvanceState,
467 wait_for_slots: bool,
468 timeout: Option<Duration>,
469 on_event: &mut F,
470 ) -> BacktestClientResult<()>
471 where
472 F: FnMut(&BacktestResponse),
473 {
474 let Some(response) = self.next_response(timeout).await? else {
475 return Err(BacktestClientError::Closed {
476 reason: "websocket ended while awaiting continue responses".to_string(),
477 });
478 };
479
480 if self.log_raw {
481 tracing::debug!("<- {response:?}");
482 }
483
484 on_event(&response);
485
486 match response {
487 BacktestResponse::ReadyForContinue => {
488 self.ready_for_continue = true;
489 state.ready_for_continue = true;
490 }
491 BacktestResponse::SlotNotification(slot) => {
492 state.slot_notifications += 1;
493 state.last_slot = Some(slot);
494 }
495 BacktestResponse::Status { status } => {
496 state.statuses.push(status);
497 }
498 BacktestResponse::Success => {}
499 BacktestResponse::Completed { summary } => {
500 state.completed = true;
501 state.summary = summary;
502 }
503 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
504 BacktestResponse::SessionCreated { .. } | BacktestResponse::SessionAttached { .. } => {
505 return Err(BacktestClientError::UnexpectedResponse {
506 context: "continuing",
507 response: Box::new(response),
508 });
509 }
510 }
511
512 if wait_for_slots && state.slot_notifications > state.expected_slots {
513 tracing::warn!(
514 "received {} slot notifications (expected {})",
515 state.slot_notifications,
516 state.expected_slots
517 );
518 }
519
520 Ok(())
521 }
522
523 pub async fn continue_until_ready<F>(
525 &mut self,
526 cont: Continue,
527 timeout: Option<Duration>,
528 mut on_event: F,
529 ) -> BacktestClientResult<ContinueResult>
530 where
531 F: FnMut(&BacktestResponse),
532 {
533 let expected_slots = cont.advance_count;
534 self.advance_internal(
535 cont.into_params(),
536 expected_slots,
537 false,
538 timeout,
539 &mut on_event,
540 )
541 .await
542 }
543
544 pub async fn advance<F>(
546 &mut self,
547 cont: Continue,
548 timeout: Option<Duration>,
549 mut on_event: F,
550 ) -> BacktestClientResult<ContinueResult>
551 where
552 F: FnMut(&BacktestResponse),
553 {
554 let expected_slots = cont.advance_count;
555 self.advance_internal(
556 cont.into_params(),
557 expected_slots,
558 true,
559 timeout,
560 &mut on_event,
561 )
562 .await
563 }
564
565 async fn advance_internal<F>(
566 &mut self,
567 params: ContinueParams,
568 expected_slots: u64,
569 wait_for_slots: bool,
570 timeout: Option<Duration>,
571 on_event: &mut F,
572 ) -> BacktestClientResult<ContinueResult>
573 where
574 F: FnMut(&BacktestResponse),
575 {
576 self.send_continue(params, timeout).await?;
577
578 let mut state = AdvanceState::new(expected_slots);
579 while !state.is_done(wait_for_slots) {
580 self.advance_step(&mut state, wait_for_slots, timeout, on_event)
581 .await?;
582 }
583
584 Ok(ContinueResult {
585 slot_notifications: state.slot_notifications,
586 last_slot: state.last_slot,
587 statuses: state.statuses,
588 ready_for_continue: state.ready_for_continue,
589 completed: state.completed,
590 })
591 }
592
593 pub async fn modify_program(
602 &self,
603 program_id: &str,
604 elf: &[u8],
605 ) -> Result<BTreeMap<Address, AccountData>, ProgramModError> {
606 let rpc = self.rpc.as_ref().ok_or(ProgramModError::NoRpcEndpoint)?;
607
608 let program_addr: Address =
609 program_id
610 .parse()
611 .map_err(|_| ProgramModError::InvalidProgramId {
612 id: program_id.to_string(),
613 })?;
614 let programdata_addr = solana_loader_v3_interface::get_program_data_address(&program_addr);
615
616 let slot = rpc.get_slot().await.map_err(|e| ProgramModError::Rpc {
617 source: Box::new(e),
618 })?;
619
620 let data_len = 13 + elf.len();
622 let lamports = rpc
623 .get_minimum_balance_for_rent_exemption(data_len)
624 .await
625 .map_err(|e| ProgramModError::Rpc {
626 source: Box::new(e),
627 })?;
628
629 Ok(build_program_injection(
630 programdata_addr,
631 elf,
632 slot.saturating_sub(1),
633 None,
634 lamports,
635 ))
636 }
637
638 pub async fn subscribe_program_logs<F, Fut>(
644 &self,
645 program_id: &str,
646 commitment: CommitmentConfig,
647 on_notification: F,
648 ) -> Result<LogSubscriptionHandle, SubscriptionError>
649 where
650 F: Fn(Response<RpcLogsResponse>) -> Fut + Send + Sync + 'static,
651 Fut: Future<Output = ()> + Send + 'static,
652 {
653 let rpc_endpoint = self
654 .rpc_endpoint
655 .as_deref()
656 .ok_or(SubscriptionError::NoRpcEndpoint)?;
657 crate::subscriptions::subscribe_program_logs(
658 rpc_endpoint,
659 program_id,
660 commitment,
661 on_notification,
662 )
663 .await
664 }
665
666 pub async fn close(&mut self, timeout: Option<Duration>) -> BacktestClientResult<()> {
670 self.close_with_frame(timeout, None).await
671 }
672
673 pub async fn close_with_frame(
675 &mut self,
676 timeout: Option<Duration>,
677 frame: Option<CloseFrame<'static>>,
678 ) -> BacktestClientResult<()> {
679 if self.ws.is_none() {
680 return Ok(());
681 }
682
683 let mut sent = false;
684 match self
685 .send(&BacktestRequest::CloseBacktestSession, timeout)
686 .await
687 {
688 Ok(()) => sent = true,
689 Err(err) if is_close_ok(&err) => {}
690 Err(err) => return Err(err),
691 }
692
693 if sent {
694 let response = match self.next_response(timeout).await {
695 Ok(Some(r)) => r,
696 Ok(None) => {
697 self.ws.take();
698 return Ok(());
699 }
700 Err(BacktestClientError::Closed { .. }) => {
701 self.ws.take();
702 return Ok(());
703 }
704 Err(BacktestClientError::WebSocket {
705 action: "receiving",
706 source,
707 }) if is_reset_without_close(&source) => {
708 self.ws.take();
709 return Ok(());
710 }
711 Err(err) => return Err(err),
712 };
713
714 match response {
715 BacktestResponse::Success | BacktestResponse::Completed { .. } => {}
716 BacktestResponse::Error(err) => return Err(BacktestClientError::Remote(err)),
717 other => {
718 return Err(BacktestClientError::UnexpectedResponse {
719 context: "closing session",
720 response: Box::new(other),
721 });
722 }
723 }
724 }
725
726 if let Some(ws) = self.ws.as_mut() {
727 let close_result = ws.close(frame).await;
728 if let Err(source) = close_result
729 && !is_ws_closed_error(&source)
730 {
731 return Err(BacktestClientError::WebSocket {
732 action: "closing",
733 source: Box::new(source),
734 });
735 }
736 }
737
738 match self.next_response(timeout).await {
740 Ok(_) => {}
741 Err(BacktestClientError::Closed { .. }) => {}
742 Err(BacktestClientError::WebSocket {
743 action: "receiving",
744 source,
745 }) if is_reset_without_close(&source) => {}
746 Err(err) => return Err(err),
747 }
748
749 tokio::time::sleep(Duration::from_millis(100)).await;
751 self.ws.take();
752 Ok(())
753 }
754
755 pub async fn close_with_reason(
757 &mut self,
758 timeout: Option<Duration>,
759 code: CloseCode,
760 reason: impl Into<String>,
761 ) -> BacktestClientResult<()> {
762 let frame = CloseFrame {
763 code,
764 reason: Cow::Owned(reason.into()),
765 };
766 self.close_with_frame(timeout, Some(frame)).await
767 }
768
769 async fn next_text(
770 &mut self,
771 timeout: Option<Duration>,
772 ) -> BacktestClientResult<Option<String>> {
773 loop {
774 let request_timeout = self.request_timeout;
775 let timeout = timeout.or(request_timeout);
776
777 let next_fut = self.ws_mut()?.next();
778 let msg = match timeout {
779 Some(duration) => tokio::time::timeout(duration, next_fut)
780 .await
781 .map_err(|_| BacktestClientError::Timeout {
782 action: "receiving",
783 duration,
784 })?,
785 None => next_fut.await,
786 };
787
788 let Some(msg) = msg else {
789 return Ok(None);
790 };
791
792 let msg = match msg {
793 Ok(msg) => msg,
794 Err(source) => {
795 return Err(BacktestClientError::WebSocket {
796 action: "receiving",
797 source: Box::new(source),
798 });
799 }
800 };
801
802 match msg {
803 Message::Text(text) => {
804 if self.log_raw {
805 tracing::debug!("<- raw: {text}");
806 }
807 return Ok(Some(text));
808 }
809 Message::Binary(bin) => match String::from_utf8(bin) {
810 Ok(text) => {
811 if self.log_raw {
812 tracing::debug!("<- raw(bin): {text}");
813 }
814 return Ok(Some(text));
815 }
816 Err(err) => {
817 tracing::warn!("discarding non-utf8 binary message: {err}");
818 continue;
819 }
820 },
821 Message::Close(frame) => {
822 let reason = close_reason(frame);
823 return Err(BacktestClientError::Closed { reason });
824 }
825 Message::Ping(_) | Message::Pong(_) => continue,
826 Message::Frame(_) => continue,
827 }
828 }
829 }
830}
831
832impl std::fmt::Debug for BacktestSession {
833 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
834 f.debug_struct("BacktestSession")
835 .field("session_id", &self.session_id)
836 .field("rpc_endpoint", &self.rpc_endpoint)
837 .field(
838 "rpc",
839 &self
840 .rpc
841 .as_ref()
842 .map(|_| "<RpcClient>")
843 .unwrap_or("<not set>"),
844 )
845 .field("ready_for_continue", &self.ready_for_continue)
846 .field("request_timeout", &self.request_timeout)
847 .finish_non_exhaustive()
848 }
849}
850
851impl Drop for BacktestSession {
852 fn drop(&mut self) {
853 let Some(ws) = self.ws.take() else {
854 return;
855 };
856
857 if let Ok(handle) = tokio::runtime::Handle::try_current() {
858 handle.spawn(async move {
859 let mut ws = ws;
860 let _ = ws.close(None).await;
861 });
862 }
863 }
864}
865
866fn resolve_rpc_url(base: &str, endpoint: &str) -> String {
867 if endpoint.starts_with("http://") || endpoint.starts_with("https://") {
868 endpoint.to_string()
869 } else {
870 format!("{}/{}", base, endpoint.trim_start_matches('/'))
871 }
872}
873
874fn close_reason(frame: Option<CloseFrame<'static>>) -> String {
875 match frame {
876 Some(frame) => format!("{:?}: {}", frame.code, frame.reason),
877 None => "no close frame".to_string(),
878 }
879}
880
881fn is_reset_without_close(err: &WsError) -> bool {
882 matches!(
883 err,
884 WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
885 )
886}
887
888fn is_ws_closed_error(err: &WsError) -> bool {
889 matches!(
890 err,
891 WsError::ConnectionClosed
892 | WsError::AlreadyClosed
893 | WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
894 )
895}
896
897fn is_close_ok(err: &BacktestClientError) -> bool {
898 match err {
899 BacktestClientError::Closed { .. } => true,
900 BacktestClientError::WebSocket { source, .. } => is_ws_closed_error(source),
901 _ => false,
902 }
903}
904
905#[cfg(test)]
906mod tests {
907 use super::*;
908
909 #[test]
910 fn coverage_tracks_slot_and_completion_from_responses() {
911 let mut coverage = SessionCoverage::default();
912 coverage.observe_response(&BacktestResponse::SlotNotification(10));
913 coverage.observe_response(&BacktestResponse::SlotNotification(12));
914 coverage.observe_response(&BacktestResponse::Completed { summary: None });
915
916 assert!(coverage.is_completed());
917 assert_eq!(coverage.highest_slot_seen(), Some(12));
918 }
919
920 #[test]
921 fn coverage_validate_end_slot_checks_completion_and_range() {
922 let mut coverage = SessionCoverage::default();
923 assert_eq!(
924 coverage.validate_end_slot(5),
925 Err(CoverageError::NotCompleted)
926 );
927
928 coverage.mark_completed();
929 assert_eq!(
930 coverage.validate_end_slot(5),
931 Err(CoverageError::NoSlotsObserved)
932 );
933
934 coverage.observe_slot(4);
935 assert_eq!(
936 coverage.validate_end_slot(5),
937 Err(CoverageError::RangeNotReached {
938 actual_end_slot: 4,
939 expected_end_slot: 5,
940 })
941 );
942
943 coverage.observe_slot(6);
944 assert_eq!(coverage.validate_end_slot(5), Ok(()));
945 }
946}