Skip to main content

s2_lite/backend/
append.rs

1use std::{
2    collections::VecDeque,
3    ops::{DerefMut as _, Range, RangeTo},
4    sync::Arc,
5};
6
7use futures::{Stream, StreamExt as _, future::OptionFuture, stream::FuturesOrdered};
8use s2_common::{
9    record::{SeqNum, StreamPosition},
10    types::{
11        basin::BasinName,
12        stream::{AppendAck, AppendInput, StreamName},
13    },
14};
15use tokio::sync::oneshot;
16
17use super::Backend;
18use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
19
20impl Backend {
21    pub async fn append(
22        &self,
23        basin: BasinName,
24        stream: StreamName,
25        input: AppendInput,
26    ) -> Result<AppendAck, AppendError> {
27        let client = self
28            .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
29                config.create_stream_on_append
30            })
31            .await?;
32        let ack = client.append_permit(input).await?.submit().await?;
33        Ok(ack)
34    }
35
36    pub async fn append_session(
37        self,
38        basin: BasinName,
39        stream: StreamName,
40        inputs: impl Stream<Item = AppendInput>,
41    ) -> Result<impl Stream<Item = Result<AppendAck, AppendError>>, AppendError> {
42        let client = self
43            .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
44                config.create_stream_on_append
45            })
46            .await?;
47        let session = SessionHandle::new();
48        Ok(async_stream::stream! {
49            tokio::pin!(inputs);
50            let mut permit_opt = None;
51            let mut append_futs = FuturesOrdered::new();
52            loop {
53                tokio::select! {
54                    Some(input) = inputs.next(), if permit_opt.is_none() => {
55                        permit_opt = Some(Box::pin(client.append_permit(input)));
56                    }
57                    Some(res) = OptionFuture::from(permit_opt.as_mut()) => {
58                        permit_opt = None;
59                        match res {
60                            Ok(permit) => append_futs.push_back(permit.submit_session(session.clone())),
61                            Err(e) => {
62                                yield Err(e.into());
63                                break;
64                            }
65                        }
66                    }
67                    Some(res) = append_futs.next(), if !append_futs.is_empty() => {
68                        match res {
69                            Ok(ack) => {
70                                yield Ok(ack);
71                            },
72                            Err(e) => {
73                                yield Err(e.into());
74                                break;
75                            }
76                        }
77                    }
78                    else => {
79                        break;
80                    }
81                }
82            }
83        })
84    }
85}
86
87#[derive(Debug)]
88struct SessionState {
89    last_ack_end: RangeTo<SeqNum>,
90    poisoned: bool,
91}
92
93#[derive(Debug, Clone)]
94pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
95
96impl SessionHandle {
97    pub fn new() -> Self {
98        Self(Arc::new(parking_lot::Mutex::new(SessionState {
99            last_ack_end: ..SeqNum::MIN,
100            poisoned: false,
101        })))
102    }
103}
104
105#[must_use]
106pub fn admit(
107    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
108    session: Option<SessionHandle>,
109) -> Option<Ticket> {
110    if tx.is_closed() {
111        return None;
112    }
113    match session {
114        None => Some(Ticket { tx, session: None }),
115        Some(session) => {
116            let session = session.0.lock_arc();
117            if session.poisoned {
118                None
119            } else {
120                Some(Ticket {
121                    tx,
122                    session: Some(session),
123                })
124            }
125        }
126    }
127}
128
129#[derive(Debug, Default)]
130pub struct PendingAppends {
131    queue: VecDeque<BlockedReplySender>,
132    next_ack_pos: Option<StreamPosition>,
133}
134
135impl PendingAppends {
136    pub fn new() -> Self {
137        Self {
138            queue: VecDeque::new(),
139            next_ack_pos: None,
140        }
141    }
142
143    pub fn next_ack_pos(&self) -> Option<StreamPosition> {
144        self.next_ack_pos
145    }
146
147    pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
148        if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
149            seq_num: ack_range.end.seq_num,
150            timestamp: ack_range.end.timestamp,
151        }) {
152            assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
153            assert!(ack_range.start.timestamp >= prev_pos.timestamp);
154        }
155        let sender = ticket.accept(ack_range);
156        if let Some(prev) = self.queue.back() {
157            assert!(prev.durability_dependency.end < sender.durability_dependency.end);
158        }
159        self.queue.push_back(sender);
160    }
161
162    pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
163        if let Some(sender) = ticket.reject(err, stable_pos) {
164            let dd = sender.durability_dependency;
165            let insert_pos = self
166                .queue
167                .partition_point(|x| x.durability_dependency.end <= dd.end);
168            self.queue.insert(insert_pos, sender);
169        }
170    }
171
172    pub fn on_stable(&mut self, stable_pos: StreamPosition) {
173        let completable = self
174            .queue
175            .iter()
176            .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
177            .count();
178        for sender in self.queue.drain(..completable) {
179            sender.unblock(Ok(stable_pos));
180        }
181        // Lots of small appends could cause this,
182        // as we bound only on total bytes not num batches.
183        if self.queue.capacity() >= 4 * self.queue.len() {
184            self.queue.shrink_to(self.queue.len() * 2);
185        }
186    }
187
188    pub fn on_durability_failed(self, err: slatedb::Error) {
189        let err = StorageError::from(err);
190        for sender in self.queue {
191            sender.unblock(Err(err.clone()));
192        }
193    }
194}
195
196pub struct Ticket {
197    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
198    session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
199}
200
201impl Ticket {
202    #[must_use]
203    fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
204        let durability_dependency = ..ack_range.end.seq_num;
205        if let Some(mut session) = self.session {
206            let session = session.deref_mut();
207            assert!(!session.poisoned, "thanks to typestate");
208            session.last_ack_end = durability_dependency;
209        }
210        BlockedReplySender {
211            reply: Ok(ack_range),
212            durability_dependency,
213            tx: self.tx,
214        }
215    }
216
217    #[must_use]
218    fn reject(
219        self,
220        append_err: AppendErrorInternal,
221        stable_pos: StreamPosition,
222    ) -> Option<BlockedReplySender> {
223        let mut durability_dependency =
224            if let AppendErrorInternal::ConditionFailed(cond_fail) = &append_err {
225                cond_fail.durability_dependency()
226            } else {
227                ..0
228            };
229        if let Some(mut session) = self.session {
230            let session = session.deref_mut();
231            assert!(!session.poisoned, "thanks to typestate");
232            session.poisoned = true;
233            durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
234        }
235        if durability_dependency.end <= stable_pos.seq_num {
236            let _ = self.tx.send(Err(append_err));
237            None
238        } else {
239            Some(BlockedReplySender {
240                reply: Err(append_err),
241                durability_dependency,
242                tx: self.tx,
243            })
244        }
245    }
246}
247
248#[derive(Debug)]
249struct BlockedReplySender {
250    reply: Result<Range<StreamPosition>, AppendErrorInternal>,
251    durability_dependency: RangeTo<SeqNum>,
252    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
253}
254
255impl BlockedReplySender {
256    fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
257        let reply = match stable_pos {
258            Ok(tail) => {
259                assert!(self.durability_dependency.end <= tail.seq_num);
260                self.reply.map(|ack| AppendAck {
261                    start: ack.start,
262                    end: ack.end,
263                    tail,
264                })
265            }
266            Err(e) => Err(e.into()),
267        };
268        let _ = self.tx.send(reply);
269    }
270}