Skip to main content

s2_lite/backend/
append.rs

1use std::{
2    collections::VecDeque,
3    ops::{DerefMut as _, Range, RangeTo},
4    sync::Arc,
5};
6
7use futures::{Stream, StreamExt as _, future::OptionFuture, stream::FuturesOrdered};
8use s2_common::{
9    encryption::{EncryptionKey, EncryptionSpec},
10    record::{SeqNum, StreamPosition},
11    types::{
12        basin::BasinName,
13        stream::{AppendAck, AppendInput, StreamName},
14    },
15};
16use tokio::sync::oneshot;
17
18use super::{Backend, StreamHandle};
19use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
20
21impl Backend {
22    pub async fn open_for_append(
23        &self,
24        basin: &BasinName,
25        stream: &StreamName,
26        encryption_key: Option<EncryptionKey>,
27    ) -> Result<StreamHandle, AppendError> {
28        self.stream_handle_with_auto_create::<AppendError>(
29            basin,
30            stream,
31            |config| config.create_stream_on_append,
32            |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
33        )
34        .await
35    }
36}
37
38impl StreamHandle {
39    pub async fn append(self, input: AppendInput) -> Result<AppendAck, AppendError> {
40        let input = input.encrypt(&self.encryption, self.client.stream_id().as_bytes());
41        let ack = self.client.append_permit(input).await?.submit().await?;
42        Ok(ack)
43    }
44
45    pub fn append_session<S>(self, inputs: S) -> impl Stream<Item = Result<AppendAck, AppendError>>
46    where
47        S: Stream<Item = AppendInput>,
48    {
49        let stream_id = self.client.stream_id();
50        let StreamHandle {
51            client, encryption, ..
52        } = self;
53        let session = SessionHandle::new();
54        async_stream::stream! {
55            tokio::pin!(inputs);
56            let mut permit_opt = None;
57            let mut append_futs = FuturesOrdered::new();
58            loop {
59                tokio::select! {
60                    Some(input) = inputs.next(), if permit_opt.is_none() => {
61                        permit_opt = Some(Box::pin(client.append_permit(
62                            input.encrypt(&encryption, stream_id.as_bytes()),
63                        )));
64                    }
65                    Some(res) = OptionFuture::from(permit_opt.as_mut()) => {
66                        permit_opt = None;
67                        match res {
68                            Ok(permit) => append_futs.push_back(permit.submit_session(session.clone())),
69                            Err(e) => {
70                                yield Err(e.into());
71                                break;
72                            }
73                        }
74                    }
75                    Some(res) = append_futs.next(), if !append_futs.is_empty() => {
76                        match res {
77                            Ok(ack) => {
78                                yield Ok(ack);
79                            }
80                            Err(e) => {
81                                yield Err(e.into());
82                                break;
83                            }
84                        }
85                    }
86                    else => {
87                        break;
88                    }
89                }
90            }
91        }
92    }
93}
94
95#[derive(Debug)]
96struct SessionState {
97    last_ack_end: RangeTo<SeqNum>,
98    poisoned: bool,
99}
100
101#[derive(Debug, Clone)]
102pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
103
104impl SessionHandle {
105    pub fn new() -> Self {
106        Self(Arc::new(parking_lot::Mutex::new(SessionState {
107            last_ack_end: ..SeqNum::MIN,
108            poisoned: false,
109        })))
110    }
111}
112
113#[must_use]
114pub fn admit(
115    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
116    session: Option<SessionHandle>,
117) -> Option<Ticket> {
118    if tx.is_closed() {
119        return None;
120    }
121    match session {
122        None => Some(Ticket { tx, session: None }),
123        Some(session) => {
124            let session = session.0.lock_arc();
125            if session.poisoned {
126                None
127            } else {
128                Some(Ticket {
129                    tx,
130                    session: Some(session),
131                })
132            }
133        }
134    }
135}
136
137#[derive(Debug, Default)]
138pub struct PendingAppends {
139    queue: VecDeque<BlockedReplySender>,
140    next_ack_pos: Option<StreamPosition>,
141}
142
143impl PendingAppends {
144    pub fn new() -> Self {
145        Self {
146            queue: VecDeque::new(),
147            next_ack_pos: None,
148        }
149    }
150
151    pub fn next_ack_pos(&self) -> Option<StreamPosition> {
152        self.next_ack_pos
153    }
154
155    pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
156        if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
157            seq_num: ack_range.end.seq_num,
158            timestamp: ack_range.end.timestamp,
159        }) {
160            assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
161            assert!(ack_range.start.timestamp >= prev_pos.timestamp);
162        }
163        let sender = ticket.accept(ack_range);
164        if let Some(prev) = self.queue.back() {
165            assert!(prev.durability_dependency.end < sender.durability_dependency.end);
166        }
167        self.queue.push_back(sender);
168    }
169
170    pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
171        if let Some(sender) = ticket.reject(err, stable_pos) {
172            let dd = sender.durability_dependency;
173            let insert_pos = self
174                .queue
175                .partition_point(|x| x.durability_dependency.end <= dd.end);
176            self.queue.insert(insert_pos, sender);
177        }
178    }
179
180    pub fn on_stable(&mut self, stable_pos: StreamPosition) {
181        let completable = self
182            .queue
183            .iter()
184            .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
185            .count();
186        for sender in self.queue.drain(..completable) {
187            sender.unblock(Ok(stable_pos));
188        }
189        // Lots of small appends could cause this,
190        // as we bound only on total bytes not num batches.
191        if self.queue.capacity() >= 4 * self.queue.len() {
192            self.queue.shrink_to(self.queue.len() * 2);
193        }
194    }
195
196    pub fn on_durability_failed(self, err: slatedb::Error) {
197        let err = StorageError::from(err);
198        for sender in self.queue {
199            sender.unblock(Err(err.clone()));
200        }
201    }
202}
203
204pub struct Ticket {
205    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
206    session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
207}
208
209impl Ticket {
210    #[must_use]
211    fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
212        let durability_dependency = ..ack_range.end.seq_num;
213        if let Some(mut session) = self.session {
214            let session = session.deref_mut();
215            assert!(!session.poisoned, "thanks to typestate");
216            session.last_ack_end = durability_dependency;
217        }
218        BlockedReplySender {
219            reply: Ok(ack_range),
220            durability_dependency,
221            tx: self.tx,
222        }
223    }
224
225    #[must_use]
226    fn reject(
227        self,
228        append_err: AppendErrorInternal,
229        stable_pos: StreamPosition,
230    ) -> Option<BlockedReplySender> {
231        let mut durability_dependency = append_err.durability_dependency();
232        if let Some(mut session) = self.session {
233            let session = session.deref_mut();
234            assert!(!session.poisoned, "thanks to typestate");
235            session.poisoned = true;
236            durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
237        }
238        if durability_dependency.end <= stable_pos.seq_num {
239            let _ = self.tx.send(Err(append_err));
240            None
241        } else {
242            Some(BlockedReplySender {
243                reply: Err(append_err),
244                durability_dependency,
245                tx: self.tx,
246            })
247        }
248    }
249}
250
251#[derive(Debug)]
252struct BlockedReplySender {
253    reply: Result<Range<StreamPosition>, AppendErrorInternal>,
254    durability_dependency: RangeTo<SeqNum>,
255    tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
256}
257
258impl BlockedReplySender {
259    fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
260        let reply = match stable_pos {
261            Ok(tail) => {
262                assert!(self.durability_dependency.end <= tail.seq_num);
263                self.reply.map(|ack| AppendAck {
264                    start: ack.start,
265                    end: ack.end,
266                    tail,
267                })
268            }
269            Err(e) => Err(e.into()),
270        };
271        let _ = self.tx.send(reply);
272    }
273}