Skip to main content

s2_sdk/session/
append.rs

1use std::{
2    collections::VecDeque,
3    future::Future,
4    num::NonZeroU32,
5    pin::Pin,
6    sync::{Arc, OnceLock},
7    task::{Context, Poll},
8    time::Duration,
9};
10
11use futures::StreamExt;
12use tokio::{
13    sync::{OwnedSemaphorePermit, Semaphore, mpsc, oneshot},
14    time::Instant,
15};
16use tokio_muxt::{CoalesceMode, MuxTimer};
17use tokio_stream::wrappers::ReceiverStream;
18use tokio_util::task::AbortOnDropHandle;
19use tracing::debug;
20
21use crate::{
22    api::{ApiError, BasinClient, Streaming, retry_builder},
23    frame_signal::FrameSignal,
24    retry::RetryBackoffBuilder,
25    types::{
26        AppendAck, AppendInput, AppendRetryPolicy, EncryptionKey, MeteredBytes, ONE_MIB, S2Error,
27        StreamName, StreamPosition, ValidationError,
28    },
29};
30
31#[derive(Debug, thiserror::Error)]
32pub enum AppendSessionError {
33    #[error(transparent)]
34    Api(#[from] ApiError),
35    #[error("append acknowledgement timed out")]
36    AckTimeout,
37    #[error("server disconnected")]
38    ServerDisconnected,
39    #[error("response stream closed early while appends in flight")]
40    StreamClosedEarly,
41    #[error("session already closed")]
42    SessionClosed,
43    #[error("session is closing")]
44    SessionClosing,
45    #[error("session dropped without calling close")]
46    SessionDropped,
47    #[error("invalid append acknowledgement: {0}")]
48    InvalidAck(String),
49}
50
51impl AppendSessionError {
52    pub fn is_retryable(&self) -> bool {
53        match self {
54            Self::Api(err) => err.is_retryable(),
55            Self::AckTimeout => true,
56            Self::ServerDisconnected => true,
57            _ => false,
58        }
59    }
60
61    pub fn has_no_side_effects(&self) -> bool {
62        match self {
63            Self::Api(err) => err.has_no_side_effects(),
64            _ => false,
65        }
66    }
67}
68
69impl From<AppendSessionError> for S2Error {
70    fn from(err: AppendSessionError) -> Self {
71        match err {
72            AppendSessionError::Api(api_err) => api_err.into(),
73            other => S2Error::Client(other.to_string()),
74        }
75    }
76}
77
78/// A [`Future`] that resolves to an acknowledgement once the batch of records is appended.
79pub struct BatchSubmitTicket {
80    rx: oneshot::Receiver<Result<AppendAck, S2Error>>,
81    terminal_err: Arc<OnceLock<S2Error>>,
82}
83
84impl Future for BatchSubmitTicket {
85    type Output = Result<AppendAck, S2Error>;
86
87    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
88        match Pin::new(&mut self.rx).poll(cx) {
89            Poll::Ready(Ok(res)) => Poll::Ready(res),
90            Poll::Ready(Err(_)) => Poll::Ready(Err(self
91                .terminal_err
92                .get()
93                .cloned()
94                .unwrap_or_else(|| AppendSessionError::SessionDropped.into()))),
95            Poll::Pending => Poll::Pending,
96        }
97    }
98}
99
100#[derive(Debug, Clone)]
101/// Configuration for an [`AppendSession`].
102pub struct AppendSessionConfig {
103    max_unacked_bytes: u32,
104    max_unacked_batches: Option<u32>,
105}
106
107impl Default for AppendSessionConfig {
108    fn default() -> Self {
109        Self {
110            max_unacked_bytes: 5 * ONE_MIB,
111            max_unacked_batches: None,
112        }
113    }
114}
115
116impl AppendSessionConfig {
117    /// Create a new [`AppendSessionConfig`] with default settings.
118    pub fn new() -> Self {
119        Self::default()
120    }
121
122    /// Set the limit on total metered bytes of unacknowledged [`AppendInput`]s held in memory.
123    ///
124    /// **Note:** It must be at least `1MiB`.
125    ///
126    /// Defaults to `5MiB`.
127    pub fn with_max_unacked_bytes(self, max_unacked_bytes: u32) -> Result<Self, ValidationError> {
128        if max_unacked_bytes < ONE_MIB {
129            return Err(format!("max_unacked_bytes must be at least {ONE_MIB}").into());
130        }
131        Ok(Self {
132            max_unacked_bytes,
133            ..self
134        })
135    }
136
137    /// Set the limit on number of unacknowledged [`AppendInput`]s held in memory.
138    ///
139    /// Defaults to no limit.
140    pub fn with_max_unacked_batches(self, max_unacked_batches: NonZeroU32) -> Self {
141        Self {
142            max_unacked_batches: Some(max_unacked_batches.get()),
143            ..self
144        }
145    }
146}
147
148struct SessionState {
149    cmd_rx: mpsc::Receiver<Command>,
150    inflight_appends: VecDeque<InflightAppend>,
151    inflight_bytes: usize,
152    close_tx: Option<oneshot::Sender<Result<(), S2Error>>>,
153    total_records: usize,
154    total_acked_records: usize,
155    prev_ack_end: Option<StreamPosition>,
156    stashed_submission: Option<StashedSubmission>,
157}
158
159/// A session for high-throughput appending with backpressure control. It can be created from
160/// [`append_session`](crate::S2Stream::append_session).
161///
162/// Supports pipelining multiple [`AppendInput`]s while preserving submission order.
163pub struct AppendSession {
164    cmd_tx: mpsc::Sender<Command>,
165    permits: AppendPermits,
166    terminal_err: Arc<OnceLock<S2Error>>,
167    _handle: AbortOnDropHandle<()>,
168}
169
170impl AppendSession {
171    pub(crate) fn new(
172        client: BasinClient,
173        stream: StreamName,
174        encryption: Option<EncryptionKey>,
175        config: AppendSessionConfig,
176    ) -> Self {
177        let buffer_size = config
178            .max_unacked_batches
179            .map(|mib| mib as usize)
180            .unwrap_or(DEFAULT_CHANNEL_BUFFER_SIZE);
181        let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
182        let permits = AppendPermits::new(config.max_unacked_batches, config.max_unacked_bytes);
183        let retry_builder = retry_builder(&client.config.retry);
184        let terminal_err = Arc::new(OnceLock::new());
185        let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
186            client,
187            stream,
188            encryption,
189            cmd_rx,
190            retry_builder,
191            buffer_size,
192            terminal_err.clone(),
193        )));
194        Self {
195            cmd_tx,
196            permits,
197            terminal_err,
198            _handle: handle,
199        }
200    }
201
202    /// Submit a batch of records for appending.
203    ///
204    /// Internally, it waits on [`reserve`](Self::reserve), then submits using the permit.
205    /// This provides backpressure when inflight limits are reached.
206    /// For explicit control, use [`reserve`](Self::reserve) followed by
207    /// [`BatchSubmitPermit::submit`].
208    ///
209    /// **Note**: After all submits, you must call [`close`](Self::close) to ensure all batches are
210    /// appended.
211    pub async fn submit(&self, input: AppendInput) -> Result<BatchSubmitTicket, S2Error> {
212        let permit = self.reserve(input.records.metered_bytes() as u32).await?;
213        Ok(permit.submit(input))
214    }
215
216    /// Reserve capacity for a batch to be submitted. Useful in [`select!`](tokio::select) loops
217    /// where you want to interleave submission with other async work. See [`submit`](Self::submit)
218    /// for a simpler API.
219    ///
220    /// Waits when inflight limits are reached, providing explicit backpressure control.
221    /// The returned permit must be used to submit the batch.
222    ///
223    /// **Note**: After all submits, you must call [`close`](Self::close) to ensure all batches are
224    /// appended.
225    ///
226    /// # Cancel safety
227    ///
228    /// This method is cancel safe. Internally, it only awaits
229    /// [`Semaphore::acquire_many_owned`](tokio::sync::Semaphore::acquire_many_owned) and
230    /// [`Sender::reserve_owned`](tokio::sync::mpsc::Sender::reserve), both of which are cancel
231    /// safe.
232    pub async fn reserve(&self, bytes: u32) -> Result<BatchSubmitPermit, S2Error> {
233        let append_permit = self.permits.acquire(bytes).await;
234        let cmd_tx_permit = self
235            .cmd_tx
236            .clone()
237            .reserve_owned()
238            .await
239            .map_err(|_| self.terminal_err())?;
240        Ok(BatchSubmitPermit {
241            append_permit,
242            cmd_tx_permit,
243            terminal_err: self.terminal_err.clone(),
244        })
245    }
246
247    /// Close the session and wait for all submitted batch of records to be appended.
248    pub async fn close(self) -> Result<(), S2Error> {
249        let (done_tx, done_rx) = oneshot::channel();
250        self.cmd_tx
251            .send(Command::Close { done_tx })
252            .await
253            .map_err(|_| self.terminal_err())?;
254        done_rx.await.map_err(|_| self.terminal_err())??;
255        Ok(())
256    }
257
258    fn terminal_err(&self) -> S2Error {
259        self.terminal_err
260            .get()
261            .cloned()
262            .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
263    }
264}
265
266/// A permit to submit a batch after reserving capacity.
267pub struct BatchSubmitPermit {
268    append_permit: AppendPermit,
269    cmd_tx_permit: mpsc::OwnedPermit<Command>,
270    terminal_err: Arc<OnceLock<S2Error>>,
271}
272
273impl BatchSubmitPermit {
274    /// Submit the batch using this permit.
275    pub fn submit(self, input: AppendInput) -> BatchSubmitTicket {
276        let (ack_tx, ack_rx) = oneshot::channel();
277        self.cmd_tx_permit.send(Command::Submit {
278            input,
279            ack_tx,
280            permit: Some(self.append_permit),
281        });
282        BatchSubmitTicket {
283            rx: ack_rx,
284            terminal_err: self.terminal_err,
285        }
286    }
287}
288
289pub(crate) struct AppendSessionInternal {
290    cmd_tx: mpsc::Sender<Command>,
291    terminal_err: Arc<OnceLock<S2Error>>,
292    _handle: AbortOnDropHandle<()>,
293}
294
295impl AppendSessionInternal {
296    pub(crate) fn new(
297        client: BasinClient,
298        stream: StreamName,
299        encryption: Option<EncryptionKey>,
300    ) -> Self {
301        let buffer_size = DEFAULT_CHANNEL_BUFFER_SIZE;
302        let (cmd_tx, cmd_rx) = mpsc::channel(buffer_size);
303        let retry_builder = retry_builder(&client.config.retry);
304        let terminal_err = Arc::new(OnceLock::new());
305        let handle = AbortOnDropHandle::new(tokio::spawn(run_session_with_retry(
306            client,
307            stream,
308            encryption,
309            cmd_rx,
310            retry_builder,
311            buffer_size,
312            terminal_err.clone(),
313        )));
314        Self {
315            cmd_tx,
316            terminal_err,
317            _handle: handle,
318        }
319    }
320
321    pub(crate) fn submit(
322        &self,
323        input: AppendInput,
324    ) -> impl Future<Output = Result<BatchSubmitTicket, S2Error>> + Send + 'static {
325        let cmd_tx = self.cmd_tx.clone();
326        let terminal_err = self.terminal_err.clone();
327        async move {
328            let (ack_tx, ack_rx) = oneshot::channel();
329            cmd_tx
330                .send(Command::Submit {
331                    input,
332                    ack_tx,
333                    permit: None,
334                })
335                .await
336                .map_err(|_| {
337                    terminal_err
338                        .get()
339                        .cloned()
340                        .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
341                })?;
342            Ok(BatchSubmitTicket {
343                rx: ack_rx,
344                terminal_err,
345            })
346        }
347    }
348
349    pub(crate) async fn close(self) -> Result<(), S2Error> {
350        let (done_tx, done_rx) = oneshot::channel();
351        self.cmd_tx
352            .send(Command::Close { done_tx })
353            .await
354            .map_err(|_| self.terminal_err())?;
355        done_rx.await.map_err(|_| self.terminal_err())??;
356        Ok(())
357    }
358
359    fn terminal_err(&self) -> S2Error {
360        self.terminal_err
361            .get()
362            .cloned()
363            .unwrap_or_else(|| AppendSessionError::SessionClosed.into())
364    }
365}
366
367#[derive(Debug)]
368pub(crate) struct AppendPermit {
369    _count: Option<OwnedSemaphorePermit>,
370    _bytes: OwnedSemaphorePermit,
371}
372
373#[derive(Clone)]
374pub(crate) struct AppendPermits {
375    count: Option<Arc<Semaphore>>,
376    bytes: Arc<Semaphore>,
377}
378
379impl AppendPermits {
380    pub(crate) fn new(count_permits: Option<u32>, bytes_permits: u32) -> Self {
381        Self {
382            count: count_permits.map(|permits| Arc::new(Semaphore::new(permits as usize))),
383            bytes: Arc::new(Semaphore::new(bytes_permits as usize)),
384        }
385    }
386
387    pub(crate) async fn acquire(&self, bytes: u32) -> AppendPermit {
388        AppendPermit {
389            _count: if let Some(count) = self.count.as_ref() {
390                Some(
391                    count
392                        .clone()
393                        .acquire_many_owned(1)
394                        .await
395                        .expect("semaphore should not be closed"),
396                )
397            } else {
398                None
399            },
400            _bytes: self
401                .bytes
402                .clone()
403                .acquire_many_owned(bytes)
404                .await
405                .expect("semaphore should not be closed"),
406        }
407    }
408}
409
410async fn run_session_with_retry(
411    client: BasinClient,
412    stream: StreamName,
413    encryption: Option<EncryptionKey>,
414    cmd_rx: mpsc::Receiver<Command>,
415    retry_builder: RetryBackoffBuilder,
416    buffer_size: usize,
417    terminal_err: Arc<OnceLock<S2Error>>,
418) {
419    let frame_signal = match client.config.retry.append_retry_policy {
420        AppendRetryPolicy::NoSideEffects => Some(FrameSignal::new()),
421        AppendRetryPolicy::All => None,
422    };
423
424    let mut state = SessionState {
425        cmd_rx,
426        inflight_appends: VecDeque::new(),
427        inflight_bytes: 0,
428        close_tx: None,
429        total_records: 0,
430        total_acked_records: 0,
431        prev_ack_end: None,
432        stashed_submission: None,
433    };
434    let mut prev_total_acked_records = 0;
435    let mut retry_backoff = retry_builder.build();
436
437    loop {
438        let result = run_session(
439            &client,
440            &stream,
441            encryption.as_ref(),
442            &mut state,
443            buffer_size,
444            &frame_signal,
445        )
446        .await;
447
448        match result {
449            Ok(()) => {
450                break;
451            }
452            Err(err) => {
453                if prev_total_acked_records < state.total_acked_records {
454                    prev_total_acked_records = state.total_acked_records;
455                    retry_backoff.reset();
456                }
457
458                if is_safe_to_retry(
459                    &err,
460                    client.config.retry.append_retry_policy,
461                    !state.inflight_appends.is_empty(),
462                    frame_signal.as_ref(),
463                ) && let Some(backoff) = retry_backoff.next()
464                {
465                    debug!(
466                        %err,
467                        ?backoff,
468                        num_retries_remaining = retry_backoff.remaining(),
469                        "retrying append session"
470                    );
471                    tokio::time::sleep(backoff).await;
472                } else {
473                    debug!(
474                        %err,
475                        retries_exhausted = retry_backoff.is_exhausted(),
476                        "not retrying append session"
477                    );
478
479                    let err: S2Error = err.into();
480
481                    let _ = terminal_err.set(err.clone());
482
483                    for inflight_append in state.inflight_appends.drain(..) {
484                        let _ = inflight_append.ack_tx.send(Err(err.clone()));
485                    }
486
487                    if let Some(stashed) = state.stashed_submission.take() {
488                        let _ = stashed.ack_tx.send(Err(err.clone()));
489                    }
490
491                    if let Some(done_tx) = state.close_tx.take() {
492                        let _ = done_tx.send(Err(err.clone()));
493                    }
494
495                    state.cmd_rx.close();
496                    while let Some(cmd) = state.cmd_rx.recv().await {
497                        cmd.reject(err.clone());
498                    }
499                    break;
500                }
501            }
502        }
503    }
504
505    if let Some(done_tx) = state.close_tx.take() {
506        let _ = done_tx.send(Ok(()));
507    }
508}
509
510async fn run_session(
511    client: &BasinClient,
512    stream: &StreamName,
513    encryption: Option<&EncryptionKey>,
514    state: &mut SessionState,
515    buffer_size: usize,
516    frame_signal: &Option<FrameSignal>,
517) -> Result<(), AppendSessionError> {
518    if let Some(s) = frame_signal {
519        s.reset();
520    }
521
522    let (input_tx, mut acks) = connect(
523        client,
524        stream,
525        encryption,
526        buffer_size,
527        frame_signal.clone(),
528    )
529    .await?;
530    let ack_timeout = client.config.request_timeout;
531
532    if !state.inflight_appends.is_empty() {
533        resend(state, &input_tx, &mut acks, ack_timeout).await?;
534
535        if let Some(s) = frame_signal {
536            s.reset();
537        }
538
539        assert!(state.inflight_appends.is_empty());
540        assert_eq!(state.inflight_bytes, 0);
541    }
542
543    let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
544    tokio::pin!(timer);
545
546    loop {
547        tokio::select! {
548            (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
549                match TimerEvent::from(event_ord) {
550                    TimerEvent::AckDeadline => {
551                        return Err(AppendSessionError::AckTimeout);
552                    }
553                }
554            }
555
556            input_tx_permit = input_tx.reserve(), if state.stashed_submission.is_some() => {
557                let input_tx_permit = input_tx_permit
558                    .map_err(|_| AppendSessionError::ServerDisconnected)?;
559                let submission = state.stashed_submission
560                    .take()
561                    .expect("stashed_submission should not be None");
562
563                let ack_deadline = Instant::now() + ack_timeout;
564                input_tx_permit.send(submission.input.clone());
565
566                state.total_records += submission.input.records.len();
567                state.inflight_bytes += submission.input_metered_bytes;
568
569                timer.as_mut().fire_at(
570                    TimerEvent::AckDeadline,
571                    ack_deadline,
572                    CoalesceMode::Earliest,
573                );
574                state.inflight_appends.push_back(InflightAppend {
575                    input: submission.input,
576                    input_metered_bytes: submission.input_metered_bytes,
577                    ack_tx: submission.ack_tx,
578                    ack_deadline,
579                    _permit: submission.permit,
580                });
581            }
582
583            cmd = state.cmd_rx.recv(), if state.stashed_submission.is_none() => {
584                match cmd {
585                    Some(Command::Submit { input, ack_tx, permit }) => {
586                        if state.close_tx.is_some() {
587                            let _ = ack_tx.send(
588                                Err(AppendSessionError::SessionClosing.into())
589                            );
590                        } else {
591                            let input_metered_bytes = input.records.metered_bytes();
592                            state.stashed_submission = Some(StashedSubmission {
593                                input,
594                                input_metered_bytes,
595                                ack_tx,
596                                permit,
597                            });
598                        }
599                    }
600                    Some(Command::Close { done_tx }) => {
601                        state.close_tx = Some(done_tx);
602                    }
603                    None => {
604                        return Err(AppendSessionError::SessionDropped);
605                    }
606                }
607            }
608
609            ack = acks.next() => {
610                match ack {
611                    Some(Ok(ack)) => {
612                        process_ack(
613                            ack,
614                            state,
615                            timer.as_mut(),
616                        )?;
617                    }
618                    Some(Err(err)) => {
619                        return Err(err.into());
620                    }
621                    None => {
622                        if !state.inflight_appends.is_empty() || state.stashed_submission.is_some() {
623                            return Err(AppendSessionError::StreamClosedEarly);
624                        }
625                        break;
626                    }
627                }
628            }
629        }
630
631        if state.close_tx.is_some()
632            && state.inflight_appends.is_empty()
633            && state.stashed_submission.is_none()
634        {
635            break;
636        }
637    }
638
639    assert!(state.inflight_appends.is_empty());
640    assert_eq!(state.inflight_bytes, 0);
641    assert!(state.stashed_submission.is_none());
642
643    Ok(())
644}
645
646async fn resend(
647    state: &mut SessionState,
648    input_tx: &mpsc::Sender<AppendInput>,
649    acks: &mut Streaming<AppendAck>,
650    ack_timeout: Duration,
651) -> Result<(), AppendSessionError> {
652    debug!(
653        inflight_appends_len = state.inflight_appends.len(),
654        inflight_bytes = state.inflight_bytes,
655        "resending inflight appends"
656    );
657
658    let mut resend_index = 0;
659    let mut resend_finished = false;
660
661    let timer = MuxTimer::<N_TIMER_VARIANTS>::default();
662    tokio::pin!(timer);
663
664    while !state.inflight_appends.is_empty() {
665        tokio::select! {
666            (event_ord, _deadline) = &mut timer, if timer.is_armed() => {
667                match TimerEvent::from(event_ord) {
668                    TimerEvent::AckDeadline => {
669                        return Err(AppendSessionError::AckTimeout);
670                    }
671                }
672            }
673
674            input_tx_permit = input_tx.reserve(), if !resend_finished => {
675                let input_tx_permit = input_tx_permit
676                    .map_err(|_| AppendSessionError::ServerDisconnected)?;
677
678                if let Some(inflight_append) = state.inflight_appends.get_mut(resend_index) {
679                    inflight_append.ack_deadline = Instant::now() + ack_timeout;
680                    timer.as_mut().fire_at(
681                        TimerEvent::AckDeadline,
682                        inflight_append.ack_deadline,
683                        CoalesceMode::Latest,
684                    );
685                    input_tx_permit.send(inflight_append.input.clone());
686                    resend_index += 1;
687                } else {
688                    resend_finished = true;
689                }
690            }
691
692            ack = acks.next() => {
693                match ack {
694                    Some(Ok(ack)) => {
695                        process_ack(
696                            ack,
697                            state,
698                            timer.as_mut(),
699                        )?;
700                        resend_index = resend_index.checked_sub(1).ok_or_else(|| {
701                            AppendSessionError::InvalidAck(
702                                "received ack without a corresponding resent append in flight".to_string(),
703                            )
704                        })?;
705                    }
706                    Some(Err(err)) => {
707                        return Err(err.into());
708                    }
709                    None => {
710                        return Err(AppendSessionError::StreamClosedEarly);
711                    }
712                }
713            }
714        }
715    }
716
717    assert_eq!(
718        resend_index, 0,
719        "resend_index should be 0 after resend completes"
720    );
721    debug!("finished resending inflight appends");
722    Ok(())
723}
724
725async fn connect(
726    client: &BasinClient,
727    stream: &StreamName,
728    encryption: Option<&EncryptionKey>,
729    buffer_size: usize,
730    frame_signal: Option<FrameSignal>,
731) -> Result<(mpsc::Sender<AppendInput>, Streaming<AppendAck>), AppendSessionError> {
732    let (input_tx, input_rx) = mpsc::channel::<AppendInput>(buffer_size);
733    let ack_stream = Box::pin(
734        client
735            .append_session(
736                stream,
737                ReceiverStream::new(input_rx).map(|i| i.into()),
738                encryption,
739                frame_signal,
740            )
741            .await?
742            .map(|ack| match ack {
743                Ok(ack) => Ok(ack.into()),
744                Err(err) => Err(err),
745            }),
746    );
747    Ok((input_tx, ack_stream))
748}
749
750fn process_ack(
751    ack: AppendAck,
752    state: &mut SessionState,
753    timer: Pin<&mut MuxTimer<N_TIMER_VARIANTS>>,
754) -> Result<(), AppendSessionError> {
755    let corresponding_append = state.inflight_appends.pop_front().ok_or_else(|| {
756        AppendSessionError::InvalidAck(
757            "received ack without a corresponding append in flight".to_string(),
758        )
759    })?;
760
761    if ack.end.seq_num < ack.start.seq_num {
762        return Err(AppendSessionError::InvalidAck(
763            "ack end seq_num should be greater than or equal to start seq_num".to_string(),
764        ));
765    }
766
767    if state
768        .prev_ack_end
769        .is_some_and(|end| ack.end.seq_num <= end.seq_num)
770    {
771        return Err(AppendSessionError::InvalidAck(
772            "ack end seq_num should be greater than previous ack end".to_string(),
773        ));
774    }
775
776    let num_acked_records = (ack.end.seq_num - ack.start.seq_num) as usize;
777    let expected_records = corresponding_append.input.records.len();
778    if num_acked_records != expected_records {
779        return Err(AppendSessionError::InvalidAck(format!(
780            "acked record count {num_acked_records} does not match submitted batch size {expected_records}"
781        )));
782    }
783
784    state.total_acked_records += num_acked_records;
785    state.inflight_bytes -= corresponding_append.input_metered_bytes;
786    state.prev_ack_end = Some(ack.end);
787
788    let _ = corresponding_append.ack_tx.send(Ok(ack));
789
790    if let Some(oldest_append) = state.inflight_appends.front() {
791        timer.fire_at(
792            TimerEvent::AckDeadline,
793            oldest_append.ack_deadline,
794            CoalesceMode::Latest,
795        );
796    } else {
797        timer.cancel(TimerEvent::AckDeadline);
798        assert_eq!(
799            state.total_records, state.total_acked_records,
800            "all records should be acked when inflight is empty"
801        );
802    }
803
804    Ok(())
805}
806
807struct StashedSubmission {
808    input: AppendInput,
809    input_metered_bytes: usize,
810    ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
811    permit: Option<AppendPermit>,
812}
813
814struct InflightAppend {
815    input: AppendInput,
816    input_metered_bytes: usize,
817    ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
818    ack_deadline: Instant,
819    _permit: Option<AppendPermit>,
820}
821
822enum Command {
823    Submit {
824        input: AppendInput,
825        ack_tx: oneshot::Sender<Result<AppendAck, S2Error>>,
826        permit: Option<AppendPermit>,
827    },
828    Close {
829        done_tx: oneshot::Sender<Result<(), S2Error>>,
830    },
831}
832
833impl Command {
834    fn reject(self, err: S2Error) {
835        match self {
836            Command::Submit { ack_tx, .. } => {
837                let _ = ack_tx.send(Err(err));
838            }
839            Command::Close { done_tx } => {
840                let _ = done_tx.send(Err(err));
841            }
842        }
843    }
844}
845
846fn is_safe_to_retry(
847    err: &AppendSessionError,
848    policy: AppendRetryPolicy,
849    has_inflight: bool,
850    frame_signal: Option<&FrameSignal>,
851) -> bool {
852    let policy_compliant = match policy {
853        AppendRetryPolicy::All => true,
854        AppendRetryPolicy::NoSideEffects => {
855            !has_inflight
856                || !frame_signal.is_none_or(|s| s.is_signalled())
857                || err.has_no_side_effects()
858        }
859    };
860    policy_compliant && err.is_retryable()
861}
862
863const DEFAULT_CHANNEL_BUFFER_SIZE: usize = 100;
864
865#[derive(Debug, Clone, Copy, PartialEq, Eq)]
866enum TimerEvent {
867    AckDeadline,
868}
869
870const N_TIMER_VARIANTS: usize = 1;
871
872impl From<TimerEvent> for usize {
873    fn from(event: TimerEvent) -> Self {
874        match event {
875            TimerEvent::AckDeadline => 0,
876        }
877    }
878}
879
880impl From<usize> for TimerEvent {
881    fn from(value: usize) -> Self {
882        match value {
883            0 => TimerEvent::AckDeadline,
884            _ => panic!("invalid ordinal"),
885        }
886    }
887}
888
889#[cfg(test)]
890mod tests {
891    use http::StatusCode;
892
893    use super::{AppendSessionError, is_safe_to_retry};
894    use crate::{
895        api::{ApiError, ApiErrorResponse},
896        frame_signal::FrameSignal,
897        types::AppendRetryPolicy,
898    };
899
900    fn server_error(status: StatusCode, code: &str) -> AppendSessionError {
901        AppendSessionError::Api(ApiError::Server(
902            status,
903            ApiErrorResponse {
904                code: code.to_owned(),
905                message: "test".to_owned(),
906            },
907        ))
908    }
909
910    #[test]
911    fn safe_to_retry_session_all_policy() {
912        let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
913        let non_retryable = server_error(StatusCode::BAD_REQUEST, "bad_request");
914        let policy = AppendRetryPolicy::All;
915
916        // All policy — always policy-compliant, just needs retryable.
917        assert!(is_safe_to_retry(&retryable, policy, true, None));
918        assert!(!is_safe_to_retry(&non_retryable, policy, true, None));
919    }
920
921    #[test]
922    fn safe_to_retry_session_no_side_effects_policy() {
923        let retryable = server_error(StatusCode::INTERNAL_SERVER_ERROR, "internal");
924        let no_side_effect = server_error(StatusCode::TOO_MANY_REQUESTS, "rate_limited");
925        let policy = AppendRetryPolicy::NoSideEffects;
926        let signal = FrameSignal::new();
927
928        // No inflight — always safe.
929        signal.signal();
930        assert!(is_safe_to_retry(&retryable, policy, false, Some(&signal)));
931
932        // Inflight + signal not set — safe (no data sent this attempt).
933        signal.reset();
934        assert!(is_safe_to_retry(&retryable, policy, true, Some(&signal)));
935
936        // Inflight + signal set + error with possible side effects — not safe.
937        signal.signal();
938        assert!(!is_safe_to_retry(&retryable, policy, true, Some(&signal)));
939
940        // Inflight + signal set + no-side-effect error — safe.
941        assert!(is_safe_to_retry(
942            &no_side_effect,
943            policy,
944            true,
945            Some(&signal)
946        ));
947
948        // AckTimeout — retryable but has possible side effects.
949        assert!(!is_safe_to_retry(
950            &AppendSessionError::AckTimeout,
951            policy,
952            true,
953            Some(&signal),
954        ));
955    }
956}