Skip to main content

s2_lite/backend/
streams.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::BasinName,
5        config::{OptionalStreamConfig, StreamReconfiguration},
6        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7        stream::{ListStreamsRequest, StreamInfo, StreamName},
8    },
9};
10use slatedb::{
11    IsolationLevel,
12    config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15use tracing::instrument;
16
17use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
18use crate::backend::{
19    error::{
20        BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
21        GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StreamAlreadyExistsError,
22        StreamDeletionPendingError, StreamNotFoundError, StreamerError,
23    },
24    kv,
25    stream_id::StreamId,
26};
27
28impl Backend {
29    pub async fn list_streams(
30        &self,
31        basin: BasinName,
32        request: ListStreamsRequest,
33    ) -> Result<Page<StreamInfo>, ListStreamsError> {
34        let ListItemsRequestParts {
35            prefix,
36            start_after,
37            limit,
38        } = request.into();
39
40        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
41        if key_range.is_empty() {
42            return Ok(Page::new_empty());
43        }
44
45        static SCAN_OPTS: ScanOptions = ScanOptions {
46            durability_filter: DurabilityLevel::Remote,
47            dirty: false,
48            read_ahead_bytes: 1,
49            cache_blocks: false,
50            max_fetch_tasks: 1,
51        };
52        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
53
54        let mut streams = Vec::with_capacity(limit.as_usize());
55        let mut has_more = false;
56        while let Some(kv) = it.next().await? {
57            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
58            assert_eq!(deser_basin.as_ref(), basin.as_ref());
59            assert!(stream.as_ref() > start_after.as_ref());
60            assert!(stream.as_ref() >= prefix.as_ref());
61            if streams.len() == limit.as_usize() {
62                has_more = true;
63                break;
64            }
65            let meta = kv::stream_meta::deser_value(kv.value)?;
66            streams.push(StreamInfo {
67                name: stream,
68                created_at: meta.created_at,
69                deleted_at: meta.deleted_at,
70            });
71        }
72        Ok(Page::new(streams, has_more))
73    }
74
75    pub async fn create_stream(
76        &self,
77        basin: BasinName,
78        stream: StreamName,
79        mut config: OptionalStreamConfig,
80        mode: CreateMode,
81    ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
82        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
83
84        let Some(basin_meta) = db_txn_get(
85            &txn,
86            kv::basin_meta::ser_key(&basin),
87            kv::basin_meta::deser_value,
88        )
89        .await?
90        else {
91            return Err(BasinNotFoundError { basin }.into());
92        };
93
94        if basin_meta.deleted_at.is_some() {
95            return Err(BasinDeletionPendingError { basin }.into());
96        }
97
98        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
99
100        let creation_idempotency_key = match &mode {
101            CreateMode::CreateOnly(Some(req_token)) => {
102                Some(creation_idempotency_key(req_token, &config))
103            }
104            _ => None,
105        };
106
107        let mut existing_created_at = None;
108
109        if let Some(existing_meta) =
110            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
111        {
112            if existing_meta.deleted_at.is_some() {
113                return Err(CreateStreamError::StreamDeletionPending(
114                    StreamDeletionPendingError { basin, stream },
115                ));
116            }
117            match mode {
118                CreateMode::CreateOnly(_) => {
119                    return if creation_idempotency_key.is_some()
120                        && existing_meta.creation_idempotency_key == creation_idempotency_key
121                    {
122                        Ok(CreatedOrReconfigured::Created(StreamInfo {
123                            name: stream,
124                            created_at: existing_meta.created_at,
125                            deleted_at: None,
126                        }))
127                    } else {
128                        Err(StreamAlreadyExistsError { basin, stream }.into())
129                    };
130                }
131                CreateMode::CreateOrReconfigure => {
132                    existing_created_at = Some(existing_meta.created_at);
133                }
134            }
135        }
136
137        config = config.merge(basin_meta.config.default_stream_config).into();
138
139        let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc);
140        let meta = kv::stream_meta::StreamMeta {
141            config: config.clone(),
142            created_at,
143            deleted_at: None,
144            creation_idempotency_key,
145        };
146
147        txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
148        if existing_created_at.is_none() {
149            txn.put(
150                kv::stream_id_mapping::ser_key(StreamId::new(&basin, &stream)),
151                kv::stream_id_mapping::ser_value(&basin, &stream),
152            )?;
153        }
154
155        static WRITE_OPTS: WriteOptions = WriteOptions {
156            await_durable: true,
157        };
158        txn.commit_with_options(&WRITE_OPTS).await?;
159
160        if existing_created_at.is_some()
161            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
162        {
163            client.advise_reconfig(config);
164        }
165
166        let info = StreamInfo {
167            name: stream,
168            created_at,
169            deleted_at: None,
170        };
171
172        Ok(if existing_created_at.is_some() {
173            CreatedOrReconfigured::Reconfigured(info)
174        } else {
175            CreatedOrReconfigured::Created(info)
176        })
177    }
178
179    pub async fn get_stream_config(
180        &self,
181        basin: BasinName,
182        stream: StreamName,
183    ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
184        let meta = self
185            .db_get(
186                kv::stream_meta::ser_key(&basin, &stream),
187                kv::stream_meta::deser_value,
188            )
189            .await?
190            .ok_or_else(|| StreamNotFoundError { basin, stream })?;
191        Ok(meta.config)
192    }
193
194    pub async fn reconfigure_stream(
195        &self,
196        basin: BasinName,
197        stream: StreamName,
198        reconfig: StreamReconfiguration,
199    ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
200        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
201
202        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
203
204        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
205            .await?
206            .ok_or_else(|| StreamNotFoundError {
207                basin: basin.clone(),
208                stream: stream.clone(),
209            })?;
210
211        if meta.deleted_at.is_some() {
212            return Err(StreamDeletionPendingError { basin, stream }.into());
213        }
214
215        meta.config = meta.config.reconfigure(reconfig);
216
217        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
218
219        static WRITE_OPTS: WriteOptions = WriteOptions {
220            await_durable: true,
221        };
222        txn.commit_with_options(&WRITE_OPTS).await?;
223
224        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
225            client.advise_reconfig(meta.config.clone());
226        }
227
228        Ok(meta.config)
229    }
230
231    #[instrument(ret, err, skip(self))]
232    pub async fn delete_stream(
233        &self,
234        basin: BasinName,
235        stream: StreamName,
236    ) -> Result<(), DeleteStreamError> {
237        match self.streamer_client(&basin, &stream).await {
238            Ok(client) => {
239                client.terminal_trim().await?;
240            }
241            Err(StreamerError::Storage(e)) => {
242                return Err(DeleteStreamError::Storage(e));
243            }
244            Err(StreamerError::StreamNotFound(e)) => {
245                return Err(DeleteStreamError::StreamNotFound(e));
246            }
247            Err(StreamerError::StreamDeletionPending(e)) => {
248                assert_eq!(e.basin, basin);
249                assert_eq!(e.stream, stream);
250            }
251        }
252
253        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
254        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
255        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
256            .await?
257            .ok_or_else(|| StreamNotFoundError {
258                basin,
259                stream: stream.clone(),
260            })?;
261        if meta.deleted_at.is_none() {
262            meta.deleted_at = Some(OffsetDateTime::now_utc());
263            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
264            static WRITE_OPTS: WriteOptions = WriteOptions {
265                await_durable: true,
266            };
267            txn.commit_with_options(&WRITE_OPTS).await?;
268        }
269
270        Ok(())
271    }
272}
273
274fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
275    Bash::new(&[
276        req_token.as_bytes(),
277        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
278            .as_ref()
279            .map(|v| serde_json::to_vec(v).expect("serializable"))
280            .unwrap_or_default(),
281    ])
282}