s2_lite/backend/
core.rs

1use std::{sync::Arc, time::Duration};
2
3use bytesize::ByteSize;
4use dashmap::DashMap;
5use enum_ordinalize::Ordinalize;
6use s2_common::{
7    record::{SeqNum, StreamPosition},
8    types::{
9        basin::BasinName,
10        config::{BasinConfig, OptionalStreamConfig},
11        resources::CreateMode,
12        stream::StreamName,
13    },
14};
15use slatedb::config::{DurabilityLevel, ScanOptions};
16use tokio::time::Instant;
17
18use super::{
19    error::{
20        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
21        StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22        TransactionConflictError,
23    },
24    kv,
25    stream_id::StreamId,
26    streamer::{StreamerClient, StreamerClientState},
27};
28
29pub const FOLLOWER_MAX_LAG: usize = 25;
30
31#[derive(Clone)]
32pub struct Backend {
33    pub(super) db: slatedb::Db,
34    client_states: Arc<DashMap<StreamId, StreamerClientState>>,
35    append_inflight_max: ByteSize,
36}
37
38impl Backend {
39    const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
40
41    pub fn new(db: slatedb::Db, append_inflight_max: ByteSize) -> Self {
42        Self {
43            db,
44            client_states: Arc::new(DashMap::new()),
45            append_inflight_max,
46        }
47    }
48
49    async fn start_streamer(
50        &self,
51        basin: BasinName,
52        stream: StreamName,
53    ) -> Result<StreamerClient, StreamerError> {
54        let stream_id = StreamId::new(&basin, &stream);
55
56        let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
57            self.db_get(
58                kv::stream_meta::ser_key(&basin, &stream),
59                kv::stream_meta::deser_value,
60            ),
61            self.db_get(
62                kv::stream_tail_position::ser_key(stream_id),
63                kv::stream_tail_position::deser_value,
64            ),
65            self.db_get(
66                kv::stream_fencing_token::ser_key(stream_id),
67                kv::stream_fencing_token::deser_value,
68            ),
69            self.db_get(
70                kv::stream_trim_point::ser_key(stream_id),
71                kv::stream_trim_point::deser_value,
72            )
73        )?;
74
75        let Some(meta) = meta else {
76            return Err(StreamNotFoundError { basin, stream }.into());
77        };
78
79        let tail_pos = tail_pos.unwrap_or(StreamPosition::MIN);
80        self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
81            .await?;
82
83        let fencing_token = fencing_token.unwrap_or_default();
84        let trim_point = trim_point.unwrap_or(..SeqNum::MIN);
85
86        if trim_point.end == SeqNum::MAX {
87            return Err(StreamDeletionPendingError { basin, stream }.into());
88        }
89
90        let client_states = self.client_states.clone();
91        Ok(super::streamer::Spawner {
92            db: self.db.clone(),
93            stream_id,
94            config: meta.config,
95            tail_pos,
96            fencing_token,
97            trim_point,
98            append_inflight_max: self.append_inflight_max,
99        }
100        .spawn(move || {
101            client_states.remove(&stream_id);
102        }))
103    }
104
105    async fn assert_no_records_following_tail(
106        &self,
107        stream_id: StreamId,
108        basin: &BasinName,
109        stream: &StreamName,
110        tail_pos: StreamPosition,
111    ) -> Result<(), StorageError> {
112        let start_key = kv::stream_record_data::ser_key(
113            stream_id,
114            StreamPosition {
115                seq_num: tail_pos.seq_num,
116                timestamp: 0,
117            },
118        );
119        static SCAN_OPTS: ScanOptions = ScanOptions {
120            durability_filter: DurabilityLevel::Remote,
121            dirty: false,
122            read_ahead_bytes: 1,
123            cache_blocks: false,
124            max_fetch_tasks: 1,
125        };
126        let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
127        let Some(kv) = it.next().await? else {
128            return Ok(());
129        };
130        if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
131            return Ok(());
132        }
133        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
134        assert!(
135            deser_stream_id != stream_id,
136            "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
137        );
138        Ok(())
139    }
140
141    fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
142        match self.client_states.entry(StreamId::new(basin, stream)) {
143            dashmap::Entry::Occupied(oe) => oe.get().clone(),
144            dashmap::Entry::Vacant(ve) => {
145                let this = self.clone();
146                let stream_id = *(ve.key());
147                let basin = basin.clone();
148                let stream = stream.clone();
149                tokio::spawn(async move {
150                    let state = match this.start_streamer(basin, stream).await {
151                        Ok(client) => StreamerClientState::Ready { client },
152                        Err(error) => StreamerClientState::InitError {
153                            error: Box::new(error),
154                            timestamp: Instant::now(),
155                        },
156                    };
157                    let replaced_state = this.client_states.insert(stream_id, state);
158                    let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
159                        panic!("expected Blocked client but replaced: {replaced_state:?}");
160                    };
161                    notify.notify_waiters();
162                });
163                ve.insert(StreamerClientState::Blocked {
164                    notify: Default::default(),
165                })
166                .value()
167                .clone()
168            }
169        }
170    }
171
172    fn streamer_remove_unready(&self, stream_id: StreamId) {
173        if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
174            && let StreamerClientState::InitError { .. } = oe.get()
175        {
176            oe.remove();
177        }
178    }
179
180    pub(super) async fn streamer_client(
181        &self,
182        basin: &BasinName,
183        stream: &StreamName,
184    ) -> Result<StreamerClient, StreamerError> {
185        let mut waited = false;
186        loop {
187            match self.streamer_client_state(basin, stream) {
188                StreamerClientState::Blocked { notify } => {
189                    notify.notified().await;
190                    waited = true;
191                }
192                StreamerClientState::InitError { error, timestamp } => {
193                    if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
194                        self.streamer_remove_unready(StreamId::new(basin, stream));
195                    } else {
196                        return Err(*error);
197                    }
198                }
199                StreamerClientState::Ready { client } => {
200                    return Ok(client);
201                }
202            }
203        }
204    }
205
206    pub(super) fn streamer_client_if_active(
207        &self,
208        basin: &BasinName,
209        stream: &StreamName,
210    ) -> Option<StreamerClient> {
211        match self.streamer_client_state(basin, stream) {
212            StreamerClientState::Ready { client } => Some(client),
213            _ => None,
214        }
215    }
216
217    pub(super) async fn streamer_client_with_auto_create<E>(
218        &self,
219        basin: &BasinName,
220        stream: &StreamName,
221        should_auto_create: impl FnOnce(&BasinConfig) -> bool,
222    ) -> Result<StreamerClient, E>
223    where
224        E: From<StreamerError>
225            + From<StorageError>
226            + From<BasinNotFoundError>
227            + From<TransactionConflictError>
228            + From<BasinDeletionPendingError>
229            + From<StreamDeletionPendingError>
230            + From<StreamNotFoundError>,
231    {
232        match self.streamer_client(basin, stream).await {
233            Ok(client) => Ok(client),
234            Err(StreamerError::StreamNotFound(e)) => {
235                let config = match self.get_basin_config(basin.clone()).await {
236                    Ok(config) => config,
237                    Err(GetBasinConfigError::Storage(e)) => Err(e)?,
238                    Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
239                };
240                if should_auto_create(&config) {
241                    if let Err(e) = self
242                        .create_stream(
243                            basin.clone(),
244                            stream.clone(),
245                            OptionalStreamConfig::default(),
246                            CreateMode::CreateOnly(None),
247                        )
248                        .await
249                    {
250                        match e {
251                            CreateStreamError::Storage(e) => Err(e)?,
252                            CreateStreamError::TransactionConflict(e) => Err(e)?,
253                            CreateStreamError::BasinDeletionPending(e) => Err(e)?,
254                            CreateStreamError::StreamDeletionPending(e) => Err(e)?,
255                            CreateStreamError::BasinNotFound(e) => Err(e)?,
256                            CreateStreamError::StreamAlreadyExists(_) => {}
257                        }
258                    }
259                    Ok(self.streamer_client(basin, stream).await?)
260                } else {
261                    Err(e.into())
262                }
263            }
264            Err(e) => Err(e.into()),
265        }
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use std::str::FromStr as _;
272
273    use bytes::Bytes;
274    use s2_common::record::{Metered, Record, StreamPosition};
275    use slatedb::{WriteBatch, config::WriteOptions, object_store};
276    use time::OffsetDateTime;
277
278    use super::*;
279
280    #[tokio::test]
281    #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
282    async fn start_streamer_fails_if_records_exist_after_tail_pos() {
283        let object_store: Arc<dyn object_store::ObjectStore> =
284            Arc::new(object_store::memory::InMemory::new());
285        let db = slatedb::Db::builder("test", object_store)
286            .build()
287            .await
288            .unwrap();
289
290        let backend = Backend::new(db.clone(), ByteSize::b(1));
291
292        let basin = BasinName::from_str("testbasin1").unwrap();
293        let stream = StreamName::from_str("stream1").unwrap();
294        let stream_id = StreamId::new(&basin, &stream);
295
296        let meta = kv::stream_meta::StreamMeta {
297            config: OptionalStreamConfig::default(),
298            created_at: OffsetDateTime::now_utc(),
299            deleted_at: None,
300            creation_idempotency_key: None,
301        };
302
303        let tail_pos = StreamPosition {
304            seq_num: 1,
305            timestamp: 123,
306        };
307        let record_pos = StreamPosition {
308            seq_num: tail_pos.seq_num,
309            timestamp: tail_pos.timestamp,
310        };
311
312        let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
313        let metered_record: Metered<Record> = record.into();
314
315        let mut wb = WriteBatch::new();
316        wb.put(
317            kv::stream_meta::ser_key(&basin, &stream),
318            kv::stream_meta::ser_value(&meta),
319        );
320        wb.put(
321            kv::stream_tail_position::ser_key(stream_id),
322            kv::stream_tail_position::ser_value(tail_pos),
323        );
324        wb.put(
325            kv::stream_record_data::ser_key(stream_id, record_pos),
326            kv::stream_record_data::ser_value(metered_record.as_ref()),
327        );
328        static WRITE_OPTS: WriteOptions = WriteOptions {
329            await_durable: true,
330        };
331        db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
332
333        backend
334            .start_streamer(basin.clone(), stream.clone())
335            .await
336            .unwrap();
337    }
338}