Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    record::StreamPosition,
4    types::{
5        basin::BasinName,
6        config::{OptionalStreamConfig, StreamReconfiguration},
7        resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
8        stream::{ListStreamsRequest, StreamInfo, StreamName},
9    },
10};
11use slatedb::{
12    IsolationLevel, IterationOrder,
13    config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19    Backend,
20    store::db_txn_get,
21    streamer::{doe_arm_delay, retention_age_or_zero},
22};
23use crate::{
24    backend::{
25        error::{
26            BasinDeletionPendingError, BasinNotFoundError, DeleteStreamError, GetStreamConfigError,
27            ListStreamsError, ProvisionStreamError, ReconfigureStreamError, StorageError,
28            StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29            StreamerError,
30        },
31        kv,
32    },
33    stream_id::StreamId,
34};
35
36impl Backend {
37    pub async fn list_streams(
38        &self,
39        basin: BasinName,
40        request: ListStreamsRequest,
41    ) -> Result<Page<StreamInfo>, ListStreamsError> {
42        let ListItemsRequestParts {
43            prefix,
44            start_after,
45            limit,
46        } = request.into();
47
48        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49        if key_range.is_empty() {
50            return Ok(Page::new_empty());
51        }
52
53        static SCAN_OPTS: ScanOptions = ScanOptions {
54            durability_filter: DurabilityLevel::Remote,
55            dirty: false,
56            read_ahead_bytes: 1,
57            cache_blocks: false,
58            max_fetch_tasks: 1,
59            order: IterationOrder::Ascending,
60        };
61        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
62
63        let mut streams = Vec::with_capacity(limit.as_usize());
64        let mut has_more = false;
65        while let Some(kv) = it.next().await? {
66            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
67            assert_eq!(deser_basin.as_ref(), basin.as_ref());
68            assert!(stream.as_ref() > start_after.as_ref());
69            assert!(stream.as_ref() >= prefix.as_ref());
70            if streams.len() == limit.as_usize() {
71                has_more = true;
72                break;
73            }
74            let meta = kv::stream_meta::deser_value(kv.value)?;
75            streams.push(StreamInfo {
76                name: stream,
77                created_at: meta.created_at,
78                deleted_at: meta.deleted_at,
79                cipher: meta.cipher,
80            });
81        }
82        Ok(Page::new(streams, has_more))
83    }
84
85    pub async fn provision_stream(
86        &self,
87        basin: BasinName,
88        stream: StreamName,
89        config: OptionalStreamConfig,
90        mode: ProvisionMode,
91    ) -> Result<ProvisionResult<StreamInfo>, ProvisionStreamError> {
92        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
93
94        let Some(basin_meta) = db_txn_get(
95            &txn,
96            kv::basin_meta::ser_key(&basin),
97            kv::basin_meta::deser_value,
98        )
99        .await?
100        else {
101            return Err(BasinNotFoundError { basin }.into());
102        };
103
104        if basin_meta.deleted_at.is_some() {
105            return Err(BasinDeletionPendingError { basin }.into());
106        }
107
108        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
109
110        let existing_meta =
111            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
112        if let Some(existing_meta) = &existing_meta
113            && existing_meta.deleted_at.is_some()
114        {
115            return Err(ProvisionStreamError::StreamDeletionPending(
116                StreamDeletionPendingError { basin, stream },
117            ));
118        }
119
120        let basin_defaults = basin_meta.config.default_stream_config;
121        let (outcome, prior_doe_min_age) = match (existing_meta, mode) {
122            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
123                let new_creation_idempotency_key = request_token
124                    .as_ref()
125                    .map(|req_token| creation_idempotency_key(req_token, &config));
126                return if new_creation_idempotency_key.is_some()
127                    && existing.creation_idempotency_key == new_creation_idempotency_key
128                {
129                    Ok(ProvisionResult::Noop(StreamInfo {
130                        name: stream,
131                        created_at: existing.created_at,
132                        deleted_at: None,
133                        cipher: existing.cipher,
134                    }))
135                } else {
136                    Err(StreamAlreadyExistsError { basin, stream }.into())
137                };
138            }
139            (Some(existing), ProvisionMode::Ensure) => {
140                let prior_doe_min_age = existing
141                    .config
142                    .delete_on_empty
143                    .min_age
144                    .filter(|age| !age.is_zero());
145                let desired_config = config.merge(basin_defaults.clone());
146                let current_config = existing.config.clone().merge(basin_defaults);
147                let meta = kv::stream_meta::StreamMeta {
148                    config: desired_config.clone().into(),
149                    cipher: existing.cipher,
150                    created_at: existing.created_at,
151                    deleted_at: None,
152                    creation_idempotency_key: existing.creation_idempotency_key,
153                };
154                (
155                    if current_config == desired_config {
156                        ProvisionResult::Noop(meta)
157                    } else {
158                        ProvisionResult::Updated(meta)
159                    },
160                    prior_doe_min_age,
161                )
162            }
163            (None, ProvisionMode::CreateOnly { request_token }) => {
164                let new_creation_idempotency_key = request_token
165                    .as_ref()
166                    .map(|req_token| creation_idempotency_key(req_token, &config));
167                (
168                    ProvisionResult::Created(kv::stream_meta::StreamMeta {
169                        config: config.merge(basin_defaults).into(),
170                        cipher: basin_meta.config.stream_cipher,
171                        created_at: OffsetDateTime::now_utc(),
172                        deleted_at: None,
173                        creation_idempotency_key: new_creation_idempotency_key,
174                    }),
175                    None,
176                )
177            }
178            (None, ProvisionMode::Ensure) => (
179                ProvisionResult::Created(kv::stream_meta::StreamMeta {
180                    config: config.merge(basin_defaults).into(),
181                    cipher: basin_meta.config.stream_cipher,
182                    created_at: OffsetDateTime::now_utc(),
183                    deleted_at: None,
184                    creation_idempotency_key: None,
185                }),
186                None,
187            ),
188        };
189
190        if !matches!(&outcome, ProvisionResult::Noop(_)) {
191            let meta = outcome.inner();
192
193            txn.put(&stream_meta_key, kv::stream_meta::ser_value(meta))?;
194            let stream_id = StreamId::new(&basin, &stream);
195            if matches!(&outcome, ProvisionResult::Created(_)) {
196                txn.put(
197                    kv::stream_id_mapping::ser_key(stream_id),
198                    kv::stream_id_mapping::ser_value(&basin, &stream),
199                )?;
200                let created_secs = meta.created_at.unix_timestamp();
201                let created_secs = if created_secs <= 0 {
202                    0
203                } else if created_secs >= i64::from(u32::MAX) {
204                    u32::MAX
205                } else {
206                    created_secs as u32
207                };
208                txn.put(
209                    kv::stream_tail_position::ser_key(stream_id),
210                    kv::stream_tail_position::ser_value(
211                        StreamPosition::MIN,
212                        kv::timestamp::TimestampSecs::from_secs(created_secs),
213                    ),
214                )?;
215            }
216            if let Some(min_age) = meta
217                .config
218                .delete_on_empty
219                .min_age
220                .filter(|age| !age.is_zero())
221                && (matches!(&outcome, ProvisionResult::Created(_)) || prior_doe_min_age.is_none())
222            {
223                txn.put(
224                    kv::stream_doe_deadline::ser_key(
225                        kv::timestamp::TimestampSecs::after(doe_arm_delay(
226                            retention_age_or_zero(&meta.config),
227                            min_age,
228                        )),
229                        stream_id,
230                    ),
231                    kv::stream_doe_deadline::ser_value(min_age),
232                )?;
233            }
234
235            static WRITE_OPTS: WriteOptions = WriteOptions {
236                await_durable: true,
237            };
238            txn.commit_with_options(&WRITE_OPTS).await?;
239        }
240
241        if let ProvisionResult::Updated(meta) = &outcome
242            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
243        {
244            client.advise_reconfig(meta.config.clone());
245        }
246
247        Ok(outcome.map(|meta| StreamInfo {
248            name: stream,
249            created_at: meta.created_at,
250            deleted_at: None,
251            cipher: meta.cipher,
252        }))
253    }
254
255    pub(super) async fn stream_id_mapping(
256        &self,
257        stream_id: StreamId,
258    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
259        self.db_get(
260            kv::stream_id_mapping::ser_key(stream_id),
261            kv::stream_id_mapping::deser_value,
262        )
263        .await
264    }
265
266    pub async fn get_stream_config(
267        &self,
268        basin: BasinName,
269        stream: StreamName,
270    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
271        let meta = self
272            .db_get(
273                kv::stream_meta::ser_key(&basin, &stream),
274                kv::stream_meta::deser_value,
275            )
276            .await?
277            .ok_or_else(|| StreamNotFoundError {
278                basin: basin.clone(),
279                stream: stream.clone(),
280            })?;
281        if meta.deleted_at.is_some() {
282            return Err(StreamDeletionPendingError { basin, stream }.into());
283        }
284        Ok(meta.config)
285    }
286
287    pub async fn reconfigure_stream(
288        &self,
289        basin: BasinName,
290        stream: StreamName,
291        reconfig: StreamReconfiguration,
292    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
293        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
294
295        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
296
297        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
298            .await?
299            .ok_or_else(|| StreamNotFoundError {
300                basin: basin.clone(),
301                stream: stream.clone(),
302            })?;
303
304        if meta.deleted_at.is_some() {
305            return Err(StreamDeletionPendingError { basin, stream }.into());
306        }
307
308        let prior_doe_min_age = meta
309            .config
310            .delete_on_empty
311            .min_age
312            .filter(|age| !age.is_zero());
313
314        meta.config = meta.config.reconfigure(reconfig);
315
316        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
317
318        let stream_id = StreamId::new(&basin, &stream);
319        if let Some(min_age) = meta
320            .config
321            .delete_on_empty
322            .min_age
323            .filter(|age| !age.is_zero())
324            && prior_doe_min_age.is_none()
325        {
326            txn.put(
327                kv::stream_doe_deadline::ser_key(
328                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
329                        retention_age_or_zero(&meta.config),
330                        min_age,
331                    )),
332                    stream_id,
333                ),
334                kv::stream_doe_deadline::ser_value(min_age),
335            )?;
336        }
337
338        static WRITE_OPTS: WriteOptions = WriteOptions {
339            await_durable: true,
340        };
341        txn.commit_with_options(&WRITE_OPTS).await?;
342
343        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
344            client.advise_reconfig(meta.config.clone());
345        }
346
347        Ok(meta.config)
348    }
349
350    #[instrument(ret, err, skip(self))]
351    pub async fn delete_stream(
352        &self,
353        basin: BasinName,
354        stream: StreamName,
355    ) -> Result<(), DeleteStreamError> {
356        match self.streamer_client_guarded(&basin, &stream).await {
357            Ok(client) => {
358                client.terminal_trim().await?;
359            }
360            Err(StreamerError::Storage(e)) => {
361                return Err(DeleteStreamError::Storage(e));
362            }
363            Err(StreamerError::StreamNotFound(e)) => {
364                return Err(DeleteStreamError::StreamNotFound(e));
365            }
366            Err(StreamerError::StreamDeletionPending(e)) => {
367                assert_eq!(e.basin, basin);
368                assert_eq!(e.stream, stream);
369            }
370        }
371
372        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
373        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
374        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
375            .await?
376            .ok_or_else(|| StreamNotFoundError {
377                basin,
378                stream: stream.clone(),
379            })?;
380        if meta.deleted_at.is_none() {
381            meta.deleted_at = Some(OffsetDateTime::now_utc());
382            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
383            static WRITE_OPTS: WriteOptions = WriteOptions {
384                await_durable: true,
385            };
386            txn.commit_with_options(&WRITE_OPTS).await?;
387        }
388
389        Ok(())
390    }
391}
392
393fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
394    Bash::length_prefixed(&[
395        req_token.as_bytes(),
396        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
397            .as_ref()
398            .map(|v| serde_json::to_vec(v).expect("serializable"))
399            .unwrap_or_default(),
400    ])
401}