1use s2_common::{
2 bash::Bash,
3 record::StreamPosition,
4 types::{
5 basin::BasinName,
6 config::{OptionalStreamConfig, StreamReconfiguration},
7 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
8 stream::{ListStreamsRequest, StreamInfo, StreamName},
9 },
10};
11use slatedb::{
12 IsolationLevel, IterationOrder,
13 config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19 Backend, CreatedOrReconfigured,
20 store::db_txn_get,
21 streamer::{doe_arm_delay, retention_age_or_zero},
22};
23use crate::{
24 backend::{
25 error::{
26 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
27 GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
28 StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29 StreamerError,
30 },
31 kv,
32 },
33 stream_id::StreamId,
34};
35
36impl Backend {
37 pub async fn list_streams(
38 &self,
39 basin: BasinName,
40 request: ListStreamsRequest,
41 ) -> Result<Page<StreamInfo>, ListStreamsError> {
42 let ListItemsRequestParts {
43 prefix,
44 start_after,
45 limit,
46 } = request.into();
47
48 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49 if key_range.is_empty() {
50 return Ok(Page::new_empty());
51 }
52
53 static SCAN_OPTS: ScanOptions = ScanOptions {
54 durability_filter: DurabilityLevel::Remote,
55 dirty: false,
56 read_ahead_bytes: 1,
57 cache_blocks: false,
58 max_fetch_tasks: 1,
59 order: IterationOrder::Ascending,
60 };
61 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
62
63 let mut streams = Vec::with_capacity(limit.as_usize());
64 let mut has_more = false;
65 while let Some(kv) = it.next().await? {
66 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
67 assert_eq!(deser_basin.as_ref(), basin.as_ref());
68 assert!(stream.as_ref() > start_after.as_ref());
69 assert!(stream.as_ref() >= prefix.as_ref());
70 if streams.len() == limit.as_usize() {
71 has_more = true;
72 break;
73 }
74 let meta = kv::stream_meta::deser_value(kv.value)?;
75 streams.push(StreamInfo {
76 name: stream,
77 created_at: meta.created_at,
78 deleted_at: meta.deleted_at,
79 cipher: meta.cipher,
80 });
81 }
82 Ok(Page::new(streams, has_more))
83 }
84
85 pub async fn create_stream(
86 &self,
87 basin: BasinName,
88 stream: StreamName,
89 config: impl Into<StreamReconfiguration>,
90 mode: CreateMode,
91 ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
92 let config = config.into();
93 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
94
95 let Some(basin_meta) = db_txn_get(
96 &txn,
97 kv::basin_meta::ser_key(&basin),
98 kv::basin_meta::deser_value,
99 )
100 .await?
101 else {
102 return Err(BasinNotFoundError { basin }.into());
103 };
104
105 if basin_meta.deleted_at.is_some() {
106 return Err(BasinDeletionPendingError { basin }.into());
107 }
108
109 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
110
111 let creation_idempotency_key = match &mode {
112 CreateMode::CreateOnly(Some(req_token)) => {
113 let resolved = OptionalStreamConfig::default().reconfigure(config.clone());
114 Some(creation_idempotency_key(req_token, &resolved))
115 }
116 _ => None,
117 };
118
119 let mut existing_meta_opt = None;
120 let mut prior_doe_min_age = None;
121
122 if let Some(existing_meta) =
123 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
124 {
125 if existing_meta.deleted_at.is_some() {
126 return Err(CreateStreamError::StreamDeletionPending(
127 StreamDeletionPendingError { basin, stream },
128 ));
129 }
130 prior_doe_min_age = existing_meta
131 .config
132 .delete_on_empty
133 .min_age
134 .filter(|age| !age.is_zero());
135 match mode {
136 CreateMode::CreateOnly(_) => {
137 return if creation_idempotency_key.is_some()
138 && existing_meta.creation_idempotency_key == creation_idempotency_key
139 {
140 Ok(CreatedOrReconfigured::Created(StreamInfo {
141 name: stream,
142 created_at: existing_meta.created_at,
143 deleted_at: None,
144 cipher: existing_meta.cipher,
145 }))
146 } else {
147 Err(StreamAlreadyExistsError { basin, stream }.into())
148 };
149 }
150 CreateMode::CreateOrReconfigure => {
151 existing_meta_opt = Some(existing_meta);
152 }
153 }
154 }
155
156 let is_reconfigure = existing_meta_opt.is_some();
157 let (resolved, created_at, cipher) = match existing_meta_opt {
158 Some(existing) => (
159 existing.config.reconfigure(config),
160 existing.created_at,
161 existing.cipher,
162 ),
163 None => (
164 OptionalStreamConfig::default().reconfigure(config),
165 OffsetDateTime::now_utc(),
166 basin_meta.config.stream_cipher,
167 ),
168 };
169 let basin_defaults = &basin_meta.config.default_stream_config;
170 let resolved: OptionalStreamConfig = resolved.merge(basin_defaults.clone()).into();
171
172 let meta = kv::stream_meta::StreamMeta {
173 config: resolved.clone(),
174 cipher,
175 created_at,
176 deleted_at: None,
177 creation_idempotency_key,
178 };
179
180 txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
181 let stream_id = StreamId::new(&basin, &stream);
182 if !is_reconfigure {
183 txn.put(
184 kv::stream_id_mapping::ser_key(stream_id),
185 kv::stream_id_mapping::ser_value(&basin, &stream),
186 )?;
187 let created_secs = created_at.unix_timestamp();
188 let created_secs = if created_secs <= 0 {
189 0
190 } else if created_secs >= i64::from(u32::MAX) {
191 u32::MAX
192 } else {
193 created_secs as u32
194 };
195 txn.put(
196 kv::stream_tail_position::ser_key(stream_id),
197 kv::stream_tail_position::ser_value(
198 StreamPosition::MIN,
199 kv::timestamp::TimestampSecs::from_secs(created_secs),
200 ),
201 )?;
202 }
203 if let Some(min_age) = meta
204 .config
205 .delete_on_empty
206 .min_age
207 .filter(|age| !age.is_zero())
208 && (!is_reconfigure || prior_doe_min_age.is_none())
209 {
210 txn.put(
211 kv::stream_doe_deadline::ser_key(
212 kv::timestamp::TimestampSecs::after(doe_arm_delay(
213 retention_age_or_zero(&meta.config),
214 min_age,
215 )),
216 stream_id,
217 ),
218 kv::stream_doe_deadline::ser_value(min_age),
219 )?;
220 }
221
222 static WRITE_OPTS: WriteOptions = WriteOptions {
223 await_durable: true,
224 };
225 txn.commit_with_options(&WRITE_OPTS).await?;
226
227 if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) {
228 client.advise_reconfig(resolved);
229 }
230
231 let info = StreamInfo {
232 name: stream,
233 created_at,
234 deleted_at: None,
235 cipher,
236 };
237
238 Ok(if is_reconfigure {
239 CreatedOrReconfigured::Reconfigured(info)
240 } else {
241 CreatedOrReconfigured::Created(info)
242 })
243 }
244
245 pub(super) async fn stream_id_mapping(
246 &self,
247 stream_id: StreamId,
248 ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
249 self.db_get(
250 kv::stream_id_mapping::ser_key(stream_id),
251 kv::stream_id_mapping::deser_value,
252 )
253 .await
254 }
255
256 pub async fn get_stream_config(
257 &self,
258 basin: BasinName,
259 stream: StreamName,
260 ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
261 let meta = self
262 .db_get(
263 kv::stream_meta::ser_key(&basin, &stream),
264 kv::stream_meta::deser_value,
265 )
266 .await?
267 .ok_or_else(|| StreamNotFoundError {
268 basin: basin.clone(),
269 stream: stream.clone(),
270 })?;
271 if meta.deleted_at.is_some() {
272 return Err(StreamDeletionPendingError { basin, stream }.into());
273 }
274 Ok(meta.config)
275 }
276
277 pub async fn reconfigure_stream(
278 &self,
279 basin: BasinName,
280 stream: StreamName,
281 reconfig: StreamReconfiguration,
282 ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
283 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
284
285 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
286
287 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
288 .await?
289 .ok_or_else(|| StreamNotFoundError {
290 basin: basin.clone(),
291 stream: stream.clone(),
292 })?;
293
294 if meta.deleted_at.is_some() {
295 return Err(StreamDeletionPendingError { basin, stream }.into());
296 }
297
298 let prior_doe_min_age = meta
299 .config
300 .delete_on_empty
301 .min_age
302 .filter(|age| !age.is_zero());
303
304 meta.config = meta.config.reconfigure(reconfig);
305
306 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
307
308 let stream_id = StreamId::new(&basin, &stream);
309 if let Some(min_age) = meta
310 .config
311 .delete_on_empty
312 .min_age
313 .filter(|age| !age.is_zero())
314 && prior_doe_min_age.is_none()
315 {
316 txn.put(
317 kv::stream_doe_deadline::ser_key(
318 kv::timestamp::TimestampSecs::after(doe_arm_delay(
319 retention_age_or_zero(&meta.config),
320 min_age,
321 )),
322 stream_id,
323 ),
324 kv::stream_doe_deadline::ser_value(min_age),
325 )?;
326 }
327
328 static WRITE_OPTS: WriteOptions = WriteOptions {
329 await_durable: true,
330 };
331 txn.commit_with_options(&WRITE_OPTS).await?;
332
333 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
334 client.advise_reconfig(meta.config.clone());
335 }
336
337 Ok(meta.config)
338 }
339
340 #[instrument(ret, err, skip(self))]
341 pub async fn delete_stream(
342 &self,
343 basin: BasinName,
344 stream: StreamName,
345 ) -> Result<(), DeleteStreamError> {
346 match self.streamer_client_guarded(&basin, &stream).await {
347 Ok(client) => {
348 client.terminal_trim().await?;
349 }
350 Err(StreamerError::Storage(e)) => {
351 return Err(DeleteStreamError::Storage(e));
352 }
353 Err(StreamerError::StreamNotFound(e)) => {
354 return Err(DeleteStreamError::StreamNotFound(e));
355 }
356 Err(StreamerError::StreamDeletionPending(e)) => {
357 assert_eq!(e.basin, basin);
358 assert_eq!(e.stream, stream);
359 }
360 }
361
362 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
363 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
364 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
365 .await?
366 .ok_or_else(|| StreamNotFoundError {
367 basin,
368 stream: stream.clone(),
369 })?;
370 if meta.deleted_at.is_none() {
371 meta.deleted_at = Some(OffsetDateTime::now_utc());
372 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
373 static WRITE_OPTS: WriteOptions = WriteOptions {
374 await_durable: true,
375 };
376 txn.commit_with_options(&WRITE_OPTS).await?;
377 }
378
379 Ok(())
380 }
381}
382
383fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
384 Bash::length_prefixed(&[
385 req_token.as_bytes(),
386 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
387 .as_ref()
388 .map(|v| serde_json::to_vec(v).expect("serializable"))
389 .unwrap_or_default(),
390 ])
391}