s2_lite/backend/
append.rs

1use std::{
2    collections::VecDeque,
3    ops::{DerefMut as _, Range, RangeTo},
4    sync::Arc,
5};
6
7use futures::{Stream, StreamExt as _, future::OptionFuture, stream::FuturesOrdered};
8use s2_common::{
9    record::{SeqNum, StreamPosition},
10    types::{
11        basin::BasinName,
12        stream::{AppendAck, AppendInput, StreamName},
13    },
14};
15use tokio::sync::oneshot;
16
17use super::Backend;
18use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
19
20impl Backend {
21    pub async fn append(
22        &self,
23        basin: BasinName,
24        stream: StreamName,
25        input: AppendInput,
26    ) -> Result<AppendAck, AppendError> {
27        let client = self
28            .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
29                config.create_stream_on_append
30            })
31            .await?;
32        let ack = client.append_permit(input).await?.submit().await?;
33        Ok(ack)
34    }
35
36    pub async fn append_session(
37        self,
38        basin: BasinName,
39        stream: StreamName,
40        inputs: impl Stream<Item = AppendInput>,
41    ) -> Result<impl Stream<Item = Result<AppendAck, AppendError>>, AppendError> {
42        let client = self.streamer_client(&basin, &stream).await?;
43        let session = SessionHandle::new();
44        Ok(async_stream::stream! {
45            tokio::pin!(inputs);
46            let mut permit_opt = None;
47            let mut append_futs = FuturesOrdered::new();
48            loop {
49                tokio::select! {
50                    Some(input) = inputs.next(), if permit_opt.is_none() => {
51                        permit_opt = Some(Box::pin(client.append_permit(input)));
52                    }
53                    Some(res) = OptionFuture::from(permit_opt.as_mut()) => {
54                        permit_opt = None;
55                        match res {
56                            Ok(permit) => append_futs.push_back(permit.submit_session(session.clone())),
57                            Err(e) => {
58                                yield Err(e.into());
59                                break;
60                            }
61                        }
62                    }
63                    Some(res) = append_futs.next(), if !append_futs.is_empty() => {
64                        match res {
65                            Ok(ack) => {
66                                yield Ok(ack);
67                            },
68                            Err(e) => {
69                                yield Err(e.into());
70                                break;
71                            }
72                        }
73                    }
74                    else => {
75                        break;
76                    }
77                }
78            }
79        })
80    }
81}
82
83#[derive(Debug)]
84struct SessionState {
85    last_ack_end: RangeTo<SeqNum>,
86    poisoned: bool,
87}
88
89#[derive(Debug, Clone)]
90pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
91
92impl SessionHandle {
93    pub fn new() -> Self {
94        Self(Arc::new(parking_lot::Mutex::new(SessionState {
95            last_ack_end: ..SeqNum::MIN,
96            poisoned: false,
97        })))
98    }
99}
100
101#[must_use]
102pub fn admit(
103    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
104    session: Option<SessionHandle>,
105) -> Option<Ticket> {
106    if tx.is_closed() {
107        return None;
108    }
109    match session {
110        None => Some(Ticket { tx, session: None }),
111        Some(session) => {
112            let session = session.0.lock_arc();
113            if session.poisoned {
114                None
115            } else {
116                Some(Ticket {
117                    tx,
118                    session: Some(session),
119                })
120            }
121        }
122    }
123}
124
125#[derive(Debug, Default)]
126pub struct PendingAppends {
127    queue: VecDeque<BlockedReplySender>,
128    next_ack_pos: Option<StreamPosition>,
129}
130
131impl PendingAppends {
132    pub fn new() -> Self {
133        Self {
134            queue: VecDeque::new(),
135            next_ack_pos: None,
136        }
137    }
138
139    pub fn next_ack_pos(&self) -> Option<StreamPosition> {
140        self.next_ack_pos
141    }
142
143    pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
144        if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
145            seq_num: ack_range.end.seq_num,
146            timestamp: ack_range.end.timestamp,
147        }) {
148            assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
149            assert!(ack_range.start.timestamp >= prev_pos.timestamp);
150        }
151        let sender = ticket.accept(ack_range);
152        if let Some(prev) = self.queue.back() {
153            assert!(prev.durability_dependency.end < sender.durability_dependency.end);
154        }
155        self.queue.push_back(sender);
156    }
157
158    pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
159        if let Some(sender) = ticket.reject(err, stable_pos) {
160            let dd = sender.durability_dependency;
161            let insert_pos = self
162                .queue
163                .partition_point(|x| x.durability_dependency.end <= dd.end);
164            self.queue.insert(insert_pos, sender);
165        }
166    }
167
168    pub fn on_stable(&mut self, stable_pos: StreamPosition) {
169        let completable = self
170            .queue
171            .iter()
172            .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
173            .count();
174        for sender in self.queue.drain(..completable) {
175            sender.unblock(Ok(stable_pos));
176        }
177        // Lots of small appends could cause this,
178        // as we bound only on total bytes not num batches.
179        if self.queue.capacity() >= 4 * self.queue.len() {
180            self.queue.shrink_to(self.queue.len() * 2);
181        }
182    }
183
184    pub fn on_durability_failed(self, err: slatedb::Error) {
185        let err = StorageError::from(err);
186        for sender in self.queue {
187            sender.unblock(Err(err.clone()));
188        }
189    }
190}
191
192pub struct Ticket {
193    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
194    session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
195}
196
197impl Ticket {
198    #[must_use]
199    fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
200        let durability_dependency = ..ack_range.end.seq_num;
201        if let Some(mut session) = self.session {
202            let session = session.deref_mut();
203            assert!(!session.poisoned, "thanks to typestate");
204            session.last_ack_end = durability_dependency;
205        }
206        BlockedReplySender {
207            reply: Ok(ack_range),
208            durability_dependency,
209            tx: self.tx,
210        }
211    }
212
213    #[must_use]
214    fn reject(
215        self,
216        append_err: AppendErrorInternal,
217        stable_pos: StreamPosition,
218    ) -> Option<BlockedReplySender> {
219        let mut durability_dependency =
220            if let AppendErrorInternal::ConditionFailed(cond_fail) = &append_err {
221                cond_fail.durability_dependency()
222            } else {
223                ..0
224            };
225        if let Some(mut session) = self.session {
226            let session = session.deref_mut();
227            assert!(!session.poisoned, "thanks to typestate");
228            session.poisoned = true;
229            durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
230        }
231        if durability_dependency.end <= stable_pos.seq_num {
232            let _ = self.tx.send(Err(append_err));
233            None
234        } else {
235            Some(BlockedReplySender {
236                reply: Err(append_err),
237                durability_dependency,
238                tx: self.tx,
239            })
240        }
241    }
242}
243
244#[derive(Debug)]
245struct BlockedReplySender {
246    reply: Result<Range<StreamPosition>, AppendErrorInternal>,
247    durability_dependency: RangeTo<SeqNum>,
248    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
249}
250
251impl BlockedReplySender {
252    fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
253        let reply = match stable_pos {
254            Ok(tail) => {
255                assert!(self.durability_dependency.end <= tail.seq_num);
256                self.reply.map(|ack| AppendAck {
257                    start: ack.start,
258                    end: ack.end,
259                    tail,
260                })
261            }
262            Err(e) => Err(e.into()),
263        };
264        let _ = self.tx.send(reply);
265    }
266}