1use std::{
2 collections::VecDeque,
3 ops::{DerefMut as _, Range, RangeTo},
4 sync::Arc,
5};
6
7use futures::{FutureExt as _, Stream, StreamExt as _, stream::FuturesOrdered};
8use s2_common::{
9 record::{MeteredSize as _, SeqNum, StreamPosition},
10 types::{
11 basin::BasinName,
12 stream::{AppendAck, AppendInput, StreamName},
13 },
14};
15use tokio::sync::oneshot;
16
17use super::Backend;
18use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
19
20const MAX_INFLIGHT_BATCHES: usize = 100;
22const MAX_INFLIGHT_BYTES: usize = 10 * 1024 * 1024;
23
24impl Backend {
25 pub async fn append(
26 &self,
27 basin: BasinName,
28 stream: StreamName,
29 input: AppendInput,
30 ) -> Result<AppendAck, AppendError> {
31 let client = self
32 .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
33 config.create_stream_on_append
34 })
35 .await?;
36 let ack = client.append(input, None).await?;
37 Ok(ack)
38 }
39
40 pub async fn append_session(
41 self,
42 basin: BasinName,
43 stream: StreamName,
44 inputs: impl Stream<Item = AppendInput>,
45 ) -> Result<impl Stream<Item = Result<AppendAck, AppendError>>, AppendError> {
46 let client = self.streamer_client(&basin, &stream).await?;
47 let session = SessionHandle::new();
48 Ok(async_stream::stream! {
49 tokio::pin!(inputs);
50 let mut futs = FuturesOrdered::new();
51 let mut inflight_bytes = 0;
52 loop {
53 tokio::select! {
54 Some(input) = inputs.next(), if (
55 futs.len() < MAX_INFLIGHT_BATCHES
56 && inflight_bytes <= MAX_INFLIGHT_BYTES
57 ) => {
58 let metered_size = input.records.metered_size();
59 inflight_bytes += metered_size;
60 let fut = client
61 .append(input, Some(session.clone()))
62 .map(move |res| (metered_size, res));
63 futs.push_back(fut);
64 }
65 Some((metered_size, res)) = futs.next() => {
66 inflight_bytes -= metered_size;
67 yield res.map_err(AppendError::from);
68 }
69 else => {
70 break;
71 }
72 }
73 }
74 })
75 }
76}
77
78#[derive(Debug)]
79struct SessionState {
80 last_ack_end: RangeTo<SeqNum>,
81 poisoned: bool,
82}
83
84#[derive(Debug, Clone)]
85pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
86
87impl SessionHandle {
88 pub fn new() -> Self {
89 Self(Arc::new(parking_lot::Mutex::new(SessionState {
90 last_ack_end: ..SeqNum::MIN,
91 poisoned: false,
92 })))
93 }
94}
95
96#[must_use]
97pub fn admit(
98 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
99 session: Option<SessionHandle>,
100) -> Option<Ticket> {
101 if tx.is_closed() {
102 return None;
103 }
104 match session {
105 None => Some(Ticket { tx, session: None }),
106 Some(session) => {
107 let session = session.0.lock_arc();
108 if session.poisoned {
109 None
110 } else {
111 Some(Ticket {
112 tx,
113 session: Some(session),
114 })
115 }
116 }
117 }
118}
119
120#[derive(Debug, Default)]
121pub struct PendingAppends {
122 queue: VecDeque<BlockedReplySender>,
123 next_ack_pos: Option<StreamPosition>,
124}
125
126impl PendingAppends {
127 pub fn new(capacity: usize) -> Self {
128 Self {
129 queue: VecDeque::with_capacity(capacity),
130 next_ack_pos: None,
131 }
132 }
133
134 pub fn next_ack_pos(&self) -> Option<StreamPosition> {
135 self.next_ack_pos
136 }
137
138 pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
139 if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
140 seq_num: ack_range.end.seq_num,
141 timestamp: ack_range.end.timestamp,
142 }) {
143 assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
144 assert!(ack_range.start.timestamp >= prev_pos.timestamp);
145 }
146 let sender = ticket.accept(ack_range);
147 if let Some(prev) = self.queue.back() {
148 assert!(prev.durability_dependency.end < sender.durability_dependency.end);
149 }
150 self.queue.push_back(sender);
151 }
152
153 pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
154 if let Some(sender) = ticket.reject(err, stable_pos) {
155 let dd = sender.durability_dependency;
156 let insert_pos = self
157 .queue
158 .partition_point(|x| x.durability_dependency.end <= dd.end);
159 self.queue.insert(insert_pos, sender);
160 }
161 }
162
163 pub fn on_stable(&mut self, stable_pos: StreamPosition) {
164 let completable = self
165 .queue
166 .iter()
167 .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
168 .count();
169 for sender in self.queue.drain(..completable) {
170 sender.unblock(Ok(stable_pos));
171 }
172 }
173
174 pub fn on_durability_failed(self, err: slatedb::Error) {
175 let err = StorageError::from(err);
176 for sender in self.queue {
177 sender.unblock(Err(err.clone()));
178 }
179 }
180}
181
182pub struct Ticket {
183 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
184 session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
185}
186
187impl Ticket {
188 #[must_use]
189 fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
190 let durability_dependency = ..ack_range.end.seq_num;
191 if let Some(mut session) = self.session {
192 let session = session.deref_mut();
193 assert!(!session.poisoned, "thanks to typestate");
194 session.last_ack_end = durability_dependency;
195 }
196 BlockedReplySender {
197 reply: Ok(ack_range),
198 durability_dependency,
199 tx: self.tx,
200 }
201 }
202
203 #[must_use]
204 fn reject(
205 self,
206 append_err: AppendErrorInternal,
207 stable_pos: StreamPosition,
208 ) -> Option<BlockedReplySender> {
209 let mut durability_dependency =
210 if let AppendErrorInternal::ConditionFailed(cond_fail) = &append_err {
211 cond_fail.durability_dependency()
212 } else {
213 ..0
214 };
215 if let Some(mut session) = self.session {
216 let session = session.deref_mut();
217 assert!(!session.poisoned, "thanks to typestate");
218 session.poisoned = true;
219 durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
220 }
221 if durability_dependency.end <= stable_pos.seq_num {
222 let _ = self.tx.send(Err(append_err));
223 None
224 } else {
225 Some(BlockedReplySender {
226 reply: Err(append_err),
227 durability_dependency,
228 tx: self.tx,
229 })
230 }
231 }
232}
233
234#[derive(Debug)]
235struct BlockedReplySender {
236 reply: Result<Range<StreamPosition>, AppendErrorInternal>,
237 durability_dependency: RangeTo<SeqNum>,
238 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
239}
240
241impl BlockedReplySender {
242 fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
243 let reply = match stable_pos {
244 Ok(tail) => {
245 assert!(self.durability_dependency.end <= tail.seq_num);
246 self.reply.map(|ack| AppendAck {
247 start: ack.start,
248 end: ack.end,
249 tail,
250 })
251 }
252 Err(e) => Err(e.into()),
253 };
254 let _ = self.tx.send(reply);
255 }
256}