1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::{BasinInfo, BasinName, BasinState, ListBasinsRequest},
5 config::{BasinConfig, BasinReconfiguration},
6 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7 stream::StreamNameStartAfter,
8 },
9};
10use slatedb::{
11 IsolationLevel,
12 config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18 error::{
19 BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
20 DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
21 },
22 kv,
23};
24
25impl Backend {
26 pub async fn list_basins(
27 &self,
28 request: ListBasinsRequest,
29 ) -> Result<Page<BasinInfo>, ListBasinsError> {
30 let ListItemsRequestParts {
31 prefix,
32 start_after,
33 limit,
34 } = request.into();
35
36 let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37 if key_range.is_empty() {
38 return Ok(Page::new_empty());
39 }
40
41 static SCAN_OPTS: ScanOptions = ScanOptions {
42 durability_filter: DurabilityLevel::Remote,
43 dirty: false,
44 read_ahead_bytes: 1,
45 cache_blocks: false,
46 max_fetch_tasks: 1,
47 };
48 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
49
50 let mut basins = Vec::with_capacity(limit.as_usize());
51 let mut has_more = false;
52 while let Some(kv) = it.next().await? {
53 let basin = kv::basin_meta::deser_key(kv.key)?;
54 assert!(basin.as_ref() > start_after.as_ref());
55 assert!(basin.as_ref() >= prefix.as_ref());
56 if basins.len() == limit.as_usize() {
57 has_more = true;
58 break;
59 }
60 let meta = kv::basin_meta::deser_value(kv.value)?;
61 let state = if meta.deleted_at.is_some() {
62 BasinState::Deleting
63 } else {
64 BasinState::Active
65 };
66 basins.push(BasinInfo {
67 name: basin,
68 scope: None,
69 state,
70 });
71 }
72 Ok(Page::new(basins, has_more))
73 }
74
75 pub async fn create_basin(
76 &self,
77 basin: BasinName,
78 config: impl Into<BasinReconfiguration>,
79 mode: CreateMode,
80 ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
81 let config = config.into();
82 let meta_key = kv::basin_meta::ser_key(&basin);
83
84 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
85
86 let new_creation_idempotency_key = match &mode {
87 CreateMode::CreateOnly(Some(req_token)) => {
88 let resolved = BasinConfig::default().reconfigure(config.clone());
89 Some(creation_idempotency_key(req_token, &resolved))
90 }
91 _ => None,
92 };
93
94 let mut existing_meta_opt = None;
95 if let Some(existing_meta) =
96 db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?
97 {
98 if existing_meta.deleted_at.is_some() {
99 return Err(BasinDeletionPendingError { basin }.into());
100 }
101 match mode {
102 CreateMode::CreateOnly(_) => {
103 return if new_creation_idempotency_key.is_some()
104 && existing_meta.creation_idempotency_key == new_creation_idempotency_key
105 {
106 Ok(CreatedOrReconfigured::Created(BasinInfo {
107 name: basin,
108 scope: None,
109 state: BasinState::Active,
110 }))
111 } else {
112 Err(BasinAlreadyExistsError { basin }.into())
113 };
114 }
115 CreateMode::CreateOrReconfigure => {
116 existing_meta_opt = Some(existing_meta);
117 }
118 }
119 }
120
121 let is_reconfigure = existing_meta_opt.is_some();
122 let (resolved, created_at, creation_idempotency_key) = match existing_meta_opt {
123 Some(existing) => (
124 existing.config.reconfigure(config),
125 existing.created_at,
126 existing.creation_idempotency_key,
127 ),
128 None => (
129 BasinConfig::default().reconfigure(config),
130 OffsetDateTime::now_utc(),
131 new_creation_idempotency_key,
132 ),
133 };
134
135 let meta = kv::basin_meta::BasinMeta {
136 config: resolved,
137 created_at,
138 deleted_at: None,
139 creation_idempotency_key,
140 };
141
142 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
143
144 static WRITE_OPTS: WriteOptions = WriteOptions {
145 await_durable: true,
146 };
147 txn.commit_with_options(&WRITE_OPTS).await?;
148
149 let info = BasinInfo {
150 name: basin,
151 scope: None,
152 state: BasinState::Active,
153 };
154
155 Ok(if is_reconfigure {
156 CreatedOrReconfigured::Reconfigured(info)
157 } else {
158 CreatedOrReconfigured::Created(info)
159 })
160 }
161
162 pub async fn get_basin_config(
163 &self,
164 basin: BasinName,
165 ) -> Result<BasinConfig, GetBasinConfigError> {
166 let Some(meta) = self
167 .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
168 .await?
169 else {
170 return Err(BasinNotFoundError { basin }.into());
171 };
172 Ok(meta.config)
173 }
174
175 pub async fn reconfigure_basin(
176 &self,
177 basin: BasinName,
178 reconfig: BasinReconfiguration,
179 ) -> Result<BasinConfig, ReconfigureBasinError> {
180 let meta_key = kv::basin_meta::ser_key(&basin);
181
182 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
183
184 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
185 return Err(BasinNotFoundError { basin }.into());
186 };
187
188 if meta.deleted_at.is_some() {
189 return Err(BasinDeletionPendingError { basin }.into());
190 }
191
192 meta.config = meta.config.reconfigure(reconfig);
193
194 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
195
196 static WRITE_OPTS: WriteOptions = WriteOptions {
197 await_durable: true,
198 };
199 txn.commit_with_options(&WRITE_OPTS).await?;
200
201 Ok(meta.config)
202 }
203
204 pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
205 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
206 let meta_key = kv::basin_meta::ser_key(&basin);
207 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
208 return Err(BasinNotFoundError { basin }.into());
209 };
210 if meta.deleted_at.is_none() {
211 meta.deleted_at = Some(OffsetDateTime::now_utc());
212 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
213 txn.put(
214 kv::basin_deletion_pending::ser_key(&basin),
215 kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
216 )?;
217 static WRITE_OPTS: WriteOptions = WriteOptions {
218 await_durable: true,
219 };
220 txn.commit_with_options(&WRITE_OPTS).await?;
221 self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
222 }
223 Ok(())
224 }
225}
226
227fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
228 Bash::length_prefixed(&[
229 req_token.as_bytes(),
230 &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
231 .expect("serializable"),
232 ])
233}