1use std::{
2 collections::VecDeque,
3 ops::{DerefMut as _, Range, RangeTo},
4 sync::Arc,
5};
6
7use futures::{Stream, StreamExt as _, future::OptionFuture, stream::FuturesOrdered};
8use s2_common::{
9 record::{SeqNum, StreamPosition},
10 types::{
11 basin::BasinName,
12 stream::{AppendAck, AppendInput, StreamName},
13 },
14};
15use tokio::sync::oneshot;
16
17use super::Backend;
18use crate::backend::error::{AppendError, AppendErrorInternal, StorageError};
19
20impl Backend {
21 pub async fn append(
22 &self,
23 basin: BasinName,
24 stream: StreamName,
25 input: AppendInput,
26 ) -> Result<AppendAck, AppendError> {
27 let client = self
28 .streamer_client_with_auto_create::<AppendError>(&basin, &stream, |config| {
29 config.create_stream_on_append
30 })
31 .await?;
32 let ack = client.append_permit(input).await?.submit().await?;
33 Ok(ack)
34 }
35
36 pub async fn append_session(
37 self,
38 basin: BasinName,
39 stream: StreamName,
40 inputs: impl Stream<Item = AppendInput>,
41 ) -> Result<impl Stream<Item = Result<AppendAck, AppendError>>, AppendError> {
42 let client = self.streamer_client(&basin, &stream).await?;
43 let session = SessionHandle::new();
44 Ok(async_stream::stream! {
45 tokio::pin!(inputs);
46 let mut permit_opt = None;
47 let mut append_futs = FuturesOrdered::new();
48 loop {
49 tokio::select! {
50 Some(input) = inputs.next(), if permit_opt.is_none() => {
51 permit_opt = Some(Box::pin(client.append_permit(input)));
52 }
53 Some(res) = OptionFuture::from(permit_opt.as_mut()) => {
54 permit_opt = None;
55 match res {
56 Ok(permit) => append_futs.push_back(permit.submit_session(session.clone())),
57 Err(e) => {
58 yield Err(e.into());
59 break;
60 }
61 }
62 }
63 Some(res) = append_futs.next(), if !append_futs.is_empty() => {
64 match res {
65 Ok(ack) => {
66 yield Ok(ack);
67 },
68 Err(e) => {
69 yield Err(e.into());
70 break;
71 }
72 }
73 }
74 else => {
75 break;
76 }
77 }
78 }
79 })
80 }
81}
82
83#[derive(Debug)]
84struct SessionState {
85 last_ack_end: RangeTo<SeqNum>,
86 poisoned: bool,
87}
88
89#[derive(Debug, Clone)]
90pub struct SessionHandle(Arc<parking_lot::Mutex<SessionState>>);
91
92impl SessionHandle {
93 pub fn new() -> Self {
94 Self(Arc::new(parking_lot::Mutex::new(SessionState {
95 last_ack_end: ..SeqNum::MIN,
96 poisoned: false,
97 })))
98 }
99}
100
101#[must_use]
102pub fn admit(
103 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
104 session: Option<SessionHandle>,
105) -> Option<Ticket> {
106 if tx.is_closed() {
107 return None;
108 }
109 match session {
110 None => Some(Ticket { tx, session: None }),
111 Some(session) => {
112 let session = session.0.lock_arc();
113 if session.poisoned {
114 None
115 } else {
116 Some(Ticket {
117 tx,
118 session: Some(session),
119 })
120 }
121 }
122 }
123}
124
125#[derive(Debug, Default)]
126pub struct PendingAppends {
127 queue: VecDeque<BlockedReplySender>,
128 next_ack_pos: Option<StreamPosition>,
129}
130
131impl PendingAppends {
132 pub fn new() -> Self {
133 Self {
134 queue: VecDeque::new(),
135 next_ack_pos: None,
136 }
137 }
138
139 pub fn next_ack_pos(&self) -> Option<StreamPosition> {
140 self.next_ack_pos
141 }
142
143 pub fn accept(&mut self, ticket: Ticket, ack_range: Range<StreamPosition>) {
144 if let Some(prev_pos) = self.next_ack_pos.replace(StreamPosition {
145 seq_num: ack_range.end.seq_num,
146 timestamp: ack_range.end.timestamp,
147 }) {
148 assert_eq!(ack_range.start.seq_num, prev_pos.seq_num);
149 assert!(ack_range.start.timestamp >= prev_pos.timestamp);
150 }
151 let sender = ticket.accept(ack_range);
152 if let Some(prev) = self.queue.back() {
153 assert!(prev.durability_dependency.end < sender.durability_dependency.end);
154 }
155 self.queue.push_back(sender);
156 }
157
158 pub fn reject(&mut self, ticket: Ticket, err: AppendErrorInternal, stable_pos: StreamPosition) {
159 if let Some(sender) = ticket.reject(err, stable_pos) {
160 let dd = sender.durability_dependency;
161 let insert_pos = self
162 .queue
163 .partition_point(|x| x.durability_dependency.end <= dd.end);
164 self.queue.insert(insert_pos, sender);
165 }
166 }
167
168 pub fn on_stable(&mut self, stable_pos: StreamPosition) {
169 let completable = self
170 .queue
171 .iter()
172 .take_while(|sender| sender.durability_dependency.end <= stable_pos.seq_num)
173 .count();
174 for sender in self.queue.drain(..completable) {
175 sender.unblock(Ok(stable_pos));
176 }
177 if self.queue.capacity() >= 4 * self.queue.len() {
180 self.queue.shrink_to(self.queue.len() * 2);
181 }
182 }
183
184 pub fn on_durability_failed(self, err: slatedb::Error) {
185 let err = StorageError::from(err);
186 for sender in self.queue {
187 sender.unblock(Err(err.clone()));
188 }
189 }
190}
191
192pub struct Ticket {
193 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
194 session: Option<parking_lot::ArcMutexGuard<parking_lot::RawMutex, SessionState>>,
195}
196
197impl Ticket {
198 #[must_use]
199 fn accept(self, ack_range: Range<StreamPosition>) -> BlockedReplySender {
200 let durability_dependency = ..ack_range.end.seq_num;
201 if let Some(mut session) = self.session {
202 let session = session.deref_mut();
203 assert!(!session.poisoned, "thanks to typestate");
204 session.last_ack_end = durability_dependency;
205 }
206 BlockedReplySender {
207 reply: Ok(ack_range),
208 durability_dependency,
209 tx: self.tx,
210 }
211 }
212
213 #[must_use]
214 fn reject(
215 self,
216 append_err: AppendErrorInternal,
217 stable_pos: StreamPosition,
218 ) -> Option<BlockedReplySender> {
219 let mut durability_dependency =
220 if let AppendErrorInternal::ConditionFailed(cond_fail) = &append_err {
221 cond_fail.durability_dependency()
222 } else {
223 ..0
224 };
225 if let Some(mut session) = self.session {
226 let session = session.deref_mut();
227 assert!(!session.poisoned, "thanks to typestate");
228 session.poisoned = true;
229 durability_dependency = ..durability_dependency.end.max(session.last_ack_end.end);
230 }
231 if durability_dependency.end <= stable_pos.seq_num {
232 let _ = self.tx.send(Err(append_err));
233 None
234 } else {
235 Some(BlockedReplySender {
236 reply: Err(append_err),
237 durability_dependency,
238 tx: self.tx,
239 })
240 }
241 }
242}
243
244#[derive(Debug)]
245struct BlockedReplySender {
246 reply: Result<Range<StreamPosition>, AppendErrorInternal>,
247 durability_dependency: RangeTo<SeqNum>,
248 tx: oneshot::Sender<Result<AppendAck, AppendErrorInternal>>,
249}
250
251impl BlockedReplySender {
252 fn unblock(self, stable_pos: Result<StreamPosition, StorageError>) {
253 let reply = match stable_pos {
254 Ok(tail) => {
255 assert!(self.durability_dependency.end <= tail.seq_num);
256 self.reply.map(|ack| AppendAck {
257 start: ack.start,
258 end: ack.end,
259 tail,
260 })
261 }
262 Err(e) => Err(e.into()),
263 };
264 let _ = self.tx.send(reply);
265 }
266}