Skip to main content

s2_lite/backend/
core.rs

1use std::sync::Arc;
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use futures::{
6    FutureExt as _,
7    future::{BoxFuture, Shared},
8};
9use s2_common::{
10    basin::BasinName,
11    config::{BasinConfig, OptionalStreamConfig},
12    encryption::{EncryptionAlgorithm, EncryptionSpec},
13    record::{NonZeroSeqNum, SeqNum, StreamPosition},
14    resources::ProvisionMode,
15    stream::StreamName,
16};
17use slatedb::config::{DurabilityLevel, ReadOptions, ScanOptions};
18use tokio::sync::{Semaphore, broadcast};
19
20use super::{
21    StreamHandle,
22    durability_notifier::DurabilityNotifier,
23    error::{
24        BasinDeletionPendingError, BasinNotFoundError, GetBasinConfigError, ProvisionStreamError,
25        StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
26        StreamerMissingInActionError, TransactionConflictError,
27    },
28    kv,
29    streamer::{GuardedStreamerClient, StreamerClient, StreamerGenerationId},
30};
31use crate::{backend::bgtasks::BgtaskTrigger, stream_id::StreamId};
32
33type StreamerInitFuture = Shared<BoxFuture<'static, Result<StreamerClient, StreamerError>>>;
34
35#[derive(Clone)]
36enum StreamerClientSlot {
37    Initializing {
38        generation_id: StreamerGenerationId,
39        future: StreamerInitFuture,
40    },
41    Ready {
42        client: StreamerClient,
43    },
44}
45
46#[derive(Clone)]
47pub struct Backend {
48    pub(super) db: slatedb::Db,
49    streamer_slots: Arc<DashMap<StreamId, StreamerClientSlot>>,
50    append_inflight_bytes_sema: Arc<Semaphore>,
51    durability_notifier: DurabilityNotifier,
52    bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
53}
54
55impl Backend {
56    pub fn new(db: slatedb::Db, append_inflight_bytes: ByteSize) -> Self {
57        let (bgtask_trigger_tx, _) = broadcast::channel(16);
58        let append_inflight_bytes = Arc::new(Semaphore::new(
59            (append_inflight_bytes.as_u64() as usize).clamp(
60                s2_common::caps::RECORD_BATCH_MAX.bytes,
61                Semaphore::MAX_PERMITS,
62            ),
63        ));
64        let durability_notifier = DurabilityNotifier::spawn(&db);
65        Self {
66            db,
67            streamer_slots: Arc::new(DashMap::new()),
68            append_inflight_bytes_sema: append_inflight_bytes,
69            durability_notifier,
70            bgtask_trigger_tx,
71        }
72    }
73
74    pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
75        let _ = self.bgtask_trigger_tx.send(trigger);
76    }
77
78    pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
79        self.bgtask_trigger_tx.subscribe()
80    }
81
82    async fn start_streamer(
83        &self,
84        generation_id: StreamerGenerationId,
85        basin: BasinName,
86        stream: StreamName,
87    ) -> Result<StreamerClient, StreamerError> {
88        let stream_id = StreamId::new(&basin, &stream);
89
90        let (meta, persisted_tail, fencing_token, trim_point) = tokio::try_join!(
91            self.db_get(
92                kv::stream_meta::ser_key(&basin, &stream),
93                kv::stream_meta::deser_value,
94            ),
95            self.load_persisted_stream_tail(stream_id),
96            self.db_get(
97                kv::stream_fencing_token::ser_key(stream_id),
98                kv::stream_fencing_token::deser_value,
99            ),
100            self.db_get(
101                kv::stream_trim_point::ser_key(stream_id),
102                kv::stream_trim_point::deser_value,
103            )
104        )?;
105
106        let Some(meta) = meta else {
107            return Err(StreamNotFoundError { basin, stream }.into());
108        };
109
110        let (tail_pos, last_tail_write_timestamp) =
111            persisted_tail.unwrap_or((StreamPosition::MIN, kv::timestamp::TimestampSecs::ZERO));
112
113        self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
114            .await?;
115
116        let fencing_token = fencing_token.unwrap_or_default();
117
118        if trim_point == Some(..NonZeroSeqNum::MAX) {
119            return Err(StreamDeletionPendingError.into());
120        }
121
122        let streamer_slots = self.streamer_slots.clone();
123        Ok(super::streamer::Spawner {
124            generation_id,
125            db: self.db.clone(),
126            stream_id,
127            config: meta.config,
128            cipher: meta.cipher,
129            tail_pos,
130            last_tail_write_timestamp,
131            fencing_token,
132            trim_point: ..trim_point.map_or(SeqNum::MIN, |tp| tp.end.get()),
133            append_inflight_bytes_sema: self.append_inflight_bytes_sema.clone(),
134            durability_notifier: self.durability_notifier.clone(),
135            bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
136        }
137        .spawn(move |client_id| {
138            streamer_slots.remove_if(&stream_id, |_, slot| {
139                matches!(slot, StreamerClientSlot::Ready { client } if client.generation_id() == client_id)
140            });
141        }))
142    }
143
144    async fn load_persisted_stream_tail(
145        &self,
146        stream_id: StreamId,
147    ) -> Result<Option<(StreamPosition, kv::timestamp::TimestampSecs)>, StorageError> {
148        let read_opts = ReadOptions {
149            durability_filter: DurabilityLevel::Remote,
150            ..Default::default()
151        };
152        let Some(entry) = self
153            .db
154            .get_key_value_with_options(kv::stream_tail_position::ser_key(stream_id), &read_opts)
155            .await?
156        else {
157            return Ok(None);
158        };
159        Ok(Some((
160            kv::stream_tail_position::deser_value(entry.value)?,
161            kv::timestamp::TimestampSecs::from_millis(entry.create_ts),
162        )))
163    }
164
165    async fn assert_no_records_following_tail(
166        &self,
167        stream_id: StreamId,
168        basin: &BasinName,
169        stream: &StreamName,
170        tail_pos: StreamPosition,
171    ) -> Result<(), StorageError> {
172        let start_key = kv::stream_record_data::ser_key(
173            stream_id,
174            StreamPosition {
175                seq_num: tail_pos.seq_num,
176                timestamp: 0,
177            },
178        );
179        let scan_opts = ScanOptions {
180            durability_filter: DurabilityLevel::Remote,
181            ..Default::default()
182        };
183        let mut it = self.db.scan_with_options(start_key.., &scan_opts).await?;
184        let Some(kv) = it.next().await? else {
185            return Ok(());
186        };
187        if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData as u8) {
188            return Ok(());
189        }
190        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
191        assert!(
192            deser_stream_id != stream_id,
193            "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
194        );
195        Ok(())
196    }
197
198    fn streamer_client_slot(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientSlot {
199        match self.streamer_slots.entry(StreamId::new(basin, stream)) {
200            dashmap::Entry::Occupied(mut oe) => {
201                if matches!(oe.get(), StreamerClientSlot::Ready { client } if client.is_dead()) {
202                    let slot = self.clone().new_initializing_slot(basin, stream);
203                    oe.insert(slot.clone());
204                    slot
205                } else {
206                    oe.get().clone()
207                }
208            }
209            dashmap::Entry::Vacant(ve) => {
210                let slot = self.clone().new_initializing_slot(basin, stream);
211                ve.insert(slot.clone());
212                slot
213            }
214        }
215    }
216
217    fn new_initializing_slot(self, basin: &BasinName, stream: &StreamName) -> StreamerClientSlot {
218        let basin = basin.clone();
219        let stream = stream.clone();
220        let generation_id = StreamerGenerationId::next();
221        let future = async move { self.start_streamer(generation_id, basin, stream).await }
222            .boxed()
223            .shared();
224        StreamerClientSlot::Initializing {
225            generation_id,
226            future,
227        }
228    }
229
230    fn streamer_finish_initialization(
231        &self,
232        stream_id: StreamId,
233        generation_id: StreamerGenerationId,
234        result: &Result<StreamerClient, StreamerError>,
235    ) {
236        if let dashmap::Entry::Occupied(mut oe) = self.streamer_slots.entry(stream_id) {
237            let is_same_init = matches!(
238                oe.get(),
239                StreamerClientSlot::Initializing {
240                    generation_id: state_generation_id,
241                    ..
242                } if *state_generation_id == generation_id
243            );
244            if is_same_init {
245                match result {
246                    Ok(client) => {
247                        debug_assert_eq!(client.generation_id(), generation_id);
248                        if client.is_dead() {
249                            oe.remove();
250                        } else {
251                            oe.insert(StreamerClientSlot::Ready {
252                                client: client.clone(),
253                            });
254                        }
255                    }
256                    Err(_) => {
257                        oe.remove();
258                    }
259                }
260            }
261        }
262    }
263
264    pub(super) async fn streamer_client(
265        &self,
266        basin: &BasinName,
267        stream: &StreamName,
268    ) -> Result<StreamerClient, StreamerError> {
269        let stream_id = StreamId::new(basin, stream);
270        match self.streamer_client_slot(basin, stream) {
271            StreamerClientSlot::Initializing {
272                generation_id,
273                future,
274            } => {
275                let result = future.await;
276                self.streamer_finish_initialization(stream_id, generation_id, &result);
277                result
278            }
279            StreamerClientSlot::Ready { client } => Ok(client),
280        }
281    }
282
283    pub(super) fn streamer_client_if_active(
284        &self,
285        basin: &BasinName,
286        stream: &StreamName,
287    ) -> Option<StreamerClient> {
288        let stream_id = StreamId::new(basin, stream);
289        let slot = self.streamer_slots.get(&stream_id)?;
290        match slot.value() {
291            StreamerClientSlot::Ready { client } if !client.is_dead() => Some(client.clone()),
292            _ => None,
293        }
294    }
295
296    pub(super) async fn streamer_client_guarded(
297        &self,
298        basin: &BasinName,
299        stream: &StreamName,
300    ) -> Result<GuardedStreamerClient, StreamerError> {
301        loop {
302            let client = self.streamer_client(basin, stream).await?;
303            match client.guard() {
304                Ok(client) => return Ok(client),
305                Err(StreamerMissingInActionError) => continue,
306            }
307        }
308    }
309
310    pub(super) async fn stream_handle_with_auto_create<E>(
311        &self,
312        basin: &BasinName,
313        stream: &StreamName,
314        should_auto_create: impl FnOnce(&BasinConfig) -> bool,
315        resolve_encryption: impl FnOnce(Option<EncryptionAlgorithm>) -> Result<EncryptionSpec, E>,
316    ) -> Result<StreamHandle, E>
317    where
318        E: From<StreamerError>
319            + From<StorageError>
320            + From<BasinNotFoundError>
321            + From<TransactionConflictError>
322            + From<BasinDeletionPendingError>
323            + From<StreamDeletionPendingError>
324            + From<StreamNotFoundError>,
325    {
326        match self.streamer_client_guarded(basin, stream).await {
327            Ok(client) => Ok(StreamHandle {
328                db: self.db.clone(),
329                encryption: resolve_encryption(client.cipher())?,
330                client,
331            }),
332            Err(StreamerError::StreamNotFound(e)) => {
333                let config = match self.get_basin_config(basin.clone()).await {
334                    Ok(config) => config,
335                    Err(GetBasinConfigError::Storage(e)) => Err(e)?,
336                    Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
337                };
338                if should_auto_create(&config) {
339                    if let Err(e) = self
340                        .provision_stream(
341                            basin.clone(),
342                            stream.clone(),
343                            OptionalStreamConfig::default(),
344                            ProvisionMode::CreateOnly {
345                                request_token: None,
346                            },
347                        )
348                        .await
349                    {
350                        match e {
351                            ProvisionStreamError::Storage(e) => Err(e)?,
352                            ProvisionStreamError::TransactionConflict(e) => Err(e)?,
353                            ProvisionStreamError::BasinDeletionPending(e) => Err(e)?,
354                            ProvisionStreamError::StreamDeletionPending(e) => Err(e)?,
355                            ProvisionStreamError::BasinNotFound(e) => Err(e)?,
356                            ProvisionStreamError::StreamAlreadyExists(_) => {}
357                            ProvisionStreamError::Validation(_) => {
358                                unreachable!("auto-create uses default config")
359                            }
360                        }
361                    }
362                    let client = self.streamer_client_guarded(basin, stream).await?;
363                    let encryption = resolve_encryption(client.cipher())?;
364                    Ok(StreamHandle {
365                        db: self.db.clone(),
366                        encryption,
367                        client,
368                    })
369                } else {
370                    Err(e.into())
371                }
372            }
373            Err(e) => Err(e.into()),
374        }
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use std::str::FromStr as _;
381
382    use bytes::Bytes;
383    use s2_common::{
384        config::{BasinConfig, OptionalStreamConfig, StreamConfig},
385        record::{Metered, MeteredExt as _, Record, StreamPosition},
386        resources::ProvisionMode,
387    };
388    use s2_storage::record::StoredRecord;
389    use slatedb::{WriteBatch, object_store};
390    use time::OffsetDateTime;
391
392    use super::*;
393
394    async fn new_test_backend() -> Backend {
395        let object_store: Arc<dyn object_store::ObjectStore> =
396            Arc::new(object_store::memory::InMemory::new());
397        let db = slatedb::Db::builder("test", object_store)
398            .build()
399            .await
400            .unwrap();
401        Backend::new(db, ByteSize::b(1))
402    }
403
404    #[tokio::test]
405    #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
406    async fn start_streamer_fails_if_records_exist_after_tail_pos() {
407        let backend = new_test_backend().await;
408
409        let basin = BasinName::from_str("testbasin1").unwrap();
410        let stream = StreamName::from_str("stream1").unwrap();
411        let stream_id = StreamId::new(&basin, &stream);
412
413        let meta = kv::stream_meta::StreamMeta {
414            config: StreamConfig::default(),
415            cipher: None,
416            created_at: OffsetDateTime::now_utc(),
417            deleted_at: None,
418            creation_idempotency_key: None,
419        };
420
421        let tail_pos = StreamPosition {
422            seq_num: 1,
423            timestamp: 123,
424        };
425        let record_pos = StreamPosition {
426            seq_num: tail_pos.seq_num,
427            timestamp: tail_pos.timestamp,
428        };
429
430        let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
431        let metered_record: Metered<StoredRecord> = StoredRecord::from(record).metered();
432
433        let mut wb = WriteBatch::new();
434        wb.put(
435            kv::stream_meta::ser_key(&basin, &stream),
436            kv::stream_meta::ser_value(&meta),
437        );
438        wb.put(
439            kv::stream_tail_position::ser_key(stream_id),
440            kv::stream_tail_position::ser_value(tail_pos),
441        );
442        wb.put(
443            kv::stream_record_data::ser_key(stream_id, record_pos),
444            kv::stream_record_data::ser_value(metered_record.as_ref()),
445        );
446        backend.db.write(wb).await.unwrap();
447
448        backend
449            .start_streamer(StreamerGenerationId::next(), basin.clone(), stream.clone())
450            .await
451            .unwrap();
452    }
453
454    #[tokio::test]
455    async fn streamer_client_slot_uses_single_initializer() {
456        let backend = new_test_backend().await;
457        let basin = BasinName::from_str("testbasin2").unwrap();
458        let stream = StreamName::from_str("stream2").unwrap();
459
460        let slot_1 = backend.streamer_client_slot(&basin, &stream);
461        let slot_2 = backend.streamer_client_slot(&basin, &stream);
462
463        let (generation_id_1, generation_id_2) = match (slot_1, slot_2) {
464            (
465                StreamerClientSlot::Initializing {
466                    generation_id: generation_id_1,
467                    ..
468                },
469                StreamerClientSlot::Initializing {
470                    generation_id: generation_id_2,
471                    ..
472                },
473            ) => (generation_id_1, generation_id_2),
474            _ => panic!("expected both slots to be Initializing"),
475        };
476        assert_eq!(generation_id_1, generation_id_2);
477        assert_eq!(backend.streamer_slots.len(), 1);
478    }
479
480    #[tokio::test]
481    async fn streamer_client_if_active_is_peek_only() {
482        let backend = new_test_backend().await;
483        let basin = BasinName::from_str("testbasin3").unwrap();
484        let stream = StreamName::from_str("stream3").unwrap();
485
486        backend
487            .provision_basin(
488                basin.clone(),
489                BasinConfig::default(),
490                ProvisionMode::CreateOnly {
491                    request_token: None,
492                },
493            )
494            .await
495            .unwrap();
496        backend
497            .provision_stream(
498                basin.clone(),
499                stream.clone(),
500                OptionalStreamConfig::default(),
501                ProvisionMode::CreateOnly {
502                    request_token: None,
503                },
504            )
505            .await
506            .unwrap();
507
508        assert!(backend.streamer_slots.is_empty());
509        assert!(backend.streamer_client_if_active(&basin, &stream).is_none());
510        assert!(backend.streamer_slots.is_empty());
511    }
512
513    #[tokio::test]
514    async fn streamer_client_failed_init_is_not_memoized() {
515        let backend = new_test_backend().await;
516        let basin = BasinName::from_str("testbasin4").unwrap();
517        let stream = StreamName::from_str("stream4").unwrap();
518        let stream_id = StreamId::new(&basin, &stream);
519
520        for _ in 0..2 {
521            let err = backend.streamer_client(&basin, &stream).await;
522            assert!(matches!(err, Err(StreamerError::StreamNotFound(_))));
523            assert!(
524                backend.streamer_slots.get(&stream_id).is_none(),
525                "failed init should not be cached"
526            );
527        }
528    }
529
530    #[tokio::test]
531    async fn streamer_finish_initialization_ignores_stale_generation_id() {
532        let backend = new_test_backend().await;
533        let basin = BasinName::from_str("testbasin5").unwrap();
534        let stream = StreamName::from_str("stream5").unwrap();
535        let stream_id = StreamId::new(&basin, &stream);
536
537        let stale_generation_id = StreamerGenerationId::next();
538        let current_generation_id = StreamerGenerationId::next();
539        let future = futures::future::pending::<Result<StreamerClient, StreamerError>>()
540            .boxed()
541            .shared();
542        backend.streamer_slots.insert(
543            stream_id,
544            StreamerClientSlot::Initializing {
545                generation_id: current_generation_id,
546                future: future.clone(),
547            },
548        );
549
550        let stale_result = Err(StreamNotFoundError { basin, stream }.into());
551        backend.streamer_finish_initialization(stream_id, stale_generation_id, &stale_result);
552
553        let Some(slot) = backend.streamer_slots.get(&stream_id) else {
554            panic!("stale init completion should not alter slot state");
555        };
556        match slot.value() {
557            StreamerClientSlot::Initializing { generation_id, .. } => {
558                assert_eq!(*generation_id, current_generation_id)
559            }
560            _ => panic!("expected initializing slot to remain unchanged"),
561        }
562    }
563}