1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::BasinName,
5 config::{OptionalStreamConfig, StreamReconfiguration},
6 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7 stream::{ListStreamsRequest, StreamInfo, StreamName},
8 },
9};
10use slatedb::{
11 IsolationLevel,
12 config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, store::db_txn_get};
17use crate::backend::{
18 error::{
19 BasinDeletionPendingError, BasinNotFoundError, CreateStreamError, DeleteStreamError,
20 GetStreamConfigError, ListStreamsError, ReconfigureStreamError, StreamAlreadyExistsError,
21 StreamDeletionPendingError, StreamNotFoundError, StreamerError,
22 },
23 kv,
24};
25
26impl Backend {
27 pub async fn list_streams(
28 &self,
29 basin: BasinName,
30 request: ListStreamsRequest,
31 ) -> Result<Page<StreamInfo>, ListStreamsError> {
32 let ListItemsRequestParts {
33 prefix,
34 start_after,
35 limit,
36 } = request.into();
37
38 let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
39 if key_range.is_empty() {
40 return Ok(Page::new_empty());
41 }
42
43 const SCAN_OPTS: ScanOptions = ScanOptions {
44 durability_filter: DurabilityLevel::Remote,
45 dirty: false,
46 read_ahead_bytes: 1,
47 cache_blocks: false,
48 max_fetch_tasks: 1,
49 };
50 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
51
52 let mut streams = Vec::with_capacity(limit.as_usize());
53 let mut has_more = false;
54 while let Some(kv) = it.next().await? {
55 let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
56 assert_eq!(deser_basin.as_ref(), basin.as_ref());
57 assert!(stream.as_ref() > start_after.as_ref());
58 assert!(stream.as_ref() >= prefix.as_ref());
59 if streams.len() == limit.as_usize() {
60 has_more = true;
61 break;
62 }
63 let meta = kv::stream_meta::deser_value(kv.value)?;
64 streams.push(StreamInfo {
65 name: stream,
66 created_at: meta.created_at,
67 deleted_at: meta.deleted_at,
68 });
69 }
70 Ok(Page::new(streams, has_more))
71 }
72
73 pub async fn create_stream(
74 &self,
75 basin: BasinName,
76 stream: StreamName,
77 mut config: OptionalStreamConfig,
78 mode: CreateMode,
79 ) -> Result<CreatedOrReconfigured<StreamInfo>, CreateStreamError> {
80 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
81
82 let Some(basin_meta) = db_txn_get(
83 &txn,
84 kv::basin_meta::ser_key(&basin),
85 kv::basin_meta::deser_value,
86 )
87 .await?
88 else {
89 return Err(BasinNotFoundError { basin }.into());
90 };
91
92 if basin_meta.deleted_at.is_some() {
93 return Err(BasinDeletionPendingError { basin }.into());
94 }
95
96 let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);
97
98 let creation_idempotency_key = match &mode {
99 CreateMode::CreateOnly(Some(req_token)) => {
100 Some(creation_idempotency_key(req_token, &config))
101 }
102 _ => None,
103 };
104
105 let mut existing_created_at = None;
106
107 if let Some(existing_meta) =
108 db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?
109 {
110 if existing_meta.deleted_at.is_some() {
111 return Err(CreateStreamError::StreamDeletionPending(
112 StreamDeletionPendingError { basin, stream },
113 ));
114 }
115 match mode {
116 CreateMode::CreateOnly(_) => {
117 return if creation_idempotency_key.is_some()
118 && existing_meta.creation_idempotency_key == creation_idempotency_key
119 {
120 Ok(CreatedOrReconfigured::Created(StreamInfo {
121 name: stream,
122 created_at: existing_meta.created_at,
123 deleted_at: None,
124 }))
125 } else {
126 Err(StreamAlreadyExistsError { basin, stream }.into())
127 };
128 }
129 CreateMode::CreateOrReconfigure => {
130 existing_created_at = Some(existing_meta.created_at);
131 }
132 }
133 }
134
135 config = config.merge(basin_meta.config.default_stream_config).into();
136
137 let created_at = existing_created_at.unwrap_or_else(OffsetDateTime::now_utc);
138 let meta = kv::stream_meta::StreamMeta {
139 config: config.clone(),
140 created_at,
141 deleted_at: None,
142 creation_idempotency_key,
143 };
144
145 txn.put(&stream_meta_key, kv::stream_meta::ser_value(&meta))?;
146
147 static WRITE_OPTS: WriteOptions = WriteOptions {
148 await_durable: true,
149 };
150 txn.commit_with_options(&WRITE_OPTS).await?;
151
152 if existing_created_at.is_some()
153 && let Some(client) = self.streamer_client_if_active(&basin, &stream)
154 {
155 client.advise_reconfig(config);
156 }
157
158 let info = StreamInfo {
159 name: stream,
160 created_at,
161 deleted_at: None,
162 };
163
164 Ok(if existing_created_at.is_some() {
165 CreatedOrReconfigured::Reconfigured(info)
166 } else {
167 CreatedOrReconfigured::Created(info)
168 })
169 }
170
171 pub async fn get_stream_config(
172 &self,
173 basin: BasinName,
174 stream: StreamName,
175 ) -> Result<OptionalStreamConfig, GetStreamConfigError> {
176 let meta = self
177 .db_get(
178 kv::stream_meta::ser_key(&basin, &stream),
179 kv::stream_meta::deser_value,
180 )
181 .await?
182 .ok_or_else(|| StreamNotFoundError { basin, stream })?;
183 Ok(meta.config)
184 }
185
186 pub async fn reconfigure_stream(
187 &self,
188 basin: BasinName,
189 stream: StreamName,
190 reconfig: StreamReconfiguration,
191 ) -> Result<OptionalStreamConfig, ReconfigureStreamError> {
192 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
193
194 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
195
196 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
197 .await?
198 .ok_or_else(|| StreamNotFoundError {
199 basin: basin.clone(),
200 stream: stream.clone(),
201 })?;
202
203 if meta.deleted_at.is_some() {
204 return Err(StreamDeletionPendingError { basin, stream }.into());
205 }
206
207 meta.config = meta.config.reconfigure(reconfig);
208
209 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
210
211 static WRITE_OPTS: WriteOptions = WriteOptions {
212 await_durable: true,
213 };
214 txn.commit_with_options(&WRITE_OPTS).await?;
215
216 if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
217 client.advise_reconfig(meta.config.clone());
218 }
219
220 Ok(meta.config)
221 }
222
223 pub async fn delete_stream(
224 &self,
225 basin: BasinName,
226 stream: StreamName,
227 ) -> Result<(), DeleteStreamError> {
228 match self.streamer_client(&basin, &stream).await {
229 Ok(client) => {
230 client.terminal_trim().await?;
231 }
232 Err(StreamerError::Storage(e)) => {
233 return Err(DeleteStreamError::Storage(e));
234 }
235 Err(StreamerError::StreamNotFound(e)) => {
236 return Err(DeleteStreamError::StreamNotFound(e));
237 }
238 Err(StreamerError::StreamDeletionPending(e)) => {
239 assert_eq!(e.basin, basin);
240 assert_eq!(e.stream, stream);
241 }
242 }
243
244 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
245 let meta_key = kv::stream_meta::ser_key(&basin, &stream);
246 let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
247 .await?
248 .ok_or_else(|| StreamNotFoundError {
249 basin,
250 stream: stream.clone(),
251 })?;
252 if meta.deleted_at.is_none() {
253 meta.deleted_at = Some(OffsetDateTime::now_utc());
254 txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
255 static WRITE_OPTS: WriteOptions = WriteOptions {
256 await_durable: true,
257 };
258 txn.commit_with_options(&WRITE_OPTS).await?;
259 }
260
261 Ok(())
262 }
263}
264
265fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
266 Bash::new(&[
267 req_token.as_bytes(),
268 &s2_api::v1::config::StreamConfig::to_opt(config.clone())
269 .as_ref()
270 .map(|v| serde_json::to_vec(v).expect("serializable"))
271 .unwrap_or_default(),
272 ])
273}