Skip to main content

s2_sdk/session/
append.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    num::NonZeroU32,
5    pin::Pin,
6    sync::{Arc, OnceLock},
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use futures::StreamExt;
12use tokio::{
13    sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
14    time::Instant,
15};
16use tokio_muxt::{CoalesceMode, MuxTimer};
17use tokio_stream::wrappers::ReceiverStream;
18use tokio_util::task::AbortOnDropHandle;
19use tracing::debug;
20
21use crate::{
22    api::{ApiError, BasinClient, Streaming, retry_builder},
23    frame_signal::FrameSignal,
24    retry::RetryBackoffBuilder,
25    types::{
26        AppendAck, AppendInput, AppendRetryPolicy, MeteredBytes, ONE_MIB, S2Error, StreamName,
27        StreamPosition, ValidationError,
28    },
29};
30
31#[derive(Debug, thiserror::Error)]
32pub enum AppendSessionError {
33    #[error(transparent)]
34    Api(#[from] ApiError),
35    #[error("append acknowledgement timed out")]
36    AckTimeout,
37    #[error("server disconnected")]
38    ServerDisconnected,
39    #[error("response stream closed early while appends in flight")]
40    StreamClosedEarly,
41    #[error("session already closed")]
42    SessionClosed,
43    #[error("session is closing")]
44    SessionClosing,
45    #[error("session dropped without calling close")]
46    SessionDropped,
47    #[error("unexpected append acknowledgement during resend")]
48    UnexpectedAck,
49}
50
51impl AppendSessionError {
52    pub fn is_retryable(&self) -> bool {
53        match self {
54            Self::Api(err) => err.is_retryable(),
55            Self::AckTimeout => true,
56            Self::ServerDisconnected => true,
57            _ => false,
58        }
59    }
60
61    pub fn has_no_side_effects(&self) -> bool {
62        match self {
63            Self::Api(err) => err.has_no_side_effects(),
64            _ => false,
65        }
66    }
67}
68
69impl From<AppendSessionError> for S2Error {
70    fn from(err: AppendSessionError) -> Self {
71        match err {
72            AppendSessionError::Api(api_err) => api_err.into(),
73            other => S2Error::Client(other.to_string()),
74        }
75    }
76}
77
78/// A [`Future`] that resolves to an acknowledgement once the batch of records is appended.
79pub struct BatchSubmitTicket {
80    rx: oneshot::Receiver<Result<AppendAck, S2Error>>,
81    terminal_err: Arc<OnceLock<S2Error>>,
82}
83
84impl Future for BatchSubmitTicket {
85    type Output = Result<AppendAck, S2Error>;
86
87    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88        match Pin::new(&mut self.rx).poll(cx) {
89            Poll::Ready(Ok(res)) => Poll::Ready(res),
90            Poll::Ready(Err(_)) => Poll::Ready(Err(self
91                .terminal_err
92                .get()
93                .cloned()
94                .unwrap_or_else(|| AppendSessionError::SessionDropped.into()))),
95            Poll::Pending => Poll::Pending,
96        }
97    }
98}
99
100#[derive(Debug, Clone)]
101/// Configuration for an [`AppendSession`].
102pub struct AppendSessionConfig {
103    max_unacked_bytes: u32,
104    max_unacked_batches: Option<u32>,
105}
106
107impl Default for AppendSessionConfig {
108    fn default() -> Self {
109        Self {
110            max_unacked_bytes: 5 * ONE_MIB,
111            max_unacked_batches: None,
112        }
113    }
114}
115
116impl AppendSessionConfig {
117    /// Create a new [`AppendSessionConfig`] with default settings.
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Set the limit on total metered bytes of unacknowledged [`AppendInput`]s held in memory.
123    ///
124    /// **Note:** It must be at least `1MiB`.
125    ///
126    /// Defaults to `5MiB`.
127    pub fn with_max_unacked_bytes(self, max_unacked_bytes: u32) -> Result<Self, ValidationError> {
128        if max_unacked_bytes < ONE_MIB {
129            return Err(format!("max_unacked_bytes must be at least {ONE_MIB}").into());
130        }
131        Ok(Self {
132            max_unacked_bytes,
133            ..self
134        })
135    }
136
137    /// Set the limit on number of unacknowledged [`AppendInput`]s held in memory.
138    ///
139    /// Defaults to no limit.
140    pub fn with_max_unacked_batches(self, max_unacked_batches: NonZeroU32) -> Self {
141        Self {
142            max_unacked_batches: Some(max_unacked_batches.get()),
143            ..self
144        }
145    }
146}
147
148struct SessionState {
149    cmd_rx: mpsc::Receiver<Command>,
150    inflight_appends: VecDeque<InflightAppend>,
151    inflight_bytes: usize,
152    close_tx: Option<oneshot::Sender<Result<(), S2Error>>>,
153    total_records: usize,
154    total_acked_records: usize,
155    prev_ack_end: Option<StreamPosition>,
156    stashed_submission: Option<StashedSubmission>,
157}
158
159/// A session for high-throughput appending with backpressure control. It can be created from
160/// [`append_session`](crate::S2Stream::append_session).
161///
162/// Supports pipelining multiple [`AppendInput`]s while preserving submission order.
163pub struct AppendSession {
164    cmd_tx: mpsc::Sender<Command>,
165    permits: AppendPermits,
166    terminal_err: Arc<OnceLock<S2Error>>,
167    _handle: AbortOnDropHandle<()>,
168}
169
170impl AppendSession {
171    pub(crate) fn new(
172        client: BasinClient,
173        stream: StreamName,
174        config: AppendSessionConfig,
175    ) -> Self {
176        let buffer_size = config
177            .max_unacked_batches
178            .map(|mib| mib as usize)
179            .unwrap_or(DEFAULT_CHANNEL_BUFFER_SIZE);
180        let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
181        let permits = AppendPermits::new(config.max_unacked_batches, config.max_unacked_bytes);
182        let retry_builder = retry_builder(&client.config.retry);
183        let terminal_err = Arc::new(OnceLock::new());
184        let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
185            client,
186            stream,
187            cmd_rx,
188            retry_builder,
189            buffer_size,
190            terminal_err.clone(),
191        )));
192        Self {
193            cmd_tx,
194            permits,
195            terminal_err,
196            _handle: handle,
197        }
198    }
199
200    /// Submit a batch of records for appending.
201    ///
202    /// Internally, it waits on [`reserve`](Self::reserve), then submits using the permit.
203    /// This provides backpressure when inflight limits are reached.
204    /// For explicit control, use [`reserve`](Self::reserve) followed by
205    /// [`BatchSubmitPermit::submit`].
206    ///
207    /// **Note**: After all submits, you must call [`close`](Self::close) to ensure all batches are
208    /// appended.
209    pub async fn submit(&self, input: AppendInput) -> Result<BatchSubmitTicket, S2Error> {
210        let permit = self.reserve(input.records.metered_bytes() as u32).await?;
211        Ok(permit.submit(input))
212    }
213
214    /// Reserve capacity for a batch to be submitted. Useful in [`select!`](tokio::select) loops
215    /// where you want to interleave submission with other async work. See [`submit`](Self::submit)
216    /// for a simpler API.
217    ///
218    /// Waits when inflight limits are reached, providing explicit backpressure control.
219    /// The returned permit must be used to submit the batch.
220    ///
221    /// **Note**: After all submits, you must call [`close`](Self::close) to ensure all batches are
222    /// appended.
223    ///
224    /// # Cancel safety
225    ///
226    /// This method is cancel safe. Internally, it only awaits
227    /// [`Semaphore::acquire_many_owned`](tokio::sync::Semaphore::acquire_many_owned) and
228    /// [`Sender::reserve_owned`](tokio::sync::mpsc::Sender::reserve), both of which are cancel
229    /// safe.
230    pub async fn reserve(&self, bytes: u32) -> Result<BatchSubmitPermit, S2Error> {
231        let append_permit = self.permits.acquire(bytes).await;
232        let cmd_tx_permit = self
233            .cmd_tx
234            .clone()
235            .reserve_owned()
236            .await
237            .map_err(|_| self.terminal_err())?;
238        Ok(BatchSubmitPermit {
239            append_permit,
240            cmd_tx_permit,
241            terminal_err: self.terminal_err.clone(),
242        })
243    }
244
245    /// Close the session and wait for all submitted batch of records to be appended.
246    pub async fn close(self) -> Result<(), S2Error> {
247        let (done_tx, done_rx) = oneshot::channel();
248        self.cmd_tx
249            .send(Command::Close { done_tx })
250            .await
251            .map_err(|_| self.terminal_err())?;
252        done_rx.await.map_err(|_| self.terminal_err())??;
253        Ok(())
254    }
255
256    fn terminal_err(&self) -> S2Error {
257        self.terminal_err
258            .get()
259            .cloned()
260            .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
261    }
262}
263
264/// A permit to submit a batch after reserving capacity.
265pub struct BatchSubmitPermit {
266    append_permit: AppendPermit,
267    cmd_tx_permit: mpsc::OwnedPermit<Command>,
268    terminal_err: Arc<OnceLock<S2Error>>,
269}
270
271impl BatchSubmitPermit {
272    /// Submit the batch using this permit.
273    pub fn submit(self, input: AppendInput) -> BatchSubmitTicket {
274        let (ack_tx, ack_rx) = oneshot::channel();
275        self.cmd_tx_permit.send(Command::Submit {
276            input,
277            ack_tx,
278            permit: Some(self.append_permit),
279        });
280        BatchSubmitTicket {
281            rx: ack_rx,
282            terminal_err: self.terminal_err,
283        }
284    }
285}
286
287pub(crate) struct AppendSessionInternal {
288    cmd_tx: mpsc::Sender<Command>,
289    terminal_err: Arc<OnceLock<S2Error>>,
290    _handle: AbortOnDropHandle<()>,
291}
292
293impl AppendSessionInternal {
294    pub(crate) fn new(client: BasinClient, stream: StreamName) -> Self {
295        let buffer_size = DEFAULT_CHANNEL_BUFFER_SIZE;
296        let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
297        let retry_builder = retry_builder(&client.config.retry);
298        let terminal_err = Arc::new(OnceLock::new());
299        let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
300            client,
301            stream,
302            cmd_rx,
303            retry_builder,
304            buffer_size,
305            terminal_err.clone(),
306        )));
307        Self {
308            cmd_tx,
309            terminal_err,
310            _handle: handle,
311        }
312    }
313
314    pub(crate) fn submit(
315        &self,
316        input: AppendInput,
317    ) -> impl Future<Output = Result<BatchSubmitTicket, S2Error>> + Send + 'static {
318        let cmd_tx = self.cmd_tx.clone();
319        let terminal_err = self.terminal_err.clone();
320        async move {
321            let (ack_tx, ack_rx) = oneshot::channel();
322            cmd_tx
323                .send(Command::Submit {
324                    input,
325                    ack_tx,
326                    permit: None,
327                })
328                .await
329                .map_err(|_| {
330                    terminal_err
331                        .get()
332                        .cloned()
333                        .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
334                })?;
335            Ok(BatchSubmitTicket {
336                rx: ack_rx,
337                terminal_err,
338            })
339        }
340    }
341
342    pub(crate) async fn close(self) -> Result<(), S2Error> {
343        let (done_tx, done_rx) = oneshot::channel();
344        self.cmd_tx
345            .send(Command::Close { done_tx })
346            .await
347            .map_err(|_| self.terminal_err())?;
348        done_rx.await.map_err(|_| self.terminal_err())??;
349        Ok(())
350    }
351
352    fn terminal_err(&self) -> S2Error {
353        self.terminal_err
354            .get()
355            .cloned()
356            .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
357    }
358}
359
360#[derive(Debug)]
361pub(crate) struct AppendPermit {
362    _count: Option<OwnedSemaphorePermit>,
363    _bytes: OwnedSemaphorePermit,
364}
365
366#[derive(Clone)]
367pub(crate) struct AppendPermits {
368    count: Option<Arc<Semaphore>>,
369    bytes: Arc<Semaphore>,
370}
371
372impl AppendPermits {
373    pub(crate) fn new(count_permits: Option<u32>, bytes_permits: u32) -> Self {
374        Self {
375            count: count_permits.map(|permits| Arc::new(Semaphore::new(permits as usize))),
376            bytes: Arc::new(Semaphore::new(bytes_permits as usize)),
377        }
378    }
379
380    pub(crate) async fn acquire(&self, bytes: u32) -> AppendPermit {
381        AppendPermit {
382            _count: if let Some(count) = self.count.as_ref() {
383                Some(
384                    count
385                        .clone()
386                        .acquire_many_owned(1)
387                        .await
388                        .expect("semaphore should not be closed"),
389                )
390            } else {
391                None
392            },
393            _bytes: self
394                .bytes
395                .clone()
396                .acquire_many_owned(bytes)
397                .await
398                .expect("semaphore should not be closed"),
399        }
400    }
401}
402
403async fn run_session_with_retry(
404    client: BasinClient,
405    stream: StreamName,
406    cmd_rx: mpsc::Receiver<Command>,
407    retry_builder: RetryBackoffBuilder,
408    buffer_size: usize,
409    terminal_err: Arc<OnceLock<S2Error>>,
410) {
411    let frame_signal = match client.config.retry.append_retry_policy {
412        AppendRetryPolicy::NoSideEffects => Some(FrameSignal::new()),
413        AppendRetryPolicy::All => None,
414    };
415
416    let mut state = SessionState {
417        cmd_rx,
418        inflight_appends: VecDeque::new(),
419        inflight_bytes: 0,
420        close_tx: None,
421        total_records: 0,
422        total_acked_records: 0,
423        prev_ack_end: None,
424        stashed_submission: None,
425    };
426    let mut prev_total_acked_records = 0;
427    let mut retry_backoff = retry_builder.build();
428
429    loop {
430        let result = run_session(&client, &stream, &mut state, buffer_size, &frame_signal).await;
431
432        match result {
433            Ok(()) => {
434                break;
435            }
436            Err(err) => {
437                if prev_total_acked_records < state.total_acked_records {
438                    prev_total_acked_records = state.total_acked_records;
439                    retry_backoff.reset();
440                }
441
442                if is_safe_to_retry(
443                    &err,
444                    client.config.retry.append_retry_policy,
445                    !state.inflight_appends.is_empty(),
446                    frame_signal.as_ref(),
447                ) && let Some(backoff) = retry_backoff.next()
448                {
449                    debug!(
450                        %err,
451                        ?backoff,
452                        num_retries_remaining = retry_backoff.remaining(),
453                        "retrying append session"
454                    );
455                    tokio::time::sleep(backoff).await;
456                } else {
457                    debug!(
458                        %err,
459                        retries_exhausted = retry_backoff.is_exhausted(),
460                        "not retrying append session"
461                    );
462
463                    let err: S2Error = err.into();
464
465                    let _ = terminal_err.set(err.clone());
466
467                    for inflight_append in state.inflight_appends.drain(..) {
468                        let _ = inflight_append.ack_tx.send(Err(err.clone()));
469                    }
470
471                    if let Some(stashed) = state.stashed_submission.take() {
472                        let _ = stashed.ack_tx.send(Err(err.clone()));
473                    }
474
475                    if let Some(done_tx) = state.close_tx.take() {
476                        let _ = done_tx.send(Err(err.clone()));
477                    }
478
479                    state.cmd_rx.close();
480                    while let Some(cmd) = state.cmd_rx.recv().await {
481                        cmd.reject(err.clone());
482                    }
483                    break;
484                }
485            }
486        }
487    }
488
489    if let Some(done_tx) = state.close_tx.take() {
490        let _ = done_tx.send(Ok(()));
491    }
492}
493
494async fn run_session(
495    client: &BasinClient,
496    stream: &StreamName,
497    state: &mut SessionState,
498    buffer_size: usize,
499    frame_signal: &Option<FrameSignal>,
500) -> Result<(), AppendSessionError> {
501    if let Some(s) = frame_signal {
502        s.reset();
503    }
504
505    let (input_tx, mut acks) = connect(client, stream, buffer_size, frame_signal.clone()).await?;
506    let ack_timeout = client.config.request_timeout;
507
508    if !state.inflight_appends.is_empty() {
509        resend(state, &input_tx, &mut acks, ack_timeout).await?;
510
511        if let Some(s) = frame_signal {
512            s.reset();
513        }
514
515        assert!(state.inflight_appends.is_empty());
516        assert_eq!(state.inflight_bytes, 0);
517    }
518
519    let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
520    tokio::pin!(timer);
521
522    loop {
523        tokio::select! {
524            (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
525                match TimerEvent::from(event_ord) {
526                    TimerEvent::AckDeadline => {
527                        return Err(AppendSessionError::AckTimeout);
528                    }
529                }
530            }
531
532            input_tx_permit = input_tx.reserve(), if state.stashed_submission.is_some() => {
533                let input_tx_permit = input_tx_permit
534                    .map_err(|_| AppendSessionError::ServerDisconnected)?;
535                let submission = state.stashed_submission
536                    .take()
537                    .expect("stashed_submission should not be None");
538
539                input_tx_permit.send(submission.input.clone());
540
541                state.total_records += submission.input.records.len();
542                state.inflight_bytes += submission.input_metered_bytes;
543
544                timer.as_mut().fire_at(
545                    TimerEvent::AckDeadline,
546                    submission.since + ack_timeout,
547                    CoalesceMode::Earliest,
548                );
549                state.inflight_appends.push_back(submission.into());
550            }
551
552            cmd = state.cmd_rx.recv(), if state.stashed_submission.is_none() => {
553                match cmd {
554                    Some(Command::Submit { input, ack_tx, permit }) => {
555                        if state.close_tx.is_some() {
556                            let _ = ack_tx.send(
557                                Err(AppendSessionError::SessionClosing.into())
558                            );
559                        } else {
560                            let input_metered_bytes = input.records.metered_bytes();
561                            state.stashed_submission = Some(StashedSubmission {
562                                input,
563                                input_metered_bytes,
564                                ack_tx,
565                                permit,
566                                since: Instant::now(),
567                            });
568                        }
569                    }
570                    Some(Command::Close { done_tx }) => {
571                        state.close_tx = Some(done_tx);
572                    }
573                    None => {
574                        return Err(AppendSessionError::SessionDropped);
575                    }
576                }
577            }
578
579            ack = acks.next() => {
580                match ack {
581                    Some(Ok(ack)) => {
582                        process_ack(
583                            ack,
584                            state,
585                            timer.as_mut(),
586                            ack_timeout,
587                        );
588                    }
589                    Some(Err(err)) => {
590                        return Err(err.into());
591                    }
592                    None => {
593                        if !state.inflight_appends.is_empty() || state.stashed_submission.is_some() {
594                            return Err(AppendSessionError::StreamClosedEarly);
595                        }
596                        break;
597                    }
598                }
599            }
600        }
601
602        if state.close_tx.is_some()
603            && state.inflight_appends.is_empty()
604            && state.stashed_submission.is_none()
605        {
606            break;
607        }
608    }
609
610    assert!(state.inflight_appends.is_empty());
611    assert_eq!(state.inflight_bytes, 0);
612    assert!(state.stashed_submission.is_none());
613
614    Ok(())
615}
616
617async fn resend(
618    state: &mut SessionState,
619    input_tx: &mpsc::Sender<AppendInput>,
620    acks: &mut Streaming<AppendAck>,
621    ack_timeout: Duration,
622) -> Result<(), AppendSessionError> {
623    debug!(
624        inflight_appends_len = state.inflight_appends.len(),
625        inflight_bytes = state.inflight_bytes,
626        "resending inflight appends"
627    );
628
629    let mut resend_index = 0;
630    let mut resend_finished = false;
631
632    let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
633    tokio::pin!(timer);
634
635    while !state.inflight_appends.is_empty() {
636        tokio::select! {
637            (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
638                match TimerEvent::from(event_ord) {
639                    TimerEvent::AckDeadline => {
640                        return Err(AppendSessionError::AckTimeout);
641                    }
642                }
643            }
644
645            input_tx_permit = input_tx.reserve(), if !resend_finished => {
646                let input_tx_permit = input_tx_permit
647                    .map_err(|_| AppendSessionError::ServerDisconnected)?;
648
649                if let Some(inflight_append) = state.inflight_appends.get_mut(resend_index) {
650                    inflight_append.since = Instant::now();
651                    timer.as_mut().fire_at(
652                        TimerEvent::AckDeadline,
653                        inflight_append.since + ack_timeout,
654                        CoalesceMode::Earliest,
655                    );
656                    input_tx_permit.send(inflight_append.input.clone());
657                    resend_index += 1;
658                } else {
659                    resend_finished = true;
660                }
661            }
662
663            ack = acks.next() => {
664                match ack {
665                    Some(Ok(ack)) => {
666                        process_ack(
667                            ack,
668                            state,
669                            timer.as_mut(),
670                            ack_timeout,
671                        );
672                        resend_index = resend_index
673                            .checked_sub(1)
674                            .ok_or(AppendSessionError::UnexpectedAck)?;
675                    }
676                    Some(Err(err)) => {
677                        return Err(err.into());
678                    }
679                    None => {
680                        return Err(AppendSessionError::StreamClosedEarly);
681                    }
682                }
683            }
684        }
685    }
686
687    assert_eq!(
688        resend_index, 0,
689        "resend_index should be 0 after resend completes"
690    );
691    debug!("finished resending inflight appends");
692    Ok(())
693}
694
695async fn connect(
696    client: &BasinClient,
697    stream: &StreamName,
698    buffer_size: usize,
699    frame_signal: Option<FrameSignal>,
700) -> Result<(mpsc::Sender<AppendInput>, Streaming<AppendAck>), AppendSessionError> {
701    let (input_tx, input_rx) = mpsc::channel::<AppendInput>(buffer_size);
702    let ack_stream = Box::pin(
703        client
704            .append_session(
705                stream,
706                ReceiverStream::new(input_rx).map(|i| i.into()),
707                frame_signal,
708            )
709            .await?
710            .map(|ack| match ack {
711                Ok(ack) => Ok(ack.into()),
712                Err(err) => Err(err),
713            }),
714    );
715    Ok((input_tx, ack_stream))
716}
717
718fn process_ack(
719    ack: AppendAck,
720    state: &mut SessionState,
721    timer: Pin<&mut MuxTimer<N_TIMER_VARIANTS>>,
722    ack_timeout: Duration,
723) {
724    let corresponding_append = state
725        .inflight_appends
726        .pop_front()
727        .expect("corresponding append should be present for an ack");
728
729    assert!(
730        ack.end.seq_num >= ack.start.seq_num,
731        "ack end seq_num should be greater than or equal to start seq_num"
732    );
733
734    if let Some(end) = state.prev_ack_end {
735        assert!(
736            ack.end.seq_num > end.seq_num,
737            "ack end seq_num should be greater than previous ack end"
738        );
739    }
740
741    let num_acked_records = (ack.end.seq_num - ack.start.seq_num) as usize;
742    assert_eq!(
743        num_acked_records,
744        corresponding_append.input.records.len(),
745        "ack record count should match submitted batch size"
746    );
747
748    state.total_acked_records += num_acked_records;
749    state.inflight_bytes -= corresponding_append.input_metered_bytes;
750    state.prev_ack_end = Some(ack.end);
751
752    let _ = corresponding_append.ack_tx.send(Ok(ack));
753
754    if let Some(oldest_append) = state.inflight_appends.front() {
755        timer.fire_at(
756            TimerEvent::AckDeadline,
757            oldest_append.since + ack_timeout,
758            CoalesceMode::Latest,
759        );
760    } else {
761        timer.cancel(TimerEvent::AckDeadline);
762        assert_eq!(
763            state.total_records, state.total_acked_records,
764            "all records should be acked when inflight is empty"
765        );
766    }
767}
768
769struct StashedSubmission {
770    input: AppendInput,
771    input_metered_bytes: usize,
772    ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
773    permit: Option<AppendPermit>,
774    since: Instant,
775}
776
777struct InflightAppend {
778    input: AppendInput,
779    input_metered_bytes: usize,
780    ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
781    since: Instant,
782    _permit: Option<AppendPermit>,
783}
784
785impl From<StashedSubmission> for InflightAppend {
786    fn from(value: StashedSubmission) -> Self {
787        Self {
788            input: value.input,
789            input_metered_bytes: value.input_metered_bytes,
790            ack_tx: value.ack_tx,
791            since: value.since,
792            _permit: value.permit,
793        }
794    }
795}
796
797enum Command {
798    Submit {
799        input: AppendInput,
800        ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
801        permit: Option<AppendPermit>,
802    },
803    Close {
804        done_tx: oneshot::Sender<Result<(), S2Error>>,
805    },
806}
807
808impl Command {
809    fn reject(self, err: S2Error) {
810        match self {
811            Command::Submit { ack_tx, .. } => {
812                let _ = ack_tx.send(Err(err));
813            }
814            Command::Close { done_tx } => {
815                let _ = done_tx.send(Err(err));
816            }
817        }
818    }
819}
820
821fn is_safe_to_retry(
822    err: &AppendSessionError,
823    policy: AppendRetryPolicy,
824    has_inflight: bool,
825    frame_signal: Option<&FrameSignal>,
826) -> bool {
827    let policy_compliant = match policy {
828        AppendRetryPolicy::All => true,
829        AppendRetryPolicy::NoSideEffects => {
830            !has_inflight
831                || !frame_signal.is_none_or(|s| s.is_signalled())
832                || err.has_no_side_effects()
833        }
834    };
835    policy_compliant && err.is_retryable()
836}
837
838const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
839
840#[derive(Debug, Clone, Copy, PartialEq, Eq)]
841enum TimerEvent {
842    AckDeadline,
843}
844
845const N_TIMER_VARIANTS: usize = 1;
846
847impl From<TimerEvent> for usize {
848    fn from(event: TimerEvent) -> Self {
849        match event {
850            TimerEvent::AckDeadline => 0,
851        }
852    }
853}
854
855impl From<usize> for TimerEvent {
856    fn from(value: usize) -> Self {
857        match value {
858            0 => TimerEvent::AckDeadline,
859            _ => panic!("invalid ordinal"),
860        }
861    }
862}
863
864#[cfg(test)]
865mod tests {
866    use http::StatusCode;
867
868    use super::{AppendSessionError, is_safe_to_retry};
869    use crate::{
870        api::{ApiError, ApiErrorResponse},
871        frame_signal::FrameSignal,
872        types::AppendRetryPolicy,
873    };
874
875    fn server_error(status: StatusCode, code: &str) -> AppendSessionError {
876        AppendSessionError::Api(ApiError::Server(
877            status,
878            ApiErrorResponse {
879                code: code.to_owned(),
880                message: "test".to_owned(),
881            },
882        ))
883    }
884
885    #[test]
886    fn safe_to_retry_session_all_policy() {
887        let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
888        let non_retryable = server_error(StatusCode::BAD_REQUEST, "bad_request");
889        let policy = AppendRetryPolicy::All;
890
891        // All policy — always policy-compliant, just needs retryable.
892        assert!(is_safe_to_retry(&retryable, policy, true, None));
893        assert!(!is_safe_to_retry(&non_retryable, policy, true, None));
894    }
895
896    #[test]
897    fn safe_to_retry_session_no_side_effects_policy() {
898        let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
899        let no_side_effect = server_error(StatusCode::TOO_MANY_REQUESTS, "rate_limited");
900        let policy = AppendRetryPolicy::NoSideEffects;
901        let signal = FrameSignal::new();
902
903        // No inflight — always safe.
904        signal.signal();
905        assert!(is_safe_to_retry(&retryable, policy, false, Some(&signal)));
906
907        // Inflight + signal not set — safe (no data sent this attempt).
908        signal.reset();
909        assert!(is_safe_to_retry(&retryable, policy, true, Some(&signal)));
910
911        // Inflight + signal set + error with possible side effects — not safe.
912        signal.signal();
913        assert!(!is_safe_to_retry(&retryable, policy, true, Some(&signal)));
914
915        // Inflight + signal set + no-side-effect error — safe.
916        assert!(is_safe_to_retry(
917            &no_side_effect,
918            policy,
919            true,
920            Some(&signal)
921        ));
922
923        // AckTimeout — retryable but has possible side effects.
924        assert!(!is_safe_to_retry(
925            &AppendSessionError::AckTimeout,
926            policy,
927            true,
928            Some(&signal),
929        ));
930    }
931}