1use s2_common::{
2 bash::Bash,
3 record::StreamPosition,
4 types::{
5 basin::BasinName,
6 config::{OptionalStreamConfig, StreamReconfiguration},
7 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
8 stream::{ListStreamsRequest, StreamInfo, StreamName},
9 },
10};
11use slatedb::{
12 IsolationLevel,
13 config::{DurabilityLevel, ScanOptions, WriteOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
19use crate::backend::{
20 error::{
21 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
22 GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StorageError,
23 StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError, StreamerError,
24 },
25 kv,
26 stream_id::StreamId,
27};
28
29impl Backend {
30 pub async fn list_streams(
31 &self,
32 basin: BasinName,
33 request: ListStreamsRequest,
34 ) -> Result<Page<StreamInfo>, ListStreamsError> {
35 let ListItemsRequestParts {
36 prefix,
37 start_after,
38 limit,
39 } = request.into();
40
41 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
42 if key_range.is_empty() {
43 return Ok(Page::new_empty());
44 }
45
46 static SCAN_OPTS: ScanOptions = ScanOptions {
47 durability_filter: DurabilityLevel::Remote,
48 dirty: false,
49 read_ahead_bytes: 1,
50 cache_blocks: false,
51 max_fetch_tasks: 1,
52 };
53 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
54
55 let mut streams = Vec::with_capacity(limit.as_usize());
56 let mut has_more = false;
57 while let Some(kv) = it.next().await? {
58 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
59 assert_eq!(deser_basin.as_ref(), basin.as_ref());
60 assert!(stream.as_ref() > start_after.as_ref());
61 assert!(stream.as_ref() >= prefix.as_ref());
62 if streams.len() == limit.as_usize() {
63 has_more = true;
64 break;
65 }
66 let meta = kv::stream_meta::deser_value(kv.value)?;
67 streams.push(StreamInfo {
68 name: stream,
69 created_at: meta.created_at,
70 deleted_at: meta.deleted_at,
71 });
72 }
73 Ok(Page::new(streams, has_more))
74 }
75
76 pub async fn create_stream(
77 &self,
78 basin: BasinName,
79 stream: StreamName,
80 mut config: OptionalStreamConfig,
81 mode: CreateMode,
82 ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
83 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
84
85 let Some(basin_meta) = db_txn_get(
86 &txn,
87 kv::basin_meta::ser_key(&basin),
88 kv::basin_meta::deser_value,
89 )
90 .await?
91 else {
92 return Err(BasinNotFoundError { basin }.into());
93 };
94
95 if basin_meta.deleted_at.is_some() {
96 return Err(BasinDeletionPendingError { basin }.into());
97 }
98
99 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
100
101 let creation_idempotency_key = match &mode {
102 CreateMode::CreateOnly(Some(req_token)) => {
103 Some(creation_idempotency_key(req_token, &config))
104 }
105 _ => None,
106 };
107
108 let mut existing_created_at = None;
109 let mut prior_doe_min_age = None;
110
111 if let Some(existing_meta) =
112 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
113 {
114 if existing_meta.deleted_at.is_some() {
115 return Err(CreateStreamError::StreamDeletionPending(
116 StreamDeletionPendingError { basin, stream },
117 ));
118 }
119 prior_doe_min_age = existing_meta
120 .config
121 .delete_on_empty
122 .min_age
123 .filter(|age| !age.is_zero());
124 match mode {
125 CreateMode::CreateOnly(_) => {
126 return if creation_idempotency_key.is_some()
127 && existing_meta.creation_idempotency_key == creation_idempotency_key
128 {
129 Ok(CreatedOrReconfigured::Created(StreamInfo {
130 name: stream,
131 created_at: existing_meta.created_at,
132 deleted_at: None,
133 }))
134 } else {
135 Err(StreamAlreadyExistsError { basin, stream }.into())
136 };
137 }
138 CreateMode::CreateOrReconfigure => {
139 existing_created_at = Some(existing_meta.created_at);
140 }
141 }
142 }
143
144 config = config.merge(basin_meta.config.default_stream_config).into();
145
146 let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc);
147 let meta = kv::stream_meta::StreamMeta {
148 config: config.clone(),
149 created_at,
150 deleted_at: None,
151 creation_idempotency_key,
152 };
153
154 txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
155 let stream_id = StreamId::new(&basin, &stream);
156 if existing_created_at.is_none() {
157 txn.put(
158 kv::stream_id_mapping::ser_key(stream_id),
159 kv::stream_id_mapping::ser_value(&basin, &stream),
160 )?;
161 let created_secs = created_at.unix_timestamp();
162 let created_secs = if created_secs <= 0 {
163 0
164 } else if created_secs >= i64::from(u32::MAX) {
165 u32::MAX
166 } else {
167 created_secs as u32
168 };
169 txn.put(
170 kv::stream_tail_position::ser_key(stream_id),
171 kv::stream_tail_position::ser_value(
172 StreamPosition::MIN,
173 kv::timestamp::TimestampSecs::from_secs(created_secs),
174 ),
175 )?;
176 }
177 if let Some(min_age) = meta
178 .config
179 .delete_on_empty
180 .min_age
181 .filter(|age| !age.is_zero())
182 && prior_doe_min_age != Some(min_age)
183 {
184 txn.put(
185 kv::stream_doe_deadline::ser_key(
186 kv::timestamp::TimestampSecs::after(min_age),
187 stream_id,
188 ),
189 kv::stream_doe_deadline::ser_value(min_age),
190 )?;
191 }
192
193 static WRITE_OPTS: WriteOptions = WriteOptions {
194 await_durable: true,
195 };
196 txn.commit_with_options(&WRITE_OPTS).await?;
197
198 if existing_created_at.is_some()
199 && let Some(client) = self.streamer_client_if_active(&basin, &stream)
200 {
201 client.advise_reconfig(config);
202 }
203
204 let info = StreamInfo {
205 name: stream,
206 created_at,
207 deleted_at: None,
208 };
209
210 Ok(if existing_created_at.is_some() {
211 CreatedOrReconfigured::Reconfigured(info)
212 } else {
213 CreatedOrReconfigured::Created(info)
214 })
215 }
216
217 pub(super) async fn stream_id_mapping(
218 &self,
219 stream_id: StreamId,
220 ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
221 self.db_get(
222 kv::stream_id_mapping::ser_key(stream_id),
223 kv::stream_id_mapping::deser_value,
224 )
225 .await
226 }
227
228 pub async fn get_stream_config(
229 &self,
230 basin: BasinName,
231 stream: StreamName,
232 ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
233 let meta = self
234 .db_get(
235 kv::stream_meta::ser_key(&basin, &stream),
236 kv::stream_meta::deser_value,
237 )
238 .await?
239 .ok_or_else(|| StreamNotFoundError {
240 basin: basin.clone(),
241 stream: stream.clone(),
242 })?;
243 if meta.deleted_at.is_some() {
244 return Err(StreamDeletionPendingError { basin, stream }.into());
245 }
246 Ok(meta.config)
247 }
248
249 pub async fn reconfigure_stream(
250 &self,
251 basin: BasinName,
252 stream: StreamName,
253 reconfig: StreamReconfiguration,
254 ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
255 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
256
257 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
258
259 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
260 .await?
261 .ok_or_else(|| StreamNotFoundError {
262 basin: basin.clone(),
263 stream: stream.clone(),
264 })?;
265
266 if meta.deleted_at.is_some() {
267 return Err(StreamDeletionPendingError { basin, stream }.into());
268 }
269
270 let prior_doe_min_age = meta
271 .config
272 .delete_on_empty
273 .min_age
274 .filter(|age| !age.is_zero());
275
276 meta.config = meta.config.reconfigure(reconfig);
277
278 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
279
280 let stream_id = StreamId::new(&basin, &stream);
281 if let Some(min_age) = meta
282 .config
283 .delete_on_empty
284 .min_age
285 .filter(|age| !age.is_zero())
286 && prior_doe_min_age != Some(min_age)
287 {
288 txn.put(
289 kv::stream_doe_deadline::ser_key(
290 kv::timestamp::TimestampSecs::after(min_age),
291 stream_id,
292 ),
293 kv::stream_doe_deadline::ser_value(min_age),
294 )?;
295 }
296
297 static WRITE_OPTS: WriteOptions = WriteOptions {
298 await_durable: true,
299 };
300 txn.commit_with_options(&WRITE_OPTS).await?;
301
302 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
303 client.advise_reconfig(meta.config.clone());
304 }
305
306 Ok(meta.config)
307 }
308
309 #[instrument(ret, err, skip(self))]
310 pub async fn delete_stream(
311 &self,
312 basin: BasinName,
313 stream: StreamName,
314 ) -> Result<(), DeleteStreamError> {
315 match self.streamer_client(&basin, &stream).await {
316 Ok(client) => {
317 client.terminal_trim().await?;
318 }
319 Err(StreamerError::Storage(e)) => {
320 return Err(DeleteStreamError::Storage(e));
321 }
322 Err(StreamerError::StreamNotFound(e)) => {
323 return Err(DeleteStreamError::StreamNotFound(e));
324 }
325 Err(StreamerError::StreamDeletionPending(e)) => {
326 assert_eq!(e.basin, basin);
327 assert_eq!(e.stream, stream);
328 }
329 }
330
331 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
332 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
333 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
334 .await?
335 .ok_or_else(|| StreamNotFoundError {
336 basin,
337 stream: stream.clone(),
338 })?;
339 if meta.deleted_at.is_none() {
340 meta.deleted_at = Some(OffsetDateTime::now_utc());
341 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
342 static WRITE_OPTS: WriteOptions = WriteOptions {
343 await_durable: true,
344 };
345 txn.commit_with_options(&WRITE_OPTS).await?;
346 }
347
348 Ok(())
349 }
350}
351
352fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
353 Bash::length_prefixed(&[
354 req_token.as_bytes(),
355 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
356 .as_ref()
357 .map(|v| serde_json::to_vec(v).expect("serializable"))
358 .unwrap_or_default(),
359 ])
360}