Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    basin::BasinName,
3    config::{OptionalStreamConfig, StreamConfig, StreamReconfiguration},
4    record::StreamPosition,
5    resources::{Page, ProvisionMode, ProvisionResult, RequestToken},
6    stream::{ListStreamsRequest, StreamInfo, StreamName},
7};
8use s2_storage::bash::Bash;
9use slatedb::{
10    IsolationLevel,
11    config::{DurabilityLevel, ScanOptions},
12};
13use time::OffsetDateTime;
14use tracing::instrument;
15
16use super::{
17    Backend,
18    store::db_txn_get,
19    streamer::{TerminalTrimCondition, TerminalTrimOutcome, doe_arm_delay},
20};
21use crate::{
22    backend::{
23        error::{
24            BasinDeletionPendingError, BasinNotFoundError, DeleteStreamError, GetStreamConfigError,
25            ListStreamsError, ProvisionStreamError, ReconfigureStreamError, StorageError,
26            StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
27            StreamerError,
28        },
29        kv,
30    },
31    stream_id::StreamId,
32};
33
34impl Backend {
35    pub async fn list_streams(
36        &self,
37        basin: BasinName,
38        request: ListStreamsRequest,
39    ) -> Result<Page<StreamInfo>, ListStreamsError> {
40        let ListStreamsRequest {
41            prefix,
42            start_after,
43            limit,
44        } = request;
45
46        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
47        if key_range.is_empty() {
48            return Ok(Page::new_empty());
49        }
50
51        let scan_opts = ScanOptions {
52            durability_filter: DurabilityLevel::Remote,
53            ..Default::default()
54        };
55        let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
56
57        let mut streams = Vec::with_capacity(limit.as_usize());
58        let mut has_more = false;
59        while let Some(kv) = it.next().await? {
60            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
61            assert_eq!(deser_basin.as_ref(), basin.as_ref());
62            assert!(stream.as_ref() > start_after.as_ref());
63            assert!(stream.as_ref() >= prefix.as_ref());
64            if streams.len() == limit.as_usize() {
65                has_more = true;
66                break;
67            }
68            let meta = kv::stream_meta::deser_value(kv.value)?;
69            streams.push(StreamInfo {
70                name: stream,
71                created_at: meta.created_at,
72                deleted_at: meta.deleted_at,
73                cipher: meta.cipher,
74            });
75        }
76        Ok(Page::new(streams, has_more))
77    }
78
79    pub async fn provision_stream(
80        &self,
81        basin: BasinName,
82        stream: StreamName,
83        config: OptionalStreamConfig,
84        mode: ProvisionMode,
85    ) -> Result<ProvisionResult<StreamInfo>, ProvisionStreamError> {
86        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
87
88        let Some(basin_meta) = db_txn_get(
89            &txn,
90            kv::basin_meta::ser_key(&basin),
91            kv::basin_meta::deser_value,
92        )
93        .await?
94        else {
95            return Err(BasinNotFoundError { basin }.into());
96        };
97
98        if basin_meta.deleted_at.is_some() {
99            return Err(BasinDeletionPendingError { basin }.into());
100        }
101
102        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
103
104        let existing_meta =
105            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
106        if let Some(existing_meta) = &existing_meta
107            && existing_meta.deleted_at.is_some()
108        {
109            return Err(ProvisionStreamError::StreamDeletionPending(
110                StreamDeletionPendingError,
111            ));
112        }
113
114        let basin_defaults = basin_meta.config.default_stream_config;
115        let (outcome, prior_doe_min_age) = match (existing_meta, mode) {
116            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
117                let new_creation_idempotency_key = request_token
118                    .as_ref()
119                    .map(|req_token| creation_idempotency_key(req_token, &config));
120                return if new_creation_idempotency_key.is_some()
121                    && existing.creation_idempotency_key == new_creation_idempotency_key
122                {
123                    Ok(ProvisionResult::Noop(StreamInfo {
124                        name: stream,
125                        created_at: existing.created_at,
126                        deleted_at: None,
127                        cipher: existing.cipher,
128                    }))
129                } else {
130                    Err(StreamAlreadyExistsError { basin, stream }.into())
131                };
132            }
133            (Some(existing), ProvisionMode::Ensure) => {
134                let desired_config = config.merge(basin_defaults);
135                let config_unchanged = existing.config == desired_config;
136                let meta = kv::stream_meta::StreamMeta {
137                    config: desired_config,
138                    cipher: existing.cipher,
139                    created_at: existing.created_at,
140                    deleted_at: None,
141                    creation_idempotency_key: existing.creation_idempotency_key,
142                };
143                (
144                    if config_unchanged {
145                        ProvisionResult::Noop(meta)
146                    } else {
147                        ProvisionResult::Updated(meta)
148                    },
149                    existing.config.delete_on_empty.min_age(),
150                )
151            }
152            (None, ProvisionMode::CreateOnly { request_token }) => {
153                let new_creation_idempotency_key = request_token
154                    .as_ref()
155                    .map(|req_token| creation_idempotency_key(req_token, &config));
156                (
157                    ProvisionResult::Created(kv::stream_meta::StreamMeta {
158                        config: config.merge(basin_defaults),
159                        cipher: basin_meta.config.stream_cipher,
160                        created_at: OffsetDateTime::now_utc(),
161                        deleted_at: None,
162                        creation_idempotency_key: new_creation_idempotency_key,
163                    }),
164                    None,
165                )
166            }
167            (None, ProvisionMode::Ensure) => (
168                ProvisionResult::Created(kv::stream_meta::StreamMeta {
169                    config: config.merge(basin_defaults),
170                    cipher: basin_meta.config.stream_cipher,
171                    created_at: OffsetDateTime::now_utc(),
172                    deleted_at: None,
173                    creation_idempotency_key: None,
174                }),
175                None,
176            ),
177        };
178
179        if !matches!(&outcome, ProvisionResult::Noop(_)) {
180            let meta = outcome.inner();
181
182            txn.put(&stream_meta_key, kv::stream_meta::ser_value(meta))?;
183
184            let stream_id = StreamId::new(&basin, &stream);
185
186            if matches!(&outcome, ProvisionResult::Created(_)) {
187                txn.put(
188                    kv::stream_id_mapping::ser_key(stream_id),
189                    kv::stream_id_mapping::ser_value(&basin, &stream),
190                )?;
191                txn.put(
192                    kv::stream_tail_position::ser_key(stream_id),
193                    kv::stream_tail_position::ser_value(StreamPosition::MIN),
194                )?;
195            }
196
197            if let Some(min_age) = meta.config.delete_on_empty.min_age()
198                && (matches!(&outcome, ProvisionResult::Created(_)) || prior_doe_min_age.is_none())
199            {
200                txn.put(
201                    kv::stream_doe_deadline::ser_key(
202                        kv::timestamp::TimestampSecs::after(doe_arm_delay(
203                            meta.config.retention_policy.age().unwrap_or_default(),
204                            min_age,
205                        )),
206                        stream_id,
207                    ),
208                    kv::stream_doe_deadline::ser_value(min_age),
209                )?;
210            }
211
212            txn.commit().await?;
213        }
214
215        if let ProvisionResult::Updated(meta) = &outcome
216            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
217        {
218            client.advise_reconfig(meta.config.clone());
219        }
220
221        Ok(outcome.map(|meta| StreamInfo {
222            name: stream,
223            created_at: meta.created_at,
224            deleted_at: None,
225            cipher: meta.cipher,
226        }))
227    }
228
229    pub(super) async fn stream_id_mapping(
230        &self,
231        stream_id: StreamId,
232    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
233        self.db_get(
234            kv::stream_id_mapping::ser_key(stream_id),
235            kv::stream_id_mapping::deser_value,
236        )
237        .await
238    }
239
240    pub async fn get_stream_config(
241        &self,
242        basin: BasinName,
243        stream: StreamName,
244    ) -> Result<StreamConfig, GetStreamConfigError> {
245        let meta = self
246            .db_get(
247                kv::stream_meta::ser_key(&basin, &stream),
248                kv::stream_meta::deser_value,
249            )
250            .await?
251            .ok_or_else(|| StreamNotFoundError {
252                basin: basin.clone(),
253                stream: stream.clone(),
254            })?;
255        if meta.deleted_at.is_some() {
256            return Err(StreamDeletionPendingError.into());
257        }
258        Ok(meta.config)
259    }
260
261    pub async fn reconfigure_stream(
262        &self,
263        basin: BasinName,
264        stream: StreamName,
265        reconfig: StreamReconfiguration,
266    ) -> Result<StreamConfig, ReconfigureStreamError> {
267        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
268
269        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
270        let (basin_meta, meta) = tokio::try_join!(
271            db_txn_get(
272                &txn,
273                kv::basin_meta::ser_key(&basin),
274                kv::basin_meta::deser_value,
275            ),
276            db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value),
277        )?;
278
279        let basin_meta = basin_meta.ok_or_else(|| BasinNotFoundError {
280            basin: basin.clone(),
281        })?;
282        if basin_meta.deleted_at.is_some() {
283            return Err(BasinDeletionPendingError { basin }.into());
284        }
285
286        let mut meta = meta.ok_or_else(|| StreamNotFoundError {
287            basin: basin.clone(),
288            stream: stream.clone(),
289        })?;
290
291        if meta.deleted_at.is_some() {
292            return Err(StreamDeletionPendingError.into());
293        }
294
295        let prior_doe_min_age = meta.config.delete_on_empty.min_age();
296
297        meta.config = OptionalStreamConfig::from(meta.config)
298            .reconfigure(reconfig)
299            .merge(basin_meta.config.default_stream_config);
300
301        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
302
303        let stream_id = StreamId::new(&basin, &stream);
304        if let Some(min_age) = meta.config.delete_on_empty.min_age()
305            && prior_doe_min_age.is_none()
306        {
307            txn.put(
308                kv::stream_doe_deadline::ser_key(
309                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
310                        meta.config.retention_policy.age().unwrap_or_default(),
311                        min_age,
312                    )),
313                    stream_id,
314                ),
315                kv::stream_doe_deadline::ser_value(min_age),
316            )?;
317        }
318
319        txn.commit().await?;
320
321        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
322            client.advise_reconfig(meta.config.clone());
323        }
324
325        Ok(meta.config)
326    }
327
328    #[instrument(ret, err, skip(self))]
329    pub async fn delete_stream(
330        &self,
331        basin: BasinName,
332        stream: StreamName,
333    ) -> Result<(), DeleteStreamError> {
334        self.delete_stream_with_condition(basin, stream, TerminalTrimCondition::Always)
335            .await
336    }
337
338    pub(super) async fn delete_stream_with_condition(
339        &self,
340        basin: BasinName,
341        stream: StreamName,
342        condition: TerminalTrimCondition,
343    ) -> Result<(), DeleteStreamError> {
344        let outcome = match self.streamer_client_guarded(&basin, &stream).await {
345            Ok(client) => client.terminal_trim(condition).await?,
346            Err(StreamerError::Storage(e)) => {
347                return Err(DeleteStreamError::Storage(e));
348            }
349            Err(StreamerError::StreamNotFound(e)) => {
350                return Err(DeleteStreamError::StreamNotFound(e));
351            }
352            Err(StreamerError::StreamDeletionPending(_)) => TerminalTrimOutcome::DeletionPending,
353        };
354        match outcome {
355            TerminalTrimOutcome::DeletionPending => self.mark_stream_deleted(basin, stream).await,
356            TerminalTrimOutcome::Ineligible => Ok(()),
357        }
358    }
359
360    async fn mark_stream_deleted(
361        &self,
362        basin: BasinName,
363        stream: StreamName,
364    ) -> Result<(), DeleteStreamError> {
365        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
366        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
367        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
368            .await?
369            .ok_or_else(|| StreamNotFoundError {
370                basin,
371                stream: stream.clone(),
372            })?;
373        if meta.deleted_at.is_none() {
374            meta.deleted_at = Some(OffsetDateTime::now_utc());
375            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
376            txn.commit().await?;
377        }
378        Ok(())
379    }
380}
381
382fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
383    Bash::length_prefixed(&[
384        req_token.as_bytes(),
385        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
386            .as_ref()
387            .map(|v| serde_json::to_vec(v).expect("serializable"))
388            .unwrap_or_default(),
389    ])
390}