Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    record::StreamPosition,
4    types::{
5        basin::BasinName,
6        config::{OptionalStreamConfig, StreamConfig, StreamReconfiguration},
7        resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
8        stream::{ListStreamsRequest, StreamInfo, StreamName},
9    },
10};
11use slatedb::{
12    IsolationLevel,
13    config::{DurabilityLevel, ScanOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19    Backend,
20    store::db_txn_get,
21    streamer::{TerminalTrimCondition, TerminalTrimOutcome, doe_arm_delay},
22};
23use crate::{
24    backend::{
25        error::{
26            BasinDeletionPendingError, BasinNotFoundError, DeleteStreamError, GetStreamConfigError,
27            ListStreamsError, ProvisionStreamError, ReconfigureStreamError, StorageError,
28            StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29            StreamerError,
30        },
31        kv,
32    },
33    stream_id::StreamId,
34};
35
36impl Backend {
37    pub async fn list_streams(
38        &self,
39        basin: BasinName,
40        request: ListStreamsRequest,
41    ) -> Result<Page<StreamInfo>, ListStreamsError> {
42        let ListItemsRequestParts {
43            prefix,
44            start_after,
45            limit,
46        } = request.into();
47
48        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49        if key_range.is_empty() {
50            return Ok(Page::new_empty());
51        }
52
53        let scan_opts = ScanOptions {
54            durability_filter: DurabilityLevel::Remote,
55            ..Default::default()
56        };
57        let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
58
59        let mut streams = Vec::with_capacity(limit.as_usize());
60        let mut has_more = false;
61        while let Some(kv) = it.next().await? {
62            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
63            assert_eq!(deser_basin.as_ref(), basin.as_ref());
64            assert!(stream.as_ref() > start_after.as_ref());
65            assert!(stream.as_ref() >= prefix.as_ref());
66            if streams.len() == limit.as_usize() {
67                has_more = true;
68                break;
69            }
70            let meta = kv::stream_meta::deser_value(kv.value)?;
71            streams.push(StreamInfo {
72                name: stream,
73                created_at: meta.created_at,
74                deleted_at: meta.deleted_at,
75                cipher: meta.cipher,
76            });
77        }
78        Ok(Page::new(streams, has_more))
79    }
80
81    pub async fn provision_stream(
82        &self,
83        basin: BasinName,
84        stream: StreamName,
85        config: OptionalStreamConfig,
86        mode: ProvisionMode,
87    ) -> Result<ProvisionResult<StreamInfo>, ProvisionStreamError> {
88        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
89
90        let Some(basin_meta) = db_txn_get(
91            &txn,
92            kv::basin_meta::ser_key(&basin),
93            kv::basin_meta::deser_value,
94        )
95        .await?
96        else {
97            return Err(BasinNotFoundError { basin }.into());
98        };
99
100        if basin_meta.deleted_at.is_some() {
101            return Err(BasinDeletionPendingError { basin }.into());
102        }
103
104        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
105
106        let existing_meta =
107            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
108        if let Some(existing_meta) = &existing_meta
109            && existing_meta.deleted_at.is_some()
110        {
111            return Err(ProvisionStreamError::StreamDeletionPending(
112                StreamDeletionPendingError,
113            ));
114        }
115
116        let basin_defaults = basin_meta.config.default_stream_config;
117        let (outcome, prior_doe_min_age) = match (existing_meta, mode) {
118            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
119                let new_creation_idempotency_key = request_token
120                    .as_ref()
121                    .map(|req_token| creation_idempotency_key(req_token, &config));
122                return if new_creation_idempotency_key.is_some()
123                    && existing.creation_idempotency_key == new_creation_idempotency_key
124                {
125                    Ok(ProvisionResult::Noop(StreamInfo {
126                        name: stream,
127                        created_at: existing.created_at,
128                        deleted_at: None,
129                        cipher: existing.cipher,
130                    }))
131                } else {
132                    Err(StreamAlreadyExistsError { basin, stream }.into())
133                };
134            }
135            (Some(existing), ProvisionMode::Ensure) => {
136                let desired_config = config.merge(basin_defaults);
137                let config_unchanged = existing.config == desired_config;
138                let meta = kv::stream_meta::StreamMeta {
139                    config: desired_config,
140                    cipher: existing.cipher,
141                    created_at: existing.created_at,
142                    deleted_at: None,
143                    creation_idempotency_key: existing.creation_idempotency_key,
144                };
145                (
146                    if config_unchanged {
147                        ProvisionResult::Noop(meta)
148                    } else {
149                        ProvisionResult::Updated(meta)
150                    },
151                    existing.config.delete_on_empty.min_age(),
152                )
153            }
154            (None, ProvisionMode::CreateOnly { request_token }) => {
155                let new_creation_idempotency_key = request_token
156                    .as_ref()
157                    .map(|req_token| creation_idempotency_key(req_token, &config));
158                (
159                    ProvisionResult::Created(kv::stream_meta::StreamMeta {
160                        config: config.merge(basin_defaults),
161                        cipher: basin_meta.config.stream_cipher,
162                        created_at: OffsetDateTime::now_utc(),
163                        deleted_at: None,
164                        creation_idempotency_key: new_creation_idempotency_key,
165                    }),
166                    None,
167                )
168            }
169            (None, ProvisionMode::Ensure) => (
170                ProvisionResult::Created(kv::stream_meta::StreamMeta {
171                    config: config.merge(basin_defaults),
172                    cipher: basin_meta.config.stream_cipher,
173                    created_at: OffsetDateTime::now_utc(),
174                    deleted_at: None,
175                    creation_idempotency_key: None,
176                }),
177                None,
178            ),
179        };
180
181        if !matches!(&outcome, ProvisionResult::Noop(_)) {
182            let meta = outcome.inner();
183
184            txn.put(&stream_meta_key, kv::stream_meta::ser_value(meta))?;
185
186            let stream_id = StreamId::new(&basin, &stream);
187
188            if matches!(&outcome, ProvisionResult::Created(_)) {
189                txn.put(
190                    kv::stream_id_mapping::ser_key(stream_id),
191                    kv::stream_id_mapping::ser_value(&basin, &stream),
192                )?;
193                txn.put(
194                    kv::stream_tail_position::ser_key(stream_id),
195                    kv::stream_tail_position::ser_value(StreamPosition::MIN),
196                )?;
197            }
198
199            if let Some(min_age) = meta.config.delete_on_empty.min_age()
200                && (matches!(&outcome, ProvisionResult::Created(_)) || prior_doe_min_age.is_none())
201            {
202                txn.put(
203                    kv::stream_doe_deadline::ser_key(
204                        kv::timestamp::TimestampSecs::after(doe_arm_delay(
205                            meta.config.retention_policy.age().unwrap_or_default(),
206                            min_age,
207                        )),
208                        stream_id,
209                    ),
210                    kv::stream_doe_deadline::ser_value(min_age),
211                )?;
212            }
213
214            txn.commit().await?;
215        }
216
217        if let ProvisionResult::Updated(meta) = &outcome
218            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
219        {
220            client.advise_reconfig(meta.config.clone());
221        }
222
223        Ok(outcome.map(|meta| StreamInfo {
224            name: stream,
225            created_at: meta.created_at,
226            deleted_at: None,
227            cipher: meta.cipher,
228        }))
229    }
230
231    pub(super) async fn stream_id_mapping(
232        &self,
233        stream_id: StreamId,
234    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
235        self.db_get(
236            kv::stream_id_mapping::ser_key(stream_id),
237            kv::stream_id_mapping::deser_value,
238        )
239        .await
240    }
241
242    pub async fn get_stream_config(
243        &self,
244        basin: BasinName,
245        stream: StreamName,
246    ) -> Result<StreamConfig, GetStreamConfigError> {
247        let meta = self
248            .db_get(
249                kv::stream_meta::ser_key(&basin, &stream),
250                kv::stream_meta::deser_value,
251            )
252            .await?
253            .ok_or_else(|| StreamNotFoundError {
254                basin: basin.clone(),
255                stream: stream.clone(),
256            })?;
257        if meta.deleted_at.is_some() {
258            return Err(StreamDeletionPendingError.into());
259        }
260        Ok(meta.config)
261    }
262
263    pub async fn reconfigure_stream(
264        &self,
265        basin: BasinName,
266        stream: StreamName,
267        reconfig: StreamReconfiguration,
268    ) -> Result<StreamConfig, ReconfigureStreamError> {
269        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
270
271        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
272        let (basin_meta, meta) = tokio::try_join!(
273            db_txn_get(
274                &txn,
275                kv::basin_meta::ser_key(&basin),
276                kv::basin_meta::deser_value,
277            ),
278            db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value),
279        )?;
280
281        let basin_meta = basin_meta.ok_or_else(|| BasinNotFoundError {
282            basin: basin.clone(),
283        })?;
284        if basin_meta.deleted_at.is_some() {
285            return Err(BasinDeletionPendingError { basin }.into());
286        }
287
288        let mut meta = meta.ok_or_else(|| StreamNotFoundError {
289            basin: basin.clone(),
290            stream: stream.clone(),
291        })?;
292
293        if meta.deleted_at.is_some() {
294            return Err(StreamDeletionPendingError.into());
295        }
296
297        let prior_doe_min_age = meta.config.delete_on_empty.min_age();
298
299        meta.config = OptionalStreamConfig::from(meta.config)
300            .reconfigure(reconfig)
301            .merge(basin_meta.config.default_stream_config);
302
303        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
304
305        let stream_id = StreamId::new(&basin, &stream);
306        if let Some(min_age) = meta.config.delete_on_empty.min_age()
307            && prior_doe_min_age.is_none()
308        {
309            txn.put(
310                kv::stream_doe_deadline::ser_key(
311                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
312                        meta.config.retention_policy.age().unwrap_or_default(),
313                        min_age,
314                    )),
315                    stream_id,
316                ),
317                kv::stream_doe_deadline::ser_value(min_age),
318            )?;
319        }
320
321        txn.commit().await?;
322
323        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
324            client.advise_reconfig(meta.config.clone());
325        }
326
327        Ok(meta.config)
328    }
329
330    #[instrument(ret, err, skip(self))]
331    pub async fn delete_stream(
332        &self,
333        basin: BasinName,
334        stream: StreamName,
335    ) -> Result<(), DeleteStreamError> {
336        self.delete_stream_with_condition(basin, stream, TerminalTrimCondition::Always)
337            .await
338    }
339
340    pub(super) async fn delete_stream_with_condition(
341        &self,
342        basin: BasinName,
343        stream: StreamName,
344        condition: TerminalTrimCondition,
345    ) -> Result<(), DeleteStreamError> {
346        let outcome = match self.streamer_client_guarded(&basin, &stream).await {
347            Ok(client) => client.terminal_trim(condition).await?,
348            Err(StreamerError::Storage(e)) => {
349                return Err(DeleteStreamError::Storage(e));
350            }
351            Err(StreamerError::StreamNotFound(e)) => {
352                return Err(DeleteStreamError::StreamNotFound(e));
353            }
354            Err(StreamerError::StreamDeletionPending(_)) => TerminalTrimOutcome::DeletionPending,
355        };
356        match outcome {
357            TerminalTrimOutcome::DeletionPending => self.mark_stream_deleted(basin, stream).await,
358            TerminalTrimOutcome::Ineligible => Ok(()),
359        }
360    }
361
362    async fn mark_stream_deleted(
363        &self,
364        basin: BasinName,
365        stream: StreamName,
366    ) -> Result<(), DeleteStreamError> {
367        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
368        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
369        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
370            .await?
371            .ok_or_else(|| StreamNotFoundError {
372                basin,
373                stream: stream.clone(),
374            })?;
375        if meta.deleted_at.is_none() {
376            meta.deleted_at = Some(OffsetDateTime::now_utc());
377            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
378            txn.commit().await?;
379        }
380        Ok(())
381    }
382}
383
384fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
385    Bash::length_prefixed(&[
386        req_token.as_bytes(),
387        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
388            .as_ref()
389            .map(|v| serde_json::to_vec(v).expect("serializable"))
390            .unwrap_or_default(),
391    ])
392}