1use s2_common::{
2 basin::BasinName,
3 config::{OptionalStreamConfig, StreamConfig, StreamReconfiguration},
4 record::StreamPosition,
5 resources::{Page, ProvisionMode, ProvisionResult, RequestToken},
6 stream::{ListStreamsRequest, StreamInfo, StreamName},
7};
8use s2_storage::bash::Bash;
9use slatedb::{
10 IsolationLevel,
11 config::{DurabilityLevel, ScanOptions},
12};
13use time::OffsetDateTime;
14use tracing::instrument;
15
16use super::{
17 Backend,
18 store::db_txn_get,
19 streamer::{TerminalTrimCondition, TerminalTrimOutcome, doe_arm_delay},
20};
21use crate::{
22 backend::{
23 error::{
24 BasinDeletionPendingError, BasinNotFoundError, DeleteStreamError, GetStreamConfigError,
25 ListStreamsError, ProvisionStreamError, ReconfigureStreamError, StorageError,
26 StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
27 StreamerError,
28 },
29 kv,
30 },
31 stream_id::StreamId,
32};
33
34impl Backend {
35 pub async fn list_streams(
36 &self,
37 basin: BasinName,
38 request: ListStreamsRequest,
39 ) -> Result<Page<StreamInfo>, ListStreamsError> {
40 let ListStreamsRequest {
41 prefix,
42 start_after,
43 limit,
44 } = request;
45
46 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
47 if key_range.is_empty() {
48 return Ok(Page::new_empty());
49 }
50
51 let scan_opts = ScanOptions {
52 durability_filter: DurabilityLevel::Remote,
53 ..Default::default()
54 };
55 let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
56
57 let mut streams = Vec::with_capacity(limit.as_usize());
58 let mut has_more = false;
59 while let Some(kv) = it.next().await? {
60 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
61 assert_eq!(deser_basin.as_ref(), basin.as_ref());
62 assert!(stream.as_ref() > start_after.as_ref());
63 assert!(stream.as_ref() >= prefix.as_ref());
64 if streams.len() == limit.as_usize() {
65 has_more = true;
66 break;
67 }
68 let meta = kv::stream_meta::deser_value(kv.value)?;
69 streams.push(StreamInfo {
70 name: stream,
71 created_at: meta.created_at,
72 deleted_at: meta.deleted_at,
73 cipher: meta.cipher,
74 });
75 }
76 Ok(Page::new(streams, has_more))
77 }
78
79 pub async fn provision_stream(
80 &self,
81 basin: BasinName,
82 stream: StreamName,
83 config: OptionalStreamConfig,
84 mode: ProvisionMode,
85 ) -> Result<ProvisionResult<StreamInfo>, ProvisionStreamError> {
86 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
87
88 let Some(basin_meta) = db_txn_get(
89 &txn,
90 kv::basin_meta::ser_key(&basin),
91 kv::basin_meta::deser_value,
92 )
93 .await?
94 else {
95 return Err(BasinNotFoundError { basin }.into());
96 };
97
98 if basin_meta.deleted_at.is_some() {
99 return Err(BasinDeletionPendingError { basin }.into());
100 }
101
102 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
103
104 let existing_meta =
105 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
106 if let Some(existing_meta) = &existing_meta
107 && existing_meta.deleted_at.is_some()
108 {
109 return Err(ProvisionStreamError::StreamDeletionPending(
110 StreamDeletionPendingError,
111 ));
112 }
113
114 let basin_defaults = basin_meta.config.default_stream_config;
115 let (outcome, prior_doe_min_age) = match (existing_meta, mode) {
116 (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
117 let new_creation_idempotency_key = request_token
118 .as_ref()
119 .map(|req_token| creation_idempotency_key(req_token, &config));
120 return if new_creation_idempotency_key.is_some()
121 && existing.creation_idempotency_key == new_creation_idempotency_key
122 {
123 Ok(ProvisionResult::Noop(StreamInfo {
124 name: stream,
125 created_at: existing.created_at,
126 deleted_at: None,
127 cipher: existing.cipher,
128 }))
129 } else {
130 Err(StreamAlreadyExistsError { basin, stream }.into())
131 };
132 }
133 (Some(existing), ProvisionMode::Ensure) => {
134 let desired_config = config.merge(basin_defaults);
135 let config_unchanged = existing.config == desired_config;
136 let meta = kv::stream_meta::StreamMeta {
137 config: desired_config,
138 cipher: existing.cipher,
139 created_at: existing.created_at,
140 deleted_at: None,
141 creation_idempotency_key: existing.creation_idempotency_key,
142 };
143 (
144 if config_unchanged {
145 ProvisionResult::Noop(meta)
146 } else {
147 ProvisionResult::Updated(meta)
148 },
149 existing.config.delete_on_empty.min_age(),
150 )
151 }
152 (None, ProvisionMode::CreateOnly { request_token }) => {
153 let new_creation_idempotency_key = request_token
154 .as_ref()
155 .map(|req_token| creation_idempotency_key(req_token, &config));
156 (
157 ProvisionResult::Created(kv::stream_meta::StreamMeta {
158 config: config.merge(basin_defaults),
159 cipher: basin_meta.config.stream_cipher,
160 created_at: OffsetDateTime::now_utc(),
161 deleted_at: None,
162 creation_idempotency_key: new_creation_idempotency_key,
163 }),
164 None,
165 )
166 }
167 (None, ProvisionMode::Ensure) => (
168 ProvisionResult::Created(kv::stream_meta::StreamMeta {
169 config: config.merge(basin_defaults),
170 cipher: basin_meta.config.stream_cipher,
171 created_at: OffsetDateTime::now_utc(),
172 deleted_at: None,
173 creation_idempotency_key: None,
174 }),
175 None,
176 ),
177 };
178
179 if !matches!(&outcome, ProvisionResult::Noop(_)) {
180 let meta = outcome.inner();
181
182 txn.put(&stream_meta_key, kv::stream_meta::ser_value(meta))?;
183
184 let stream_id = StreamId::new(&basin, &stream);
185
186 if matches!(&outcome, ProvisionResult::Created(_)) {
187 txn.put(
188 kv::stream_id_mapping::ser_key(stream_id),
189 kv::stream_id_mapping::ser_value(&basin, &stream),
190 )?;
191 txn.put(
192 kv::stream_tail_position::ser_key(stream_id),
193 kv::stream_tail_position::ser_value(StreamPosition::MIN),
194 )?;
195 }
196
197 if let Some(min_age) = meta.config.delete_on_empty.min_age()
198 && (matches!(&outcome, ProvisionResult::Created(_)) || prior_doe_min_age.is_none())
199 {
200 txn.put(
201 kv::stream_doe_deadline::ser_key(
202 kv::timestamp::TimestampSecs::after(doe_arm_delay(
203 meta.config.retention_policy.age().unwrap_or_default(),
204 min_age,
205 )),
206 stream_id,
207 ),
208 kv::stream_doe_deadline::ser_value(min_age),
209 )?;
210 }
211
212 txn.commit().await?;
213 }
214
215 if let ProvisionResult::Updated(meta) = &outcome
216 && let Some(client) = self.streamer_client_if_active(&basin, &stream)
217 {
218 client.advise_reconfig(meta.config.clone());
219 }
220
221 Ok(outcome.map(|meta| StreamInfo {
222 name: stream,
223 created_at: meta.created_at,
224 deleted_at: None,
225 cipher: meta.cipher,
226 }))
227 }
228
229 pub(super) async fn stream_id_mapping(
230 &self,
231 stream_id: StreamId,
232 ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
233 self.db_get(
234 kv::stream_id_mapping::ser_key(stream_id),
235 kv::stream_id_mapping::deser_value,
236 )
237 .await
238 }
239
240 pub async fn get_stream_config(
241 &self,
242 basin: BasinName,
243 stream: StreamName,
244 ) -> Result<StreamConfig, GetStreamConfigError> {
245 let meta = self
246 .db_get(
247 kv::stream_meta::ser_key(&basin, &stream),
248 kv::stream_meta::deser_value,
249 )
250 .await?
251 .ok_or_else(|| StreamNotFoundError {
252 basin: basin.clone(),
253 stream: stream.clone(),
254 })?;
255 if meta.deleted_at.is_some() {
256 return Err(StreamDeletionPendingError.into());
257 }
258 Ok(meta.config)
259 }
260
261 pub async fn reconfigure_stream(
262 &self,
263 basin: BasinName,
264 stream: StreamName,
265 reconfig: StreamReconfiguration,
266 ) -> Result<StreamConfig, ReconfigureStreamError> {
267 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
268
269 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
270 let (basin_meta, meta) = tokio::try_join!(
271 db_txn_get(
272 &txn,
273 kv::basin_meta::ser_key(&basin),
274 kv::basin_meta::deser_value,
275 ),
276 db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value),
277 )?;
278
279 let basin_meta = basin_meta.ok_or_else(|| BasinNotFoundError {
280 basin: basin.clone(),
281 })?;
282 if basin_meta.deleted_at.is_some() {
283 return Err(BasinDeletionPendingError { basin }.into());
284 }
285
286 let mut meta = meta.ok_or_else(|| StreamNotFoundError {
287 basin: basin.clone(),
288 stream: stream.clone(),
289 })?;
290
291 if meta.deleted_at.is_some() {
292 return Err(StreamDeletionPendingError.into());
293 }
294
295 let prior_doe_min_age = meta.config.delete_on_empty.min_age();
296
297 meta.config = OptionalStreamConfig::from(meta.config)
298 .reconfigure(reconfig)
299 .merge(basin_meta.config.default_stream_config);
300
301 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
302
303 let stream_id = StreamId::new(&basin, &stream);
304 if let Some(min_age) = meta.config.delete_on_empty.min_age()
305 && prior_doe_min_age.is_none()
306 {
307 txn.put(
308 kv::stream_doe_deadline::ser_key(
309 kv::timestamp::TimestampSecs::after(doe_arm_delay(
310 meta.config.retention_policy.age().unwrap_or_default(),
311 min_age,
312 )),
313 stream_id,
314 ),
315 kv::stream_doe_deadline::ser_value(min_age),
316 )?;
317 }
318
319 txn.commit().await?;
320
321 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
322 client.advise_reconfig(meta.config.clone());
323 }
324
325 Ok(meta.config)
326 }
327
328 #[instrument(ret, err, skip(self))]
329 pub async fn delete_stream(
330 &self,
331 basin: BasinName,
332 stream: StreamName,
333 ) -> Result<(), DeleteStreamError> {
334 self.delete_stream_with_condition(basin, stream, TerminalTrimCondition::Always)
335 .await
336 }
337
338 pub(super) async fn delete_stream_with_condition(
339 &self,
340 basin: BasinName,
341 stream: StreamName,
342 condition: TerminalTrimCondition,
343 ) -> Result<(), DeleteStreamError> {
344 let outcome = match self.streamer_client_guarded(&basin, &stream).await {
345 Ok(client) => client.terminal_trim(condition).await?,
346 Err(StreamerError::Storage(e)) => {
347 return Err(DeleteStreamError::Storage(e));
348 }
349 Err(StreamerError::StreamNotFound(e)) => {
350 return Err(DeleteStreamError::StreamNotFound(e));
351 }
352 Err(StreamerError::StreamDeletionPending(_)) => TerminalTrimOutcome::DeletionPending,
353 };
354 match outcome {
355 TerminalTrimOutcome::DeletionPending => self.mark_stream_deleted(basin, stream).await,
356 TerminalTrimOutcome::Ineligible => Ok(()),
357 }
358 }
359
360 async fn mark_stream_deleted(
361 &self,
362 basin: BasinName,
363 stream: StreamName,
364 ) -> Result<(), DeleteStreamError> {
365 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
366 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
367 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
368 .await?
369 .ok_or_else(|| StreamNotFoundError {
370 basin,
371 stream: stream.clone(),
372 })?;
373 if meta.deleted_at.is_none() {
374 meta.deleted_at = Some(OffsetDateTime::now_utc());
375 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
376 txn.commit().await?;
377 }
378 Ok(())
379 }
380}
381
382fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
383 Bash::length_prefixed(&[
384 req_token.as_bytes(),
385 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
386 .as_ref()
387 .map(|v| serde_json::to_vec(v).expect("serializable"))
388 .unwrap_or_default(),
389 ])
390}