Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    record::StreamPosition,
4    types::{
5        basin::BasinName,
6        config::{OptionalStreamConfig, StreamReconfiguration},
7        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
8        stream::{ListStreamsRequest, StreamInfo, StreamName},
9    },
10};
11use slatedb::{
12    IsolationLevel,
13    config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
19use crate::backend::{
20    error::{
21        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
22        GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
23        StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
24    },
25    kv,
26    stream_id::StreamId,
27};
28
29impl Backend {
30    pub async fn list_streams(
31        &self,
32        basin: BasinName,
33        request: ListStreamsRequest,
34    ) -> Result<Page<StreamInfo>, ListStreamsError> {
35        let ListItemsRequestParts {
36            prefix,
37            start_after,
38            limit,
39        } = request.into();
40
41        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
42        if key_range.is_empty() {
43            return Ok(Page::new_empty());
44        }
45
46        static SCAN_OPTS: ScanOptions = ScanOptions {
47            durability_filter: DurabilityLevel::Remote,
48            dirty: false,
49            read_ahead_bytes: 1,
50            cache_blocks: false,
51            max_fetch_tasks: 1,
52        };
53        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
54
55        let mut streams = Vec::with_capacity(limit.as_usize());
56        let mut has_more = false;
57        while let Some(kv) = it.next().await? {
58            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
59            assert_eq!(deser_basin.as_ref(), basin.as_ref());
60            assert!(stream.as_ref() > start_after.as_ref());
61            assert!(stream.as_ref() >= prefix.as_ref());
62            if streams.len() == limit.as_usize() {
63                has_more = true;
64                break;
65            }
66            let meta = kv::stream_meta::deser_value(kv.value)?;
67            streams.push(StreamInfo {
68                name: stream,
69                created_at: meta.created_at,
70                deleted_at: meta.deleted_at,
71            });
72        }
73        Ok(Page::new(streams, has_more))
74    }
75
76    pub async fn create_stream(
77        &self,
78        basin: BasinName,
79        stream: StreamName,
80        mut config: OptionalStreamConfig,
81        mode: CreateMode,
82    ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
83        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
84
85        let Some(basin_meta) = db_txn_get(
86            &txn,
87            kv::basin_meta::ser_key(&basin),
88            kv::basin_meta::deser_value,
89        )
90        .await?
91        else {
92            return Err(BasinNotFoundError { basin }.into());
93        };
94
95        if basin_meta.deleted_at.is_some() {
96            return Err(BasinDeletionPendingError { basin }.into());
97        }
98
99        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
100
101        let creation_idempotency_key = match &mode {
102            CreateMode::CreateOnly(Some(req_token)) => {
103                Some(creation_idempotency_key(req_token, &config))
104            }
105            _ => None,
106        };
107
108        let mut existing_created_at = None;
109        let mut prior_doe_min_age = None;
110
111        if let Some(existing_meta) =
112            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
113        {
114            if existing_meta.deleted_at.is_some() {
115                return Err(CreateStreamError::StreamDeletionPending(
116                    StreamDeletionPendingError { basin, stream },
117                ));
118            }
119            prior_doe_min_age = existing_meta
120                .config
121                .delete_on_empty
122                .min_age
123                .filter(|age| !age.is_zero());
124            match mode {
125                CreateMode::CreateOnly(_) => {
126                    return if creation_idempotency_key.is_some()
127                        && existing_meta.creation_idempotency_key == creation_idempotency_key
128                    {
129                        Ok(CreatedOrReconfigured::Created(StreamInfo {
130                            name: stream,
131                            created_at: existing_meta.created_at,
132                            deleted_at: None,
133                        }))
134                    } else {
135                        Err(StreamAlreadyExistsError { basin, stream }.into())
136                    };
137                }
138                CreateMode::CreateOrReconfigure => {
139                    existing_created_at = Some(existing_meta.created_at);
140                }
141            }
142        }
143
144        config = config.merge(basin_meta.config.default_stream_config).into();
145
146        let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc);
147        let meta = kv::stream_meta::StreamMeta {
148            config: config.clone(),
149            created_at,
150            deleted_at: None,
151            creation_idempotency_key,
152        };
153
154        txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
155        let stream_id = StreamId::new(&basin, &stream);
156        if existing_created_at.is_none() {
157            txn.put(
158                kv::stream_id_mapping::ser_key(stream_id),
159                kv::stream_id_mapping::ser_value(&basin, &stream),
160            )?;
161            let created_secs = created_at.unix_timestamp();
162            let created_secs = if created_secs <= 0 {
163                0
164            } else if created_secs >= i64::from(u32::MAX) {
165                u32::MAX
166            } else {
167                created_secs as u32
168            };
169            txn.put(
170                kv::stream_tail_position::ser_key(stream_id),
171                kv::stream_tail_position::ser_value(
172                    StreamPosition::MIN,
173                    kv::timestamp::TimestampSecs::from_secs(created_secs),
174                ),
175            )?;
176        }
177        if let Some(min_age) = meta
178            .config
179            .delete_on_empty
180            .min_age
181            .filter(|age| !age.is_zero())
182            && prior_doe_min_age != Some(min_age)
183        {
184            txn.put(
185                kv::stream_doe_deadline::ser_key(
186                    kv::timestamp::TimestampSecs::after(min_age),
187                    stream_id,
188                ),
189                kv::stream_doe_deadline::ser_value(min_age),
190            )?;
191        }
192
193        static WRITE_OPTS: WriteOptions = WriteOptions {
194            await_durable: true,
195        };
196        txn.commit_with_options(&WRITE_OPTS).await?;
197
198        if existing_created_at.is_some()
199            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
200        {
201            client.advise_reconfig(config);
202        }
203
204        let info = StreamInfo {
205            name: stream,
206            created_at,
207            deleted_at: None,
208        };
209
210        Ok(if existing_created_at.is_some() {
211            CreatedOrReconfigured::Reconfigured(info)
212        } else {
213            CreatedOrReconfigured::Created(info)
214        })
215    }
216
217    pub(super) async fn stream_id_mapping(
218        &self,
219        stream_id: StreamId,
220    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
221        self.db_get(
222            kv::stream_id_mapping::ser_key(stream_id),
223            kv::stream_id_mapping::deser_value,
224        )
225        .await
226    }
227
228    pub async fn get_stream_config(
229        &self,
230        basin: BasinName,
231        stream: StreamName,
232    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
233        let meta = self
234            .db_get(
235                kv::stream_meta::ser_key(&basin, &stream),
236                kv::stream_meta::deser_value,
237            )
238            .await?
239            .ok_or_else(|| StreamNotFoundError { basin, stream })?;
240        Ok(meta.config)
241    }
242
243    pub async fn reconfigure_stream(
244        &self,
245        basin: BasinName,
246        stream: StreamName,
247        reconfig: StreamReconfiguration,
248    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
249        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
250
251        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
252
253        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
254            .await?
255            .ok_or_else(|| StreamNotFoundError {
256                basin: basin.clone(),
257                stream: stream.clone(),
258            })?;
259
260        if meta.deleted_at.is_some() {
261            return Err(StreamDeletionPendingError { basin, stream }.into());
262        }
263
264        let prior_doe_min_age = meta
265            .config
266            .delete_on_empty
267            .min_age
268            .filter(|age| !age.is_zero());
269
270        meta.config = meta.config.reconfigure(reconfig);
271
272        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
273
274        let stream_id = StreamId::new(&basin, &stream);
275        if let Some(min_age) = meta
276            .config
277            .delete_on_empty
278            .min_age
279            .filter(|age| !age.is_zero())
280            && prior_doe_min_age != Some(min_age)
281        {
282            txn.put(
283                kv::stream_doe_deadline::ser_key(
284                    kv::timestamp::TimestampSecs::after(min_age),
285                    stream_id,
286                ),
287                kv::stream_doe_deadline::ser_value(min_age),
288            )?;
289        }
290
291        static WRITE_OPTS: WriteOptions = WriteOptions {
292            await_durable: true,
293        };
294        txn.commit_with_options(&WRITE_OPTS).await?;
295
296        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
297            client.advise_reconfig(meta.config.clone());
298        }
299
300        Ok(meta.config)
301    }
302
303    #[instrument(ret, err, skip(self))]
304    pub async fn delete_stream(
305        &self,
306        basin: BasinName,
307        stream: StreamName,
308    ) -> Result<(), DeleteStreamError> {
309        match self.streamer_client(&basin, &stream).await {
310            Ok(client) => {
311                client.terminal_trim().await?;
312            }
313            Err(StreamerError::Storage(e)) => {
314                return Err(DeleteStreamError::Storage(e));
315            }
316            Err(StreamerError::StreamNotFound(e)) => {
317                return Err(DeleteStreamError::StreamNotFound(e));
318            }
319            Err(StreamerError::StreamDeletionPending(e)) => {
320                assert_eq!(e.basin, basin);
321                assert_eq!(e.stream, stream);
322            }
323        }
324
325        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
326        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
327        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
328            .await?
329            .ok_or_else(|| StreamNotFoundError {
330                basin,
331                stream: stream.clone(),
332            })?;
333        if meta.deleted_at.is_none() {
334            meta.deleted_at = Some(OffsetDateTime::now_utc());
335            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
336            static WRITE_OPTS: WriteOptions = WriteOptions {
337                await_durable: true,
338            };
339            txn.commit_with_options(&WRITE_OPTS).await?;
340        }
341
342        Ok(())
343    }
344}
345
346fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
347    Bash::new(&[
348        req_token.as_bytes(),
349        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
350            .as_ref()
351            .map(|v| serde_json::to_vec(v).expect("serializable"))
352            .unwrap_or_default(),
353    ])
354}