1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::{BasinInfo, BasinName, CreateBasinIntent, ListBasinsRequest},
5 config::{BasinConfig, BasinReconfiguration},
6 resources::{ListItemsRequestParts, Page, RequestToken},
7 stream::StreamNameStartAfter,
8 },
9};
10use slatedb::{
11 IsolationLevel, IterationOrder,
12 config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18 error::{
19 BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
20 DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
21 },
22 kv,
23};
24
25impl Backend {
26 pub async fn list_basins(
27 &self,
28 request: ListBasinsRequest,
29 ) -> Result<Page<BasinInfo>, ListBasinsError> {
30 let ListItemsRequestParts {
31 prefix,
32 start_after,
33 limit,
34 } = request.into();
35
36 let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37 if key_range.is_empty() {
38 return Ok(Page::new_empty());
39 }
40
41 static SCAN_OPTS: ScanOptions = ScanOptions {
42 durability_filter: DurabilityLevel::Remote,
43 dirty: false,
44 read_ahead_bytes: 1,
45 cache_blocks: false,
46 max_fetch_tasks: 1,
47 order: IterationOrder::Ascending,
48 };
49 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
50
51 let mut basins = Vec::with_capacity(limit.as_usize());
52 let mut has_more = false;
53 while let Some(kv) = it.next().await? {
54 let basin = kv::basin_meta::deser_key(kv.key)?;
55 assert!(basin.as_ref() > start_after.as_ref());
56 assert!(basin.as_ref() >= prefix.as_ref());
57 if basins.len() == limit.as_usize() {
58 has_more = true;
59 break;
60 }
61 let meta = kv::basin_meta::deser_value(kv.value)?;
62 basins.push(BasinInfo {
63 name: basin,
64 scope: None,
65 created_at: meta.created_at,
66 deleted_at: meta.deleted_at,
67 });
68 }
69 Ok(Page::new(basins, has_more))
70 }
71
72 pub async fn create_basin(
73 &self,
74 basin: BasinName,
75 intent: CreateBasinIntent,
76 ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
77 let meta_key = kv::basin_meta::ser_key(&basin);
78
79 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
80
81 let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
82 if let Some(existing_meta) = &existing_meta
83 && existing_meta.deleted_at.is_some()
84 {
85 return Err(BasinDeletionPendingError { basin }.into());
86 }
87
88 let (meta, is_reconfigure) = match (existing_meta, intent) {
89 (
90 Some(existing),
91 CreateBasinIntent::CreateOnly {
92 config,
93 request_token,
94 },
95 ) => {
96 let new_creation_idempotency_key = request_token
97 .as_ref()
98 .map(|req_token| creation_idempotency_key(req_token, &config));
99 return if new_creation_idempotency_key.is_some()
100 && existing.creation_idempotency_key == new_creation_idempotency_key
101 {
102 Ok(CreatedOrReconfigured::Created(BasinInfo {
103 name: basin,
104 scope: None,
105 created_at: existing.created_at,
106 deleted_at: None,
107 }))
108 } else {
109 Err(BasinAlreadyExistsError { basin }.into())
110 };
111 }
112 (Some(existing), CreateBasinIntent::CreateOrReconfigure { reconfiguration }) => (
113 kv::basin_meta::BasinMeta {
114 config: existing.config.reconfigure(reconfiguration),
115 created_at: existing.created_at,
116 deleted_at: None,
117 creation_idempotency_key: existing.creation_idempotency_key,
118 },
119 true,
120 ),
121 (
122 None,
123 CreateBasinIntent::CreateOnly {
124 config,
125 request_token,
126 },
127 ) => {
128 let new_creation_idempotency_key = request_token
129 .as_ref()
130 .map(|req_token| creation_idempotency_key(req_token, &config));
131 (
132 kv::basin_meta::BasinMeta {
133 config,
134 created_at: OffsetDateTime::now_utc(),
135 deleted_at: None,
136 creation_idempotency_key: new_creation_idempotency_key,
137 },
138 false,
139 )
140 }
141 (None, CreateBasinIntent::CreateOrReconfigure { reconfiguration }) => (
142 kv::basin_meta::BasinMeta {
143 config: BasinConfig::default().reconfigure(reconfiguration),
144 created_at: OffsetDateTime::now_utc(),
145 deleted_at: None,
146 creation_idempotency_key: None,
147 },
148 false,
149 ),
150 };
151
152 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
153
154 static WRITE_OPTS: WriteOptions = WriteOptions {
155 await_durable: true,
156 };
157 txn.commit_with_options(&WRITE_OPTS).await?;
158
159 let info = BasinInfo {
160 name: basin,
161 scope: None,
162 created_at: meta.created_at,
163 deleted_at: None,
164 };
165
166 Ok(if is_reconfigure {
167 CreatedOrReconfigured::Reconfigured(info)
168 } else {
169 CreatedOrReconfigured::Created(info)
170 })
171 }
172
173 pub async fn get_basin_config(
174 &self,
175 basin: BasinName,
176 ) -> Result<BasinConfig, GetBasinConfigError> {
177 let Some(meta) = self
178 .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
179 .await?
180 else {
181 return Err(BasinNotFoundError { basin }.into());
182 };
183 Ok(meta.config)
184 }
185
186 pub async fn reconfigure_basin(
187 &self,
188 basin: BasinName,
189 reconfig: BasinReconfiguration,
190 ) -> Result<BasinConfig, ReconfigureBasinError> {
191 let meta_key = kv::basin_meta::ser_key(&basin);
192
193 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
194
195 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
196 return Err(BasinNotFoundError { basin }.into());
197 };
198
199 if meta.deleted_at.is_some() {
200 return Err(BasinDeletionPendingError { basin }.into());
201 }
202
203 meta.config = meta.config.reconfigure(reconfig);
204
205 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
206
207 static WRITE_OPTS: WriteOptions = WriteOptions {
208 await_durable: true,
209 };
210 txn.commit_with_options(&WRITE_OPTS).await?;
211
212 Ok(meta.config)
213 }
214
215 pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
216 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
217 let meta_key = kv::basin_meta::ser_key(&basin);
218 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
219 return Err(BasinNotFoundError { basin }.into());
220 };
221 if meta.deleted_at.is_none() {
222 meta.deleted_at = Some(OffsetDateTime::now_utc());
223 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
224 txn.put(
225 kv::basin_deletion_pending::ser_key(&basin),
226 kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
227 )?;
228 static WRITE_OPTS: WriteOptions = WriteOptions {
229 await_durable: true,
230 };
231 txn.commit_with_options(&WRITE_OPTS).await?;
232 self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
233 }
234 Ok(())
235 }
236}
237
238fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
239 Bash::length_prefixed(&[
240 req_token.as_bytes(),
241 &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
242 .expect("serializable"),
243 ])
244}