1use s2_common::{
2 bash::Bash,
3 record::StreamPosition,
4 types::{
5 basin::BasinName,
6 config::{OptionalStreamConfig, StreamConfig, StreamReconfiguration},
7 resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
8 stream::{ListStreamsRequest, StreamInfo, StreamName},
9 },
10};
11use slatedb::{
12 IsolationLevel,
13 config::{DurabilityLevel, ScanOptions},
14};
15use time::OffsetDateTime;
16use tracing::instrument;
17
18use super::{
19 Backend,
20 store::db_txn_get,
21 streamer::{TerminalTrimCondition, TerminalTrimOutcome, doe_arm_delay},
22};
23use crate::{
24 backend::{
25 error::{
26 BasinDeletionPendingError, BasinNotFoundError, DeleteStreamError, GetStreamConfigError,
27 ListStreamsError, ProvisionStreamError, ReconfigureStreamError, StorageError,
28 StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
29 StreamerError,
30 },
31 kv,
32 },
33 stream_id::StreamId,
34};
35
36impl Backend {
37 pub async fn list_streams(
38 &self,
39 basin: BasinName,
40 request: ListStreamsRequest,
41 ) -> Result<Page<StreamInfo>, ListStreamsError> {
42 let ListItemsRequestParts {
43 prefix,
44 start_after,
45 limit,
46 } = request.into();
47
48 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
49 if key_range.is_empty() {
50 return Ok(Page::new_empty());
51 }
52
53 let scan_opts = ScanOptions {
54 durability_filter: DurabilityLevel::Remote,
55 ..Default::default()
56 };
57 let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
58
59 let mut streams = Vec::with_capacity(limit.as_usize());
60 let mut has_more = false;
61 while let Some(kv) = it.next().await? {
62 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
63 assert_eq!(deser_basin.as_ref(), basin.as_ref());
64 assert!(stream.as_ref() > start_after.as_ref());
65 assert!(stream.as_ref() >= prefix.as_ref());
66 if streams.len() == limit.as_usize() {
67 has_more = true;
68 break;
69 }
70 let meta = kv::stream_meta::deser_value(kv.value)?;
71 streams.push(StreamInfo {
72 name: stream,
73 created_at: meta.created_at,
74 deleted_at: meta.deleted_at,
75 cipher: meta.cipher,
76 });
77 }
78 Ok(Page::new(streams, has_more))
79 }
80
81 pub async fn provision_stream(
82 &self,
83 basin: BasinName,
84 stream: StreamName,
85 config: OptionalStreamConfig,
86 mode: ProvisionMode,
87 ) -> Result<ProvisionResult<StreamInfo>, ProvisionStreamError> {
88 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
89
90 let Some(basin_meta) = db_txn_get(
91 &txn,
92 kv::basin_meta::ser_key(&basin),
93 kv::basin_meta::deser_value,
94 )
95 .await?
96 else {
97 return Err(BasinNotFoundError { basin }.into());
98 };
99
100 if basin_meta.deleted_at.is_some() {
101 return Err(BasinDeletionPendingError { basin }.into());
102 }
103
104 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
105
106 let existing_meta =
107 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
108 if let Some(existing_meta) = &existing_meta
109 && existing_meta.deleted_at.is_some()
110 {
111 return Err(ProvisionStreamError::StreamDeletionPending(
112 StreamDeletionPendingError,
113 ));
114 }
115
116 let basin_defaults = basin_meta.config.default_stream_config;
117 let (outcome, prior_doe_min_age) = match (existing_meta, mode) {
118 (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
119 let new_creation_idempotency_key = request_token
120 .as_ref()
121 .map(|req_token| creation_idempotency_key(req_token, &config));
122 return if new_creation_idempotency_key.is_some()
123 && existing.creation_idempotency_key == new_creation_idempotency_key
124 {
125 Ok(ProvisionResult::Noop(StreamInfo {
126 name: stream,
127 created_at: existing.created_at,
128 deleted_at: None,
129 cipher: existing.cipher,
130 }))
131 } else {
132 Err(StreamAlreadyExistsError { basin, stream }.into())
133 };
134 }
135 (Some(existing), ProvisionMode::Ensure) => {
136 let desired_config = config.merge(basin_defaults);
137 let config_unchanged = existing.config == desired_config;
138 let meta = kv::stream_meta::StreamMeta {
139 config: desired_config,
140 cipher: existing.cipher,
141 created_at: existing.created_at,
142 deleted_at: None,
143 creation_idempotency_key: existing.creation_idempotency_key,
144 };
145 (
146 if config_unchanged {
147 ProvisionResult::Noop(meta)
148 } else {
149 ProvisionResult::Updated(meta)
150 },
151 existing.config.delete_on_empty.min_age(),
152 )
153 }
154 (None, ProvisionMode::CreateOnly { request_token }) => {
155 let new_creation_idempotency_key = request_token
156 .as_ref()
157 .map(|req_token| creation_idempotency_key(req_token, &config));
158 (
159 ProvisionResult::Created(kv::stream_meta::StreamMeta {
160 config: config.merge(basin_defaults),
161 cipher: basin_meta.config.stream_cipher,
162 created_at: OffsetDateTime::now_utc(),
163 deleted_at: None,
164 creation_idempotency_key: new_creation_idempotency_key,
165 }),
166 None,
167 )
168 }
169 (None, ProvisionMode::Ensure) => (
170 ProvisionResult::Created(kv::stream_meta::StreamMeta {
171 config: config.merge(basin_defaults),
172 cipher: basin_meta.config.stream_cipher,
173 created_at: OffsetDateTime::now_utc(),
174 deleted_at: None,
175 creation_idempotency_key: None,
176 }),
177 None,
178 ),
179 };
180
181 if !matches!(&outcome, ProvisionResult::Noop(_)) {
182 let meta = outcome.inner();
183
184 txn.put(&stream_meta_key, kv::stream_meta::ser_value(meta))?;
185
186 let stream_id = StreamId::new(&basin, &stream);
187
188 if matches!(&outcome, ProvisionResult::Created(_)) {
189 txn.put(
190 kv::stream_id_mapping::ser_key(stream_id),
191 kv::stream_id_mapping::ser_value(&basin, &stream),
192 )?;
193 txn.put(
194 kv::stream_tail_position::ser_key(stream_id),
195 kv::stream_tail_position::ser_value(StreamPosition::MIN),
196 )?;
197 }
198
199 if let Some(min_age) = meta.config.delete_on_empty.min_age()
200 && (matches!(&outcome, ProvisionResult::Created(_)) || prior_doe_min_age.is_none())
201 {
202 txn.put(
203 kv::stream_doe_deadline::ser_key(
204 kv::timestamp::TimestampSecs::after(doe_arm_delay(
205 meta.config.retention_policy.age().unwrap_or_default(),
206 min_age,
207 )),
208 stream_id,
209 ),
210 kv::stream_doe_deadline::ser_value(min_age),
211 )?;
212 }
213
214 txn.commit().await?;
215 }
216
217 if let ProvisionResult::Updated(meta) = &outcome
218 && let Some(client) = self.streamer_client_if_active(&basin, &stream)
219 {
220 client.advise_reconfig(meta.config.clone());
221 }
222
223 Ok(outcome.map(|meta| StreamInfo {
224 name: stream,
225 created_at: meta.created_at,
226 deleted_at: None,
227 cipher: meta.cipher,
228 }))
229 }
230
231 pub(super) async fn stream_id_mapping(
232 &self,
233 stream_id: StreamId,
234 ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
235 self.db_get(
236 kv::stream_id_mapping::ser_key(stream_id),
237 kv::stream_id_mapping::deser_value,
238 )
239 .await
240 }
241
242 pub async fn get_stream_config(
243 &self,
244 basin: BasinName,
245 stream: StreamName,
246 ) -> Result<StreamConfig, GetStreamConfigError> {
247 let meta = self
248 .db_get(
249 kv::stream_meta::ser_key(&basin, &stream),
250 kv::stream_meta::deser_value,
251 )
252 .await?
253 .ok_or_else(|| StreamNotFoundError {
254 basin: basin.clone(),
255 stream: stream.clone(),
256 })?;
257 if meta.deleted_at.is_some() {
258 return Err(StreamDeletionPendingError.into());
259 }
260 Ok(meta.config)
261 }
262
263 pub async fn reconfigure_stream(
264 &self,
265 basin: BasinName,
266 stream: StreamName,
267 reconfig: StreamReconfiguration,
268 ) -> Result<StreamConfig, ReconfigureStreamError> {
269 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
270
271 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
272 let (basin_meta, meta) = tokio::try_join!(
273 db_txn_get(
274 &txn,
275 kv::basin_meta::ser_key(&basin),
276 kv::basin_meta::deser_value,
277 ),
278 db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value),
279 )?;
280
281 let basin_meta = basin_meta.ok_or_else(|| BasinNotFoundError {
282 basin: basin.clone(),
283 })?;
284 if basin_meta.deleted_at.is_some() {
285 return Err(BasinDeletionPendingError { basin }.into());
286 }
287
288 let mut meta = meta.ok_or_else(|| StreamNotFoundError {
289 basin: basin.clone(),
290 stream: stream.clone(),
291 })?;
292
293 if meta.deleted_at.is_some() {
294 return Err(StreamDeletionPendingError.into());
295 }
296
297 let prior_doe_min_age = meta.config.delete_on_empty.min_age();
298
299 meta.config = OptionalStreamConfig::from(meta.config)
300 .reconfigure(reconfig)
301 .merge(basin_meta.config.default_stream_config);
302
303 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
304
305 let stream_id = StreamId::new(&basin, &stream);
306 if let Some(min_age) = meta.config.delete_on_empty.min_age()
307 && prior_doe_min_age.is_none()
308 {
309 txn.put(
310 kv::stream_doe_deadline::ser_key(
311 kv::timestamp::TimestampSecs::after(doe_arm_delay(
312 meta.config.retention_policy.age().unwrap_or_default(),
313 min_age,
314 )),
315 stream_id,
316 ),
317 kv::stream_doe_deadline::ser_value(min_age),
318 )?;
319 }
320
321 txn.commit().await?;
322
323 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
324 client.advise_reconfig(meta.config.clone());
325 }
326
327 Ok(meta.config)
328 }
329
330 #[instrument(ret, err, skip(self))]
331 pub async fn delete_stream(
332 &self,
333 basin: BasinName,
334 stream: StreamName,
335 ) -> Result<(), DeleteStreamError> {
336 self.delete_stream_with_condition(basin, stream, TerminalTrimCondition::Always)
337 .await
338 }
339
340 pub(super) async fn delete_stream_with_condition(
341 &self,
342 basin: BasinName,
343 stream: StreamName,
344 condition: TerminalTrimCondition,
345 ) -> Result<(), DeleteStreamError> {
346 let outcome = match self.streamer_client_guarded(&basin, &stream).await {
347 Ok(client) => client.terminal_trim(condition).await?,
348 Err(StreamerError::Storage(e)) => {
349 return Err(DeleteStreamError::Storage(e));
350 }
351 Err(StreamerError::StreamNotFound(e)) => {
352 return Err(DeleteStreamError::StreamNotFound(e));
353 }
354 Err(StreamerError::StreamDeletionPending(_)) => TerminalTrimOutcome::DeletionPending,
355 };
356 match outcome {
357 TerminalTrimOutcome::DeletionPending => self.mark_stream_deleted(basin, stream).await,
358 TerminalTrimOutcome::Ineligible => Ok(()),
359 }
360 }
361
362 async fn mark_stream_deleted(
363 &self,
364 basin: BasinName,
365 stream: StreamName,
366 ) -> Result<(), DeleteStreamError> {
367 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
368 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
369 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
370 .await?
371 .ok_or_else(|| StreamNotFoundError {
372 basin,
373 stream: stream.clone(),
374 })?;
375 if meta.deleted_at.is_none() {
376 meta.deleted_at = Some(OffsetDateTime::now_utc());
377 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
378 txn.commit().await?;
379 }
380 Ok(())
381 }
382}
383
384fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
385 Bash::length_prefixed(&[
386 req_token.as_bytes(),
387 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
388 .as_ref()
389 .map(|v| serde_json::to_vec(v).expect("serializable"))
390 .unwrap_or_default(),
391 ])
392}