s2_lite/backend/
core.rs

1use std::{sync::Arc, time::Duration};
2
3use dashmap::DashMap;
4use enum_ordinalize::Ordinalize;
5use s2_common::{
6    record::{SeqNum, StreamPosition},
7    types::{
8        basin::BasinName,
9        config::{BasinConfig, OptionalStreamConfig},
10        resources::CreateMode,
11        stream::StreamName,
12    },
13};
14use slatedb::config::{DurabilityLevel, ScanOptions};
15use tokio::time::Instant;
16
17use super::{
18    error::{
19        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, GetBasinConfigError,
20        StorageError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
21        TransactionConflictError, UnavailableError,
22    },
23    kv,
24    stream_id::StreamId,
25    streamer::{StreamerClient, StreamerClientState},
26};
27
28pub const FOLLOWER_MAX_LAG: usize = 25;
29
30#[derive(Clone)]
31pub struct Backend {
32    pub(super) db: slatedb::Db,
33    client_states: Arc<DashMap<StreamId, StreamerClientState>>,
34}
35
36impl Backend {
37    const FAILED_INIT_MEMORY: Duration = Duration::from_secs(1);
38
39    pub fn new(db: slatedb::Db) -> Self {
40        Self {
41            db,
42            client_states: Arc::new(DashMap::new()),
43        }
44    }
45
46    async fn start_streamer(
47        &self,
48        basin: BasinName,
49        stream: StreamName,
50    ) -> Result<StreamerClient, StreamerError> {
51        let stream_id = StreamId::new(&basin, &stream);
52
53        let (meta, tail_pos, fencing_token, trim_point) = tokio::try_join!(
54            self.db_get(
55                kv::stream_meta::ser_key(&basin, &stream),
56                kv::stream_meta::deser_value,
57            ),
58            self.db_get(
59                kv::stream_tail_position::ser_key(stream_id),
60                kv::stream_tail_position::deser_value,
61            ),
62            self.db_get(
63                kv::stream_fencing_token::ser_key(stream_id),
64                kv::stream_fencing_token::deser_value,
65            ),
66            self.db_get(
67                kv::stream_trim_point::ser_key(stream_id),
68                kv::stream_trim_point::deser_value,
69            )
70        )?;
71
72        let Some(meta) = meta else {
73            return Err(StreamNotFoundError { basin, stream }.into());
74        };
75
76        let tail_pos = tail_pos.unwrap_or(StreamPosition::MIN);
77        self.assert_no_records_following_tail(stream_id, &basin, &stream, tail_pos)
78            .await?;
79
80        let fencing_token = fencing_token.unwrap_or_default();
81        let trim_point = trim_point.unwrap_or(..SeqNum::MIN);
82
83        if trim_point.end == SeqNum::MAX {
84            return Err(StreamDeletionPendingError { basin, stream }.into());
85        }
86
87        let client_states = self.client_states.clone();
88        Ok(super::streamer::spawn_streamer(
89            self.db.clone(),
90            stream_id,
91            meta.config,
92            tail_pos,
93            fencing_token,
94            trim_point,
95            client_states,
96        ))
97    }
98
99    async fn assert_no_records_following_tail(
100        &self,
101        stream_id: StreamId,
102        basin: &BasinName,
103        stream: &StreamName,
104        tail_pos: StreamPosition,
105    ) -> Result<(), StorageError> {
106        let start_key = kv::stream_record_data::ser_key(
107            stream_id,
108            StreamPosition {
109                seq_num: tail_pos.seq_num,
110                timestamp: 0,
111            },
112        );
113        static SCAN_OPTS: ScanOptions = ScanOptions {
114            durability_filter: DurabilityLevel::Remote,
115            dirty: false,
116            read_ahead_bytes: 1,
117            cache_blocks: false,
118            max_fetch_tasks: 1,
119        };
120        let mut it = self.db.scan_with_options(start_key.., &SCAN_OPTS).await?;
121        let Some(kv) = it.next().await? else {
122            return Ok(());
123        };
124        if kv.key.first().copied() != Some(kv::KeyType::StreamRecordData.ordinal()) {
125            return Ok(());
126        }
127        let (deser_stream_id, pos) = kv::stream_record_data::deser_key(kv.key)?;
128        assert!(
129            deser_stream_id != stream_id,
130            "invariant violation: stream `{basin}/{stream}` tail_pos {tail_pos:?} but found record at {pos:?}"
131        );
132        Ok(())
133    }
134
135    fn streamer_client_state(&self, basin: &BasinName, stream: &StreamName) -> StreamerClientState {
136        match self.client_states.entry(StreamId::new(basin, stream)) {
137            dashmap::Entry::Occupied(oe) => oe.get().clone(),
138            dashmap::Entry::Vacant(ve) => {
139                let this = self.clone();
140                let stream_id = *(ve.key());
141                let basin = basin.clone();
142                let stream = stream.clone();
143                tokio::spawn(async move {
144                    let state = match this.start_streamer(basin, stream).await {
145                        Ok(client) => StreamerClientState::Ready { client },
146                        Err(error) => StreamerClientState::InitError {
147                            error: Box::new(error),
148                            timestamp: Instant::now(),
149                        },
150                    };
151                    let replaced_state = this.client_states.insert(stream_id, state);
152                    let Some(StreamerClientState::Blocked { notify }) = replaced_state else {
153                        panic!("expected Blocked client but replaced: {replaced_state:?}");
154                    };
155                    notify.notify_waiters();
156                });
157                ve.insert(StreamerClientState::Blocked {
158                    notify: Default::default(),
159                })
160                .value()
161                .clone()
162            }
163        }
164    }
165
166    fn streamer_remove_unready(&self, stream_id: StreamId) {
167        if let dashmap::Entry::Occupied(oe) = self.client_states.entry(stream_id)
168            && let StreamerClientState::InitError { .. } = oe.get()
169        {
170            oe.remove();
171        }
172    }
173
174    pub(super) async fn streamer_client(
175        &self,
176        basin: &BasinName,
177        stream: &StreamName,
178    ) -> Result<StreamerClient, StreamerError> {
179        let mut waited = false;
180        loop {
181            match self.streamer_client_state(basin, stream) {
182                StreamerClientState::Blocked { notify } => {
183                    notify.notified().await;
184                    waited = true;
185                }
186                StreamerClientState::InitError { error, timestamp } => {
187                    if !waited || timestamp.elapsed() > Self::FAILED_INIT_MEMORY {
188                        self.streamer_remove_unready(StreamId::new(basin, stream));
189                    } else {
190                        return Err(*error);
191                    }
192                }
193                StreamerClientState::Ready { client } => {
194                    return Ok(client);
195                }
196            }
197        }
198    }
199
200    pub(super) fn streamer_client_if_active(
201        &self,
202        basin: &BasinName,
203        stream: &StreamName,
204    ) -> Option<StreamerClient> {
205        match self.streamer_client_state(basin, stream) {
206            StreamerClientState::Ready { client } => Some(client),
207            _ => None,
208        }
209    }
210
211    pub(super) async fn streamer_client_with_auto_create<E>(
212        &self,
213        basin: &BasinName,
214        stream: &StreamName,
215        should_auto_create: impl FnOnce(&BasinConfig) -> bool,
216    ) -> Result<StreamerClient, E>
217    where
218        E: From<StreamerError>
219            + From<StorageError>
220            + From<BasinNotFoundError>
221            + From<TransactionConflictError>
222            + From<UnavailableError>
223            + From<BasinDeletionPendingError>
224            + From<StreamDeletionPendingError>
225            + From<StreamNotFoundError>,
226    {
227        match self.streamer_client(basin, stream).await {
228            Ok(client) => Ok(client),
229            Err(StreamerError::StreamNotFound(e)) => {
230                let config = match self.get_basin_config(basin.clone()).await {
231                    Ok(config) => config,
232                    Err(GetBasinConfigError::Storage(e)) => Err(e)?,
233                    Err(GetBasinConfigError::BasinNotFound(e)) => Err(e)?,
234                };
235                if should_auto_create(&config) {
236                    if let Err(e) = self
237                        .create_stream(
238                            basin.clone(),
239                            stream.clone(),
240                            OptionalStreamConfig::default(),
241                            CreateMode::CreateOnly(None),
242                        )
243                        .await
244                    {
245                        match e {
246                            CreateStreamError::Storage(e) => Err(e)?,
247                            CreateStreamError::TransactionConflict(e) => Err(e)?,
248                            CreateStreamError::Unavailable(e) => Err(e)?,
249                            CreateStreamError::BasinDeletionPending(e) => Err(e)?,
250                            CreateStreamError::StreamDeletionPending(e) => Err(e)?,
251                            CreateStreamError::BasinNotFound(e) => Err(e)?,
252                            CreateStreamError::StreamAlreadyExists(_) => {}
253                        }
254                    }
255                    Ok(self.streamer_client(basin, stream).await?)
256                } else {
257                    Err(e.into())
258                }
259            }
260            Err(e) => Err(e.into()),
261        }
262    }
263}
264
265#[cfg(test)]
266mod tests {
267    use std::str::FromStr as _;
268
269    use bytes::Bytes;
270    use s2_common::record::{Metered, Record, StreamPosition};
271    use slatedb::{WriteBatch, config::WriteOptions, object_store};
272    use time::OffsetDateTime;
273
274    use super::*;
275
276    #[tokio::test]
277    #[should_panic(expected = "invariant violation: stream `testbasin1/stream1` tail_pos")]
278    async fn start_streamer_fails_if_records_exist_after_tail_pos() {
279        let object_store: Arc<dyn object_store::ObjectStore> =
280            Arc::new(object_store::memory::InMemory::new());
281        let db = slatedb::Db::builder("test", object_store)
282            .build()
283            .await
284            .unwrap();
285
286        let backend = Backend::new(db.clone());
287
288        let basin = BasinName::from_str("testbasin1").unwrap();
289        let stream = StreamName::from_str("stream1").unwrap();
290        let stream_id = StreamId::new(&basin, &stream);
291
292        let meta = kv::stream_meta::StreamMeta {
293            config: OptionalStreamConfig::default(),
294            created_at: OffsetDateTime::now_utc(),
295            deleted_at: None,
296            creation_idempotency_key: None,
297        };
298
299        let tail_pos = StreamPosition {
300            seq_num: 1,
301            timestamp: 123,
302        };
303        let record_pos = StreamPosition {
304            seq_num: tail_pos.seq_num,
305            timestamp: tail_pos.timestamp,
306        };
307
308        let record = Record::try_from_parts(vec![], Bytes::from_static(b"hello")).unwrap();
309        let metered_record: Metered<Record> = record.into();
310
311        let mut wb = WriteBatch::new();
312        wb.put(
313            kv::stream_meta::ser_key(&basin, &stream),
314            kv::stream_meta::ser_value(&meta),
315        );
316        wb.put(
317            kv::stream_tail_position::ser_key(stream_id),
318            kv::stream_tail_position::ser_value(tail_pos),
319        );
320        wb.put(
321            kv::stream_record_data::ser_key(stream_id, record_pos),
322            kv::stream_record_data::ser_value(metered_record.as_ref()),
323        );
324        static WRITE_OPTS: WriteOptions = WriteOptions {
325            await_durable: true,
326        };
327        db.write_with_options(wb, &WRITE_OPTS).await.unwrap();
328
329        backend
330            .start_streamer(basin.clone(), stream.clone())
331            .await
332            .unwrap();
333    }
334}