1use std::{
2 collections::VecDeque,
3 future::Future,
4 num::NonZeroU32,
5 pin::Pin,
6 sync::{Arc, OnceLock},
7 task::{Context, Poll},
8 time::Duration,
9};
10
11use futures::StreamExt;
12use tokio::{
13 sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
14 time::Instant,
15};
16use tokio_muxt::{CoalesceMode, MuxTimer};
17use tokio_stream::wrappers::ReceiverStream;
18use tokio_util::task::AbortOnDropHandle;
19use tracing::debug;
20
21use crate::{
22 api::{ApiError, BasinClient, Streaming, retry_builder},
23 frame_signal::FrameSignal,
24 retry::RetryBackoffBuilder,
25 types::{
26 AppendAck, AppendInput, AppendRetryPolicy, EncryptionKey, MeteredBytes, ONE_MIB, S2Error,
27 StreamName, StreamPosition, ValidationError,
28 },
29};
30
31#[derive(Debug, thiserror::Error)]
32pub enum AppendSessionError {
33 #[error(transparent)]
34 Api(#[from] ApiError),
35 #[error("append acknowledgement timed out")]
36 AckTimeout,
37 #[error("server disconnected")]
38 ServerDisconnected,
39 #[error("response stream closed early while appends in flight")]
40 StreamClosedEarly,
41 #[error("session already closed")]
42 SessionClosed,
43 #[error("session is closing")]
44 SessionClosing,
45 #[error("session dropped without calling close")]
46 SessionDropped,
47 #[error("invalid append acknowledgement: {0}")]
48 InvalidAck(String),
49}
50
51impl AppendSessionError {
52 pub fn is_retryable(&self) -> bool {
53 match self {
54 Self::Api(err) => err.is_retryable(),
55 Self::AckTimeout => true,
56 Self::ServerDisconnected => true,
57 _ => false,
58 }
59 }
60
61 pub fn has_no_side_effects(&self) -> bool {
62 match self {
63 Self::Api(err) => err.has_no_side_effects(),
64 _ => false,
65 }
66 }
67}
68
69impl From<AppendSessionError> for S2Error {
70 fn from(err: AppendSessionError) -> Self {
71 match err {
72 AppendSessionError::Api(api_err) => api_err.into(),
73 other => S2Error::Client(other.to_string()),
74 }
75 }
76}
77
78pub struct BatchSubmitTicket {
80 rx: oneshot::Receiver<Result<AppendAck, S2Error>>,
81 terminal_err: Arc<OnceLock<S2Error>>,
82}
83
84impl Future for BatchSubmitTicket {
85 type Output = Result<AppendAck, S2Error>;
86
87 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88 match Pin::new(&mut self.rx).poll(cx) {
89 Poll::Ready(Ok(res)) => Poll::Ready(res),
90 Poll::Ready(Err(_)) => Poll::Ready(Err(self
91 .terminal_err
92 .get()
93 .cloned()
94 .unwrap_or_else(|| AppendSessionError::SessionDropped.into()))),
95 Poll::Pending => Poll::Pending,
96 }
97 }
98}
99
100#[derive(Debug, Clone)]
101pub struct AppendSessionConfig {
103 max_unacked_bytes: u32,
104 max_unacked_batches: Option<u32>,
105}
106
107impl Default for AppendSessionConfig {
108 fn default() -> Self {
109 Self {
110 max_unacked_bytes: 5 * ONE_MIB,
111 max_unacked_batches: None,
112 }
113 }
114}
115
116impl AppendSessionConfig {
117 pub fn new() -> Self {
119 Self::default()
120 }
121
122 pub fn with_max_unacked_bytes(self, max_unacked_bytes: u32) -> Result<Self, ValidationError> {
128 if max_unacked_bytes < ONE_MIB {
129 return Err(format!("max_unacked_bytes must be at least {ONE_MIB}").into());
130 }
131 Ok(Self {
132 max_unacked_bytes,
133 ..self
134 })
135 }
136
137 pub fn with_max_unacked_batches(self, max_unacked_batches: NonZeroU32) -> Self {
141 Self {
142 max_unacked_batches: Some(max_unacked_batches.get()),
143 ..self
144 }
145 }
146}
147
148struct SessionState {
149 cmd_rx: mpsc::Receiver<Command>,
150 inflight_appends: VecDeque<InflightAppend>,
151 inflight_bytes: usize,
152 close_tx: Option<oneshot::Sender<Result<(), S2Error>>>,
153 total_records: usize,
154 total_acked_records: usize,
155 prev_ack_end: Option<StreamPosition>,
156 stashed_submission: Option<StashedSubmission>,
157}
158
159pub struct AppendSession {
164 cmd_tx: mpsc::Sender<Command>,
165 permits: AppendPermits,
166 terminal_err: Arc<OnceLock<S2Error>>,
167 _handle: AbortOnDropHandle<()>,
168}
169
170impl AppendSession {
171 pub(crate) fn new(
172 client: BasinClient,
173 stream: StreamName,
174 encryption: Option<EncryptionKey>,
175 config: AppendSessionConfig,
176 ) -> Self {
177 let buffer_size = config
178 .max_unacked_batches
179 .map(|mib| mib as usize)
180 .unwrap_or(DEFAULT_CHANNEL_BUFFER_SIZE);
181 let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
182 let permits = AppendPermits::new(config.max_unacked_batches, config.max_unacked_bytes);
183 let retry_builder = retry_builder(&client.config.retry);
184 let terminal_err = Arc::new(OnceLock::new());
185 let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
186 client,
187 stream,
188 encryption,
189 cmd_rx,
190 retry_builder,
191 buffer_size,
192 terminal_err.clone(),
193 )));
194 Self {
195 cmd_tx,
196 permits,
197 terminal_err,
198 _handle: handle,
199 }
200 }
201
202 pub async fn submit(&self, input: AppendInput) -> Result<BatchSubmitTicket, S2Error> {
212 let permit = self.reserve(input.records.metered_bytes() as u32).await?;
213 Ok(permit.submit(input))
214 }
215
216 pub async fn reserve(&self, bytes: u32) -> Result<BatchSubmitPermit, S2Error> {
233 let append_permit = self.permits.acquire(bytes).await;
234 let cmd_tx_permit = self
235 .cmd_tx
236 .clone()
237 .reserve_owned()
238 .await
239 .map_err(|_| self.terminal_err())?;
240 Ok(BatchSubmitPermit {
241 append_permit,
242 cmd_tx_permit,
243 terminal_err: self.terminal_err.clone(),
244 })
245 }
246
247 pub async fn close(self) -> Result<(), S2Error> {
249 let (done_tx, done_rx) = oneshot::channel();
250 self.cmd_tx
251 .send(Command::Close { done_tx })
252 .await
253 .map_err(|_| self.terminal_err())?;
254 done_rx.await.map_err(|_| self.terminal_err())??;
255 Ok(())
256 }
257
258 fn terminal_err(&self) -> S2Error {
259 self.terminal_err
260 .get()
261 .cloned()
262 .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
263 }
264}
265
266pub struct BatchSubmitPermit {
268 append_permit: AppendPermit,
269 cmd_tx_permit: mpsc::OwnedPermit<Command>,
270 terminal_err: Arc<OnceLock<S2Error>>,
271}
272
273impl BatchSubmitPermit {
274 pub fn submit(self, input: AppendInput) -> BatchSubmitTicket {
276 let (ack_tx, ack_rx) = oneshot::channel();
277 self.cmd_tx_permit.send(Command::Submit {
278 input,
279 ack_tx,
280 permit: Some(self.append_permit),
281 });
282 BatchSubmitTicket {
283 rx: ack_rx,
284 terminal_err: self.terminal_err,
285 }
286 }
287}
288
289pub(crate) struct AppendSessionInternal {
290 cmd_tx: mpsc::Sender<Command>,
291 terminal_err: Arc<OnceLock<S2Error>>,
292 _handle: AbortOnDropHandle<()>,
293}
294
295impl AppendSessionInternal {
296 pub(crate) fn new(
297 client: BasinClient,
298 stream: StreamName,
299 encryption: Option<EncryptionKey>,
300 ) -> Self {
301 let buffer_size = DEFAULT_CHANNEL_BUFFER_SIZE;
302 let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
303 let retry_builder = retry_builder(&client.config.retry);
304 let terminal_err = Arc::new(OnceLock::new());
305 let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
306 client,
307 stream,
308 encryption,
309 cmd_rx,
310 retry_builder,
311 buffer_size,
312 terminal_err.clone(),
313 )));
314 Self {
315 cmd_tx,
316 terminal_err,
317 _handle: handle,
318 }
319 }
320
321 pub(crate) fn submit(
322 &self,
323 input: AppendInput,
324 ) -> impl Future<Output = Result<BatchSubmitTicket, S2Error>> + Send + 'static {
325 let cmd_tx = self.cmd_tx.clone();
326 let terminal_err = self.terminal_err.clone();
327 async move {
328 let (ack_tx, ack_rx) = oneshot::channel();
329 cmd_tx
330 .send(Command::Submit {
331 input,
332 ack_tx,
333 permit: None,
334 })
335 .await
336 .map_err(|_| {
337 terminal_err
338 .get()
339 .cloned()
340 .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
341 })?;
342 Ok(BatchSubmitTicket {
343 rx: ack_rx,
344 terminal_err,
345 })
346 }
347 }
348
349 pub(crate) async fn close(self) -> Result<(), S2Error> {
350 let (done_tx, done_rx) = oneshot::channel();
351 self.cmd_tx
352 .send(Command::Close { done_tx })
353 .await
354 .map_err(|_| self.terminal_err())?;
355 done_rx.await.map_err(|_| self.terminal_err())??;
356 Ok(())
357 }
358
359 fn terminal_err(&self) -> S2Error {
360 self.terminal_err
361 .get()
362 .cloned()
363 .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
364 }
365}
366
367#[derive(Debug)]
368pub(crate) struct AppendPermit {
369 _count: Option<OwnedSemaphorePermit>,
370 _bytes: OwnedSemaphorePermit,
371}
372
373#[derive(Clone)]
374pub(crate) struct AppendPermits {
375 count: Option<Arc<Semaphore>>,
376 bytes: Arc<Semaphore>,
377}
378
379impl AppendPermits {
380 pub(crate) fn new(count_permits: Option<u32>, bytes_permits: u32) -> Self {
381 Self {
382 count: count_permits.map(|permits| Arc::new(Semaphore::new(permits as usize))),
383 bytes: Arc::new(Semaphore::new(bytes_permits as usize)),
384 }
385 }
386
387 pub(crate) async fn acquire(&self, bytes: u32) -> AppendPermit {
388 AppendPermit {
389 _count: if let Some(count) = self.count.as_ref() {
390 Some(
391 count
392 .clone()
393 .acquire_many_owned(1)
394 .await
395 .expect("semaphore should not be closed"),
396 )
397 } else {
398 None
399 },
400 _bytes: self
401 .bytes
402 .clone()
403 .acquire_many_owned(bytes)
404 .await
405 .expect("semaphore should not be closed"),
406 }
407 }
408}
409
410async fn run_session_with_retry(
411 client: BasinClient,
412 stream: StreamName,
413 encryption: Option<EncryptionKey>,
414 cmd_rx: mpsc::Receiver<Command>,
415 retry_builder: RetryBackoffBuilder,
416 buffer_size: usize,
417 terminal_err: Arc<OnceLock<S2Error>>,
418) {
419 let frame_signal = match client.config.retry.append_retry_policy {
420 AppendRetryPolicy::NoSideEffects => Some(FrameSignal::new()),
421 AppendRetryPolicy::All => None,
422 };
423
424 let mut state = SessionState {
425 cmd_rx,
426 inflight_appends: VecDeque::new(),
427 inflight_bytes: 0,
428 close_tx: None,
429 total_records: 0,
430 total_acked_records: 0,
431 prev_ack_end: None,
432 stashed_submission: None,
433 };
434 let mut prev_total_acked_records = 0;
435 let mut retry_backoff = retry_builder.build();
436
437 loop {
438 let result = run_session(
439 &client,
440 &stream,
441 encryption.as_ref(),
442 &mut state,
443 buffer_size,
444 &frame_signal,
445 )
446 .await;
447
448 match result {
449 Ok(()) => {
450 break;
451 }
452 Err(err) => {
453 if prev_total_acked_records < state.total_acked_records {
454 prev_total_acked_records = state.total_acked_records;
455 retry_backoff.reset();
456 }
457
458 if is_safe_to_retry(
459 &err,
460 client.config.retry.append_retry_policy,
461 !state.inflight_appends.is_empty(),
462 frame_signal.as_ref(),
463 ) && let Some(backoff) = retry_backoff.next()
464 {
465 debug!(
466 %err,
467 ?backoff,
468 num_retries_remaining = retry_backoff.remaining(),
469 "retrying append session"
470 );
471 tokio::time::sleep(backoff).await;
472 } else {
473 debug!(
474 %err,
475 retries_exhausted = retry_backoff.is_exhausted(),
476 "not retrying append session"
477 );
478
479 let err: S2Error = err.into();
480
481 let _ = terminal_err.set(err.clone());
482
483 for inflight_append in state.inflight_appends.drain(..) {
484 let _ = inflight_append.ack_tx.send(Err(err.clone()));
485 }
486
487 if let Some(stashed) = state.stashed_submission.take() {
488 let _ = stashed.ack_tx.send(Err(err.clone()));
489 }
490
491 if let Some(done_tx) = state.close_tx.take() {
492 let _ = done_tx.send(Err(err.clone()));
493 }
494
495 state.cmd_rx.close();
496 while let Some(cmd) = state.cmd_rx.recv().await {
497 cmd.reject(err.clone());
498 }
499 break;
500 }
501 }
502 }
503 }
504
505 if let Some(done_tx) = state.close_tx.take() {
506 let _ = done_tx.send(Ok(()));
507 }
508}
509
510async fn run_session(
511 client: &BasinClient,
512 stream: &StreamName,
513 encryption: Option<&EncryptionKey>,
514 state: &mut SessionState,
515 buffer_size: usize,
516 frame_signal: &Option<FrameSignal>,
517) -> Result<(), AppendSessionError> {
518 if let Some(s) = frame_signal {
519 s.reset();
520 }
521
522 let (input_tx, mut acks) = connect(
523 client,
524 stream,
525 encryption,
526 buffer_size,
527 frame_signal.clone(),
528 )
529 .await?;
530 let ack_timeout = client.config.request_timeout;
531
532 if !state.inflight_appends.is_empty() {
533 resend(state, &input_tx, &mut acks, ack_timeout).await?;
534
535 if let Some(s) = frame_signal {
536 s.reset();
537 }
538
539 assert!(state.inflight_appends.is_empty());
540 assert_eq!(state.inflight_bytes, 0);
541 }
542
543 let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
544 tokio::pin!(timer);
545
546 loop {
547 tokio::select! {
548 (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
549 match TimerEvent::from(event_ord) {
550 TimerEvent::AckDeadline => {
551 return Err(AppendSessionError::AckTimeout);
552 }
553 }
554 }
555
556 input_tx_permit = input_tx.reserve(), if state.stashed_submission.is_some() => {
557 let input_tx_permit = input_tx_permit
558 .map_err(|_| AppendSessionError::ServerDisconnected)?;
559 let submission = state.stashed_submission
560 .take()
561 .expect("stashed_submission should not be None");
562
563 input_tx_permit.send(submission.input.clone());
564
565 state.total_records += submission.input.records.len();
566 state.inflight_bytes += submission.input_metered_bytes;
567
568 timer.as_mut().fire_at(
569 TimerEvent::AckDeadline,
570 submission.since + ack_timeout,
571 CoalesceMode::Earliest,
572 );
573 state.inflight_appends.push_back(submission.into());
574 }
575
576 cmd = state.cmd_rx.recv(), if state.stashed_submission.is_none() => {
577 match cmd {
578 Some(Command::Submit { input, ack_tx, permit }) => {
579 if state.close_tx.is_some() {
580 let _ = ack_tx.send(
581 Err(AppendSessionError::SessionClosing.into())
582 );
583 } else {
584 let input_metered_bytes = input.records.metered_bytes();
585 state.stashed_submission = Some(StashedSubmission {
586 input,
587 input_metered_bytes,
588 ack_tx,
589 permit,
590 since: Instant::now(),
591 });
592 }
593 }
594 Some(Command::Close { done_tx }) => {
595 state.close_tx = Some(done_tx);
596 }
597 None => {
598 return Err(AppendSessionError::SessionDropped);
599 }
600 }
601 }
602
603 ack = acks.next() => {
604 match ack {
605 Some(Ok(ack)) => {
606 process_ack(
607 ack,
608 state,
609 timer.as_mut(),
610 ack_timeout,
611 )?;
612 }
613 Some(Err(err)) => {
614 return Err(err.into());
615 }
616 None => {
617 if !state.inflight_appends.is_empty() || state.stashed_submission.is_some() {
618 return Err(AppendSessionError::StreamClosedEarly);
619 }
620 break;
621 }
622 }
623 }
624 }
625
626 if state.close_tx.is_some()
627 && state.inflight_appends.is_empty()
628 && state.stashed_submission.is_none()
629 {
630 break;
631 }
632 }
633
634 assert!(state.inflight_appends.is_empty());
635 assert_eq!(state.inflight_bytes, 0);
636 assert!(state.stashed_submission.is_none());
637
638 Ok(())
639}
640
641async fn resend(
642 state: &mut SessionState,
643 input_tx: &mpsc::Sender<AppendInput>,
644 acks: &mut Streaming<AppendAck>,
645 ack_timeout: Duration,
646) -> Result<(), AppendSessionError> {
647 debug!(
648 inflight_appends_len = state.inflight_appends.len(),
649 inflight_bytes = state.inflight_bytes,
650 "resending inflight appends"
651 );
652
653 let mut resend_index = 0;
654 let mut resend_finished = false;
655
656 let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
657 tokio::pin!(timer);
658
659 while !state.inflight_appends.is_empty() {
660 tokio::select! {
661 (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
662 match TimerEvent::from(event_ord) {
663 TimerEvent::AckDeadline => {
664 return Err(AppendSessionError::AckTimeout);
665 }
666 }
667 }
668
669 input_tx_permit = input_tx.reserve(), if !resend_finished => {
670 let input_tx_permit = input_tx_permit
671 .map_err(|_| AppendSessionError::ServerDisconnected)?;
672
673 if let Some(inflight_append) = state.inflight_appends.get_mut(resend_index) {
674 inflight_append.since = Instant::now();
675 timer.as_mut().fire_at(
676 TimerEvent::AckDeadline,
677 inflight_append.since + ack_timeout,
678 CoalesceMode::Earliest,
679 );
680 input_tx_permit.send(inflight_append.input.clone());
681 resend_index += 1;
682 } else {
683 resend_finished = true;
684 }
685 }
686
687 ack = acks.next() => {
688 match ack {
689 Some(Ok(ack)) => {
690 process_ack(
691 ack,
692 state,
693 timer.as_mut(),
694 ack_timeout,
695 )?;
696 resend_index = resend_index.checked_sub(1).ok_or_else(|| {
697 AppendSessionError::InvalidAck(
698 "received ack without a corresponding resent append in flight".to_string(),
699 )
700 })?;
701 }
702 Some(Err(err)) => {
703 return Err(err.into());
704 }
705 None => {
706 return Err(AppendSessionError::StreamClosedEarly);
707 }
708 }
709 }
710 }
711 }
712
713 assert_eq!(
714 resend_index, 0,
715 "resend_index should be 0 after resend completes"
716 );
717 debug!("finished resending inflight appends");
718 Ok(())
719}
720
721async fn connect(
722 client: &BasinClient,
723 stream: &StreamName,
724 encryption: Option<&EncryptionKey>,
725 buffer_size: usize,
726 frame_signal: Option<FrameSignal>,
727) -> Result<(mpsc::Sender<AppendInput>, Streaming<AppendAck>), AppendSessionError> {
728 let (input_tx, input_rx) = mpsc::channel::<AppendInput>(buffer_size);
729 let ack_stream = Box::pin(
730 client
731 .append_session(
732 stream,
733 ReceiverStream::new(input_rx).map(|i| i.into()),
734 encryption,
735 frame_signal,
736 )
737 .await?
738 .map(|ack| match ack {
739 Ok(ack) => Ok(ack.into()),
740 Err(err) => Err(err),
741 }),
742 );
743 Ok((input_tx, ack_stream))
744}
745
746fn process_ack(
747 ack: AppendAck,
748 state: &mut SessionState,
749 timer: Pin<&mut MuxTimer<N_TIMER_VARIANTS>>,
750 ack_timeout: Duration,
751) -> Result<(), AppendSessionError> {
752 let corresponding_append = state.inflight_appends.pop_front().ok_or_else(|| {
753 AppendSessionError::InvalidAck(
754 "received ack without a corresponding append in flight".to_string(),
755 )
756 })?;
757
758 if ack.end.seq_num < ack.start.seq_num {
759 return Err(AppendSessionError::InvalidAck(
760 "ack end seq_num should be greater than or equal to start seq_num".to_string(),
761 ));
762 }
763
764 if state
765 .prev_ack_end
766 .is_some_and(|end| ack.end.seq_num <= end.seq_num)
767 {
768 return Err(AppendSessionError::InvalidAck(
769 "ack end seq_num should be greater than previous ack end".to_string(),
770 ));
771 }
772
773 let num_acked_records = (ack.end.seq_num - ack.start.seq_num) as usize;
774 let expected_records = corresponding_append.input.records.len();
775 if num_acked_records != expected_records {
776 return Err(AppendSessionError::InvalidAck(format!(
777 "acked record count {num_acked_records} does not match submitted batch size {expected_records}"
778 )));
779 }
780
781 state.total_acked_records += num_acked_records;
782 state.inflight_bytes -= corresponding_append.input_metered_bytes;
783 state.prev_ack_end = Some(ack.end);
784
785 let _ = corresponding_append.ack_tx.send(Ok(ack));
786
787 if let Some(oldest_append) = state.inflight_appends.front() {
788 timer.fire_at(
789 TimerEvent::AckDeadline,
790 oldest_append.since + ack_timeout,
791 CoalesceMode::Latest,
792 );
793 } else {
794 timer.cancel(TimerEvent::AckDeadline);
795 assert_eq!(
796 state.total_records, state.total_acked_records,
797 "all records should be acked when inflight is empty"
798 );
799 }
800
801 Ok(())
802}
803
804struct StashedSubmission {
805 input: AppendInput,
806 input_metered_bytes: usize,
807 ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
808 permit: Option<AppendPermit>,
809 since: Instant,
810}
811
812struct InflightAppend {
813 input: AppendInput,
814 input_metered_bytes: usize,
815 ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
816 since: Instant,
817 _permit: Option<AppendPermit>,
818}
819
820impl From<StashedSubmission> for InflightAppend {
821 fn from(value: StashedSubmission) -> Self {
822 Self {
823 input: value.input,
824 input_metered_bytes: value.input_metered_bytes,
825 ack_tx: value.ack_tx,
826 since: value.since,
827 _permit: value.permit,
828 }
829 }
830}
831
832enum Command {
833 Submit {
834 input: AppendInput,
835 ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
836 permit: Option<AppendPermit>,
837 },
838 Close {
839 done_tx: oneshot::Sender<Result<(), S2Error>>,
840 },
841}
842
843impl Command {
844 fn reject(self, err: S2Error) {
845 match self {
846 Command::Submit { ack_tx, .. } => {
847 let _ = ack_tx.send(Err(err));
848 }
849 Command::Close { done_tx } => {
850 let _ = done_tx.send(Err(err));
851 }
852 }
853 }
854}
855
856fn is_safe_to_retry(
857 err: &AppendSessionError,
858 policy: AppendRetryPolicy,
859 has_inflight: bool,
860 frame_signal: Option<&FrameSignal>,
861) -> bool {
862 let policy_compliant = match policy {
863 AppendRetryPolicy::All => true,
864 AppendRetryPolicy::NoSideEffects => {
865 !has_inflight
866 || !frame_signal.is_none_or(|s| s.is_signalled())
867 || err.has_no_side_effects()
868 }
869 };
870 policy_compliant && err.is_retryable()
871}
872
873const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
874
875#[derive(Debug, Clone, Copy, PartialEq, Eq)]
876enum TimerEvent {
877 AckDeadline,
878}
879
880const N_TIMER_VARIANTS: usize = 1;
881
882impl From<TimerEvent> for usize {
883 fn from(event: TimerEvent) -> Self {
884 match event {
885 TimerEvent::AckDeadline => 0,
886 }
887 }
888}
889
890impl From<usize> for TimerEvent {
891 fn from(value: usize) -> Self {
892 match value {
893 0 => TimerEvent::AckDeadline,
894 _ => panic!("invalid ordinal"),
895 }
896 }
897}
898
899#[cfg(test)]
900mod tests {
901 use http::StatusCode;
902
903 use super::{AppendSessionError, is_safe_to_retry};
904 use crate::{
905 api::{ApiError, ApiErrorResponse},
906 frame_signal::FrameSignal,
907 types::AppendRetryPolicy,
908 };
909
910 fn server_error(status: StatusCode, code: &str) -> AppendSessionError {
911 AppendSessionError::Api(ApiError::Server(
912 status,
913 ApiErrorResponse {
914 code: code.to_owned(),
915 message: "test".to_owned(),
916 },
917 ))
918 }
919
920 #[test]
921 fn safe_to_retry_session_all_policy() {
922 let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
923 let non_retryable = server_error(StatusCode::BAD_REQUEST, "bad_request");
924 let policy = AppendRetryPolicy::All;
925
926 assert!(is_safe_to_retry(&retryable, policy, true, None));
928 assert!(!is_safe_to_retry(&non_retryable, policy, true, None));
929 }
930
931 #[test]
932 fn safe_to_retry_session_no_side_effects_policy() {
933 let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
934 let no_side_effect = server_error(StatusCode::TOO_MANY_REQUESTS, "rate_limited");
935 let policy = AppendRetryPolicy::NoSideEffects;
936 let signal = FrameSignal::new();
937
938 signal.signal();
940 assert!(is_safe_to_retry(&retryable, policy, false, Some(&signal)));
941
942 signal.reset();
944 assert!(is_safe_to_retry(&retryable, policy, true, Some(&signal)));
945
946 signal.signal();
948 assert!(!is_safe_to_retry(&retryable, policy, true, Some(&signal)));
949
950 assert!(is_safe_to_retry(
952 &no_side_effect,
953 policy,
954 true,
955 Some(&signal)
956 ));
957
958 assert!(!is_safe_to_retry(
960 &AppendSessionError::AckTimeout,
961 policy,
962 true,
963 Some(&signal),
964 ));
965 }
966}