Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    record::StreamPosition,
4    types::{
5        basin::BasinName,
6        config::{OptionalStreamConfig, StreamReconfiguration},
7        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
8        stream::{ListStreamsRequest, StreamInfo, StreamName},
9    },
10};
11use slatedb::{
12    IsolationLevel, IterationOrder,
13    config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19    Backend, CreatedOrReconfigured,
20    store::db_txn_get,
21    streamer::{doe_arm_delay, retention_age_or_zero},
22};
23use crate::{
24    backend::{
25        error::{
26            BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
27            GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
28            StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29            StreamerError,
30        },
31        kv,
32    },
33    stream_id::StreamId,
34};
35
36impl Backend {
37    pub async fn list_streams(
38        &self,
39        basin: BasinName,
40        request: ListStreamsRequest,
41    ) -> Result<Page<StreamInfo>, ListStreamsError> {
42        let ListItemsRequestParts {
43            prefix,
44            start_after,
45            limit,
46        } = request.into();
47
48        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49        if key_range.is_empty() {
50            return Ok(Page::new_empty());
51        }
52
53        static SCAN_OPTS: ScanOptions = ScanOptions {
54            durability_filter: DurabilityLevel::Remote,
55            dirty: false,
56            read_ahead_bytes: 1,
57            cache_blocks: false,
58            max_fetch_tasks: 1,
59            order: IterationOrder::Ascending,
60        };
61        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
62
63        let mut streams = Vec::with_capacity(limit.as_usize());
64        let mut has_more = false;
65        while let Some(kv) = it.next().await? {
66            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
67            assert_eq!(deser_basin.as_ref(), basin.as_ref());
68            assert!(stream.as_ref() > start_after.as_ref());
69            assert!(stream.as_ref() >= prefix.as_ref());
70            if streams.len() == limit.as_usize() {
71                has_more = true;
72                break;
73            }
74            let meta = kv::stream_meta::deser_value(kv.value)?;
75            streams.push(StreamInfo {
76                name: stream,
77                created_at: meta.created_at,
78                deleted_at: meta.deleted_at,
79                cipher: meta.cipher,
80            });
81        }
82        Ok(Page::new(streams, has_more))
83    }
84
85    pub async fn create_stream(
86        &self,
87        basin: BasinName,
88        stream: StreamName,
89        config: impl Into<StreamReconfiguration>,
90        mode: CreateMode,
91    ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
92        let config = config.into();
93        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
94
95        let Some(basin_meta) = db_txn_get(
96            &txn,
97            kv::basin_meta::ser_key(&basin),
98            kv::basin_meta::deser_value,
99        )
100        .await?
101        else {
102            return Err(BasinNotFoundError { basin }.into());
103        };
104
105        if basin_meta.deleted_at.is_some() {
106            return Err(BasinDeletionPendingError { basin }.into());
107        }
108
109        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
110
111        let creation_idempotency_key = match &mode {
112            CreateMode::CreateOnly(Some(req_token)) => {
113                let resolved = OptionalStreamConfig::default().reconfigure(config.clone());
114                Some(creation_idempotency_key(req_token, &resolved))
115            }
116            _ => None,
117        };
118
119        let mut existing_meta_opt = None;
120        let mut prior_doe_min_age = None;
121
122        if let Some(existing_meta) =
123            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
124        {
125            if existing_meta.deleted_at.is_some() {
126                return Err(CreateStreamError::StreamDeletionPending(
127                    StreamDeletionPendingError { basin, stream },
128                ));
129            }
130            prior_doe_min_age = existing_meta
131                .config
132                .delete_on_empty
133                .min_age
134                .filter(|age| !age.is_zero());
135            match mode {
136                CreateMode::CreateOnly(_) => {
137                    return if creation_idempotency_key.is_some()
138                        && existing_meta.creation_idempotency_key == creation_idempotency_key
139                    {
140                        Ok(CreatedOrReconfigured::Created(StreamInfo {
141                            name: stream,
142                            created_at: existing_meta.created_at,
143                            deleted_at: None,
144                            cipher: existing_meta.cipher,
145                        }))
146                    } else {
147                        Err(StreamAlreadyExistsError { basin, stream }.into())
148                    };
149                }
150                CreateMode::CreateOrReconfigure => {
151                    existing_meta_opt = Some(existing_meta);
152                }
153            }
154        }
155
156        let is_reconfigure = existing_meta_opt.is_some();
157        let (resolved, created_at, cipher) = match existing_meta_opt {
158            Some(existing) => (
159                existing.config.reconfigure(config),
160                existing.created_at,
161                existing.cipher,
162            ),
163            None => (
164                OptionalStreamConfig::default().reconfigure(config),
165                OffsetDateTime::now_utc(),
166                basin_meta.config.stream_cipher,
167            ),
168        };
169        let basin_defaults = &basin_meta.config.default_stream_config;
170        let resolved: OptionalStreamConfig = resolved.merge(basin_defaults.clone()).into();
171
172        let meta = kv::stream_meta::StreamMeta {
173            config: resolved.clone(),
174            cipher,
175            created_at,
176            deleted_at: None,
177            creation_idempotency_key,
178        };
179
180        txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
181        let stream_id = StreamId::new(&basin, &stream);
182        if !is_reconfigure {
183            txn.put(
184                kv::stream_id_mapping::ser_key(stream_id),
185                kv::stream_id_mapping::ser_value(&basin, &stream),
186            )?;
187            let created_secs = created_at.unix_timestamp();
188            let created_secs = if created_secs <= 0 {
189                0
190            } else if created_secs >= i64::from(u32::MAX) {
191                u32::MAX
192            } else {
193                created_secs as u32
194            };
195            txn.put(
196                kv::stream_tail_position::ser_key(stream_id),
197                kv::stream_tail_position::ser_value(
198                    StreamPosition::MIN,
199                    kv::timestamp::TimestampSecs::from_secs(created_secs),
200                ),
201            )?;
202        }
203        if let Some(min_age) = meta
204            .config
205            .delete_on_empty
206            .min_age
207            .filter(|age| !age.is_zero())
208            && (!is_reconfigure || prior_doe_min_age.is_none())
209        {
210            txn.put(
211                kv::stream_doe_deadline::ser_key(
212                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
213                        retention_age_or_zero(&meta.config),
214                        min_age,
215                    )),
216                    stream_id,
217                ),
218                kv::stream_doe_deadline::ser_value(min_age),
219            )?;
220        }
221
222        static WRITE_OPTS: WriteOptions = WriteOptions {
223            await_durable: true,
224        };
225        txn.commit_with_options(&WRITE_OPTS).await?;
226
227        if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) {
228            client.advise_reconfig(resolved);
229        }
230
231        let info = StreamInfo {
232            name: stream,
233            created_at,
234            deleted_at: None,
235            cipher,
236        };
237
238        Ok(if is_reconfigure {
239            CreatedOrReconfigured::Reconfigured(info)
240        } else {
241            CreatedOrReconfigured::Created(info)
242        })
243    }
244
245    pub(super) async fn stream_id_mapping(
246        &self,
247        stream_id: StreamId,
248    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
249        self.db_get(
250            kv::stream_id_mapping::ser_key(stream_id),
251            kv::stream_id_mapping::deser_value,
252        )
253        .await
254    }
255
256    pub async fn get_stream_config(
257        &self,
258        basin: BasinName,
259        stream: StreamName,
260    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
261        let meta = self
262            .db_get(
263                kv::stream_meta::ser_key(&basin, &stream),
264                kv::stream_meta::deser_value,
265            )
266            .await?
267            .ok_or_else(|| StreamNotFoundError {
268                basin: basin.clone(),
269                stream: stream.clone(),
270            })?;
271        if meta.deleted_at.is_some() {
272            return Err(StreamDeletionPendingError { basin, stream }.into());
273        }
274        Ok(meta.config)
275    }
276
277    pub async fn reconfigure_stream(
278        &self,
279        basin: BasinName,
280        stream: StreamName,
281        reconfig: StreamReconfiguration,
282    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
283        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
284
285        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
286
287        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
288            .await?
289            .ok_or_else(|| StreamNotFoundError {
290                basin: basin.clone(),
291                stream: stream.clone(),
292            })?;
293
294        if meta.deleted_at.is_some() {
295            return Err(StreamDeletionPendingError { basin, stream }.into());
296        }
297
298        let prior_doe_min_age = meta
299            .config
300            .delete_on_empty
301            .min_age
302            .filter(|age| !age.is_zero());
303
304        meta.config = meta.config.reconfigure(reconfig);
305
306        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
307
308        let stream_id = StreamId::new(&basin, &stream);
309        if let Some(min_age) = meta
310            .config
311            .delete_on_empty
312            .min_age
313            .filter(|age| !age.is_zero())
314            && prior_doe_min_age.is_none()
315        {
316            txn.put(
317                kv::stream_doe_deadline::ser_key(
318                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
319                        retention_age_or_zero(&meta.config),
320                        min_age,
321                    )),
322                    stream_id,
323                ),
324                kv::stream_doe_deadline::ser_value(min_age),
325            )?;
326        }
327
328        static WRITE_OPTS: WriteOptions = WriteOptions {
329            await_durable: true,
330        };
331        txn.commit_with_options(&WRITE_OPTS).await?;
332
333        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
334            client.advise_reconfig(meta.config.clone());
335        }
336
337        Ok(meta.config)
338    }
339
340    #[instrument(ret, err, skip(self))]
341    pub async fn delete_stream(
342        &self,
343        basin: BasinName,
344        stream: StreamName,
345    ) -> Result<(), DeleteStreamError> {
346        match self.streamer_client_guarded(&basin, &stream).await {
347            Ok(client) => {
348                client.terminal_trim().await?;
349            }
350            Err(StreamerError::Storage(e)) => {
351                return Err(DeleteStreamError::Storage(e));
352            }
353            Err(StreamerError::StreamNotFound(e)) => {
354                return Err(DeleteStreamError::StreamNotFound(e));
355            }
356            Err(StreamerError::StreamDeletionPending(e)) => {
357                assert_eq!(e.basin, basin);
358                assert_eq!(e.stream, stream);
359            }
360        }
361
362        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
363        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
364        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
365            .await?
366            .ok_or_else(|| StreamNotFoundError {
367                basin,
368                stream: stream.clone(),
369            })?;
370        if meta.deleted_at.is_none() {
371            meta.deleted_at = Some(OffsetDateTime::now_utc());
372            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
373            static WRITE_OPTS: WriteOptions = WriteOptions {
374                await_durable: true,
375            };
376            txn.commit_with_options(&WRITE_OPTS).await?;
377        }
378
379        Ok(())
380    }
381}
382
383fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
384    Bash::length_prefixed(&[
385        req_token.as_bytes(),
386        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
387            .as_ref()
388            .map(|v| serde_json::to_vec(v).expect("serializable"))
389            .unwrap_or_default(),
390    ])
391}