1use std::sync::{
2 Arc,
3 atomic::{AtomicU64, Ordering},
4};
5
6use bytesize::ByteSize;
7use dashmap::DashMap;
8use enum_ordinalize::Ordinalize;
9use futures::{
10 FutureExt as _,
11 future::{BoxFuture, Shared},
12};
13use s2_common::{
14 record::{NonZeroSeqNum, SeqNum, StreamPosition},
15 types::{
16 basin::BasinName,
17 config::{BasinConfig, OptionalStreamConfig},
18 resources::CreateMode,
19 stream::StreamName,
20 },
21};
22use slatedb::config::{DurabilityLevel, ScanOptions};
23use tokio::sync::broadcast;
24
25use super::{
26 error::{
27 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
28 StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
29 TransactionConflictError,
30 },
31 kv,
32 stream_id::StreamId,
33 streamer::StreamerClient,
34};
35use crate::backend::bgtasks::BgtaskTrigger;
36
37type StreamerInitFuture = Shared<BoxFuture<'static, Result<StreamerClient, StreamerError>>>;
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40struct StreamerInitId(u64);
41
42impl StreamerInitId {
43 fn next() -> Self {
44 static NEXT_ID: AtomicU64 = AtomicU64::new(1);
45 Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
46 }
47}
48
49#[derive(Clone)]
50enum StreamerClientSlot {
51 Initializing {
52 init_id: StreamerInitId,
53 future: StreamerInitFuture,
54 },
55 Ready {
56 client: StreamerClient,
57 },
58}
59
60#[derive(Clone)]
61pub struct Backend {
62 pub(super) db: slatedb::Db,
63 streamer_slots: Arc<DashMap<StreamId, StreamerClientSlot>>,
64 append_inflight_max: ByteSize,
65 bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
66}
67
68impl Backend {
69 pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
70 let (bgtask_trigger_tx, _) = broadcast::channel(16);
71 Self {
72 db,
73 streamer_slots: Arc::new(DashMap::new()),
74 append_inflight_max,
75 bgtask_trigger_tx,
76 }
77 }
78
79 pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
80 let _ = self.bgtask_trigger_tx.send(trigger);
81 }
82
83 pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
84 self.bgtask_trigger_tx.subscribe()
85 }
86
87 async fn start_streamer(
88 &self,
89 basin: BasinName,
90 stream: StreamName,
91 ) -> Result<StreamerClient, StreamerError> {
92 let stream_id = StreamId::new(&basin, &stream);
93
94 let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
95 self.db_get(
96 kv::stream_meta::ser_key(&basin, &stream),
97 kv::stream_meta::deser_value,
98 ),
99 self.db_get(
100 kv::stream_tail_position::ser_key(stream_id),
101 kv::stream_tail_position::deser_value,
102 ),
103 self.db_get(
104 kv::stream_fencing_token::ser_key(stream_id),
105 kv::stream_fencing_token::deser_value,
106 ),
107 self.db_get(
108 kv::stream_trim_point::ser_key(stream_id),
109 kv::stream_trim_point::deser_value,
110 )
111 )?;
112
113 let Some(meta) = meta else {
114 return Err(StreamNotFoundError { basin, stream }.into());
115 };
116
117 let tail_pos = tail_pos.map(|(pos, _)| pos).unwrap_or(StreamPosition::MIN);
118 self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
119 .await?;
120
121 let fencing_token = fencing_token.unwrap_or_default();
122
123 if trim_point == Some(..NonZeroSeqNum::MAX) {
124 return Err(StreamDeletionPendingError { basin, stream }.into());
125 }
126
127 let streamer_slots = self.streamer_slots.clone();
128 Ok(super::streamer::Spawner {
129 db: self.db.clone(),
130 stream_id,
131 config: meta.config,
132 tail_pos,
133 fencing_token,
134 trim_point: ..trim_point.map_or(SeqNum::MIN, |tp| tp.end.get()),
135 append_inflight_max: self.append_inflight_max,
136 bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
137 }
138 .spawn(move |client_id| {
139 streamer_slots.remove_if(&stream_id, |_, slot| {
140 matches!(slot, StreamerClientSlot::Ready { client } if client.id() == client_id)
141 });
142 }))
143 }
144
145 async fn assert_no_records_following_tail(
146 &self,
147 stream_id: StreamId,
148 basin: &BasinName,
149 stream: &StreamName,
150 tail_pos: StreamPosition,
151 ) -> Result<(), StorageError> {
152 let start_key = kv::stream_record_data::ser_key(
153 stream_id,
154 StreamPosition {
155 seq_num: tail_pos.seq_num,
156 timestamp: 0,
157 },
158 );
159 static SCAN_OPTS: ScanOptions = ScanOptions {
160 durability_filter: DurabilityLevel::Remote,
161 dirty: false,
162 read_ahead_bytes: 1,
163 cache_blocks: false,
164 max_fetch_tasks: 1,
165 };
166 let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
167 let Some(kv) = it.next().await? else {
168 return Ok(());
169 };
170 if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
171 return Ok(());
172 }
173 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
174 assert!(
175 deser_stream_id != stream_id,
176 "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
177 );
178 Ok(())
179 }
180
181 fn streamer_client_slot(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientSlot {
182 match self.streamer_slots.entry(StreamId::new(basin, stream)) {
183 dashmap::Entry::Occupied(oe) => oe.get().clone(),
184 dashmap::Entry::Vacant(ve) => {
185 let this = self.clone();
186 let basin = basin.clone();
187 let stream = stream.clone();
188 let init_id = StreamerInitId::next();
189 let future = async move { this.start_streamer(basin, stream).await }
190 .boxed()
191 .shared();
192 let slot = StreamerClientSlot::Initializing {
193 init_id,
194 future: future.clone(),
195 };
196 ve.insert(slot.clone());
197 slot
198 }
199 }
200 }
201
202 fn streamer_finish_initialization(
203 &self,
204 stream_id: StreamId,
205 init_id: StreamerInitId,
206 result: &Result<StreamerClient, StreamerError>,
207 ) {
208 if let dashmap::Entry::Occupied(mut oe) = self.streamer_slots.entry(stream_id) {
209 let is_same_init = matches!(
210 oe.get(),
211 StreamerClientSlot::Initializing {
212 init_id: state_init_id,
213 ..
214 } if *state_init_id == init_id
215 );
216 if is_same_init {
217 match result {
218 Ok(client) => {
219 oe.insert(StreamerClientSlot::Ready {
220 client: client.clone(),
221 });
222 }
223 Err(_) => {
224 oe.remove();
225 }
226 }
227 }
228 }
229 }
230
231 pub(super) async fn streamer_client(
232 &self,
233 basin: &BasinName,
234 stream: &StreamName,
235 ) -> Result<StreamerClient, StreamerError> {
236 let stream_id = StreamId::new(basin, stream);
237 match self.streamer_client_slot(basin, stream) {
238 StreamerClientSlot::Initializing { init_id, future } => {
239 let result = future.await;
240 self.streamer_finish_initialization(stream_id, init_id, &result);
241 result
242 }
243 StreamerClientSlot::Ready { client } => Ok(client),
244 }
245 }
246
247 pub(super) fn streamer_client_if_active(
248 &self,
249 basin: &BasinName,
250 stream: &StreamName,
251 ) -> Option<StreamerClient> {
252 let stream_id = StreamId::new(basin, stream);
253 let slot = self.streamer_slots.get(&stream_id)?;
254 match slot.value() {
255 StreamerClientSlot::Ready { client } => Some(client.clone()),
256 _ => None,
257 }
258 }
259
260 pub(super) async fn streamer_client_with_auto_create<E>(
261 &self,
262 basin: &BasinName,
263 stream: &StreamName,
264 should_auto_create: impl FnOnce(&BasinConfig) -> bool,
265 ) -> Result<StreamerClient, E>
266 where
267 E: From<StreamerError>
268 + From<StorageError>
269 + From<BasinNotFoundError>
270 + From<TransactionConflictError>
271 + From<BasinDeletionPendingError>
272 + From<StreamDeletionPendingError>
273 + From<StreamNotFoundError>,
274 {
275 match self.streamer_client(basin, stream).await {
276 Ok(client) => Ok(client),
277 Err(StreamerError::StreamNotFound(e)) => {
278 let config = match self.get_basin_config(basin.clone()).await {
279 Ok(config) => config,
280 Err(GetBasinConfigError::Storage(e)) => Err(e)?,
281 Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
282 };
283 if should_auto_create(&config) {
284 if let Err(e) = self
285 .create_stream(
286 basin.clone(),
287 stream.clone(),
288 OptionalStreamConfig::default(),
289 CreateMode::CreateOnly(None),
290 )
291 .await
292 {
293 match e {
294 CreateStreamError::Storage(e) => Err(e)?,
295 CreateStreamError::TransactionConflict(e) => Err(e)?,
296 CreateStreamError::BasinDeletionPending(e) => Err(e)?,
297 CreateStreamError::StreamDeletionPending(e) => Err(e)?,
298 CreateStreamError::BasinNotFound(e) => Err(e)?,
299 CreateStreamError::StreamAlreadyExists(_) => {}
300 }
301 }
302 Ok(self.streamer_client(basin, stream).await?)
303 } else {
304 Err(e.into())
305 }
306 }
307 Err(e) => Err(e.into()),
308 }
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use std::str::FromStr as _;
315
316 use bytes::Bytes;
317 use s2_common::{
318 record::{Metered, Record, StreamPosition},
319 types::{config::BasinConfig, resources::CreateMode},
320 };
321 use slatedb::{WriteBatch, config::WriteOptions, object_store};
322 use time::OffsetDateTime;
323
324 use super::*;
325
326 async fn new_test_backend() -> Backend {
327 let object_store: Arc<dyn object_store::ObjectStore> =
328 Arc::new(object_store::memory::InMemory::new());
329 let db = slatedb::Db::builder("test", object_store)
330 .build()
331 .await
332 .unwrap();
333 Backend::new(db, ByteSize::b(1))
334 }
335
336 #[tokio::test]
337 #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
338 async fn start_streamer_fails_if_records_exist_after_tail_pos() {
339 let backend = new_test_backend().await;
340
341 let basin = BasinName::from_str("testbasin1").unwrap();
342 let stream = StreamName::from_str("stream1").unwrap();
343 let stream_id = StreamId::new(&basin, &stream);
344
345 let meta = kv::stream_meta::StreamMeta {
346 config: OptionalStreamConfig::default(),
347 created_at: OffsetDateTime::now_utc(),
348 deleted_at: None,
349 creation_idempotency_key: None,
350 };
351
352 let tail_pos = StreamPosition {
353 seq_num: 1,
354 timestamp: 123,
355 };
356 let record_pos = StreamPosition {
357 seq_num: tail_pos.seq_num,
358 timestamp: tail_pos.timestamp,
359 };
360
361 let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
362 let metered_record: Metered<Record> = record.into();
363
364 let mut wb = WriteBatch::new();
365 wb.put(
366 kv::stream_meta::ser_key(&basin, &stream),
367 kv::stream_meta::ser_value(&meta),
368 );
369 wb.put(
370 kv::stream_tail_position::ser_key(stream_id),
371 kv::stream_tail_position::ser_value(
372 tail_pos,
373 kv::timestamp::TimestampSecs::from_secs(1),
374 ),
375 );
376 wb.put(
377 kv::stream_record_data::ser_key(stream_id, record_pos),
378 kv::stream_record_data::ser_value(metered_record.as_ref()),
379 );
380 static WRITE_OPTS: WriteOptions = WriteOptions {
381 await_durable: true,
382 };
383 backend
384 .db
385 .write_with_options(wb, &WRITE_OPTS)
386 .await
387 .unwrap();
388
389 backend
390 .start_streamer(basin.clone(), stream.clone())
391 .await
392 .unwrap();
393 }
394
395 #[tokio::test]
396 async fn streamer_client_slot_uses_single_initializer() {
397 let backend = new_test_backend().await;
398 let basin = BasinName::from_str("testbasin2").unwrap();
399 let stream = StreamName::from_str("stream2").unwrap();
400
401 let slot_1 = backend.streamer_client_slot(&basin, &stream);
402 let slot_2 = backend.streamer_client_slot(&basin, &stream);
403
404 let (init_id_1, init_id_2) = match (slot_1, slot_2) {
405 (
406 StreamerClientSlot::Initializing {
407 init_id: init_id_1, ..
408 },
409 StreamerClientSlot::Initializing {
410 init_id: init_id_2, ..
411 },
412 ) => (init_id_1, init_id_2),
413 _ => panic!("expected both slots to be Initializing"),
414 };
415 assert_eq!(init_id_1, init_id_2);
416 assert_eq!(backend.streamer_slots.len(), 1);
417 }
418
419 #[tokio::test]
420 async fn streamer_client_if_active_is_peek_only() {
421 let backend = new_test_backend().await;
422 let basin = BasinName::from_str("testbasin3").unwrap();
423 let stream = StreamName::from_str("stream3").unwrap();
424
425 backend
426 .create_basin(
427 basin.clone(),
428 BasinConfig::default(),
429 CreateMode::CreateOnly(None),
430 )
431 .await
432 .unwrap();
433 backend
434 .create_stream(
435 basin.clone(),
436 stream.clone(),
437 OptionalStreamConfig::default(),
438 CreateMode::CreateOnly(None),
439 )
440 .await
441 .unwrap();
442
443 assert!(backend.streamer_slots.is_empty());
444 assert!(backend.streamer_client_if_active(&basin, &stream).is_none());
445 assert!(backend.streamer_slots.is_empty());
446 }
447
448 #[tokio::test]
449 async fn streamer_client_failed_init_is_not_memoized() {
450 let backend = new_test_backend().await;
451 let basin = BasinName::from_str("testbasin4").unwrap();
452 let stream = StreamName::from_str("stream4").unwrap();
453 let stream_id = StreamId::new(&basin, &stream);
454
455 for _ in 0..2 {
456 let err = backend.streamer_client(&basin, &stream).await;
457 assert!(matches!(err, Err(StreamerError::StreamNotFound(_))));
458 assert!(
459 backend.streamer_slots.get(&stream_id).is_none(),
460 "failed init should not be cached"
461 );
462 }
463 }
464
465 #[tokio::test]
466 async fn streamer_finish_initialization_ignores_stale_init_id() {
467 let backend = new_test_backend().await;
468 let basin = BasinName::from_str("testbasin5").unwrap();
469 let stream = StreamName::from_str("stream5").unwrap();
470 let stream_id = StreamId::new(&basin, &stream);
471
472 let stale_init_id = StreamerInitId::next();
473 let current_init_id = StreamerInitId::next();
474 let future = futures::future::pending::<Result<StreamerClient, StreamerError>>()
475 .boxed()
476 .shared();
477 backend.streamer_slots.insert(
478 stream_id,
479 StreamerClientSlot::Initializing {
480 init_id: current_init_id,
481 future: future.clone(),
482 },
483 );
484
485 let stale_result = Err(StreamNotFoundError { basin, stream }.into());
486 backend.streamer_finish_initialization(stream_id, stale_init_id, &stale_result);
487
488 let Some(slot) = backend.streamer_slots.get(&stream_id) else {
489 panic!("stale init completion should not alter slot state");
490 };
491 match slot.value() {
492 StreamerClientSlot::Initializing { init_id, .. } => {
493 assert_eq!(*init_id, current_init_id)
494 }
495 _ => panic!("expected initializing slot to remain unchanged"),
496 }
497 }
498}