1use std::{
2 collections::VecDeque,
3 ops::{DerefMut as _, Range, RangeTo},
4 sync::Arc,
5};
6
7use futures::{Stream, StreamExt as _, future::OptionFuture, stream::FuturesOrdered};
8use s2_common::{
9 record::{SeqNum, StreamPosition},
10 types::{
11 basin::BasinName,
12 stream::{AppendAck, AppendInput, StreamName},
13 },
14};
15use tokio::sync::oneshot;
16
17use super::Backend;
18use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
19
20impl Backend {
21 pub async fn append(
22 &self,
23 basin: BasinName,
24 stream: StreamName,
25 input: AppendInput,
26 ) -> Result<AppendAck, AppendError> {
27 let client = self
28 .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
29 config.create_stream_on_append
30 })
31 .await?;
32 let ack = client.append_permit(input).await?.submit().await?;
33 Ok(ack)
34 }
35
36 pub async fn append_session(
37 self,
38 basin: BasinName,
39 stream: StreamName,
40 inputs: impl Stream<Item = AppendInput>,
41 ) -> Result<impl Stream<Item = Result<AppendAck, AppendError>>, AppendError> {
42 let client = self
43 .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
44 config.create_stream_on_append
45 })
46 .await?;
47 let session = SessionHandle::new();
48 Ok(async_stream::stream! {
49 tokio::pin!(inputs);
50 let mut permit_opt = None;
51 let mut append_futs = FuturesOrdered::new();
52 loop {
53 tokio::select! {
54 Some(input) = inputs.next(), if permit_opt.is_none() => {
55 permit_opt = Some(Box::pin(client.append_permit(input)));
56 }
57 Some(res) = OptionFuture::from(permit_opt.as_mut()) => {
58 permit_opt = None;
59 match res {
60 Ok(permit) => append_futs.push_back(permit.submit_session(session.clone())),
61 Err(e) => {
62 yield Err(e.into());
63 break;
64 }
65 }
66 }
67 Some(res) = append_futs.next(), if !append_futs.is_empty() => {
68 match res {
69 Ok(ack) => {
70 yield Ok(ack);
71 },
72 Err(e) => {
73 yield Err(e.into());
74 break;
75 }
76 }
77 }
78 else => {
79 break;
80 }
81 }
82 }
83 })
84 }
85}
86
87#[derive(Debug)]
88struct SessionState {
89 last_ack_end: RangeTo<SeqNum>,
90 poisoned: bool,
91}
92
93#[derive(Debug, Clone)]
94pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
95
96impl SessionHandle {
97 pub fn new() -> Self {
98 Self(Arc::new(parking_lot::Mutex::new(SessionState {
99 last_ack_end: ..SeqNum::MIN,
100 poisoned: false,
101 })))
102 }
103}
104
105#[must_use]
106pub fn admit(
107 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
108 session: Option<SessionHandle>,
109) -> Option<Ticket> {
110 if tx.is_closed() {
111 return None;
112 }
113 match session {
114 None => Some(Ticket { tx, session: None }),
115 Some(session) => {
116 let session = session.0.lock_arc();
117 if session.poisoned {
118 None
119 } else {
120 Some(Ticket {
121 tx,
122 session: Some(session),
123 })
124 }
125 }
126 }
127}
128
129#[derive(Debug, Default)]
130pub struct PendingAppends {
131 queue: VecDeque<BlockedReplySender>,
132 next_ack_pos: Option<StreamPosition>,
133}
134
135impl PendingAppends {
136 pub fn new() -> Self {
137 Self {
138 queue: VecDeque::new(),
139 next_ack_pos: None,
140 }
141 }
142
143 pub fn next_ack_pos(&self) -> Option<StreamPosition> {
144 self.next_ack_pos
145 }
146
147 pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
148 if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
149 seq_num: ack_range.end.seq_num,
150 timestamp: ack_range.end.timestamp,
151 }) {
152 assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
153 assert!(ack_range.start.timestamp >= prev_pos.timestamp);
154 }
155 let sender = ticket.accept(ack_range);
156 if let Some(prev) = self.queue.back() {
157 assert!(prev.durability_dependency.end < sender.durability_dependency.end);
158 }
159 self.queue.push_back(sender);
160 }
161
162 pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
163 if let Some(sender) = ticket.reject(err, stable_pos) {
164 let dd = sender.durability_dependency;
165 let insert_pos = self
166 .queue
167 .partition_point(|x| x.durability_dependency.end <= dd.end);
168 self.queue.insert(insert_pos, sender);
169 }
170 }
171
172 pub fn on_stable(&mut self, stable_pos: StreamPosition) {
173 let completable = self
174 .queue
175 .iter()
176 .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
177 .count();
178 for sender in self.queue.drain(..completable) {
179 sender.unblock(Ok(stable_pos));
180 }
181 if self.queue.capacity() >= 4 * self.queue.len() {
184 self.queue.shrink_to(self.queue.len() * 2);
185 }
186 }
187
188 pub fn on_durability_failed(self, err: slatedb::Error) {
189 let err = StorageError::from(err);
190 for sender in self.queue {
191 sender.unblock(Err(err.clone()));
192 }
193 }
194}
195
196pub struct Ticket {
197 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
198 session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
199}
200
201impl Ticket {
202 #[must_use]
203 fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
204 let durability_dependency = ..ack_range.end.seq_num;
205 if let Some(mut session) = self.session {
206 let session = session.deref_mut();
207 assert!(!session.poisoned, "thanks to typestate");
208 session.last_ack_end = durability_dependency;
209 }
210 BlockedReplySender {
211 reply: Ok(ack_range),
212 durability_dependency,
213 tx: self.tx,
214 }
215 }
216
217 #[must_use]
218 fn reject(
219 self,
220 append_err: AppendErrorInternal,
221 stable_pos: StreamPosition,
222 ) -> Option<BlockedReplySender> {
223 let mut durability_dependency =
224 if let AppendErrorInternal::ConditionFailed(cond_fail) = &append_err {
225 cond_fail.durability_dependency()
226 } else {
227 ..0
228 };
229 if let Some(mut session) = self.session {
230 let session = session.deref_mut();
231 assert!(!session.poisoned, "thanks to typestate");
232 session.poisoned = true;
233 durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
234 }
235 if durability_dependency.end <= stable_pos.seq_num {
236 let _ = self.tx.send(Err(append_err));
237 None
238 } else {
239 Some(BlockedReplySender {
240 reply: Err(append_err),
241 durability_dependency,
242 tx: self.tx,
243 })
244 }
245 }
246}
247
248#[derive(Debug)]
249struct BlockedReplySender {
250 reply: Result<Range<StreamPosition>, AppendErrorInternal>,
251 durability_dependency: RangeTo<SeqNum>,
252 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
253}
254
255impl BlockedReplySender {
256 fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
257 let reply = match stable_pos {
258 Ok(tail) => {
259 assert!(self.durability_dependency.end <= tail.seq_num);
260 self.reply.map(|ack| AppendAck {
261 start: ack.start,
262 end: ack.end,
263 tail,
264 })
265 }
266 Err(e) => Err(e.into()),
267 };
268 let _ = self.tx.send(reply);
269 }
270}