1use std::{sync::Arc, time::Duration};
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use enum_ordinalize::Ordinalize;
6use s2_common::{
7 record::{SeqNum, StreamPosition},
8 types::{
9 basin::BasinName,
10 config::{BasinConfig, OptionalStreamConfig},
11 resources::CreateMode,
12 stream::StreamName,
13 },
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::time::Instant;
17
18use super::{
19 error::{
20 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
21 StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22 TransactionConflictError,
23 },
24 kv,
25 stream_id::StreamId,
26 streamer::{StreamerClient, StreamerClientState},
27};
28
29pub const FOLLOWER_MAX_LAG: usize = 25;
30
31#[derive(Clone)]
32pub struct Backend {
33 pub(super) db: slatedb::Db,
34 client_states: Arc<DashMap<StreamId, StreamerClientState>>,
35 append_inflight_max: ByteSize,
36}
37
38impl Backend {
39 const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
40
41 pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
42 Self {
43 db,
44 client_states: Arc::new(DashMap::new()),
45 append_inflight_max,
46 }
47 }
48
49 async fn start_streamer(
50 &self,
51 basin: BasinName,
52 stream: StreamName,
53 ) -> Result<StreamerClient, StreamerError> {
54 let stream_id = StreamId::new(&basin, &stream);
55
56 let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
57 self.db_get(
58 kv::stream_meta::ser_key(&basin, &stream),
59 kv::stream_meta::deser_value,
60 ),
61 self.db_get(
62 kv::stream_tail_position::ser_key(stream_id),
63 kv::stream_tail_position::deser_value,
64 ),
65 self.db_get(
66 kv::stream_fencing_token::ser_key(stream_id),
67 kv::stream_fencing_token::deser_value,
68 ),
69 self.db_get(
70 kv::stream_trim_point::ser_key(stream_id),
71 kv::stream_trim_point::deser_value,
72 )
73 )?;
74
75 let Some(meta) = meta else {
76 return Err(StreamNotFoundError { basin, stream }.into());
77 };
78
79 let tail_pos = tail_pos.unwrap_or(StreamPosition::MIN);
80 self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
81 .await?;
82
83 let fencing_token = fencing_token.unwrap_or_default();
84 let trim_point = trim_point.unwrap_or(..SeqNum::MIN);
85
86 if trim_point.end == SeqNum::MAX {
87 return Err(StreamDeletionPendingError { basin, stream }.into());
88 }
89
90 let client_states = self.client_states.clone();
91 Ok(super::streamer::Spawner {
92 db: self.db.clone(),
93 stream_id,
94 config: meta.config,
95 tail_pos,
96 fencing_token,
97 trim_point,
98 append_inflight_max: self.append_inflight_max,
99 }
100 .spawn(move || {
101 client_states.remove(&stream_id);
102 }))
103 }
104
105 async fn assert_no_records_following_tail(
106 &self,
107 stream_id: StreamId,
108 basin: &BasinName,
109 stream: &StreamName,
110 tail_pos: StreamPosition,
111 ) -> Result<(), StorageError> {
112 let start_key = kv::stream_record_data::ser_key(
113 stream_id,
114 StreamPosition {
115 seq_num: tail_pos.seq_num,
116 timestamp: 0,
117 },
118 );
119 static SCAN_OPTS: ScanOptions = ScanOptions {
120 durability_filter: DurabilityLevel::Remote,
121 dirty: false,
122 read_ahead_bytes: 1,
123 cache_blocks: false,
124 max_fetch_tasks: 1,
125 };
126 let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
127 let Some(kv) = it.next().await? else {
128 return Ok(());
129 };
130 if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
131 return Ok(());
132 }
133 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
134 assert!(
135 deser_stream_id != stream_id,
136 "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
137 );
138 Ok(())
139 }
140
141 fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
142 match self.client_states.entry(StreamId::new(basin, stream)) {
143 dashmap::Entry::Occupied(oe) => oe.get().clone(),
144 dashmap::Entry::Vacant(ve) => {
145 let this = self.clone();
146 let stream_id = *(ve.key());
147 let basin = basin.clone();
148 let stream = stream.clone();
149 tokio::spawn(async move {
150 let state = match this.start_streamer(basin, stream).await {
151 Ok(client) => StreamerClientState::Ready { client },
152 Err(error) => StreamerClientState::InitError {
153 error: Box::new(error),
154 timestamp: Instant::now(),
155 },
156 };
157 let replaced_state = this.client_states.insert(stream_id, state);
158 let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
159 panic!("expected Blocked client but replaced: {replaced_state:?}");
160 };
161 notify.notify_waiters();
162 });
163 ve.insert(StreamerClientState::Blocked {
164 notify: Default::default(),
165 })
166 .value()
167 .clone()
168 }
169 }
170 }
171
172 fn streamer_remove_unready(&self, stream_id: StreamId) {
173 if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
174 && let StreamerClientState::InitError { .. } = oe.get()
175 {
176 oe.remove();
177 }
178 }
179
180 pub(super) async fn streamer_client(
181 &self,
182 basin: &BasinName,
183 stream: &StreamName,
184 ) -> Result<StreamerClient, StreamerError> {
185 let mut waited = false;
186 loop {
187 match self.streamer_client_state(basin, stream) {
188 StreamerClientState::Blocked { notify } => {
189 notify.notified().await;
190 waited = true;
191 }
192 StreamerClientState::InitError { error, timestamp } => {
193 if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
194 self.streamer_remove_unready(StreamId::new(basin, stream));
195 } else {
196 return Err(*error);
197 }
198 }
199 StreamerClientState::Ready { client } => {
200 return Ok(client);
201 }
202 }
203 }
204 }
205
206 pub(super) fn streamer_client_if_active(
207 &self,
208 basin: &BasinName,
209 stream: &StreamName,
210 ) -> Option<StreamerClient> {
211 match self.streamer_client_state(basin, stream) {
212 StreamerClientState::Ready { client } => Some(client),
213 _ => None,
214 }
215 }
216
217 pub(super) async fn streamer_client_with_auto_create<E>(
218 &self,
219 basin: &BasinName,
220 stream: &StreamName,
221 should_auto_create: impl FnOnce(&BasinConfig) -> bool,
222 ) -> Result<StreamerClient, E>
223 where
224 E: From<StreamerError>
225 + From<StorageError>
226 + From<BasinNotFoundError>
227 + From<TransactionConflictError>
228 + From<BasinDeletionPendingError>
229 + From<StreamDeletionPendingError>
230 + From<StreamNotFoundError>,
231 {
232 match self.streamer_client(basin, stream).await {
233 Ok(client) => Ok(client),
234 Err(StreamerError::StreamNotFound(e)) => {
235 let config = match self.get_basin_config(basin.clone()).await {
236 Ok(config) => config,
237 Err(GetBasinConfigError::Storage(e)) => Err(e)?,
238 Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
239 };
240 if should_auto_create(&config) {
241 if let Err(e) = self
242 .create_stream(
243 basin.clone(),
244 stream.clone(),
245 OptionalStreamConfig::default(),
246 CreateMode::CreateOnly(None),
247 )
248 .await
249 {
250 match e {
251 CreateStreamError::Storage(e) => Err(e)?,
252 CreateStreamError::TransactionConflict(e) => Err(e)?,
253 CreateStreamError::BasinDeletionPending(e) => Err(e)?,
254 CreateStreamError::StreamDeletionPending(e) => Err(e)?,
255 CreateStreamError::BasinNotFound(e) => Err(e)?,
256 CreateStreamError::StreamAlreadyExists(_) => {}
257 }
258 }
259 Ok(self.streamer_client(basin, stream).await?)
260 } else {
261 Err(e.into())
262 }
263 }
264 Err(e) => Err(e.into()),
265 }
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use std::str::FromStr as _;
272
273 use bytes::Bytes;
274 use s2_common::record::{Metered, Record, StreamPosition};
275 use slatedb::{WriteBatch, config::WriteOptions, object_store};
276 use time::OffsetDateTime;
277
278 use super::*;
279
280 #[tokio::test]
281 #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
282 async fn start_streamer_fails_if_records_exist_after_tail_pos() {
283 let object_store: Arc<dyn object_store::ObjectStore> =
284 Arc::new(object_store::memory::InMemory::new());
285 let db = slatedb::Db::builder("test", object_store)
286 .build()
287 .await
288 .unwrap();
289
290 let backend = Backend::new(db.clone(), ByteSize::b(1));
291
292 let basin = BasinName::from_str("testbasin1").unwrap();
293 let stream = StreamName::from_str("stream1").unwrap();
294 let stream_id = StreamId::new(&basin, &stream);
295
296 let meta = kv::stream_meta::StreamMeta {
297 config: OptionalStreamConfig::default(),
298 created_at: OffsetDateTime::now_utc(),
299 deleted_at: None,
300 creation_idempotency_key: None,
301 };
302
303 let tail_pos = StreamPosition {
304 seq_num: 1,
305 timestamp: 123,
306 };
307 let record_pos = StreamPosition {
308 seq_num: tail_pos.seq_num,
309 timestamp: tail_pos.timestamp,
310 };
311
312 let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
313 let metered_record: Metered<Record> = record.into();
314
315 let mut wb = WriteBatch::new();
316 wb.put(
317 kv::stream_meta::ser_key(&basin, &stream),
318 kv::stream_meta::ser_value(&meta),
319 );
320 wb.put(
321 kv::stream_tail_position::ser_key(stream_id),
322 kv::stream_tail_position::ser_value(tail_pos),
323 );
324 wb.put(
325 kv::stream_record_data::ser_key(stream_id, record_pos),
326 kv::stream_record_data::ser_value(metered_record.as_ref()),
327 );
328 static WRITE_OPTS: WriteOptions = WriteOptions {
329 await_durable: true,
330 };
331 db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
332
333 backend
334 .start_streamer(basin.clone(), stream.clone())
335 .await
336 .unwrap();
337 }
338}