1use s2_common::{
2 bash::Bash,
3 record::StreamPosition,
4 types::{
5 basin::BasinName,
6 config::{OptionalStreamConfig, StreamReconfiguration},
7 resources::{ListItemsRequestParts, Page, RequestToken},
8 stream::{CreateStreamIntent, ListStreamsRequest, StreamInfo, StreamName},
9 },
10};
11use slatedb::{
12 IsolationLevel, IterationOrder,
13 config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19 Backend, CreatedOrReconfigured,
20 store::db_txn_get,
21 streamer::{doe_arm_delay, retention_age_or_zero},
22};
23use crate::{
24 backend::{
25 error::{
26 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
27 GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
28 StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29 StreamerError,
30 },
31 kv,
32 },
33 stream_id::StreamId,
34};
35
36impl Backend {
37 pub async fn list_streams(
38 &self,
39 basin: BasinName,
40 request: ListStreamsRequest,
41 ) -> Result<Page<StreamInfo>, ListStreamsError> {
42 let ListItemsRequestParts {
43 prefix,
44 start_after,
45 limit,
46 } = request.into();
47
48 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49 if key_range.is_empty() {
50 return Ok(Page::new_empty());
51 }
52
53 static SCAN_OPTS: ScanOptions = ScanOptions {
54 durability_filter: DurabilityLevel::Remote,
55 dirty: false,
56 read_ahead_bytes: 1,
57 cache_blocks: false,
58 max_fetch_tasks: 1,
59 order: IterationOrder::Ascending,
60 };
61 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
62
63 let mut streams = Vec::with_capacity(limit.as_usize());
64 let mut has_more = false;
65 while let Some(kv) = it.next().await? {
66 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
67 assert_eq!(deser_basin.as_ref(), basin.as_ref());
68 assert!(stream.as_ref() > start_after.as_ref());
69 assert!(stream.as_ref() >= prefix.as_ref());
70 if streams.len() == limit.as_usize() {
71 has_more = true;
72 break;
73 }
74 let meta = kv::stream_meta::deser_value(kv.value)?;
75 streams.push(StreamInfo {
76 name: stream,
77 created_at: meta.created_at,
78 deleted_at: meta.deleted_at,
79 cipher: meta.cipher,
80 });
81 }
82 Ok(Page::new(streams, has_more))
83 }
84
85 pub async fn create_stream(
86 &self,
87 basin: BasinName,
88 stream: StreamName,
89 intent: CreateStreamIntent,
90 ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
91 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
92
93 let Some(basin_meta) = db_txn_get(
94 &txn,
95 kv::basin_meta::ser_key(&basin),
96 kv::basin_meta::deser_value,
97 )
98 .await?
99 else {
100 return Err(BasinNotFoundError { basin }.into());
101 };
102
103 if basin_meta.deleted_at.is_some() {
104 return Err(BasinDeletionPendingError { basin }.into());
105 }
106
107 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
108
109 let existing_meta =
110 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
111 if let Some(existing_meta) = &existing_meta
112 && existing_meta.deleted_at.is_some()
113 {
114 return Err(CreateStreamError::StreamDeletionPending(
115 StreamDeletionPendingError { basin, stream },
116 ));
117 }
118
119 let (mut meta, is_reconfigure, prior_doe_min_age) = match (existing_meta, intent) {
120 (
121 Some(existing),
122 CreateStreamIntent::CreateOnly {
123 config,
124 request_token,
125 },
126 ) => {
127 let new_creation_idempotency_key = request_token
128 .as_ref()
129 .map(|req_token| creation_idempotency_key(req_token, &config));
130 return if new_creation_idempotency_key.is_some()
131 && existing.creation_idempotency_key == new_creation_idempotency_key
132 {
133 Ok(CreatedOrReconfigured::Created(StreamInfo {
134 name: stream,
135 created_at: existing.created_at,
136 deleted_at: None,
137 cipher: existing.cipher,
138 }))
139 } else {
140 Err(StreamAlreadyExistsError { basin, stream }.into())
141 };
142 }
143 (Some(existing), CreateStreamIntent::CreateOrReconfigure { reconfiguration }) => {
144 let prior_doe_min_age = existing
145 .config
146 .delete_on_empty
147 .min_age
148 .filter(|age| !age.is_zero());
149 (
150 kv::stream_meta::StreamMeta {
151 config: existing.config.reconfigure(reconfiguration),
152 cipher: existing.cipher,
153 created_at: existing.created_at,
154 deleted_at: None,
155 creation_idempotency_key: existing.creation_idempotency_key,
156 },
157 true,
158 prior_doe_min_age,
159 )
160 }
161 (
162 None,
163 CreateStreamIntent::CreateOnly {
164 config,
165 request_token,
166 },
167 ) => {
168 let new_creation_idempotency_key = request_token
169 .as_ref()
170 .map(|req_token| creation_idempotency_key(req_token, &config));
171 (
172 kv::stream_meta::StreamMeta {
173 config,
174 cipher: basin_meta.config.stream_cipher,
175 created_at: OffsetDateTime::now_utc(),
176 deleted_at: None,
177 creation_idempotency_key: new_creation_idempotency_key,
178 },
179 false,
180 None,
181 )
182 }
183 (None, CreateStreamIntent::CreateOrReconfigure { reconfiguration }) => (
184 kv::stream_meta::StreamMeta {
185 config: OptionalStreamConfig::default().reconfigure(reconfiguration),
186 cipher: basin_meta.config.stream_cipher,
187 created_at: OffsetDateTime::now_utc(),
188 deleted_at: None,
189 creation_idempotency_key: None,
190 },
191 false,
192 None,
193 ),
194 };
195 let basin_defaults = basin_meta.config.default_stream_config;
196 meta.config = meta.config.merge(basin_defaults).into();
197
198 txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
199 let stream_id = StreamId::new(&basin, &stream);
200 if !is_reconfigure {
201 txn.put(
202 kv::stream_id_mapping::ser_key(stream_id),
203 kv::stream_id_mapping::ser_value(&basin, &stream),
204 )?;
205 let created_secs = meta.created_at.unix_timestamp();
206 let created_secs = if created_secs <= 0 {
207 0
208 } else if created_secs >= i64::from(u32::MAX) {
209 u32::MAX
210 } else {
211 created_secs as u32
212 };
213 txn.put(
214 kv::stream_tail_position::ser_key(stream_id),
215 kv::stream_tail_position::ser_value(
216 StreamPosition::MIN,
217 kv::timestamp::TimestampSecs::from_secs(created_secs),
218 ),
219 )?;
220 }
221 if let Some(min_age) = meta
222 .config
223 .delete_on_empty
224 .min_age
225 .filter(|age| !age.is_zero())
226 && (!is_reconfigure || prior_doe_min_age.is_none())
227 {
228 txn.put(
229 kv::stream_doe_deadline::ser_key(
230 kv::timestamp::TimestampSecs::after(doe_arm_delay(
231 retention_age_or_zero(&meta.config),
232 min_age,
233 )),
234 stream_id,
235 ),
236 kv::stream_doe_deadline::ser_value(min_age),
237 )?;
238 }
239
240 static WRITE_OPTS: WriteOptions = WriteOptions {
241 await_durable: true,
242 };
243 txn.commit_with_options(&WRITE_OPTS).await?;
244
245 if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) {
246 client.advise_reconfig(meta.config.clone());
247 }
248
249 let info = StreamInfo {
250 name: stream,
251 created_at: meta.created_at,
252 deleted_at: None,
253 cipher: meta.cipher,
254 };
255
256 Ok(if is_reconfigure {
257 CreatedOrReconfigured::Reconfigured(info)
258 } else {
259 CreatedOrReconfigured::Created(info)
260 })
261 }
262
263 pub(super) async fn stream_id_mapping(
264 &self,
265 stream_id: StreamId,
266 ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
267 self.db_get(
268 kv::stream_id_mapping::ser_key(stream_id),
269 kv::stream_id_mapping::deser_value,
270 )
271 .await
272 }
273
274 pub async fn get_stream_config(
275 &self,
276 basin: BasinName,
277 stream: StreamName,
278 ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
279 let meta = self
280 .db_get(
281 kv::stream_meta::ser_key(&basin, &stream),
282 kv::stream_meta::deser_value,
283 )
284 .await?
285 .ok_or_else(|| StreamNotFoundError {
286 basin: basin.clone(),
287 stream: stream.clone(),
288 })?;
289 if meta.deleted_at.is_some() {
290 return Err(StreamDeletionPendingError { basin, stream }.into());
291 }
292 Ok(meta.config)
293 }
294
295 pub async fn reconfigure_stream(
296 &self,
297 basin: BasinName,
298 stream: StreamName,
299 reconfig: StreamReconfiguration,
300 ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
301 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
302
303 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
304
305 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
306 .await?
307 .ok_or_else(|| StreamNotFoundError {
308 basin: basin.clone(),
309 stream: stream.clone(),
310 })?;
311
312 if meta.deleted_at.is_some() {
313 return Err(StreamDeletionPendingError { basin, stream }.into());
314 }
315
316 let prior_doe_min_age = meta
317 .config
318 .delete_on_empty
319 .min_age
320 .filter(|age| !age.is_zero());
321
322 meta.config = meta.config.reconfigure(reconfig);
323
324 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
325
326 let stream_id = StreamId::new(&basin, &stream);
327 if let Some(min_age) = meta
328 .config
329 .delete_on_empty
330 .min_age
331 .filter(|age| !age.is_zero())
332 && prior_doe_min_age.is_none()
333 {
334 txn.put(
335 kv::stream_doe_deadline::ser_key(
336 kv::timestamp::TimestampSecs::after(doe_arm_delay(
337 retention_age_or_zero(&meta.config),
338 min_age,
339 )),
340 stream_id,
341 ),
342 kv::stream_doe_deadline::ser_value(min_age),
343 )?;
344 }
345
346 static WRITE_OPTS: WriteOptions = WriteOptions {
347 await_durable: true,
348 };
349 txn.commit_with_options(&WRITE_OPTS).await?;
350
351 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
352 client.advise_reconfig(meta.config.clone());
353 }
354
355 Ok(meta.config)
356 }
357
358 #[instrument(ret, err, skip(self))]
359 pub async fn delete_stream(
360 &self,
361 basin: BasinName,
362 stream: StreamName,
363 ) -> Result<(), DeleteStreamError> {
364 match self.streamer_client_guarded(&basin, &stream).await {
365 Ok(client) => {
366 client.terminal_trim().await?;
367 }
368 Err(StreamerError::Storage(e)) => {
369 return Err(DeleteStreamError::Storage(e));
370 }
371 Err(StreamerError::StreamNotFound(e)) => {
372 return Err(DeleteStreamError::StreamNotFound(e));
373 }
374 Err(StreamerError::StreamDeletionPending(e)) => {
375 assert_eq!(e.basin, basin);
376 assert_eq!(e.stream, stream);
377 }
378 }
379
380 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
381 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
382 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
383 .await?
384 .ok_or_else(|| StreamNotFoundError {
385 basin,
386 stream: stream.clone(),
387 })?;
388 if meta.deleted_at.is_none() {
389 meta.deleted_at = Some(OffsetDateTime::now_utc());
390 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
391 static WRITE_OPTS: WriteOptions = WriteOptions {
392 await_durable: true,
393 };
394 txn.commit_with_options(&WRITE_OPTS).await?;
395 }
396
397 Ok(())
398 }
399}
400
401fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
402 Bash::length_prefixed(&[
403 req_token.as_bytes(),
404 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
405 .as_ref()
406 .map(|v| serde_json::to_vec(v).expect("serializable"))
407 .unwrap_or_default(),
408 ])
409}