Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    record::StreamPosition,
4    types::{
5        basin::BasinName,
6        config::{OptionalStreamConfig, StreamReconfiguration},
7        resources::{ListItemsRequestParts, Page, RequestToken},
8        stream::{CreateStreamIntent, ListStreamsRequest, StreamInfo, StreamName},
9    },
10};
11use slatedb::{
12    IsolationLevel, IterationOrder,
13    config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19    Backend, CreatedOrReconfigured,
20    store::db_txn_get,
21    streamer::{doe_arm_delay, retention_age_or_zero},
22};
23use crate::{
24    backend::{
25        error::{
26            BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
27            GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
28            StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29            StreamerError,
30        },
31        kv,
32    },
33    stream_id::StreamId,
34};
35
36impl Backend {
37    pub async fn list_streams(
38        &self,
39        basin: BasinName,
40        request: ListStreamsRequest,
41    ) -> Result<Page<StreamInfo>, ListStreamsError> {
42        let ListItemsRequestParts {
43            prefix,
44            start_after,
45            limit,
46        } = request.into();
47
48        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49        if key_range.is_empty() {
50            return Ok(Page::new_empty());
51        }
52
53        static SCAN_OPTS: ScanOptions = ScanOptions {
54            durability_filter: DurabilityLevel::Remote,
55            dirty: false,
56            read_ahead_bytes: 1,
57            cache_blocks: false,
58            max_fetch_tasks: 1,
59            order: IterationOrder::Ascending,
60        };
61        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
62
63        let mut streams = Vec::with_capacity(limit.as_usize());
64        let mut has_more = false;
65        while let Some(kv) = it.next().await? {
66            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
67            assert_eq!(deser_basin.as_ref(), basin.as_ref());
68            assert!(stream.as_ref() > start_after.as_ref());
69            assert!(stream.as_ref() >= prefix.as_ref());
70            if streams.len() == limit.as_usize() {
71                has_more = true;
72                break;
73            }
74            let meta = kv::stream_meta::deser_value(kv.value)?;
75            streams.push(StreamInfo {
76                name: stream,
77                created_at: meta.created_at,
78                deleted_at: meta.deleted_at,
79                cipher: meta.cipher,
80            });
81        }
82        Ok(Page::new(streams, has_more))
83    }
84
85    pub async fn create_stream(
86        &self,
87        basin: BasinName,
88        stream: StreamName,
89        intent: CreateStreamIntent,
90    ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
91        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
92
93        let Some(basin_meta) = db_txn_get(
94            &txn,
95            kv::basin_meta::ser_key(&basin),
96            kv::basin_meta::deser_value,
97        )
98        .await?
99        else {
100            return Err(BasinNotFoundError { basin }.into());
101        };
102
103        if basin_meta.deleted_at.is_some() {
104            return Err(BasinDeletionPendingError { basin }.into());
105        }
106
107        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
108
109        let existing_meta =
110            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
111        if let Some(existing_meta) = &existing_meta
112            && existing_meta.deleted_at.is_some()
113        {
114            return Err(CreateStreamError::StreamDeletionPending(
115                StreamDeletionPendingError { basin, stream },
116            ));
117        }
118
119        let (mut meta, is_reconfigure, prior_doe_min_age) = match (existing_meta, intent) {
120            (
121                Some(existing),
122                CreateStreamIntent::CreateOnly {
123                    config,
124                    request_token,
125                },
126            ) => {
127                let new_creation_idempotency_key = request_token
128                    .as_ref()
129                    .map(|req_token| creation_idempotency_key(req_token, &config));
130                return if new_creation_idempotency_key.is_some()
131                    && existing.creation_idempotency_key == new_creation_idempotency_key
132                {
133                    Ok(CreatedOrReconfigured::Created(StreamInfo {
134                        name: stream,
135                        created_at: existing.created_at,
136                        deleted_at: None,
137                        cipher: existing.cipher,
138                    }))
139                } else {
140                    Err(StreamAlreadyExistsError { basin, stream }.into())
141                };
142            }
143            (Some(existing), CreateStreamIntent::CreateOrReconfigure { reconfiguration }) => {
144                let prior_doe_min_age = existing
145                    .config
146                    .delete_on_empty
147                    .min_age
148                    .filter(|age| !age.is_zero());
149                (
150                    kv::stream_meta::StreamMeta {
151                        config: existing.config.reconfigure(reconfiguration),
152                        cipher: existing.cipher,
153                        created_at: existing.created_at,
154                        deleted_at: None,
155                        creation_idempotency_key: existing.creation_idempotency_key,
156                    },
157                    true,
158                    prior_doe_min_age,
159                )
160            }
161            (
162                None,
163                CreateStreamIntent::CreateOnly {
164                    config,
165                    request_token,
166                },
167            ) => {
168                let new_creation_idempotency_key = request_token
169                    .as_ref()
170                    .map(|req_token| creation_idempotency_key(req_token, &config));
171                (
172                    kv::stream_meta::StreamMeta {
173                        config,
174                        cipher: basin_meta.config.stream_cipher,
175                        created_at: OffsetDateTime::now_utc(),
176                        deleted_at: None,
177                        creation_idempotency_key: new_creation_idempotency_key,
178                    },
179                    false,
180                    None,
181                )
182            }
183            (None, CreateStreamIntent::CreateOrReconfigure { reconfiguration }) => (
184                kv::stream_meta::StreamMeta {
185                    config: OptionalStreamConfig::default().reconfigure(reconfiguration),
186                    cipher: basin_meta.config.stream_cipher,
187                    created_at: OffsetDateTime::now_utc(),
188                    deleted_at: None,
189                    creation_idempotency_key: None,
190                },
191                false,
192                None,
193            ),
194        };
195        let basin_defaults = basin_meta.config.default_stream_config;
196        meta.config = meta.config.merge(basin_defaults).into();
197
198        txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
199        let stream_id = StreamId::new(&basin, &stream);
200        if !is_reconfigure {
201            txn.put(
202                kv::stream_id_mapping::ser_key(stream_id),
203                kv::stream_id_mapping::ser_value(&basin, &stream),
204            )?;
205            let created_secs = meta.created_at.unix_timestamp();
206            let created_secs = if created_secs <= 0 {
207                0
208            } else if created_secs >= i64::from(u32::MAX) {
209                u32::MAX
210            } else {
211                created_secs as u32
212            };
213            txn.put(
214                kv::stream_tail_position::ser_key(stream_id),
215                kv::stream_tail_position::ser_value(
216                    StreamPosition::MIN,
217                    kv::timestamp::TimestampSecs::from_secs(created_secs),
218                ),
219            )?;
220        }
221        if let Some(min_age) = meta
222            .config
223            .delete_on_empty
224            .min_age
225            .filter(|age| !age.is_zero())
226            && (!is_reconfigure || prior_doe_min_age.is_none())
227        {
228            txn.put(
229                kv::stream_doe_deadline::ser_key(
230                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
231                        retention_age_or_zero(&meta.config),
232                        min_age,
233                    )),
234                    stream_id,
235                ),
236                kv::stream_doe_deadline::ser_value(min_age),
237            )?;
238        }
239
240        static WRITE_OPTS: WriteOptions = WriteOptions {
241            await_durable: true,
242        };
243        txn.commit_with_options(&WRITE_OPTS).await?;
244
245        if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) {
246            client.advise_reconfig(meta.config.clone());
247        }
248
249        let info = StreamInfo {
250            name: stream,
251            created_at: meta.created_at,
252            deleted_at: None,
253            cipher: meta.cipher,
254        };
255
256        Ok(if is_reconfigure {
257            CreatedOrReconfigured::Reconfigured(info)
258        } else {
259            CreatedOrReconfigured::Created(info)
260        })
261    }
262
263    pub(super) async fn stream_id_mapping(
264        &self,
265        stream_id: StreamId,
266    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
267        self.db_get(
268            kv::stream_id_mapping::ser_key(stream_id),
269            kv::stream_id_mapping::deser_value,
270        )
271        .await
272    }
273
274    pub async fn get_stream_config(
275        &self,
276        basin: BasinName,
277        stream: StreamName,
278    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
279        let meta = self
280            .db_get(
281                kv::stream_meta::ser_key(&basin, &stream),
282                kv::stream_meta::deser_value,
283            )
284            .await?
285            .ok_or_else(|| StreamNotFoundError {
286                basin: basin.clone(),
287                stream: stream.clone(),
288            })?;
289        if meta.deleted_at.is_some() {
290            return Err(StreamDeletionPendingError { basin, stream }.into());
291        }
292        Ok(meta.config)
293    }
294
295    pub async fn reconfigure_stream(
296        &self,
297        basin: BasinName,
298        stream: StreamName,
299        reconfig: StreamReconfiguration,
300    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
301        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
302
303        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
304
305        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
306            .await?
307            .ok_or_else(|| StreamNotFoundError {
308                basin: basin.clone(),
309                stream: stream.clone(),
310            })?;
311
312        if meta.deleted_at.is_some() {
313            return Err(StreamDeletionPendingError { basin, stream }.into());
314        }
315
316        let prior_doe_min_age = meta
317            .config
318            .delete_on_empty
319            .min_age
320            .filter(|age| !age.is_zero());
321
322        meta.config = meta.config.reconfigure(reconfig);
323
324        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
325
326        let stream_id = StreamId::new(&basin, &stream);
327        if let Some(min_age) = meta
328            .config
329            .delete_on_empty
330            .min_age
331            .filter(|age| !age.is_zero())
332            && prior_doe_min_age.is_none()
333        {
334            txn.put(
335                kv::stream_doe_deadline::ser_key(
336                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
337                        retention_age_or_zero(&meta.config),
338                        min_age,
339                    )),
340                    stream_id,
341                ),
342                kv::stream_doe_deadline::ser_value(min_age),
343            )?;
344        }
345
346        static WRITE_OPTS: WriteOptions = WriteOptions {
347            await_durable: true,
348        };
349        txn.commit_with_options(&WRITE_OPTS).await?;
350
351        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
352            client.advise_reconfig(meta.config.clone());
353        }
354
355        Ok(meta.config)
356    }
357
358    #[instrument(ret, err, skip(self))]
359    pub async fn delete_stream(
360        &self,
361        basin: BasinName,
362        stream: StreamName,
363    ) -> Result<(), DeleteStreamError> {
364        match self.streamer_client_guarded(&basin, &stream).await {
365            Ok(client) => {
366                client.terminal_trim().await?;
367            }
368            Err(StreamerError::Storage(e)) => {
369                return Err(DeleteStreamError::Storage(e));
370            }
371            Err(StreamerError::StreamNotFound(e)) => {
372                return Err(DeleteStreamError::StreamNotFound(e));
373            }
374            Err(StreamerError::StreamDeletionPending(e)) => {
375                assert_eq!(e.basin, basin);
376                assert_eq!(e.stream, stream);
377            }
378        }
379
380        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
381        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
382        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
383            .await?
384            .ok_or_else(|| StreamNotFoundError {
385                basin,
386                stream: stream.clone(),
387            })?;
388        if meta.deleted_at.is_none() {
389            meta.deleted_at = Some(OffsetDateTime::now_utc());
390            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
391            static WRITE_OPTS: WriteOptions = WriteOptions {
392                await_durable: true,
393            };
394            txn.commit_with_options(&WRITE_OPTS).await?;
395        }
396
397        Ok(())
398    }
399}
400
401fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
402    Bash::length_prefixed(&[
403        req_token.as_bytes(),
404        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
405            .as_ref()
406            .map(|v| serde_json::to_vec(v).expect("serializable"))
407            .unwrap_or_default(),
408    ])
409}