1use std::{
2 collections::VecDeque,
3 ops::{DerefMut as _, Range, RangeTo},
4 sync::Arc,
5};
6
7use futures::{Stream, StreamExt as _, future::OptionFuture, stream::FuturesOrdered};
8use s2_common::{
9 encryption::{EncryptionKey, EncryptionSpec},
10 record::{SeqNum, StreamPosition},
11 types::{
12 basin::BasinName,
13 stream::{AppendAck, AppendInput, StreamName},
14 },
15};
16use tokio::sync::oneshot;
17
18use super::{Backend, StreamHandle};
19use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
20
21impl Backend {
22 pub async fn open_for_append(
23 &self,
24 basin: &BasinName,
25 stream: &StreamName,
26 encryption_key: Option<EncryptionKey>,
27 ) -> Result<StreamHandle, AppendError> {
28 self.stream_handle_with_auto_create::<AppendError>(
29 basin,
30 stream,
31 |config| config.create_stream_on_append,
32 |cipher| Ok(EncryptionSpec::resolve(cipher, encryption_key)?),
33 )
34 .await
35 }
36}
37
38impl StreamHandle {
39 pub async fn append(self, input: AppendInput) -> Result<AppendAck, AppendError> {
40 let input = input.encrypt(&self.encryption, self.client.stream_id().as_bytes());
41 let ack = self.client.append_permit(input).await?.submit().await?;
42 Ok(ack)
43 }
44
45 pub fn append_session<S>(self, inputs: S) -> impl Stream<Item = Result<AppendAck, AppendError>>
46 where
47 S: Stream<Item = AppendInput>,
48 {
49 let stream_id = self.client.stream_id();
50 let StreamHandle {
51 client, encryption, ..
52 } = self;
53 let session = SessionHandle::new();
54 async_stream::stream! {
55 tokio::pin!(inputs);
56 let mut permit_opt = None;
57 let mut append_futs = FuturesOrdered::new();
58 loop {
59 tokio::select! {
60 Some(input) = inputs.next(), if permit_opt.is_none() => {
61 permit_opt = Some(Box::pin(client.append_permit(
62 input.encrypt(&encryption, stream_id.as_bytes()),
63 )));
64 }
65 Some(res) = OptionFuture::from(permit_opt.as_mut()) => {
66 permit_opt = None;
67 match res {
68 Ok(permit) => append_futs.push_back(permit.submit_session(session.clone())),
69 Err(e) => {
70 yield Err(e.into());
71 break;
72 }
73 }
74 }
75 Some(res) = append_futs.next(), if !append_futs.is_empty() => {
76 match res {
77 Ok(ack) => {
78 yield Ok(ack);
79 }
80 Err(e) => {
81 yield Err(e.into());
82 break;
83 }
84 }
85 }
86 else => {
87 break;
88 }
89 }
90 }
91 }
92 }
93}
94
95#[derive(Debug)]
96struct SessionState {
97 last_ack_end: RangeTo<SeqNum>,
98 poisoned: bool,
99}
100
101#[derive(Debug, Clone)]
102pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
103
104impl SessionHandle {
105 pub fn new() -> Self {
106 Self(Arc::new(parking_lot::Mutex::new(SessionState {
107 last_ack_end: ..SeqNum::MIN,
108 poisoned: false,
109 })))
110 }
111}
112
113#[must_use]
114pub fn admit(
115 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
116 session: Option<SessionHandle>,
117) -> Option<Ticket> {
118 if tx.is_closed() {
119 return None;
120 }
121 match session {
122 None => Some(Ticket { tx, session: None }),
123 Some(session) => {
124 let session = session.0.lock_arc();
125 if session.poisoned {
126 None
127 } else {
128 Some(Ticket {
129 tx,
130 session: Some(session),
131 })
132 }
133 }
134 }
135}
136
137#[derive(Debug, Default)]
138pub struct PendingAppends {
139 queue: VecDeque<BlockedReplySender>,
140 next_ack_pos: Option<StreamPosition>,
141}
142
143impl PendingAppends {
144 pub fn new() -> Self {
145 Self {
146 queue: VecDeque::new(),
147 next_ack_pos: None,
148 }
149 }
150
151 pub fn next_ack_pos(&self) -> Option<StreamPosition> {
152 self.next_ack_pos
153 }
154
155 pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
156 if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
157 seq_num: ack_range.end.seq_num,
158 timestamp: ack_range.end.timestamp,
159 }) {
160 assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
161 assert!(ack_range.start.timestamp >= prev_pos.timestamp);
162 }
163 let sender = ticket.accept(ack_range);
164 if let Some(prev) = self.queue.back() {
165 assert!(prev.durability_dependency.end < sender.durability_dependency.end);
166 }
167 self.queue.push_back(sender);
168 }
169
170 pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
171 if let Some(sender) = ticket.reject(err, stable_pos) {
172 let dd = sender.durability_dependency;
173 let insert_pos = self
174 .queue
175 .partition_point(|x| x.durability_dependency.end <= dd.end);
176 self.queue.insert(insert_pos, sender);
177 }
178 }
179
180 pub fn on_stable(&mut self, stable_pos: StreamPosition) {
181 let completable = self
182 .queue
183 .iter()
184 .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
185 .count();
186 for sender in self.queue.drain(..completable) {
187 sender.unblock(Ok(stable_pos));
188 }
189 if self.queue.capacity() >= 4 * self.queue.len() {
192 self.queue.shrink_to(self.queue.len() * 2);
193 }
194 }
195
196 pub fn on_durability_failed(self, err: slatedb::Error) {
197 let err = StorageError::from(err);
198 for sender in self.queue {
199 sender.unblock(Err(err.clone()));
200 }
201 }
202}
203
204pub struct Ticket {
205 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
206 session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
207}
208
209impl Ticket {
210 #[must_use]
211 fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
212 let durability_dependency = ..ack_range.end.seq_num;
213 if let Some(mut session) = self.session {
214 let session = session.deref_mut();
215 assert!(!session.poisoned, "thanks to typestate");
216 session.last_ack_end = durability_dependency;
217 }
218 BlockedReplySender {
219 reply: Ok(ack_range),
220 durability_dependency,
221 tx: self.tx,
222 }
223 }
224
225 #[must_use]
226 fn reject(
227 self,
228 append_err: AppendErrorInternal,
229 stable_pos: StreamPosition,
230 ) -> Option<BlockedReplySender> {
231 let mut durability_dependency = append_err.durability_dependency();
232 if let Some(mut session) = self.session {
233 let session = session.deref_mut();
234 assert!(!session.poisoned, "thanks to typestate");
235 session.poisoned = true;
236 durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
237 }
238 if durability_dependency.end <= stable_pos.seq_num {
239 let _ = self.tx.send(Err(append_err));
240 None
241 } else {
242 Some(BlockedReplySender {
243 reply: Err(append_err),
244 durability_dependency,
245 tx: self.tx,
246 })
247 }
248 }
249}
250
251#[derive(Debug)]
252struct BlockedReplySender {
253 reply: Result<Range<StreamPosition>, AppendErrorInternal>,
254 durability_dependency: RangeTo<SeqNum>,
255 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
256}
257
258impl BlockedReplySender {
259 fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
260 let reply = match stable_pos {
261 Ok(tail) => {
262 assert!(self.durability_dependency.end <= tail.seq_num);
263 self.reply.map(|ack| AppendAck {
264 start: ack.start,
265 end: ack.end,
266 tail,
267 })
268 }
269 Err(e) => Err(e.into()),
270 };
271 let _ = self.tx.send(reply);
272 }
273}