1use s2_common::{
2 bash::Bash,
3 record::StreamPosition,
4 types::{
5 basin::BasinName,
6 config::{OptionalStreamConfig, StreamReconfiguration},
7 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
8 stream::{ListStreamsRequest, StreamInfo, StreamName},
9 },
10};
11use slatedb::{
12 IsolationLevel,
13 config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
19use crate::backend::{
20 error::{
21 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
22 GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
23 StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
24 },
25 kv,
26 stream_id::StreamId,
27};
28
29impl Backend {
30 pub async fn list_streams(
31 &self,
32 basin: BasinName,
33 request: ListStreamsRequest,
34 ) -> Result<Page<StreamInfo>, ListStreamsError> {
35 let ListItemsRequestParts {
36 prefix,
37 start_after,
38 limit,
39 } = request.into();
40
41 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
42 if key_range.is_empty() {
43 return Ok(Page::new_empty());
44 }
45
46 static SCAN_OPTS: ScanOptions = ScanOptions {
47 durability_filter: DurabilityLevel::Remote,
48 dirty: false,
49 read_ahead_bytes: 1,
50 cache_blocks: false,
51 max_fetch_tasks: 1,
52 };
53 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
54
55 let mut streams = Vec::with_capacity(limit.as_usize());
56 let mut has_more = false;
57 while let Some(kv) = it.next().await? {
58 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
59 assert_eq!(deser_basin.as_ref(), basin.as_ref());
60 assert!(stream.as_ref() > start_after.as_ref());
61 assert!(stream.as_ref() >= prefix.as_ref());
62 if streams.len() == limit.as_usize() {
63 has_more = true;
64 break;
65 }
66 let meta = kv::stream_meta::deser_value(kv.value)?;
67 streams.push(StreamInfo {
68 name: stream,
69 created_at: meta.created_at,
70 deleted_at: meta.deleted_at,
71 });
72 }
73 Ok(Page::new(streams, has_more))
74 }
75
76 pub async fn create_stream(
77 &self,
78 basin: BasinName,
79 stream: StreamName,
80 config: impl Into<StreamReconfiguration>,
81 mode: CreateMode,
82 ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
83 let config = config.into();
84 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
85
86 let Some(basin_meta) = db_txn_get(
87 &txn,
88 kv::basin_meta::ser_key(&basin),
89 kv::basin_meta::deser_value,
90 )
91 .await?
92 else {
93 return Err(BasinNotFoundError { basin }.into());
94 };
95
96 if basin_meta.deleted_at.is_some() {
97 return Err(BasinDeletionPendingError { basin }.into());
98 }
99
100 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
101
102 let creation_idempotency_key = match &mode {
103 CreateMode::CreateOnly(Some(req_token)) => {
104 let resolved = OptionalStreamConfig::default().reconfigure(config.clone());
105 Some(creation_idempotency_key(req_token, &resolved))
106 }
107 _ => None,
108 };
109
110 let mut existing_meta_opt = None;
111 let mut prior_doe_min_age = None;
112
113 if let Some(existing_meta) =
114 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
115 {
116 if existing_meta.deleted_at.is_some() {
117 return Err(CreateStreamError::StreamDeletionPending(
118 StreamDeletionPendingError { basin, stream },
119 ));
120 }
121 prior_doe_min_age = existing_meta
122 .config
123 .delete_on_empty
124 .min_age
125 .filter(|age| !age.is_zero());
126 match mode {
127 CreateMode::CreateOnly(_) => {
128 return if creation_idempotency_key.is_some()
129 && existing_meta.creation_idempotency_key == creation_idempotency_key
130 {
131 Ok(CreatedOrReconfigured::Created(StreamInfo {
132 name: stream,
133 created_at: existing_meta.created_at,
134 deleted_at: None,
135 }))
136 } else {
137 Err(StreamAlreadyExistsError { basin, stream }.into())
138 };
139 }
140 CreateMode::CreateOrReconfigure => {
141 existing_meta_opt = Some(existing_meta);
142 }
143 }
144 }
145
146 let is_reconfigure = existing_meta_opt.is_some();
147 let (resolved, created_at) = match existing_meta_opt {
148 Some(existing) => (existing.config.reconfigure(config), existing.created_at),
149 None => (
150 OptionalStreamConfig::default().reconfigure(config),
151 OffsetDateTime::now_utc(),
152 ),
153 };
154 let resolved: OptionalStreamConfig = resolved
155 .merge(basin_meta.config.default_stream_config)
156 .into();
157
158 let meta = kv::stream_meta::StreamMeta {
159 config: resolved.clone(),
160 created_at,
161 deleted_at: None,
162 creation_idempotency_key,
163 };
164
165 txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
166 let stream_id = StreamId::new(&basin, &stream);
167 if !is_reconfigure {
168 txn.put(
169 kv::stream_id_mapping::ser_key(stream_id),
170 kv::stream_id_mapping::ser_value(&basin, &stream),
171 )?;
172 let created_secs = created_at.unix_timestamp();
173 let created_secs = if created_secs <= 0 {
174 0
175 } else if created_secs >= i64::from(u32::MAX) {
176 u32::MAX
177 } else {
178 created_secs as u32
179 };
180 txn.put(
181 kv::stream_tail_position::ser_key(stream_id),
182 kv::stream_tail_position::ser_value(
183 StreamPosition::MIN,
184 kv::timestamp::TimestampSecs::from_secs(created_secs),
185 ),
186 )?;
187 }
188 if let Some(min_age) = meta
189 .config
190 .delete_on_empty
191 .min_age
192 .filter(|age| !age.is_zero())
193 && prior_doe_min_age != Some(min_age)
194 {
195 txn.put(
196 kv::stream_doe_deadline::ser_key(
197 kv::timestamp::TimestampSecs::after(min_age),
198 stream_id,
199 ),
200 kv::stream_doe_deadline::ser_value(min_age),
201 )?;
202 }
203
204 static WRITE_OPTS: WriteOptions = WriteOptions {
205 await_durable: true,
206 };
207 txn.commit_with_options(&WRITE_OPTS).await?;
208
209 if is_reconfigure && let Some(client) = self.streamer_client_if_active(&basin, &stream) {
210 client.advise_reconfig(resolved);
211 }
212
213 let info = StreamInfo {
214 name: stream,
215 created_at,
216 deleted_at: None,
217 };
218
219 Ok(if is_reconfigure {
220 CreatedOrReconfigured::Reconfigured(info)
221 } else {
222 CreatedOrReconfigured::Created(info)
223 })
224 }
225
226 pub(super) async fn stream_id_mapping(
227 &self,
228 stream_id: StreamId,
229 ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
230 self.db_get(
231 kv::stream_id_mapping::ser_key(stream_id),
232 kv::stream_id_mapping::deser_value,
233 )
234 .await
235 }
236
237 pub async fn get_stream_config(
238 &self,
239 basin: BasinName,
240 stream: StreamName,
241 ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
242 let meta = self
243 .db_get(
244 kv::stream_meta::ser_key(&basin, &stream),
245 kv::stream_meta::deser_value,
246 )
247 .await?
248 .ok_or_else(|| StreamNotFoundError {
249 basin: basin.clone(),
250 stream: stream.clone(),
251 })?;
252 if meta.deleted_at.is_some() {
253 return Err(StreamDeletionPendingError { basin, stream }.into());
254 }
255 Ok(meta.config)
256 }
257
258 pub async fn reconfigure_stream(
259 &self,
260 basin: BasinName,
261 stream: StreamName,
262 reconfig: StreamReconfiguration,
263 ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
264 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
265
266 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
267
268 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
269 .await?
270 .ok_or_else(|| StreamNotFoundError {
271 basin: basin.clone(),
272 stream: stream.clone(),
273 })?;
274
275 if meta.deleted_at.is_some() {
276 return Err(StreamDeletionPendingError { basin, stream }.into());
277 }
278
279 let prior_doe_min_age = meta
280 .config
281 .delete_on_empty
282 .min_age
283 .filter(|age| !age.is_zero());
284
285 meta.config = meta.config.reconfigure(reconfig);
286
287 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
288
289 let stream_id = StreamId::new(&basin, &stream);
290 if let Some(min_age) = meta
291 .config
292 .delete_on_empty
293 .min_age
294 .filter(|age| !age.is_zero())
295 && prior_doe_min_age != Some(min_age)
296 {
297 txn.put(
298 kv::stream_doe_deadline::ser_key(
299 kv::timestamp::TimestampSecs::after(min_age),
300 stream_id,
301 ),
302 kv::stream_doe_deadline::ser_value(min_age),
303 )?;
304 }
305
306 static WRITE_OPTS: WriteOptions = WriteOptions {
307 await_durable: true,
308 };
309 txn.commit_with_options(&WRITE_OPTS).await?;
310
311 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
312 client.advise_reconfig(meta.config.clone());
313 }
314
315 Ok(meta.config)
316 }
317
318 #[instrument(ret, err, skip(self))]
319 pub async fn delete_stream(
320 &self,
321 basin: BasinName,
322 stream: StreamName,
323 ) -> Result<(), DeleteStreamError> {
324 match self.streamer_client(&basin, &stream).await {
325 Ok(client) => {
326 client.terminal_trim().await?;
327 }
328 Err(StreamerError::Storage(e)) => {
329 return Err(DeleteStreamError::Storage(e));
330 }
331 Err(StreamerError::StreamNotFound(e)) => {
332 return Err(DeleteStreamError::StreamNotFound(e));
333 }
334 Err(StreamerError::StreamDeletionPending(e)) => {
335 assert_eq!(e.basin, basin);
336 assert_eq!(e.stream, stream);
337 }
338 }
339
340 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
341 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
342 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
343 .await?
344 .ok_or_else(|| StreamNotFoundError {
345 basin,
346 stream: stream.clone(),
347 })?;
348 if meta.deleted_at.is_none() {
349 meta.deleted_at = Some(OffsetDateTime::now_utc());
350 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
351 static WRITE_OPTS: WriteOptions = WriteOptions {
352 await_durable: true,
353 };
354 txn.commit_with_options(&WRITE_OPTS).await?;
355 }
356
357 Ok(())
358 }
359}
360
361fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
362 Bash::length_prefixed(&[
363 req_token.as_bytes(),
364 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
365 .as_ref()
366 .map(|v| serde_json::to_vec(v).expect("serializable"))
367 .unwrap_or_default(),
368 ])
369}