1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::BasinName,
5 config::{OptionalStreamConfig, StreamReconfiguration},
6 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7 stream::{ListStreamsRequest, StreamInfo, StreamName},
8 },
9};
10use slatedb::{
11 IsolationLevel,
12 config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15use tracing::instrument;
16
17use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
18use crate::backend::{
19 error::{
20 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
21 GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StreamAlreadyExistsError,
22 StreamDeletionPendingError, StreamNotFoundError, StreamerError,
23 },
24 kv,
25 stream_id::StreamId,
26};
27
28impl Backend {
29 pub async fn list_streams(
30 &self,
31 basin: BasinName,
32 request: ListStreamsRequest,
33 ) -> Result<Page<StreamInfo>, ListStreamsError> {
34 let ListItemsRequestParts {
35 prefix,
36 start_after,
37 limit,
38 } = request.into();
39
40 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
41 if key_range.is_empty() {
42 return Ok(Page::new_empty());
43 }
44
45 static SCAN_OPTS: ScanOptions = ScanOptions {
46 durability_filter: DurabilityLevel::Remote,
47 dirty: false,
48 read_ahead_bytes: 1,
49 cache_blocks: false,
50 max_fetch_tasks: 1,
51 };
52 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
53
54 let mut streams = Vec::with_capacity(limit.as_usize());
55 let mut has_more = false;
56 while let Some(kv) = it.next().await? {
57 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
58 assert_eq!(deser_basin.as_ref(), basin.as_ref());
59 assert!(stream.as_ref() > start_after.as_ref());
60 assert!(stream.as_ref() >= prefix.as_ref());
61 if streams.len() == limit.as_usize() {
62 has_more = true;
63 break;
64 }
65 let meta = kv::stream_meta::deser_value(kv.value)?;
66 streams.push(StreamInfo {
67 name: stream,
68 created_at: meta.created_at,
69 deleted_at: meta.deleted_at,
70 });
71 }
72 Ok(Page::new(streams, has_more))
73 }
74
75 pub async fn create_stream(
76 &self,
77 basin: BasinName,
78 stream: StreamName,
79 mut config: OptionalStreamConfig,
80 mode: CreateMode,
81 ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
82 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
83
84 let Some(basin_meta) = db_txn_get(
85 &txn,
86 kv::basin_meta::ser_key(&basin),
87 kv::basin_meta::deser_value,
88 )
89 .await?
90 else {
91 return Err(BasinNotFoundError { basin }.into());
92 };
93
94 if basin_meta.deleted_at.is_some() {
95 return Err(BasinDeletionPendingError { basin }.into());
96 }
97
98 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
99
100 let creation_idempotency_key = match &mode {
101 CreateMode::CreateOnly(Some(req_token)) => {
102 Some(creation_idempotency_key(req_token, &config))
103 }
104 _ => None,
105 };
106
107 let mut existing_created_at = None;
108
109 if let Some(existing_meta) =
110 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
111 {
112 if existing_meta.deleted_at.is_some() {
113 return Err(CreateStreamError::StreamDeletionPending(
114 StreamDeletionPendingError { basin, stream },
115 ));
116 }
117 match mode {
118 CreateMode::CreateOnly(_) => {
119 return if creation_idempotency_key.is_some()
120 && existing_meta.creation_idempotency_key == creation_idempotency_key
121 {
122 Ok(CreatedOrReconfigured::Created(StreamInfo {
123 name: stream,
124 created_at: existing_meta.created_at,
125 deleted_at: None,
126 }))
127 } else {
128 Err(StreamAlreadyExistsError { basin, stream }.into())
129 };
130 }
131 CreateMode::CreateOrReconfigure => {
132 existing_created_at = Some(existing_meta.created_at);
133 }
134 }
135 }
136
137 config = config.merge(basin_meta.config.default_stream_config).into();
138
139 let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc);
140 let meta = kv::stream_meta::StreamMeta {
141 config: config.clone(),
142 created_at,
143 deleted_at: None,
144 creation_idempotency_key,
145 };
146
147 txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
148 if existing_created_at.is_none() {
149 txn.put(
150 kv::stream_id_mapping::ser_key(StreamId::new(&basin, &stream)),
151 kv::stream_id_mapping::ser_value(&basin, &stream),
152 )?;
153 }
154
155 static WRITE_OPTS: WriteOptions = WriteOptions {
156 await_durable: true,
157 };
158 txn.commit_with_options(&WRITE_OPTS).await?;
159
160 if existing_created_at.is_some()
161 && let Some(client) = self.streamer_client_if_active(&basin, &stream)
162 {
163 client.advise_reconfig(config);
164 }
165
166 let info = StreamInfo {
167 name: stream,
168 created_at,
169 deleted_at: None,
170 };
171
172 Ok(if existing_created_at.is_some() {
173 CreatedOrReconfigured::Reconfigured(info)
174 } else {
175 CreatedOrReconfigured::Created(info)
176 })
177 }
178
179 pub async fn get_stream_config(
180 &self,
181 basin: BasinName,
182 stream: StreamName,
183 ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
184 let meta = self
185 .db_get(
186 kv::stream_meta::ser_key(&basin, &stream),
187 kv::stream_meta::deser_value,
188 )
189 .await?
190 .ok_or_else(|| StreamNotFoundError { basin, stream })?;
191 Ok(meta.config)
192 }
193
194 pub async fn reconfigure_stream(
195 &self,
196 basin: BasinName,
197 stream: StreamName,
198 reconfig: StreamReconfiguration,
199 ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
200 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
201
202 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
203
204 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
205 .await?
206 .ok_or_else(|| StreamNotFoundError {
207 basin: basin.clone(),
208 stream: stream.clone(),
209 })?;
210
211 if meta.deleted_at.is_some() {
212 return Err(StreamDeletionPendingError { basin, stream }.into());
213 }
214
215 meta.config = meta.config.reconfigure(reconfig);
216
217 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
218
219 static WRITE_OPTS: WriteOptions = WriteOptions {
220 await_durable: true,
221 };
222 txn.commit_with_options(&WRITE_OPTS).await?;
223
224 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
225 client.advise_reconfig(meta.config.clone());
226 }
227
228 Ok(meta.config)
229 }
230
231 #[instrument(ret, err, skip(self))]
232 pub async fn delete_stream(
233 &self,
234 basin: BasinName,
235 stream: StreamName,
236 ) -> Result<(), DeleteStreamError> {
237 match self.streamer_client(&basin, &stream).await {
238 Ok(client) => {
239 client.terminal_trim().await?;
240 }
241 Err(StreamerError::Storage(e)) => {
242 return Err(DeleteStreamError::Storage(e));
243 }
244 Err(StreamerError::StreamNotFound(e)) => {
245 return Err(DeleteStreamError::StreamNotFound(e));
246 }
247 Err(StreamerError::StreamDeletionPending(e)) => {
248 assert_eq!(e.basin, basin);
249 assert_eq!(e.stream, stream);
250 }
251 }
252
253 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
254 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
255 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
256 .await?
257 .ok_or_else(|| StreamNotFoundError {
258 basin,
259 stream: stream.clone(),
260 })?;
261 if meta.deleted_at.is_none() {
262 meta.deleted_at = Some(OffsetDateTime::now_utc());
263 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
264 static WRITE_OPTS: WriteOptions = WriteOptions {
265 await_durable: true,
266 };
267 txn.commit_with_options(&WRITE_OPTS).await?;
268 }
269
270 Ok(())
271 }
272}
273
274fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
275 Bash::new(&[
276 req_token.as_bytes(),
277 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
278 .as_ref()
279 .map(|v| serde_json::to_vec(v).expect("serializable"))
280 .unwrap_or_default(),
281 ])
282}