s2_lite/backend/
append.rs

1use std::{
2    collections::VecDeque,
3    ops::{DerefMut as _, Range, RangeTo},
4    sync::Arc,
5};
6
7use futures::{FutureExt as _, Stream, StreamExt as _, stream::FuturesOrdered};
8use s2_common::{
9    record::{MeteredSize as _, SeqNum, StreamPosition},
10    types::{
11        basin::BasinName,
12        stream::{AppendAck, AppendInput, StreamName},
13    },
14};
15use tokio::sync::oneshot;
16
17use super::Backend;
18use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
19
20// N.B. Ultimately capped by streamer::MAX_INFLIGHT_APPENDS
21const MAX_INFLIGHT_BATCHES: usize = 100;
22const MAX_INFLIGHT_BYTES: usize = 10 * 1024 * 1024;
23
24impl Backend {
25    pub async fn append(
26        &self,
27        basin: BasinName,
28        stream: StreamName,
29        input: AppendInput,
30    ) -> Result<AppendAck, AppendError> {
31        let client = self
32            .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
33                config.create_stream_on_append
34            })
35            .await?;
36        let ack = client.append(input, None).await?;
37        Ok(ack)
38    }
39
40    pub async fn append_session(
41        self,
42        basin: BasinName,
43        stream: StreamName,
44        inputs: impl Stream<Item = AppendInput>,
45    ) -> Result<impl Stream<Item = Result<AppendAck, AppendError>>, AppendError> {
46        let client = self.streamer_client(&basin, &stream).await?;
47        let session = SessionHandle::new();
48        Ok(async_stream::stream! {
49            tokio::pin!(inputs);
50            let mut futs = FuturesOrdered::new();
51            let mut inflight_bytes = 0;
52            loop {
53                tokio::select! {
54                    Some(input) = inputs.next(), if (
55                        futs.len() < MAX_INFLIGHT_BATCHES
56                        && inflight_bytes <= MAX_INFLIGHT_BYTES
57                    ) => {
58                        let metered_size = input.records.metered_size();
59                        inflight_bytes += metered_size;
60                        let fut = client
61                            .append(input, Some(session.clone()))
62                            .map(move |res| (metered_size, res));
63                        futs.push_back(fut);
64                    }
65                    Some((metered_size, res)) = futs.next() => {
66                        inflight_bytes -= metered_size;
67                        yield res.map_err(AppendError::from);
68                    }
69                    else => {
70                        break;
71                    }
72                }
73            }
74        })
75    }
76}
77
78#[derive(Debug)]
79struct SessionState {
80    last_ack_end: RangeTo<SeqNum>,
81    poisoned: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
86
87impl SessionHandle {
88    pub fn new() -> Self {
89        Self(Arc::new(parking_lot::Mutex::new(SessionState {
90            last_ack_end: ..SeqNum::MIN,
91            poisoned: false,
92        })))
93    }
94}
95
96#[must_use]
97pub fn admit(
98    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
99    session: Option<SessionHandle>,
100) -> Option<Ticket> {
101    if tx.is_closed() {
102        return None;
103    }
104    match session {
105        None => Some(Ticket { tx, session: None }),
106        Some(session) => {
107            let session = session.0.lock_arc();
108            if session.poisoned {
109                None
110            } else {
111                Some(Ticket {
112                    tx,
113                    session: Some(session),
114                })
115            }
116        }
117    }
118}
119
120#[derive(Debug, Default)]
121pub struct PendingAppends {
122    queue: VecDeque<BlockedReplySender>,
123    next_ack_pos: Option<StreamPosition>,
124}
125
126impl PendingAppends {
127    pub fn new(capacity: usize) -> Self {
128        Self {
129            queue: VecDeque::with_capacity(capacity),
130            next_ack_pos: None,
131        }
132    }
133
134    pub fn next_ack_pos(&self) -> Option<StreamPosition> {
135        self.next_ack_pos
136    }
137
138    pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
139        if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
140            seq_num: ack_range.end.seq_num,
141            timestamp: ack_range.end.timestamp,
142        }) {
143            assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
144            assert!(ack_range.start.timestamp >= prev_pos.timestamp);
145        }
146        let sender = ticket.accept(ack_range);
147        if let Some(prev) = self.queue.back() {
148            assert!(prev.durability_dependency.end < sender.durability_dependency.end);
149        }
150        self.queue.push_back(sender);
151    }
152
153    pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
154        if let Some(sender) = ticket.reject(err, stable_pos) {
155            let dd = sender.durability_dependency;
156            let insert_pos = self
157                .queue
158                .partition_point(|x| x.durability_dependency.end <= dd.end);
159            self.queue.insert(insert_pos, sender);
160        }
161    }
162
163    pub fn on_stable(&mut self, stable_pos: StreamPosition) {
164        let completable = self
165            .queue
166            .iter()
167            .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
168            .count();
169        for sender in self.queue.drain(..completable) {
170            sender.unblock(Ok(stable_pos));
171        }
172    }
173
174    pub fn on_durability_failed(self, err: slatedb::Error) {
175        let err = StorageError::from(err);
176        for sender in self.queue {
177            sender.unblock(Err(err.clone()));
178        }
179    }
180}
181
182pub struct Ticket {
183    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
184    session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
185}
186
187impl Ticket {
188    #[must_use]
189    fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
190        let durability_dependency = ..ack_range.end.seq_num;
191        if let Some(mut session) = self.session {
192            let session = session.deref_mut();
193            assert!(!session.poisoned, "thanks to typestate");
194            session.last_ack_end = durability_dependency;
195        }
196        BlockedReplySender {
197            reply: Ok(ack_range),
198            durability_dependency,
199            tx: self.tx,
200        }
201    }
202
203    #[must_use]
204    fn reject(
205        self,
206        append_err: AppendErrorInternal,
207        stable_pos: StreamPosition,
208    ) -> Option<BlockedReplySender> {
209        let mut durability_dependency =
210            if let AppendErrorInternal::ConditionFailed(cond_fail) = &append_err {
211                cond_fail.durability_dependency()
212            } else {
213                ..0
214            };
215        if let Some(mut session) = self.session {
216            let session = session.deref_mut();
217            assert!(!session.poisoned, "thanks to typestate");
218            session.poisoned = true;
219            durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
220        }
221        if durability_dependency.end <= stable_pos.seq_num {
222            let _ = self.tx.send(Err(append_err));
223            None
224        } else {
225            Some(BlockedReplySender {
226                reply: Err(append_err),
227                durability_dependency,
228                tx: self.tx,
229            })
230        }
231    }
232}
233
234#[derive(Debug)]
235struct BlockedReplySender {
236    reply: Result<Range<StreamPosition>, AppendErrorInternal>,
237    durability_dependency: RangeTo<SeqNum>,
238    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
239}
240
241impl BlockedReplySender {
242    fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
243        let reply = match stable_pos {
244            Ok(tail) => {
245                assert!(self.durability_dependency.end <= tail.seq_num);
246                self.reply.map(|ack| AppendAck {
247                    start: ack.start,
248                    end: ack.end,
249                    tail,
250                })
251            }
252            Err(e) => Err(e.into()),
253        };
254        let _ = self.tx.send(reply);
255    }
256}