1use std::sync::Arc;
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use futures::{
6 FutureExt as _,
7 future::{BoxFuture, Shared},
8};
9use s2_common::{
10 basin::BasinName,
11 config::{BasinConfig, OptionalStreamConfig},
12 encryption::{EncryptionAlgorithm, EncryptionSpec},
13 record::{NonZeroSeqNum, SeqNum, StreamPosition},
14 resources::ProvisionMode,
15 stream::StreamName,
16};
17use slatedb::config::{DurabilityLevel, ReadOptions, ScanOptions};
18use tokio::sync::{Semaphore, broadcast};
19
20use super::{
21 StreamHandle,
22 durability_notifier::DurabilityNotifier,
23 error::{
24 BasinDeletionPendingError, BasinNotFoundError, GetBasinConfigError, ProvisionStreamError,
25 StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
26 StreamerMissingInActionError, TransactionConflictError,
27 },
28 kv,
29 streamer::{GuardedStreamerClient, StreamerClient, StreamerGenerationId},
30};
31use crate::{backend::bgtasks::BgtaskTrigger, stream_id::StreamId};
32
33type StreamerInitFuture = Shared<BoxFuture<'static, Result<StreamerClient, StreamerError>>>;
34
35#[derive(Clone)]
36enum StreamerClientSlot {
37 Initializing {
38 generation_id: StreamerGenerationId,
39 future: StreamerInitFuture,
40 },
41 Ready {
42 client: StreamerClient,
43 },
44}
45
46#[derive(Clone)]
47pub struct Backend {
48 pub(super) db: slatedb::Db,
49 streamer_slots: Arc<DashMap<StreamId, StreamerClientSlot>>,
50 append_inflight_bytes_sema: Arc<Semaphore>,
51 durability_notifier: DurabilityNotifier,
52 bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
53}
54
55impl Backend {
56 pub fn new(db: slatedb::Db, append_inflight_bytes: ByteSize) -> Self {
57 let (bgtask_trigger_tx, _) = broadcast::channel(16);
58 let append_inflight_bytes = Arc::new(Semaphore::new(
59 (append_inflight_bytes.as_u64() as usize).clamp(
60 s2_common::caps::RECORD_BATCH_MAX.bytes,
61 Semaphore::MAX_PERMITS,
62 ),
63 ));
64 let durability_notifier = DurabilityNotifier::spawn(&db);
65 Self {
66 db,
67 streamer_slots: Arc::new(DashMap::new()),
68 append_inflight_bytes_sema: append_inflight_bytes,
69 durability_notifier,
70 bgtask_trigger_tx,
71 }
72 }
73
74 pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
75 let _ = self.bgtask_trigger_tx.send(trigger);
76 }
77
78 pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
79 self.bgtask_trigger_tx.subscribe()
80 }
81
82 async fn start_streamer(
83 &self,
84 generation_id: StreamerGenerationId,
85 basin: BasinName,
86 stream: StreamName,
87 ) -> Result<StreamerClient, StreamerError> {
88 let stream_id = StreamId::new(&basin, &stream);
89
90 let (meta, persisted_tail, fencing_token, trim_point) = tokio::try_join!(
91 self.db_get(
92 kv::stream_meta::ser_key(&basin, &stream),
93 kv::stream_meta::deser_value,
94 ),
95 self.load_persisted_stream_tail(stream_id),
96 self.db_get(
97 kv::stream_fencing_token::ser_key(stream_id),
98 kv::stream_fencing_token::deser_value,
99 ),
100 self.db_get(
101 kv::stream_trim_point::ser_key(stream_id),
102 kv::stream_trim_point::deser_value,
103 )
104 )?;
105
106 let Some(meta) = meta else {
107 return Err(StreamNotFoundError { basin, stream }.into());
108 };
109
110 let (tail_pos, last_tail_write_timestamp) =
111 persisted_tail.unwrap_or((StreamPosition::MIN, kv::timestamp::TimestampSecs::ZERO));
112
113 self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
114 .await?;
115
116 let fencing_token = fencing_token.unwrap_or_default();
117
118 if trim_point == Some(..NonZeroSeqNum::MAX) {
119 return Err(StreamDeletionPendingError.into());
120 }
121
122 let streamer_slots = self.streamer_slots.clone();
123 Ok(super::streamer::Spawner {
124 generation_id,
125 db: self.db.clone(),
126 stream_id,
127 config: meta.config,
128 cipher: meta.cipher,
129 tail_pos,
130 last_tail_write_timestamp,
131 fencing_token,
132 trim_point: ..trim_point.map_or(SeqNum::MIN, |tp| tp.end.get()),
133 append_inflight_bytes_sema: self.append_inflight_bytes_sema.clone(),
134 durability_notifier: self.durability_notifier.clone(),
135 bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
136 }
137 .spawn(move |client_id| {
138 streamer_slots.remove_if(&stream_id, |_, slot| {
139 matches!(slot, StreamerClientSlot::Ready { client } if client.generation_id() == client_id)
140 });
141 }))
142 }
143
144 async fn load_persisted_stream_tail(
145 &self,
146 stream_id: StreamId,
147 ) -> Result<Option<(StreamPosition, kv::timestamp::TimestampSecs)>, StorageError> {
148 let read_opts = ReadOptions {
149 durability_filter: DurabilityLevel::Remote,
150 ..Default::default()
151 };
152 let Some(entry) = self
153 .db
154 .get_key_value_with_options(kv::stream_tail_position::ser_key(stream_id), &read_opts)
155 .await?
156 else {
157 return Ok(None);
158 };
159 Ok(Some((
160 kv::stream_tail_position::deser_value(entry.value)?,
161 kv::timestamp::TimestampSecs::from_millis(entry.create_ts),
162 )))
163 }
164
165 async fn assert_no_records_following_tail(
166 &self,
167 stream_id: StreamId,
168 basin: &BasinName,
169 stream: &StreamName,
170 tail_pos: StreamPosition,
171 ) -> Result<(), StorageError> {
172 let start_key = kv::stream_record_data::ser_key(
173 stream_id,
174 StreamPosition {
175 seq_num: tail_pos.seq_num,
176 timestamp: 0,
177 },
178 );
179 let scan_opts = ScanOptions {
180 durability_filter: DurabilityLevel::Remote,
181 ..Default::default()
182 };
183 let mut it = self.db.scan_with_options(start_key.., &scan_opts).await?;
184 let Some(kv) = it.next().await? else {
185 return Ok(());
186 };
187 if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData as u8) {
188 return Ok(());
189 }
190 let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
191 assert!(
192 deser_stream_id != stream_id,
193 "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
194 );
195 Ok(())
196 }
197
198 fn streamer_client_slot(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientSlot {
199 match self.streamer_slots.entry(StreamId::new(basin, stream)) {
200 dashmap::Entry::Occupied(mut oe) => {
201 if matches!(oe.get(), StreamerClientSlot::Ready { client } if client.is_dead()) {
202 let slot = self.clone().new_initializing_slot(basin, stream);
203 oe.insert(slot.clone());
204 slot
205 } else {
206 oe.get().clone()
207 }
208 }
209 dashmap::Entry::Vacant(ve) => {
210 let slot = self.clone().new_initializing_slot(basin, stream);
211 ve.insert(slot.clone());
212 slot
213 }
214 }
215 }
216
217 fn new_initializing_slot(self, basin: &BasinName, stream: &StreamName) -> StreamerClientSlot {
218 let basin = basin.clone();
219 let stream = stream.clone();
220 let generation_id = StreamerGenerationId::next();
221 let future = async move { self.start_streamer(generation_id, basin, stream).await }
222 .boxed()
223 .shared();
224 StreamerClientSlot::Initializing {
225 generation_id,
226 future,
227 }
228 }
229
230 fn streamer_finish_initialization(
231 &self,
232 stream_id: StreamId,
233 generation_id: StreamerGenerationId,
234 result: &Result<StreamerClient, StreamerError>,
235 ) {
236 if let dashmap::Entry::Occupied(mut oe) = self.streamer_slots.entry(stream_id) {
237 let is_same_init = matches!(
238 oe.get(),
239 StreamerClientSlot::Initializing {
240 generation_id: state_generation_id,
241 ..
242 } if *state_generation_id == generation_id
243 );
244 if is_same_init {
245 match result {
246 Ok(client) => {
247 debug_assert_eq!(client.generation_id(), generation_id);
248 if client.is_dead() {
249 oe.remove();
250 } else {
251 oe.insert(StreamerClientSlot::Ready {
252 client: client.clone(),
253 });
254 }
255 }
256 Err(_) => {
257 oe.remove();
258 }
259 }
260 }
261 }
262 }
263
264 pub(super) async fn streamer_client(
265 &self,
266 basin: &BasinName,
267 stream: &StreamName,
268 ) -> Result<StreamerClient, StreamerError> {
269 let stream_id = StreamId::new(basin, stream);
270 match self.streamer_client_slot(basin, stream) {
271 StreamerClientSlot::Initializing {
272 generation_id,
273 future,
274 } => {
275 let result = future.await;
276 self.streamer_finish_initialization(stream_id, generation_id, &result);
277 result
278 }
279 StreamerClientSlot::Ready { client } => Ok(client),
280 }
281 }
282
283 pub(super) fn streamer_client_if_active(
284 &self,
285 basin: &BasinName,
286 stream: &StreamName,
287 ) -> Option<StreamerClient> {
288 let stream_id = StreamId::new(basin, stream);
289 let slot = self.streamer_slots.get(&stream_id)?;
290 match slot.value() {
291 StreamerClientSlot::Ready { client } if !client.is_dead() => Some(client.clone()),
292 _ => None,
293 }
294 }
295
296 pub(super) async fn streamer_client_guarded(
297 &self,
298 basin: &BasinName,
299 stream: &StreamName,
300 ) -> Result<GuardedStreamerClient, StreamerError> {
301 loop {
302 let client = self.streamer_client(basin, stream).await?;
303 match client.guard() {
304 Ok(client) => return Ok(client),
305 Err(StreamerMissingInActionError) => continue,
306 }
307 }
308 }
309
310 pub(super) async fn stream_handle_with_auto_create<E>(
311 &self,
312 basin: &BasinName,
313 stream: &StreamName,
314 should_auto_create: impl FnOnce(&BasinConfig) -> bool,
315 resolve_encryption: impl FnOnce(Option<EncryptionAlgorithm>) -> Result<EncryptionSpec, E>,
316 ) -> Result<StreamHandle, E>
317 where
318 E: From<StreamerError>
319 + From<StorageError>
320 + From<BasinNotFoundError>
321 + From<TransactionConflictError>
322 + From<BasinDeletionPendingError>
323 + From<StreamDeletionPendingError>
324 + From<StreamNotFoundError>,
325 {
326 match self.streamer_client_guarded(basin, stream).await {
327 Ok(client) => Ok(StreamHandle {
328 db: self.db.clone(),
329 encryption: resolve_encryption(client.cipher())?,
330 client,
331 }),
332 Err(StreamerError::StreamNotFound(e)) => {
333 let config = match self.get_basin_config(basin.clone()).await {
334 Ok(config) => config,
335 Err(GetBasinConfigError::Storage(e)) => Err(e)?,
336 Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
337 };
338 if should_auto_create(&config) {
339 if let Err(e) = self
340 .provision_stream(
341 basin.clone(),
342 stream.clone(),
343 OptionalStreamConfig::default(),
344 ProvisionMode::CreateOnly {
345 request_token: None,
346 },
347 )
348 .await
349 {
350 match e {
351 ProvisionStreamError::Storage(e) => Err(e)?,
352 ProvisionStreamError::TransactionConflict(e) => Err(e)?,
353 ProvisionStreamError::BasinDeletionPending(e) => Err(e)?,
354 ProvisionStreamError::StreamDeletionPending(e) => Err(e)?,
355 ProvisionStreamError::BasinNotFound(e) => Err(e)?,
356 ProvisionStreamError::StreamAlreadyExists(_) => {}
357 ProvisionStreamError::Validation(_) => {
358 unreachable!("auto-create uses default config")
359 }
360 }
361 }
362 let client = self.streamer_client_guarded(basin, stream).await?;
363 let encryption = resolve_encryption(client.cipher())?;
364 Ok(StreamHandle {
365 db: self.db.clone(),
366 encryption,
367 client,
368 })
369 } else {
370 Err(e.into())
371 }
372 }
373 Err(e) => Err(e.into()),
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use std::str::FromStr as _;
381
382 use bytes::Bytes;
383 use s2_common::{
384 config::{BasinConfig, OptionalStreamConfig, StreamConfig},
385 record::{Metered, MeteredExt as _, Record, StreamPosition},
386 resources::ProvisionMode,
387 };
388 use s2_storage::record::StoredRecord;
389 use slatedb::{WriteBatch, object_store};
390 use time::OffsetDateTime;
391
392 use super::*;
393
394 async fn new_test_backend() -> Backend {
395 let object_store: Arc<dyn object_store::ObjectStore> =
396 Arc::new(object_store::memory::InMemory::new());
397 let db = slatedb::Db::builder("test", object_store)
398 .build()
399 .await
400 .unwrap();
401 Backend::new(db, ByteSize::b(1))
402 }
403
404 #[tokio::test]
405 #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
406 async fn start_streamer_fails_if_records_exist_after_tail_pos() {
407 let backend = new_test_backend().await;
408
409 let basin = BasinName::from_str("testbasin1").unwrap();
410 let stream = StreamName::from_str("stream1").unwrap();
411 let stream_id = StreamId::new(&basin, &stream);
412
413 let meta = kv::stream_meta::StreamMeta {
414 config: StreamConfig::default(),
415 cipher: None,
416 created_at: OffsetDateTime::now_utc(),
417 deleted_at: None,
418 creation_idempotency_key: None,
419 };
420
421 let tail_pos = StreamPosition {
422 seq_num: 1,
423 timestamp: 123,
424 };
425 let record_pos = StreamPosition {
426 seq_num: tail_pos.seq_num,
427 timestamp: tail_pos.timestamp,
428 };
429
430 let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
431 let metered_record: Metered<StoredRecord> = StoredRecord::from(record).metered();
432
433 let mut wb = WriteBatch::new();
434 wb.put(
435 kv::stream_meta::ser_key(&basin, &stream),
436 kv::stream_meta::ser_value(&meta),
437 );
438 wb.put(
439 kv::stream_tail_position::ser_key(stream_id),
440 kv::stream_tail_position::ser_value(tail_pos),
441 );
442 wb.put(
443 kv::stream_record_data::ser_key(stream_id, record_pos),
444 kv::stream_record_data::ser_value(metered_record.as_ref()),
445 );
446 backend.db.write(wb).await.unwrap();
447
448 backend
449 .start_streamer(StreamerGenerationId::next(), basin.clone(), stream.clone())
450 .await
451 .unwrap();
452 }
453
454 #[tokio::test]
455 async fn streamer_client_slot_uses_single_initializer() {
456 let backend = new_test_backend().await;
457 let basin = BasinName::from_str("testbasin2").unwrap();
458 let stream = StreamName::from_str("stream2").unwrap();
459
460 let slot_1 = backend.streamer_client_slot(&basin, &stream);
461 let slot_2 = backend.streamer_client_slot(&basin, &stream);
462
463 let (generation_id_1, generation_id_2) = match (slot_1, slot_2) {
464 (
465 StreamerClientSlot::Initializing {
466 generation_id: generation_id_1,
467 ..
468 },
469 StreamerClientSlot::Initializing {
470 generation_id: generation_id_2,
471 ..
472 },
473 ) => (generation_id_1, generation_id_2),
474 _ => panic!("expected both slots to be Initializing"),
475 };
476 assert_eq!(generation_id_1, generation_id_2);
477 assert_eq!(backend.streamer_slots.len(), 1);
478 }
479
480 #[tokio::test]
481 async fn streamer_client_if_active_is_peek_only() {
482 let backend = new_test_backend().await;
483 let basin = BasinName::from_str("testbasin3").unwrap();
484 let stream = StreamName::from_str("stream3").unwrap();
485
486 backend
487 .provision_basin(
488 basin.clone(),
489 BasinConfig::default(),
490 ProvisionMode::CreateOnly {
491 request_token: None,
492 },
493 )
494 .await
495 .unwrap();
496 backend
497 .provision_stream(
498 basin.clone(),
499 stream.clone(),
500 OptionalStreamConfig::default(),
501 ProvisionMode::CreateOnly {
502 request_token: None,
503 },
504 )
505 .await
506 .unwrap();
507
508 assert!(backend.streamer_slots.is_empty());
509 assert!(backend.streamer_client_if_active(&basin, &stream).is_none());
510 assert!(backend.streamer_slots.is_empty());
511 }
512
513 #[tokio::test]
514 async fn streamer_client_failed_init_is_not_memoized() {
515 let backend = new_test_backend().await;
516 let basin = BasinName::from_str("testbasin4").unwrap();
517 let stream = StreamName::from_str("stream4").unwrap();
518 let stream_id = StreamId::new(&basin, &stream);
519
520 for _ in 0..2 {
521 let err = backend.streamer_client(&basin, &stream).await;
522 assert!(matches!(err, Err(StreamerError::StreamNotFound(_))));
523 assert!(
524 backend.streamer_slots.get(&stream_id).is_none(),
525 "failed init should not be cached"
526 );
527 }
528 }
529
530 #[tokio::test]
531 async fn streamer_finish_initialization_ignores_stale_generation_id() {
532 let backend = new_test_backend().await;
533 let basin = BasinName::from_str("testbasin5").unwrap();
534 let stream = StreamName::from_str("stream5").unwrap();
535 let stream_id = StreamId::new(&basin, &stream);
536
537 let stale_generation_id = StreamerGenerationId::next();
538 let current_generation_id = StreamerGenerationId::next();
539 let future = futures::future::pending::<Result<StreamerClient, StreamerError>>()
540 .boxed()
541 .shared();
542 backend.streamer_slots.insert(
543 stream_id,
544 StreamerClientSlot::Initializing {
545 generation_id: current_generation_id,
546 future: future.clone(),
547 },
548 );
549
550 let stale_result = Err(StreamNotFoundError { basin, stream }.into());
551 backend.streamer_finish_initialization(stream_id, stale_generation_id, &stale_result);
552
553 let Some(slot) = backend.streamer_slots.get(&stream_id) else {
554 panic!("stale init completion should not alter slot state");
555 };
556 match slot.value() {
557 StreamerClientSlot::Initializing { generation_id, .. } => {
558 assert_eq!(*generation_id, current_generation_id)
559 }
560 _ => panic!("expected initializing slot to remain unchanged"),
561 }
562 }
563}