Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    record::StreamPosition,
4    types::{
5        basin::BasinName,
6        config::{OptionalStreamConfig, StreamReconfiguration},
7        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
8        stream::{ListStreamsRequest, StreamInfo, StreamName},
9    },
10};
11use slatedb::{
12    IsolationLevel,
13    config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
19use crate::backend::{
20    error::{
21        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
22        GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
23        StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
24    },
25    kv,
26    stream_id::StreamId,
27};
28
29impl Backend {
30    pub async fn list_streams(
31        &self,
32        basin: BasinName,
33        request: ListStreamsRequest,
34    ) -> Result<Page<StreamInfo>, ListStreamsError> {
35        let ListItemsRequestParts {
36            prefix,
37            start_after,
38            limit,
39        } = request.into();
40
41        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
42        if key_range.is_empty() {
43            return Ok(Page::new_empty());
44        }
45
46        static SCAN_OPTS: ScanOptions = ScanOptions {
47            durability_filter: DurabilityLevel::Remote,
48            dirty: false,
49            read_ahead_bytes: 1,
50            cache_blocks: false,
51            max_fetch_tasks: 1,
52        };
53        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
54
55        let mut streams = Vec::with_capacity(limit.as_usize());
56        let mut has_more = false;
57        while let Some(kv) = it.next().await? {
58            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
59            assert_eq!(deser_basin.as_ref(), basin.as_ref());
60            assert!(stream.as_ref() > start_after.as_ref());
61            assert!(stream.as_ref() >= prefix.as_ref());
62            if streams.len() == limit.as_usize() {
63                has_more = true;
64                break;
65            }
66            let meta = kv::stream_meta::deser_value(kv.value)?;
67            streams.push(StreamInfo {
68                name: stream,
69                created_at: meta.created_at,
70                deleted_at: meta.deleted_at,
71            });
72        }
73        Ok(Page::new(streams, has_more))
74    }
75
76    pub async fn create_stream(
77        &self,
78        basin: BasinName,
79        stream: StreamName,
80        config: impl Into<StreamReconfiguration>,
81        mode: CreateMode,
82    ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
83        let config = config.into();
84        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
85
86        let Some(basin_meta) = db_txn_get(
87            &txn,
88            kv::basin_meta::ser_key(&basin),
89            kv::basin_meta::deser_value,
90        )
91        .await?
92        else {
93            return Err(BasinNotFoundError { basin }.into());
94        };
95
96        if basin_meta.deleted_at.is_some() {
97            return Err(BasinDeletionPendingError { basin }.into());
98        }
99
100        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
101
102        let creation_idempotency_key = match &mode {
103            CreateMode::CreateOnly(Some(req_token)) => {
104                let resolved = OptionalStreamConfig::default().reconfigure(config.clone());
105                Some(creation_idempotency_key(req_token, &resolved))
106            }
107            _ => None,
108        };
109
110        let mut existing_meta_opt = None;
111        let mut prior_doe_min_age = None;
112
113        if let Some(existing_meta) =
114            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
115        {
116            if existing_meta.deleted_at.is_some() {
117                return Err(CreateStreamError::StreamDeletionPending(
118                    StreamDeletionPendingError { basin, stream },
119                ));
120            }
121            prior_doe_min_age = existing_meta
122                .config
123                .delete_on_empty
124                .min_age
125                .filter(|age| !age.is_zero());
126            match mode {
127                CreateMode::CreateOnly(_) => {
128                    return if creation_idempotency_key.is_some()
129                        && existing_meta.creation_idempotency_key == creation_idempotency_key
130                    {
131                        Ok(CreatedOrReconfigured::Created(StreamInfo {
132                            name: stream,
133                            created_at: existing_meta.created_at,
134                            deleted_at: None,
135                        }))
136                    } else {
137                        Err(StreamAlreadyExistsError { basin, stream }.into())
138                    };
139                }
140                CreateMode::CreateOrReconfigure => {
141                    existing_meta_opt = Some(existing_meta);
142                }
143            }
144        }
145
146        let is_reconfigure = existing_meta_opt.is_some();
147        let (resolved, created_at) = match existing_meta_opt {
148            Some(existing) => (existing.config.reconfigure(config), existing.created_at),
149            None => (
150                OptionalStreamConfig::default().reconfigure(config),
151                OffsetDateTime::now_utc(),
152            ),
153        };
154        let resolved: OptionalStreamConfig = resolved
155            .merge(basin_meta.config.default_stream_config)
156            .into();
157
158        let meta = kv::stream_meta::StreamMeta {
159            config: resolved.clone(),
160            created_at,
161            deleted_at: None,
162            creation_idempotency_key,
163        };
164
165        txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
166        let stream_id = StreamId::new(&basin, &stream);
167        if !is_reconfigure {
168            txn.put(
169                kv::stream_id_mapping::ser_key(stream_id),
170                kv::stream_id_mapping::ser_value(&basin, &stream),
171            )?;
172            let created_secs = created_at.unix_timestamp();
173            let created_secs = if created_secs <= 0 {
174                0
175            } else if created_secs >= i64::from(u32::MAX) {
176                u32::MAX
177            } else {
178                created_secs as u32
179            };
180            txn.put(
181                kv::stream_tail_position::ser_key(stream_id),
182                kv::stream_tail_position::ser_value(
183                    StreamPosition::MIN,
184                    kv::timestamp::TimestampSecs::from_secs(created_secs),
185                ),
186            )?;
187        }
188        if let Some(min_age) = meta
189            .config
190            .delete_on_empty
191            .min_age
192            .filter(|age| !age.is_zero())
193            && prior_doe_min_age != Some(min_age)
194        {
195            txn.put(
196                kv::stream_doe_deadline::ser_key(
197                    kv::timestamp::TimestampSecs::after(min_age),
198                    stream_id,
199                ),
200                kv::stream_doe_deadline::ser_value(min_age),
201            )?;
202        }
203
204        static WRITE_OPTS: WriteOptions = WriteOptions {
205            await_durable: true,
206        };
207        txn.commit_with_options(&WRITE_OPTS).await?;
208
209        if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) {
210            client.advise_reconfig(resolved);
211        }
212
213        let info = StreamInfo {
214            name: stream,
215            created_at,
216            deleted_at: None,
217        };
218
219        Ok(if is_reconfigure {
220            CreatedOrReconfigured::Reconfigured(info)
221        } else {
222            CreatedOrReconfigured::Created(info)
223        })
224    }
225
226    pub(super) async fn stream_id_mapping(
227        &self,
228        stream_id: StreamId,
229    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
230        self.db_get(
231            kv::stream_id_mapping::ser_key(stream_id),
232            kv::stream_id_mapping::deser_value,
233        )
234        .await
235    }
236
237    pub async fn get_stream_config(
238        &self,
239        basin: BasinName,
240        stream: StreamName,
241    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
242        let meta = self
243            .db_get(
244                kv::stream_meta::ser_key(&basin, &stream),
245                kv::stream_meta::deser_value,
246            )
247            .await?
248            .ok_or_else(|| StreamNotFoundError {
249                basin: basin.clone(),
250                stream: stream.clone(),
251            })?;
252        if meta.deleted_at.is_some() {
253            return Err(StreamDeletionPendingError { basin, stream }.into());
254        }
255        Ok(meta.config)
256    }
257
258    pub async fn reconfigure_stream(
259        &self,
260        basin: BasinName,
261        stream: StreamName,
262        reconfig: StreamReconfiguration,
263    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
264        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
265
266        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
267
268        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
269            .await?
270            .ok_or_else(|| StreamNotFoundError {
271                basin: basin.clone(),
272                stream: stream.clone(),
273            })?;
274
275        if meta.deleted_at.is_some() {
276            return Err(StreamDeletionPendingError { basin, stream }.into());
277        }
278
279        let prior_doe_min_age = meta
280            .config
281            .delete_on_empty
282            .min_age
283            .filter(|age| !age.is_zero());
284
285        meta.config = meta.config.reconfigure(reconfig);
286
287        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
288
289        let stream_id = StreamId::new(&basin, &stream);
290        if let Some(min_age) = meta
291            .config
292            .delete_on_empty
293            .min_age
294            .filter(|age| !age.is_zero())
295            && prior_doe_min_age != Some(min_age)
296        {
297            txn.put(
298                kv::stream_doe_deadline::ser_key(
299                    kv::timestamp::TimestampSecs::after(min_age),
300                    stream_id,
301                ),
302                kv::stream_doe_deadline::ser_value(min_age),
303            )?;
304        }
305
306        static WRITE_OPTS: WriteOptions = WriteOptions {
307            await_durable: true,
308        };
309        txn.commit_with_options(&WRITE_OPTS).await?;
310
311        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
312            client.advise_reconfig(meta.config.clone());
313        }
314
315        Ok(meta.config)
316    }
317
318    #[instrument(ret, err, skip(self))]
319    pub async fn delete_stream(
320        &self,
321        basin: BasinName,
322        stream: StreamName,
323    ) -> Result<(), DeleteStreamError> {
324        match self.streamer_client(&basin, &stream).await {
325            Ok(client) => {
326                client.terminal_trim().await?;
327            }
328            Err(StreamerError::Storage(e)) => {
329                return Err(DeleteStreamError::Storage(e));
330            }
331            Err(StreamerError::StreamNotFound(e)) => {
332                return Err(DeleteStreamError::StreamNotFound(e));
333            }
334            Err(StreamerError::StreamDeletionPending(e)) => {
335                assert_eq!(e.basin, basin);
336                assert_eq!(e.stream, stream);
337            }
338        }
339
340        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
341        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
342        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
343            .await?
344            .ok_or_else(|| StreamNotFoundError {
345                basin,
346                stream: stream.clone(),
347            })?;
348        if meta.deleted_at.is_none() {
349            meta.deleted_at = Some(OffsetDateTime::now_utc());
350            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
351            static WRITE_OPTS: WriteOptions = WriteOptions {
352                await_durable: true,
353            };
354            txn.commit_with_options(&WRITE_OPTS).await?;
355        }
356
357        Ok(())
358    }
359}
360
361fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
362    Bash::length_prefixed(&[
363        req_token.as_bytes(),
364        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
365            .as_ref()
366            .map(|v| serde_json::to_vec(v).expect("serializable"))
367            .unwrap_or_default(),
368    ])
369}