1use std::{
2 collections::VecDeque,
3 future::Future,
4 num::NonZeroU32,
5 pin::Pin,
6 sync::{Arc, OnceLock},
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use futures::StreamExt;
12use tokio::{
13 sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
14 time::Instant,
15};
16use tokio_muxt::{CoalesceMode, MuxTimer};
17use tokio_stream::wrappers::ReceiverStream;
18use tokio_util::task::AbortOnDropHandle;
19use tracing::debug;
20
21use crate::{
22 api::{ApiError, BasinClient, Streaming, retry_builder},
23 frame_signal::FrameSignal,
24 retry::RetryBackoffBuilder,
25 types::{
26 AppendAck, AppendInput, AppendRetryPolicy, MeteredBytes, ONE_MIB, S2Error, StreamName,
27 StreamPosition, ValidationError,
28 },
29};
30
31#[derive(Debug, thiserror::Error)]
32pub enum AppendSessionError {
33 #[error(transparent)]
34 Api(#[from] ApiError),
35 #[error("append acknowledgement timed out")]
36 AckTimeout,
37 #[error("server disconnected")]
38 ServerDisconnected,
39 #[error("response stream closed early while appends in flight")]
40 StreamClosedEarly,
41 #[error("session already closed")]
42 SessionClosed,
43 #[error("session is closing")]
44 SessionClosing,
45 #[error("session dropped without calling close")]
46 SessionDropped,
47 #[error("unexpected append acknowledgement during resend")]
48 UnexpectedAck,
49}
50
51impl AppendSessionError {
52 pub fn is_retryable(&self) -> bool {
53 match self {
54 Self::Api(err) => err.is_retryable(),
55 Self::AckTimeout => true,
56 Self::ServerDisconnected => true,
57 _ => false,
58 }
59 }
60
61 pub fn has_no_side_effects(&self) -> bool {
62 match self {
63 Self::Api(err) => err.has_no_side_effects(),
64 _ => false,
65 }
66 }
67}
68
69impl From<AppendSessionError> for S2Error {
70 fn from(err: AppendSessionError) -> Self {
71 match err {
72 AppendSessionError::Api(api_err) => api_err.into(),
73 other => S2Error::Client(other.to_string()),
74 }
75 }
76}
77
78pub struct BatchSubmitTicket {
80 rx: oneshot::Receiver<Result<AppendAck, S2Error>>,
81 terminal_err: Arc<OnceLock<S2Error>>,
82}
83
84impl Future for BatchSubmitTicket {
85 type Output = Result<AppendAck, S2Error>;
86
87 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88 match Pin::new(&mut self.rx).poll(cx) {
89 Poll::Ready(Ok(res)) => Poll::Ready(res),
90 Poll::Ready(Err(_)) => Poll::Ready(Err(self
91 .terminal_err
92 .get()
93 .cloned()
94 .unwrap_or_else(|| AppendSessionError::SessionDropped.into()))),
95 Poll::Pending => Poll::Pending,
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct AppendSessionConfig {
103 max_unacked_bytes: u32,
104 max_unacked_batches: Option<u32>,
105}
106
107impl Default for AppendSessionConfig {
108 fn default() -> Self {
109 Self {
110 max_unacked_bytes: 5 * ONE_MIB,
111 max_unacked_batches: None,
112 }
113 }
114}
115
116impl AppendSessionConfig {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_max_unacked_bytes(self, max_unacked_bytes: u32) -> Result<Self, ValidationError> {
128 if max_unacked_bytes < ONE_MIB {
129 return Err(format!("max_unacked_bytes must be at least {ONE_MIB}").into());
130 }
131 Ok(Self {
132 max_unacked_bytes,
133 ..self
134 })
135 }
136
137 pub fn with_max_unacked_batches(self, max_unacked_batches: NonZeroU32) -> Self {
141 Self {
142 max_unacked_batches: Some(max_unacked_batches.get()),
143 ..self
144 }
145 }
146}
147
148struct SessionState {
149 cmd_rx: mpsc::Receiver<Command>,
150 inflight_appends: VecDeque<InflightAppend>,
151 inflight_bytes: usize,
152 close_tx: Option<oneshot::Sender<Result<(), S2Error>>>,
153 total_records: usize,
154 total_acked_records: usize,
155 prev_ack_end: Option<StreamPosition>,
156 stashed_submission: Option<StashedSubmission>,
157}
158
159pub struct AppendSession {
164 cmd_tx: mpsc::Sender<Command>,
165 permits: AppendPermits,
166 terminal_err: Arc<OnceLock<S2Error>>,
167 _handle: AbortOnDropHandle<()>,
168}
169
170impl AppendSession {
171 pub(crate) fn new(
172 client: BasinClient,
173 stream: StreamName,
174 config: AppendSessionConfig,
175 ) -> Self {
176 let buffer_size = config
177 .max_unacked_batches
178 .map(|mib| mib as usize)
179 .unwrap_or(DEFAULT_CHANNEL_BUFFER_SIZE);
180 let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
181 let permits = AppendPermits::new(config.max_unacked_batches, config.max_unacked_bytes);
182 let retry_builder = retry_builder(&client.config.retry);
183 let terminal_err = Arc::new(OnceLock::new());
184 let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
185 client,
186 stream,
187 cmd_rx,
188 retry_builder,
189 buffer_size,
190 terminal_err.clone(),
191 )));
192 Self {
193 cmd_tx,
194 permits,
195 terminal_err,
196 _handle: handle,
197 }
198 }
199
200 pub async fn submit(&self, input: AppendInput) -> Result<BatchSubmitTicket, S2Error> {
210 let permit = self.reserve(input.records.metered_bytes() as u32).await?;
211 Ok(permit.submit(input))
212 }
213
214 pub async fn reserve(&self, bytes: u32) -> Result<BatchSubmitPermit, S2Error> {
231 let append_permit = self.permits.acquire(bytes).await;
232 let cmd_tx_permit = self
233 .cmd_tx
234 .clone()
235 .reserve_owned()
236 .await
237 .map_err(|_| self.terminal_err())?;
238 Ok(BatchSubmitPermit {
239 append_permit,
240 cmd_tx_permit,
241 terminal_err: self.terminal_err.clone(),
242 })
243 }
244
245 pub async fn close(self) -> Result<(), S2Error> {
247 let (done_tx, done_rx) = oneshot::channel();
248 self.cmd_tx
249 .send(Command::Close { done_tx })
250 .await
251 .map_err(|_| self.terminal_err())?;
252 done_rx.await.map_err(|_| self.terminal_err())??;
253 Ok(())
254 }
255
256 fn terminal_err(&self) -> S2Error {
257 self.terminal_err
258 .get()
259 .cloned()
260 .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
261 }
262}
263
264pub struct BatchSubmitPermit {
266 append_permit: AppendPermit,
267 cmd_tx_permit: mpsc::OwnedPermit<Command>,
268 terminal_err: Arc<OnceLock<S2Error>>,
269}
270
271impl BatchSubmitPermit {
272 pub fn submit(self, input: AppendInput) -> BatchSubmitTicket {
274 let (ack_tx, ack_rx) = oneshot::channel();
275 self.cmd_tx_permit.send(Command::Submit {
276 input,
277 ack_tx,
278 permit: Some(self.append_permit),
279 });
280 BatchSubmitTicket {
281 rx: ack_rx,
282 terminal_err: self.terminal_err,
283 }
284 }
285}
286
287pub(crate) struct AppendSessionInternal {
288 cmd_tx: mpsc::Sender<Command>,
289 terminal_err: Arc<OnceLock<S2Error>>,
290 _handle: AbortOnDropHandle<()>,
291}
292
293impl AppendSessionInternal {
294 pub(crate) fn new(client: BasinClient, stream: StreamName) -> Self {
295 let buffer_size = DEFAULT_CHANNEL_BUFFER_SIZE;
296 let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
297 let retry_builder = retry_builder(&client.config.retry);
298 let terminal_err = Arc::new(OnceLock::new());
299 let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
300 client,
301 stream,
302 cmd_rx,
303 retry_builder,
304 buffer_size,
305 terminal_err.clone(),
306 )));
307 Self {
308 cmd_tx,
309 terminal_err,
310 _handle: handle,
311 }
312 }
313
314 pub(crate) fn submit(
315 &self,
316 input: AppendInput,
317 ) -> impl Future<Output = Result<BatchSubmitTicket, S2Error>> + Send + 'static {
318 let cmd_tx = self.cmd_tx.clone();
319 let terminal_err = self.terminal_err.clone();
320 async move {
321 let (ack_tx, ack_rx) = oneshot::channel();
322 cmd_tx
323 .send(Command::Submit {
324 input,
325 ack_tx,
326 permit: None,
327 })
328 .await
329 .map_err(|_| {
330 terminal_err
331 .get()
332 .cloned()
333 .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
334 })?;
335 Ok(BatchSubmitTicket {
336 rx: ack_rx,
337 terminal_err,
338 })
339 }
340 }
341
342 pub(crate) async fn close(self) -> Result<(), S2Error> {
343 let (done_tx, done_rx) = oneshot::channel();
344 self.cmd_tx
345 .send(Command::Close { done_tx })
346 .await
347 .map_err(|_| self.terminal_err())?;
348 done_rx.await.map_err(|_| self.terminal_err())??;
349 Ok(())
350 }
351
352 fn terminal_err(&self) -> S2Error {
353 self.terminal_err
354 .get()
355 .cloned()
356 .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
357 }
358}
359
360#[derive(Debug)]
361pub(crate) struct AppendPermit {
362 _count: Option<OwnedSemaphorePermit>,
363 _bytes: OwnedSemaphorePermit,
364}
365
366#[derive(Clone)]
367pub(crate) struct AppendPermits {
368 count: Option<Arc<Semaphore>>,
369 bytes: Arc<Semaphore>,
370}
371
372impl AppendPermits {
373 pub(crate) fn new(count_permits: Option<u32>, bytes_permits: u32) -> Self {
374 Self {
375 count: count_permits.map(|permits| Arc::new(Semaphore::new(permits as usize))),
376 bytes: Arc::new(Semaphore::new(bytes_permits as usize)),
377 }
378 }
379
380 pub(crate) async fn acquire(&self, bytes: u32) -> AppendPermit {
381 AppendPermit {
382 _count: if let Some(count) = self.count.as_ref() {
383 Some(
384 count
385 .clone()
386 .acquire_many_owned(1)
387 .await
388 .expect("semaphore should not be closed"),
389 )
390 } else {
391 None
392 },
393 _bytes: self
394 .bytes
395 .clone()
396 .acquire_many_owned(bytes)
397 .await
398 .expect("semaphore should not be closed"),
399 }
400 }
401}
402
403async fn run_session_with_retry(
404 client: BasinClient,
405 stream: StreamName,
406 cmd_rx: mpsc::Receiver<Command>,
407 retry_builder: RetryBackoffBuilder,
408 buffer_size: usize,
409 terminal_err: Arc<OnceLock<S2Error>>,
410) {
411 let frame_signal = match client.config.retry.append_retry_policy {
412 AppendRetryPolicy::NoSideEffects => Some(FrameSignal::new()),
413 AppendRetryPolicy::All => None,
414 };
415
416 let mut state = SessionState {
417 cmd_rx,
418 inflight_appends: VecDeque::new(),
419 inflight_bytes: 0,
420 close_tx: None,
421 total_records: 0,
422 total_acked_records: 0,
423 prev_ack_end: None,
424 stashed_submission: None,
425 };
426 let mut prev_total_acked_records = 0;
427 let mut retry_backoff = retry_builder.build();
428
429 loop {
430 let result = run_session(&client, &stream, &mut state, buffer_size, &frame_signal).await;
431
432 match result {
433 Ok(()) => {
434 break;
435 }
436 Err(err) => {
437 if prev_total_acked_records < state.total_acked_records {
438 prev_total_acked_records = state.total_acked_records;
439 retry_backoff.reset();
440 }
441
442 if is_safe_to_retry(
443 &err,
444 client.config.retry.append_retry_policy,
445 !state.inflight_appends.is_empty(),
446 frame_signal.as_ref(),
447 ) && let Some(backoff) = retry_backoff.next()
448 {
449 debug!(
450 %err,
451 ?backoff,
452 num_retries_remaining = retry_backoff.remaining(),
453 "retrying append session"
454 );
455 tokio::time::sleep(backoff).await;
456 } else {
457 debug!(
458 %err,
459 retries_exhausted = retry_backoff.is_exhausted(),
460 "not retrying append session"
461 );
462
463 let err: S2Error = err.into();
464
465 let _ = terminal_err.set(err.clone());
466
467 for inflight_append in state.inflight_appends.drain(..) {
468 let _ = inflight_append.ack_tx.send(Err(err.clone()));
469 }
470
471 if let Some(stashed) = state.stashed_submission.take() {
472 let _ = stashed.ack_tx.send(Err(err.clone()));
473 }
474
475 if let Some(done_tx) = state.close_tx.take() {
476 let _ = done_tx.send(Err(err.clone()));
477 }
478
479 state.cmd_rx.close();
480 while let Some(cmd) = state.cmd_rx.recv().await {
481 cmd.reject(err.clone());
482 }
483 break;
484 }
485 }
486 }
487 }
488
489 if let Some(done_tx) = state.close_tx.take() {
490 let _ = done_tx.send(Ok(()));
491 }
492}
493
494async fn run_session(
495 client: &BasinClient,
496 stream: &StreamName,
497 state: &mut SessionState,
498 buffer_size: usize,
499 frame_signal: &Option<FrameSignal>,
500) -> Result<(), AppendSessionError> {
501 if let Some(s) = frame_signal {
502 s.reset();
503 }
504
505 let (input_tx, mut acks) = connect(client, stream, buffer_size, frame_signal.clone()).await?;
506 let ack_timeout = client.config.request_timeout;
507
508 if !state.inflight_appends.is_empty() {
509 resend(state, &input_tx, &mut acks, ack_timeout).await?;
510
511 if let Some(s) = frame_signal {
512 s.reset();
513 }
514
515 assert!(state.inflight_appends.is_empty());
516 assert_eq!(state.inflight_bytes, 0);
517 }
518
519 let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
520 tokio::pin!(timer);
521
522 loop {
523 tokio::select! {
524 (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
525 match TimerEvent::from(event_ord) {
526 TimerEvent::AckDeadline => {
527 return Err(AppendSessionError::AckTimeout);
528 }
529 }
530 }
531
532 input_tx_permit = input_tx.reserve(), if state.stashed_submission.is_some() => {
533 let input_tx_permit = input_tx_permit
534 .map_err(|_| AppendSessionError::ServerDisconnected)?;
535 let submission = state.stashed_submission
536 .take()
537 .expect("stashed_submission should not be None");
538
539 input_tx_permit.send(submission.input.clone());
540
541 state.total_records += submission.input.records.len();
542 state.inflight_bytes += submission.input_metered_bytes;
543
544 timer.as_mut().fire_at(
545 TimerEvent::AckDeadline,
546 submission.since + ack_timeout,
547 CoalesceMode::Earliest,
548 );
549 state.inflight_appends.push_back(submission.into());
550 }
551
552 cmd = state.cmd_rx.recv(), if state.stashed_submission.is_none() => {
553 match cmd {
554 Some(Command::Submit { input, ack_tx, permit }) => {
555 if state.close_tx.is_some() {
556 let _ = ack_tx.send(
557 Err(AppendSessionError::SessionClosing.into())
558 );
559 } else {
560 let input_metered_bytes = input.records.metered_bytes();
561 state.stashed_submission = Some(StashedSubmission {
562 input,
563 input_metered_bytes,
564 ack_tx,
565 permit,
566 since: Instant::now(),
567 });
568 }
569 }
570 Some(Command::Close { done_tx }) => {
571 state.close_tx = Some(done_tx);
572 }
573 None => {
574 return Err(AppendSessionError::SessionDropped);
575 }
576 }
577 }
578
579 ack = acks.next() => {
580 match ack {
581 Some(Ok(ack)) => {
582 process_ack(
583 ack,
584 state,
585 timer.as_mut(),
586 ack_timeout,
587 );
588 }
589 Some(Err(err)) => {
590 return Err(err.into());
591 }
592 None => {
593 if !state.inflight_appends.is_empty() || state.stashed_submission.is_some() {
594 return Err(AppendSessionError::StreamClosedEarly);
595 }
596 break;
597 }
598 }
599 }
600 }
601
602 if state.close_tx.is_some()
603 && state.inflight_appends.is_empty()
604 && state.stashed_submission.is_none()
605 {
606 break;
607 }
608 }
609
610 assert!(state.inflight_appends.is_empty());
611 assert_eq!(state.inflight_bytes, 0);
612 assert!(state.stashed_submission.is_none());
613
614 Ok(())
615}
616
617async fn resend(
618 state: &mut SessionState,
619 input_tx: &mpsc::Sender<AppendInput>,
620 acks: &mut Streaming<AppendAck>,
621 ack_timeout: Duration,
622) -> Result<(), AppendSessionError> {
623 debug!(
624 inflight_appends_len = state.inflight_appends.len(),
625 inflight_bytes = state.inflight_bytes,
626 "resending inflight appends"
627 );
628
629 let mut resend_index = 0;
630 let mut resend_finished = false;
631
632 let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
633 tokio::pin!(timer);
634
635 while !state.inflight_appends.is_empty() {
636 tokio::select! {
637 (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
638 match TimerEvent::from(event_ord) {
639 TimerEvent::AckDeadline => {
640 return Err(AppendSessionError::AckTimeout);
641 }
642 }
643 }
644
645 input_tx_permit = input_tx.reserve(), if !resend_finished => {
646 let input_tx_permit = input_tx_permit
647 .map_err(|_| AppendSessionError::ServerDisconnected)?;
648
649 if let Some(inflight_append) = state.inflight_appends.get_mut(resend_index) {
650 inflight_append.since = Instant::now();
651 timer.as_mut().fire_at(
652 TimerEvent::AckDeadline,
653 inflight_append.since + ack_timeout,
654 CoalesceMode::Earliest,
655 );
656 input_tx_permit.send(inflight_append.input.clone());
657 resend_index += 1;
658 } else {
659 resend_finished = true;
660 }
661 }
662
663 ack = acks.next() => {
664 match ack {
665 Some(Ok(ack)) => {
666 process_ack(
667 ack,
668 state,
669 timer.as_mut(),
670 ack_timeout,
671 );
672 resend_index = resend_index
673 .checked_sub(1)
674 .ok_or(AppendSessionError::UnexpectedAck)?;
675 }
676 Some(Err(err)) => {
677 return Err(err.into());
678 }
679 None => {
680 return Err(AppendSessionError::StreamClosedEarly);
681 }
682 }
683 }
684 }
685 }
686
687 assert_eq!(
688 resend_index, 0,
689 "resend_index should be 0 after resend completes"
690 );
691 debug!("finished resending inflight appends");
692 Ok(())
693}
694
695async fn connect(
696 client: &BasinClient,
697 stream: &StreamName,
698 buffer_size: usize,
699 frame_signal: Option<FrameSignal>,
700) -> Result<(mpsc::Sender<AppendInput>, Streaming<AppendAck>), AppendSessionError> {
701 let (input_tx, input_rx) = mpsc::channel::<AppendInput>(buffer_size);
702 let ack_stream = Box::pin(
703 client
704 .append_session(
705 stream,
706 ReceiverStream::new(input_rx).map(|i| i.into()),
707 frame_signal,
708 )
709 .await?
710 .map(|ack| match ack {
711 Ok(ack) => Ok(ack.into()),
712 Err(err) => Err(err),
713 }),
714 );
715 Ok((input_tx, ack_stream))
716}
717
718fn process_ack(
719 ack: AppendAck,
720 state: &mut SessionState,
721 timer: Pin<&mut MuxTimer<N_TIMER_VARIANTS>>,
722 ack_timeout: Duration,
723) {
724 let corresponding_append = state
725 .inflight_appends
726 .pop_front()
727 .expect("corresponding append should be present for an ack");
728
729 assert!(
730 ack.end.seq_num >= ack.start.seq_num,
731 "ack end seq_num should be greater than or equal to start seq_num"
732 );
733
734 if let Some(end) = state.prev_ack_end {
735 assert!(
736 ack.end.seq_num > end.seq_num,
737 "ack end seq_num should be greater than previous ack end"
738 );
739 }
740
741 let num_acked_records = (ack.end.seq_num - ack.start.seq_num) as usize;
742 assert_eq!(
743 num_acked_records,
744 corresponding_append.input.records.len(),
745 "ack record count should match submitted batch size"
746 );
747
748 state.total_acked_records += num_acked_records;
749 state.inflight_bytes -= corresponding_append.input_metered_bytes;
750 state.prev_ack_end = Some(ack.end);
751
752 let _ = corresponding_append.ack_tx.send(Ok(ack));
753
754 if let Some(oldest_append) = state.inflight_appends.front() {
755 timer.fire_at(
756 TimerEvent::AckDeadline,
757 oldest_append.since + ack_timeout,
758 CoalesceMode::Latest,
759 );
760 } else {
761 timer.cancel(TimerEvent::AckDeadline);
762 assert_eq!(
763 state.total_records, state.total_acked_records,
764 "all records should be acked when inflight is empty"
765 );
766 }
767}
768
769struct StashedSubmission {
770 input: AppendInput,
771 input_metered_bytes: usize,
772 ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
773 permit: Option<AppendPermit>,
774 since: Instant,
775}
776
777struct InflightAppend {
778 input: AppendInput,
779 input_metered_bytes: usize,
780 ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
781 since: Instant,
782 _permit: Option<AppendPermit>,
783}
784
785impl From<StashedSubmission> for InflightAppend {
786 fn from(value: StashedSubmission) -> Self {
787 Self {
788 input: value.input,
789 input_metered_bytes: value.input_metered_bytes,
790 ack_tx: value.ack_tx,
791 since: value.since,
792 _permit: value.permit,
793 }
794 }
795}
796
797enum Command {
798 Submit {
799 input: AppendInput,
800 ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
801 permit: Option<AppendPermit>,
802 },
803 Close {
804 done_tx: oneshot::Sender<Result<(), S2Error>>,
805 },
806}
807
808impl Command {
809 fn reject(self, err: S2Error) {
810 match self {
811 Command::Submit { ack_tx, .. } => {
812 let _ = ack_tx.send(Err(err));
813 }
814 Command::Close { done_tx } => {
815 let _ = done_tx.send(Err(err));
816 }
817 }
818 }
819}
820
821fn is_safe_to_retry(
822 err: &AppendSessionError,
823 policy: AppendRetryPolicy,
824 has_inflight: bool,
825 frame_signal: Option<&FrameSignal>,
826) -> bool {
827 let policy_compliant = match policy {
828 AppendRetryPolicy::All => true,
829 AppendRetryPolicy::NoSideEffects => {
830 !has_inflight
831 || !frame_signal.is_none_or(|s| s.is_signalled())
832 || err.has_no_side_effects()
833 }
834 };
835 policy_compliant && err.is_retryable()
836}
837
838const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
839
840#[derive(Debug, Clone, Copy, PartialEq, Eq)]
841enum TimerEvent {
842 AckDeadline,
843}
844
845const N_TIMER_VARIANTS: usize = 1;
846
847impl From<TimerEvent> for usize {
848 fn from(event: TimerEvent) -> Self {
849 match event {
850 TimerEvent::AckDeadline => 0,
851 }
852 }
853}
854
855impl From<usize> for TimerEvent {
856 fn from(value: usize) -> Self {
857 match value {
858 0 => TimerEvent::AckDeadline,
859 _ => panic!("invalid ordinal"),
860 }
861 }
862}
863
864#[cfg(test)]
865mod tests {
866 use http::StatusCode;
867
868 use super::{AppendSessionError, is_safe_to_retry};
869 use crate::{
870 api::{ApiError, ApiErrorResponse},
871 frame_signal::FrameSignal,
872 types::AppendRetryPolicy,
873 };
874
875 fn server_error(status: StatusCode, code: &str) -> AppendSessionError {
876 AppendSessionError::Api(ApiError::Server(
877 status,
878 ApiErrorResponse {
879 code: code.to_owned(),
880 message: "test".to_owned(),
881 },
882 ))
883 }
884
885 #[test]
886 fn safe_to_retry_session_all_policy() {
887 let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
888 let non_retryable = server_error(StatusCode::BAD_REQUEST, "bad_request");
889 let policy = AppendRetryPolicy::All;
890
891 assert!(is_safe_to_retry(&retryable, policy, true, None));
893 assert!(!is_safe_to_retry(&non_retryable, policy, true, None));
894 }
895
896 #[test]
897 fn safe_to_retry_session_no_side_effects_policy() {
898 let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
899 let no_side_effect = server_error(StatusCode::TOO_MANY_REQUESTS, "rate_limited");
900 let policy = AppendRetryPolicy::NoSideEffects;
901 let signal = FrameSignal::new();
902
903 signal.signal();
905 assert!(is_safe_to_retry(&retryable, policy, false, Some(&signal)));
906
907 signal.reset();
909 assert!(is_safe_to_retry(&retryable, policy, true, Some(&signal)));
910
911 signal.signal();
913 assert!(!is_safe_to_retry(&retryable, policy, true, Some(&signal)));
914
915 assert!(is_safe_to_retry(
917 &no_side_effect,
918 policy,
919 true,
920 Some(&signal)
921 ));
922
923 assert!(!is_safe_to_retry(
925 &AppendSessionError::AckTimeout,
926 policy,
927 true,
928 Some(&signal),
929 ));
930 }
931}