s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::BasinName,
5        config::{OptionalStreamConfig, StreamReconfiguration},
6        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7        stream::{ListStreamsRequest, StreamInfo, StreamName},
8    },
9};
10use slatedb::{
11    IsolationLevel,
12    config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
17use crate::backend::{
18    error::{
19        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
20        GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StreamAlreadyExistsError,
21        StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22    },
23    kv,
24};
25
26impl Backend {
27    pub async fn list_streams(
28        &self,
29        basin: BasinName,
30        request: ListStreamsRequest,
31    ) -> Result<Page<StreamInfo>, ListStreamsError> {
32        let ListItemsRequestParts {
33            prefix,
34            start_after,
35            limit,
36        } = request.into();
37
38        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
39        if key_range.is_empty() {
40            return Ok(Page::new_empty());
41        }
42
43        const SCAN_OPTS: ScanOptions = ScanOptions {
44            durability_filter: DurabilityLevel::Remote,
45            dirty: false,
46            read_ahead_bytes: 1,
47            cache_blocks: false,
48            max_fetch_tasks: 1,
49        };
50        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
51
52        let mut streams = Vec::with_capacity(limit.as_usize());
53        let mut has_more = false;
54        while let Some(kv) = it.next().await? {
55            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
56            assert_eq!(deser_basin.as_ref(), basin.as_ref());
57            assert!(stream.as_ref() > start_after.as_ref());
58            assert!(stream.as_ref() >= prefix.as_ref());
59            if streams.len() == limit.as_usize() {
60                has_more = true;
61                break;
62            }
63            let meta = kv::stream_meta::deser_value(kv.value)?;
64            streams.push(StreamInfo {
65                name: stream,
66                created_at: meta.created_at,
67                deleted_at: meta.deleted_at,
68            });
69        }
70        Ok(Page::new(streams, has_more))
71    }
72
73    pub async fn create_stream(
74        &self,
75        basin: BasinName,
76        stream: StreamName,
77        mut config: OptionalStreamConfig,
78        mode: CreateMode,
79    ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
80        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
81
82        let Some(basin_meta) = db_txn_get(
83            &txn,
84            kv::basin_meta::ser_key(&basin),
85            kv::basin_meta::deser_value,
86        )
87        .await?
88        else {
89            return Err(BasinNotFoundError { basin }.into());
90        };
91
92        if basin_meta.deleted_at.is_some() {
93            return Err(BasinDeletionPendingError { basin }.into());
94        }
95
96        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
97
98        let creation_idempotency_key = match &mode {
99            CreateMode::CreateOnly(Some(req_token)) => {
100                Some(creation_idempotency_key(req_token, &config))
101            }
102            _ => None,
103        };
104
105        let mut existing_created_at = None;
106
107        if let Some(existing_meta) =
108            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
109        {
110            if existing_meta.deleted_at.is_some() {
111                return Err(CreateStreamError::StreamDeletionPending(
112                    StreamDeletionPendingError { basin, stream },
113                ));
114            }
115            match mode {
116                CreateMode::CreateOnly(_) => {
117                    return if creation_idempotency_key.is_some()
118                        && existing_meta.creation_idempotency_key == creation_idempotency_key
119                    {
120                        Ok(CreatedOrReconfigured::Created(StreamInfo {
121                            name: stream,
122                            created_at: existing_meta.created_at,
123                            deleted_at: None,
124                        }))
125                    } else {
126                        Err(StreamAlreadyExistsError { basin, stream }.into())
127                    };
128                }
129                CreateMode::CreateOrReconfigure => {
130                    existing_created_at = Some(existing_meta.created_at);
131                }
132            }
133        }
134
135        config = config.merge(basin_meta.config.default_stream_config).into();
136
137        let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc);
138        let meta = kv::stream_meta::StreamMeta {
139            config: config.clone(),
140            created_at,
141            deleted_at: None,
142            creation_idempotency_key,
143        };
144
145        txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
146
147        static WRITE_OPTS: WriteOptions = WriteOptions {
148            await_durable: true,
149        };
150        txn.commit_with_options(&WRITE_OPTS).await?;
151
152        if existing_created_at.is_some()
153            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
154        {
155            client.advise_reconfig(config);
156        }
157
158        let info = StreamInfo {
159            name: stream,
160            created_at,
161            deleted_at: None,
162        };
163
164        Ok(if existing_created_at.is_some() {
165            CreatedOrReconfigured::Reconfigured(info)
166        } else {
167            CreatedOrReconfigured::Created(info)
168        })
169    }
170
171    pub async fn get_stream_config(
172        &self,
173        basin: BasinName,
174        stream: StreamName,
175    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
176        let meta = self
177            .db_get(
178                kv::stream_meta::ser_key(&basin, &stream),
179                kv::stream_meta::deser_value,
180            )
181            .await?
182            .ok_or_else(|| StreamNotFoundError { basin, stream })?;
183        Ok(meta.config)
184    }
185
186    pub async fn reconfigure_stream(
187        &self,
188        basin: BasinName,
189        stream: StreamName,
190        reconfig: StreamReconfiguration,
191    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
192        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
193
194        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
195
196        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
197            .await?
198            .ok_or_else(|| StreamNotFoundError {
199                basin: basin.clone(),
200                stream: stream.clone(),
201            })?;
202
203        if meta.deleted_at.is_some() {
204            return Err(StreamDeletionPendingError { basin, stream }.into());
205        }
206
207        meta.config = meta.config.reconfigure(reconfig);
208
209        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
210
211        static WRITE_OPTS: WriteOptions = WriteOptions {
212            await_durable: true,
213        };
214        txn.commit_with_options(&WRITE_OPTS).await?;
215
216        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
217            client.advise_reconfig(meta.config.clone());
218        }
219
220        Ok(meta.config)
221    }
222
223    pub async fn delete_stream(
224        &self,
225        basin: BasinName,
226        stream: StreamName,
227    ) -> Result<(), DeleteStreamError> {
228        match self.streamer_client(&basin, &stream).await {
229            Ok(client) => {
230                client.terminal_trim().await?;
231            }
232            Err(StreamerError::Storage(e)) => {
233                return Err(DeleteStreamError::Storage(e));
234            }
235            Err(StreamerError::StreamNotFound(e)) => {
236                return Err(DeleteStreamError::StreamNotFound(e));
237            }
238            Err(StreamerError::StreamDeletionPending(e)) => {
239                assert_eq!(e.basin, basin);
240                assert_eq!(e.stream, stream);
241            }
242        }
243
244        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
245        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
246        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
247            .await?
248            .ok_or_else(|| StreamNotFoundError {
249                basin,
250                stream: stream.clone(),
251            })?;
252        if meta.deleted_at.is_none() {
253            meta.deleted_at = Some(OffsetDateTime::now_utc());
254            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
255            static WRITE_OPTS: WriteOptions = WriteOptions {
256                await_durable: true,
257            };
258            txn.commit_with_options(&WRITE_OPTS).await?;
259        }
260
261        Ok(())
262    }
263}
264
265fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
266    Bash::new(&[
267        req_token.as_bytes(),
268        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
269            .as_ref()
270            .map(|v| serde_json::to_vec(v).expect("serializable"))
271            .unwrap_or_default(),
272    ])
273}