Skip to main content

s2_lite/backend/
core.rs

1use std::{sync::Arc, time::Duration};
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use enum_ordinalize::Ordinalize;
6use s2_common::{
7    record::{NonZeroSeqNum, SeqNum, StreamPosition},
8    types::{
9        basin::BasinName,
10        config::{BasinConfig, OptionalStreamConfig},
11        resources::CreateMode,
12        stream::StreamName,
13    },
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::{sync::broadcast, time::Instant};
17
18use super::{
19    error::{
20        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
21        StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22        TransactionConflictError,
23    },
24    kv,
25    stream_id::StreamId,
26    streamer::{StreamerClient, StreamerClientState},
27};
28use crate::backend::bgtasks::BgtaskTrigger;
29
30#[derive(Clone)]
31pub struct Backend {
32    pub(super) db: slatedb::Db,
33    client_states: Arc<DashMap<StreamId, StreamerClientState>>,
34    append_inflight_max: ByteSize,
35    bgtask_trigger_tx: broadcast::Sender<BgtaskTrigger>,
36}
37
38impl Backend {
39    const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
40
41    pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
42        let (bgtask_trigger_tx, _) = broadcast::channel(16);
43        Self {
44            db,
45            client_states: Arc::new(DashMap::new()),
46            append_inflight_max,
47            bgtask_trigger_tx,
48        }
49    }
50
51    pub(super) fn bgtask_trigger(&self, trigger: BgtaskTrigger) {
52        let _ = self.bgtask_trigger_tx.send(trigger);
53    }
54
55    pub(super) fn bgtask_trigger_subscribe(&self) -> broadcast::Receiver<BgtaskTrigger> {
56        self.bgtask_trigger_tx.subscribe()
57    }
58
59    async fn start_streamer(
60        &self,
61        basin: BasinName,
62        stream: StreamName,
63    ) -> Result<StreamerClient, StreamerError> {
64        let stream_id = StreamId::new(&basin, &stream);
65
66        let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
67            self.db_get(
68                kv::stream_meta::ser_key(&basin, &stream),
69                kv::stream_meta::deser_value,
70            ),
71            self.db_get(
72                kv::stream_tail_position::ser_key(stream_id),
73                kv::stream_tail_position::deser_value,
74            ),
75            self.db_get(
76                kv::stream_fencing_token::ser_key(stream_id),
77                kv::stream_fencing_token::deser_value,
78            ),
79            self.db_get(
80                kv::stream_trim_point::ser_key(stream_id),
81                kv::stream_trim_point::deser_value,
82            )
83        )?;
84
85        let Some(meta) = meta else {
86            return Err(StreamNotFoundError { basin, stream }.into());
87        };
88
89        let tail_pos = tail_pos.map(|(pos, _)| pos).unwrap_or(StreamPosition::MIN);
90        self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
91            .await?;
92
93        let fencing_token = fencing_token.unwrap_or_default();
94
95        if trim_point == Some(..NonZeroSeqNum::MAX) {
96            return Err(StreamDeletionPendingError { basin, stream }.into());
97        }
98
99        let client_states = self.client_states.clone();
100        Ok(super::streamer::Spawner {
101            db: self.db.clone(),
102            stream_id,
103            config: meta.config,
104            tail_pos,
105            fencing_token,
106            trim_point: ..trim_point.map_or(SeqNum::MIN, |tp| tp.end.get()),
107            append_inflight_max: self.append_inflight_max,
108            bgtask_trigger_tx: self.bgtask_trigger_tx.clone(),
109        }
110        .spawn(move || {
111            client_states.remove(&stream_id);
112        }))
113    }
114
115    async fn assert_no_records_following_tail(
116        &self,
117        stream_id: StreamId,
118        basin: &BasinName,
119        stream: &StreamName,
120        tail_pos: StreamPosition,
121    ) -> Result<(), StorageError> {
122        let start_key = kv::stream_record_data::ser_key(
123            stream_id,
124            StreamPosition {
125                seq_num: tail_pos.seq_num,
126                timestamp: 0,
127            },
128        );
129        static SCAN_OPTS: ScanOptions = ScanOptions {
130            durability_filter: DurabilityLevel::Remote,
131            dirty: false,
132            read_ahead_bytes: 1,
133            cache_blocks: false,
134            max_fetch_tasks: 1,
135        };
136        let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
137        let Some(kv) = it.next().await? else {
138            return Ok(());
139        };
140        if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
141            return Ok(());
142        }
143        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
144        assert!(
145            deser_stream_id != stream_id,
146            "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
147        );
148        Ok(())
149    }
150
151    fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
152        match self.client_states.entry(StreamId::new(basin, stream)) {
153            dashmap::Entry::Occupied(oe) => oe.get().clone(),
154            dashmap::Entry::Vacant(ve) => {
155                let this = self.clone();
156                let stream_id = *(ve.key());
157                let basin = basin.clone();
158                let stream = stream.clone();
159                tokio::spawn(async move {
160                    let state = match this.start_streamer(basin, stream).await {
161                        Ok(client) => StreamerClientState::Ready { client },
162                        Err(error) => StreamerClientState::InitError {
163                            error: Box::new(error),
164                            timestamp: Instant::now(),
165                        },
166                    };
167                    let replaced_state = this.client_states.insert(stream_id, state);
168                    let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
169                        panic!("expected Blocked client but replaced: {replaced_state:?}");
170                    };
171                    notify.notify_waiters();
172                });
173                ve.insert(StreamerClientState::Blocked {
174                    notify: Default::default(),
175                })
176                .value()
177                .clone()
178            }
179        }
180    }
181
182    fn streamer_remove_unready(&self, stream_id: StreamId) {
183        if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
184            && let StreamerClientState::InitError { .. } = oe.get()
185        {
186            oe.remove();
187        }
188    }
189
190    pub(super) async fn streamer_client(
191        &self,
192        basin: &BasinName,
193        stream: &StreamName,
194    ) -> Result<StreamerClient, StreamerError> {
195        let mut waited = false;
196        loop {
197            match self.streamer_client_state(basin, stream) {
198                StreamerClientState::Blocked { notify } => {
199                    notify.notified().await;
200                    waited = true;
201                }
202                StreamerClientState::InitError { error, timestamp } => {
203                    if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
204                        self.streamer_remove_unready(StreamId::new(basin, stream));
205                    } else {
206                        return Err(*error);
207                    }
208                }
209                StreamerClientState::Ready { client } => {
210                    return Ok(client);
211                }
212            }
213        }
214    }
215
216    pub(super) fn streamer_client_if_active(
217        &self,
218        basin: &BasinName,
219        stream: &StreamName,
220    ) -> Option<StreamerClient> {
221        match self.streamer_client_state(basin, stream) {
222            StreamerClientState::Ready { client } => Some(client),
223            _ => None,
224        }
225    }
226
227    pub(super) async fn streamer_client_with_auto_create<E>(
228        &self,
229        basin: &BasinName,
230        stream: &StreamName,
231        should_auto_create: impl FnOnce(&BasinConfig) -> bool,
232    ) -> Result<StreamerClient, E>
233    where
234        E: From<StreamerError>
235            + From<StorageError>
236            + From<BasinNotFoundError>
237            + From<TransactionConflictError>
238            + From<BasinDeletionPendingError>
239            + From<StreamDeletionPendingError>
240            + From<StreamNotFoundError>,
241    {
242        match self.streamer_client(basin, stream).await {
243            Ok(client) => Ok(client),
244            Err(StreamerError::StreamNotFound(e)) => {
245                let config = match self.get_basin_config(basin.clone()).await {
246                    Ok(config) => config,
247                    Err(GetBasinConfigError::Storage(e)) => Err(e)?,
248                    Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
249                };
250                if should_auto_create(&config) {
251                    if let Err(e) = self
252                        .create_stream(
253                            basin.clone(),
254                            stream.clone(),
255                            OptionalStreamConfig::default(),
256                            CreateMode::CreateOnly(None),
257                        )
258                        .await
259                    {
260                        match e {
261                            CreateStreamError::Storage(e) => Err(e)?,
262                            CreateStreamError::TransactionConflict(e) => Err(e)?,
263                            CreateStreamError::BasinDeletionPending(e) => Err(e)?,
264                            CreateStreamError::StreamDeletionPending(e) => Err(e)?,
265                            CreateStreamError::BasinNotFound(e) => Err(e)?,
266                            CreateStreamError::StreamAlreadyExists(_) => {}
267                        }
268                    }
269                    Ok(self.streamer_client(basin, stream).await?)
270                } else {
271                    Err(e.into())
272                }
273            }
274            Err(e) => Err(e.into()),
275        }
276    }
277}
278
279#[cfg(test)]
280mod tests {
281    use std::str::FromStr as _;
282
283    use bytes::Bytes;
284    use s2_common::record::{Metered, Record, StreamPosition};
285    use slatedb::{WriteBatch, config::WriteOptions, object_store};
286    use time::OffsetDateTime;
287
288    use super::*;
289
290    #[tokio::test]
291    #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
292    async fn start_streamer_fails_if_records_exist_after_tail_pos() {
293        let object_store: Arc<dyn object_store::ObjectStore> =
294            Arc::new(object_store::memory::InMemory::new());
295        let db = slatedb::Db::builder("test", object_store)
296            .build()
297            .await
298            .unwrap();
299
300        let backend = Backend::new(db.clone(), ByteSize::b(1));
301
302        let basin = BasinName::from_str("testbasin1").unwrap();
303        let stream = StreamName::from_str("stream1").unwrap();
304        let stream_id = StreamId::new(&basin, &stream);
305
306        let meta = kv::stream_meta::StreamMeta {
307            config: OptionalStreamConfig::default(),
308            created_at: OffsetDateTime::now_utc(),
309            deleted_at: None,
310            creation_idempotency_key: None,
311        };
312
313        let tail_pos = StreamPosition {
314            seq_num: 1,
315            timestamp: 123,
316        };
317        let record_pos = StreamPosition {
318            seq_num: tail_pos.seq_num,
319            timestamp: tail_pos.timestamp,
320        };
321
322        let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
323        let metered_record: Metered<Record> = record.into();
324
325        let mut wb = WriteBatch::new();
326        wb.put(
327            kv::stream_meta::ser_key(&basin, &stream),
328            kv::stream_meta::ser_value(&meta),
329        );
330        wb.put(
331            kv::stream_tail_position::ser_key(stream_id),
332            kv::stream_tail_position::ser_value(
333                tail_pos,
334                kv::timestamp::TimestampSecs::from_secs(1),
335            ),
336        );
337        wb.put(
338            kv::stream_record_data::ser_key(stream_id, record_pos),
339            kv::stream_record_data::ser_value(metered_record.as_ref()),
340        );
341        static WRITE_OPTS: WriteOptions = WriteOptions {
342            await_durable: true,
343        };
344        db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
345
346        backend
347            .start_streamer(basin.clone(), stream.clone())
348            .await
349            .unwrap();
350    }
351}