Skip to main content

s2_lite/backend/
core.rs

1use std::sync::{
2    Arc,
3    atomic::{AtomicU64, Ordering},
4};
5
6use bytesize::ByteSize;
7use dashmap::DashMap;
8use enum_ordinalize::Ordinalize;
9use futures::{
10    FutureExt as _,
11    future::{BoxFuture, Shared},
12};
13use s2_common::{
14    record::{NonZeroSeqNum, SeqNum, StreamPosition},
15    types::{
16        basin::BasinName,
17        config::{BasinConfig, OptionalStreamConfig},
18        resources::CreateMode,
19        stream::StreamName,
20    },
21};
22use slatedb::config::{DurabilityLevel, ScanOptions};
23use tokio::sync::broadcast;
24
25use super::{
26    error::{
27        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
28        StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
29        TransactionConflictError,
30    },
31    kv,
32    stream_id::StreamId,
33    streamer::StreamerClient,
34};
35use crate::backend::bgtasks::BgtaskTrigger;
36
37type StreamerInitFuture = Shared<BoxFuture<'static, Result<StreamerClient, StreamerError>>>;
38
39#[derive(Clone, Copy, Debug, PartialEq, Eq)]
40struct StreamerInitId(u64);
41
42impl StreamerInitId {
43    fn next() -> Self {
44        static NEXT_ID: AtomicU64 = AtomicU64::new(1);
45        Self(NEXT_ID.fetch_add(1, Ordering::Relaxed))
46    }
47}
48
49#[derive(Clone)]
50enum StreamerClientSlot {
51    Initializing {
52        init_id: StreamerInitId,
53        future: StreamerInitFuture,
54    },
55    Ready {
56        client: StreamerClient,
57    },
58}
59
60#[derive(Clone)]
61pub struct Backend {
62    pub(super) db: slatedb::Db,
63    streamer_slots: Arc<DashMap<StreamId, StreamerClientSlot>>,
64    append_inflight_max: ByteSize,
65    bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
66}
67
68impl Backend {
69    pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
70        let (bgtask_trigger_tx, _) = broadcast::channel(16);
71        Self {
72            db,
73            streamer_slots: Arc::new(DashMap::new()),
74            append_inflight_max,
75            bgtask_trigger_tx,
76        }
77    }
78
79    pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
80        let _ = self.bgtask_trigger_tx.send(trigger);
81    }
82
83    pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
84        self.bgtask_trigger_tx.subscribe()
85    }
86
87    async fn start_streamer(
88        &self,
89        basin: BasinName,
90        stream: StreamName,
91    ) -> Result<StreamerClient, StreamerError> {
92        let stream_id = StreamId::new(&basin, &stream);
93
94        let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
95            self.db_get(
96                kv::stream_meta::ser_key(&basin, &stream),
97                kv::stream_meta::deser_value,
98            ),
99            self.db_get(
100                kv::stream_tail_position::ser_key(stream_id),
101                kv::stream_tail_position::deser_value,
102            ),
103            self.db_get(
104                kv::stream_fencing_token::ser_key(stream_id),
105                kv::stream_fencing_token::deser_value,
106            ),
107            self.db_get(
108                kv::stream_trim_point::ser_key(stream_id),
109                kv::stream_trim_point::deser_value,
110            )
111        )?;
112
113        let Some(meta) = meta else {
114            return Err(StreamNotFoundError { basin, stream }.into());
115        };
116
117        let tail_pos = tail_pos.map(|(pos, _)| pos).unwrap_or(StreamPosition::MIN);
118        self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
119            .await?;
120
121        let fencing_token = fencing_token.unwrap_or_default();
122
123        if trim_point == Some(..NonZeroSeqNum::MAX) {
124            return Err(StreamDeletionPendingError { basin, stream }.into());
125        }
126
127        let streamer_slots = self.streamer_slots.clone();
128        Ok(super::streamer::Spawner {
129            db: self.db.clone(),
130            stream_id,
131            config: meta.config,
132            tail_pos,
133            fencing_token,
134            trim_point: ..trim_point.map_or(SeqNum::MIN, |tp| tp.end.get()),
135            append_inflight_max: self.append_inflight_max,
136            bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
137        }
138        .spawn(move |client_id| {
139            streamer_slots.remove_if(&stream_id, |_, slot| {
140                matches!(slot, StreamerClientSlot::Ready { client } if client.id() == client_id)
141            });
142        }))
143    }
144
145    async fn assert_no_records_following_tail(
146        &self,
147        stream_id: StreamId,
148        basin: &BasinName,
149        stream: &StreamName,
150        tail_pos: StreamPosition,
151    ) -> Result<(), StorageError> {
152        let start_key = kv::stream_record_data::ser_key(
153            stream_id,
154            StreamPosition {
155                seq_num: tail_pos.seq_num,
156                timestamp: 0,
157            },
158        );
159        static SCAN_OPTS: ScanOptions = ScanOptions {
160            durability_filter: DurabilityLevel::Remote,
161            dirty: false,
162            read_ahead_bytes: 1,
163            cache_blocks: false,
164            max_fetch_tasks: 1,
165        };
166        let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
167        let Some(kv) = it.next().await? else {
168            return Ok(());
169        };
170        if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
171            return Ok(());
172        }
173        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
174        assert!(
175            deser_stream_id != stream_id,
176            "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
177        );
178        Ok(())
179    }
180
181    fn streamer_client_slot(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientSlot {
182        match self.streamer_slots.entry(StreamId::new(basin, stream)) {
183            dashmap::Entry::Occupied(oe) => oe.get().clone(),
184            dashmap::Entry::Vacant(ve) => {
185                let this = self.clone();
186                let basin = basin.clone();
187                let stream = stream.clone();
188                let init_id = StreamerInitId::next();
189                let future = async move { this.start_streamer(basin, stream).await }
190                    .boxed()
191                    .shared();
192                let slot = StreamerClientSlot::Initializing {
193                    init_id,
194                    future: future.clone(),
195                };
196                ve.insert(slot.clone());
197                slot
198            }
199        }
200    }
201
202    fn streamer_finish_initialization(
203        &self,
204        stream_id: StreamId,
205        init_id: StreamerInitId,
206        result: &Result<StreamerClient, StreamerError>,
207    ) {
208        if let dashmap::Entry::Occupied(mut oe) = self.streamer_slots.entry(stream_id) {
209            let is_same_init = matches!(
210                oe.get(),
211                StreamerClientSlot::Initializing {
212                    init_id: state_init_id,
213                    ..
214                } if *state_init_id == init_id
215            );
216            if is_same_init {
217                match result {
218                    Ok(client) => {
219                        oe.insert(StreamerClientSlot::Ready {
220                            client: client.clone(),
221                        });
222                    }
223                    Err(_) => {
224                        oe.remove();
225                    }
226                }
227            }
228        }
229    }
230
231    pub(super) async fn streamer_client(
232        &self,
233        basin: &BasinName,
234        stream: &StreamName,
235    ) -> Result<StreamerClient, StreamerError> {
236        let stream_id = StreamId::new(basin, stream);
237        match self.streamer_client_slot(basin, stream) {
238            StreamerClientSlot::Initializing { init_id, future } => {
239                let result = future.await;
240                self.streamer_finish_initialization(stream_id, init_id, &result);
241                result
242            }
243            StreamerClientSlot::Ready { client } => Ok(client),
244        }
245    }
246
247    pub(super) fn streamer_client_if_active(
248        &self,
249        basin: &BasinName,
250        stream: &StreamName,
251    ) -> Option<StreamerClient> {
252        let stream_id = StreamId::new(basin, stream);
253        let slot = self.streamer_slots.get(&stream_id)?;
254        match slot.value() {
255            StreamerClientSlot::Ready { client } => Some(client.clone()),
256            _ => None,
257        }
258    }
259
260    pub(super) async fn streamer_client_with_auto_create<E>(
261        &self,
262        basin: &BasinName,
263        stream: &StreamName,
264        should_auto_create: impl FnOnce(&BasinConfig) -> bool,
265    ) -> Result<StreamerClient, E>
266    where
267        E: From<StreamerError>
268            + From<StorageError>
269            + From<BasinNotFoundError>
270            + From<TransactionConflictError>
271            + From<BasinDeletionPendingError>
272            + From<StreamDeletionPendingError>
273            + From<StreamNotFoundError>,
274    {
275        match self.streamer_client(basin, stream).await {
276            Ok(client) => Ok(client),
277            Err(StreamerError::StreamNotFound(e)) => {
278                let config = match self.get_basin_config(basin.clone()).await {
279                    Ok(config) => config,
280                    Err(GetBasinConfigError::Storage(e)) => Err(e)?,
281                    Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
282                };
283                if should_auto_create(&config) {
284                    if let Err(e) = self
285                        .create_stream(
286                            basin.clone(),
287                            stream.clone(),
288                            OptionalStreamConfig::default(),
289                            CreateMode::CreateOnly(None),
290                        )
291                        .await
292                    {
293                        match e {
294                            CreateStreamError::Storage(e) => Err(e)?,
295                            CreateStreamError::TransactionConflict(e) => Err(e)?,
296                            CreateStreamError::BasinDeletionPending(e) => Err(e)?,
297                            CreateStreamError::StreamDeletionPending(e) => Err(e)?,
298                            CreateStreamError::BasinNotFound(e) => Err(e)?,
299                            CreateStreamError::StreamAlreadyExists(_) => {}
300                        }
301                    }
302                    Ok(self.streamer_client(basin, stream).await?)
303                } else {
304                    Err(e.into())
305                }
306            }
307            Err(e) => Err(e.into()),
308        }
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use std::str::FromStr as _;
315
316    use bytes::Bytes;
317    use s2_common::{
318        record::{Metered, Record, StreamPosition},
319        types::{config::BasinConfig, resources::CreateMode},
320    };
321    use slatedb::{WriteBatch, config::WriteOptions, object_store};
322    use time::OffsetDateTime;
323
324    use super::*;
325
326    async fn new_test_backend() -> Backend {
327        let object_store: Arc<dyn object_store::ObjectStore> =
328            Arc::new(object_store::memory::InMemory::new());
329        let db = slatedb::Db::builder("test", object_store)
330            .build()
331            .await
332            .unwrap();
333        Backend::new(db, ByteSize::b(1))
334    }
335
336    #[tokio::test]
337    #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
338    async fn start_streamer_fails_if_records_exist_after_tail_pos() {
339        let backend = new_test_backend().await;
340
341        let basin = BasinName::from_str("testbasin1").unwrap();
342        let stream = StreamName::from_str("stream1").unwrap();
343        let stream_id = StreamId::new(&basin, &stream);
344
345        let meta = kv::stream_meta::StreamMeta {
346            config: OptionalStreamConfig::default(),
347            created_at: OffsetDateTime::now_utc(),
348            deleted_at: None,
349            creation_idempotency_key: None,
350        };
351
352        let tail_pos = StreamPosition {
353            seq_num: 1,
354            timestamp: 123,
355        };
356        let record_pos = StreamPosition {
357            seq_num: tail_pos.seq_num,
358            timestamp: tail_pos.timestamp,
359        };
360
361        let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
362        let metered_record: Metered<Record> = record.into();
363
364        let mut wb = WriteBatch::new();
365        wb.put(
366            kv::stream_meta::ser_key(&basin, &stream),
367            kv::stream_meta::ser_value(&meta),
368        );
369        wb.put(
370            kv::stream_tail_position::ser_key(stream_id),
371            kv::stream_tail_position::ser_value(
372                tail_pos,
373                kv::timestamp::TimestampSecs::from_secs(1),
374            ),
375        );
376        wb.put(
377            kv::stream_record_data::ser_key(stream_id, record_pos),
378            kv::stream_record_data::ser_value(metered_record.as_ref()),
379        );
380        static WRITE_OPTS: WriteOptions = WriteOptions {
381            await_durable: true,
382        };
383        backend
384            .db
385            .write_with_options(wb, &WRITE_OPTS)
386            .await
387            .unwrap();
388
389        backend
390            .start_streamer(basin.clone(), stream.clone())
391            .await
392            .unwrap();
393    }
394
395    #[tokio::test]
396    async fn streamer_client_slot_uses_single_initializer() {
397        let backend = new_test_backend().await;
398        let basin = BasinName::from_str("testbasin2").unwrap();
399        let stream = StreamName::from_str("stream2").unwrap();
400
401        let slot_1 = backend.streamer_client_slot(&basin, &stream);
402        let slot_2 = backend.streamer_client_slot(&basin, &stream);
403
404        let (init_id_1, init_id_2) = match (slot_1, slot_2) {
405            (
406                StreamerClientSlot::Initializing {
407                    init_id: init_id_1, ..
408                },
409                StreamerClientSlot::Initializing {
410                    init_id: init_id_2, ..
411                },
412            ) => (init_id_1, init_id_2),
413            _ => panic!("expected both slots to be Initializing"),
414        };
415        assert_eq!(init_id_1, init_id_2);
416        assert_eq!(backend.streamer_slots.len(), 1);
417    }
418
419    #[tokio::test]
420    async fn streamer_client_if_active_is_peek_only() {
421        let backend = new_test_backend().await;
422        let basin = BasinName::from_str("testbasin3").unwrap();
423        let stream = StreamName::from_str("stream3").unwrap();
424
425        backend
426            .create_basin(
427                basin.clone(),
428                BasinConfig::default(),
429                CreateMode::CreateOnly(None),
430            )
431            .await
432            .unwrap();
433        backend
434            .create_stream(
435                basin.clone(),
436                stream.clone(),
437                OptionalStreamConfig::default(),
438                CreateMode::CreateOnly(None),
439            )
440            .await
441            .unwrap();
442
443        assert!(backend.streamer_slots.is_empty());
444        assert!(backend.streamer_client_if_active(&basin, &stream).is_none());
445        assert!(backend.streamer_slots.is_empty());
446    }
447
448    #[tokio::test]
449    async fn streamer_client_failed_init_is_not_memoized() {
450        let backend = new_test_backend().await;
451        let basin = BasinName::from_str("testbasin4").unwrap();
452        let stream = StreamName::from_str("stream4").unwrap();
453        let stream_id = StreamId::new(&basin, &stream);
454
455        for _ in 0..2 {
456            let err = backend.streamer_client(&basin, &stream).await;
457            assert!(matches!(err, Err(StreamerError::StreamNotFound(_))));
458            assert!(
459                backend.streamer_slots.get(&stream_id).is_none(),
460                "failed init should not be cached"
461            );
462        }
463    }
464
465    #[tokio::test]
466    async fn streamer_finish_initialization_ignores_stale_init_id() {
467        let backend = new_test_backend().await;
468        let basin = BasinName::from_str("testbasin5").unwrap();
469        let stream = StreamName::from_str("stream5").unwrap();
470        let stream_id = StreamId::new(&basin, &stream);
471
472        let stale_init_id = StreamerInitId::next();
473        let current_init_id = StreamerInitId::next();
474        let future = futures::future::pending::<Result<StreamerClient, StreamerError>>()
475            .boxed()
476            .shared();
477        backend.streamer_slots.insert(
478            stream_id,
479            StreamerClientSlot::Initializing {
480                init_id: current_init_id,
481                future: future.clone(),
482            },
483        );
484
485        let stale_result = Err(StreamNotFoundError { basin, stream }.into());
486        backend.streamer_finish_initialization(stream_id, stale_init_id, &stale_result);
487
488        let Some(slot) = backend.streamer_slots.get(&stream_id) else {
489            panic!("stale init completion should not alter slot state");
490        };
491        match slot.value() {
492            StreamerClientSlot::Initializing { init_id, .. } => {
493                assert_eq!(*init_id, current_init_id)
494            }
495            _ => panic!("expected initializing slot to remain unchanged"),
496        }
497    }
498}