1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::{BasinInfo, BasinName, ListBasinsRequest},
5 config::{BasinConfig, BasinReconfiguration},
6 resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7 stream::StreamNameStartAfter,
8 },
9};
10use slatedb::{
11 IsolationLevel, IterationOrder,
12 config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18 error::{
19 BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
20 DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
21 },
22 kv,
23};
24
25impl Backend {
26 pub async fn list_basins(
27 &self,
28 request: ListBasinsRequest,
29 ) -> Result<Page<BasinInfo>, ListBasinsError> {
30 let ListItemsRequestParts {
31 prefix,
32 start_after,
33 limit,
34 } = request.into();
35
36 let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37 if key_range.is_empty() {
38 return Ok(Page::new_empty());
39 }
40
41 static SCAN_OPTS: ScanOptions = ScanOptions {
42 durability_filter: DurabilityLevel::Remote,
43 dirty: false,
44 read_ahead_bytes: 1,
45 cache_blocks: false,
46 max_fetch_tasks: 1,
47 order: IterationOrder::Ascending,
48 };
49 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
50
51 let mut basins = Vec::with_capacity(limit.as_usize());
52 let mut has_more = false;
53 while let Some(kv) = it.next().await? {
54 let basin = kv::basin_meta::deser_key(kv.key)?;
55 assert!(basin.as_ref() > start_after.as_ref());
56 assert!(basin.as_ref() >= prefix.as_ref());
57 if basins.len() == limit.as_usize() {
58 has_more = true;
59 break;
60 }
61 let meta = kv::basin_meta::deser_value(kv.value)?;
62 basins.push(BasinInfo {
63 name: basin,
64 scope: None,
65 created_at: meta.created_at,
66 deleted_at: meta.deleted_at,
67 });
68 }
69 Ok(Page::new(basins, has_more))
70 }
71
72 pub async fn create_basin(
73 &self,
74 basin: BasinName,
75 config: impl Into<BasinReconfiguration>,
76 mode: CreateMode,
77 ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
78 let config = config.into();
79 let meta_key = kv::basin_meta::ser_key(&basin);
80
81 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
82
83 let new_creation_idempotency_key = match &mode {
84 CreateMode::CreateOnly(Some(req_token)) => {
85 let resolved = BasinConfig::default().reconfigure(config.clone());
86 Some(creation_idempotency_key(req_token, &resolved))
87 }
88 _ => None,
89 };
90
91 let mut existing_meta_opt = None;
92 if let Some(existing_meta) =
93 db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?
94 {
95 if existing_meta.deleted_at.is_some() {
96 return Err(BasinDeletionPendingError { basin }.into());
97 }
98 match mode {
99 CreateMode::CreateOnly(_) => {
100 return if new_creation_idempotency_key.is_some()
101 && existing_meta.creation_idempotency_key == new_creation_idempotency_key
102 {
103 Ok(CreatedOrReconfigured::Created(BasinInfo {
104 name: basin,
105 scope: None,
106 created_at: existing_meta.created_at,
107 deleted_at: None,
108 }))
109 } else {
110 Err(BasinAlreadyExistsError { basin }.into())
111 };
112 }
113 CreateMode::CreateOrReconfigure => {
114 existing_meta_opt = Some(existing_meta);
115 }
116 }
117 }
118
119 let is_reconfigure = existing_meta_opt.is_some();
120 let (resolved, created_at, creation_idempotency_key) = match existing_meta_opt {
121 Some(existing) => (
122 existing.config.reconfigure(config),
123 existing.created_at,
124 existing.creation_idempotency_key,
125 ),
126 None => (
127 BasinConfig::default().reconfigure(config),
128 OffsetDateTime::now_utc(),
129 new_creation_idempotency_key,
130 ),
131 };
132
133 let meta = kv::basin_meta::BasinMeta {
134 config: resolved,
135 created_at,
136 deleted_at: None,
137 creation_idempotency_key,
138 };
139
140 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
141
142 static WRITE_OPTS: WriteOptions = WriteOptions {
143 await_durable: true,
144 };
145 txn.commit_with_options(&WRITE_OPTS).await?;
146
147 let info = BasinInfo {
148 name: basin,
149 scope: None,
150 created_at,
151 deleted_at: None,
152 };
153
154 Ok(if is_reconfigure {
155 CreatedOrReconfigured::Reconfigured(info)
156 } else {
157 CreatedOrReconfigured::Created(info)
158 })
159 }
160
161 pub async fn get_basin_config(
162 &self,
163 basin: BasinName,
164 ) -> Result<BasinConfig, GetBasinConfigError> {
165 let Some(meta) = self
166 .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
167 .await?
168 else {
169 return Err(BasinNotFoundError { basin }.into());
170 };
171 Ok(meta.config)
172 }
173
174 pub async fn reconfigure_basin(
175 &self,
176 basin: BasinName,
177 reconfig: BasinReconfiguration,
178 ) -> Result<BasinConfig, ReconfigureBasinError> {
179 let meta_key = kv::basin_meta::ser_key(&basin);
180
181 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
182
183 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
184 return Err(BasinNotFoundError { basin }.into());
185 };
186
187 if meta.deleted_at.is_some() {
188 return Err(BasinDeletionPendingError { basin }.into());
189 }
190
191 meta.config = meta.config.reconfigure(reconfig);
192
193 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
194
195 static WRITE_OPTS: WriteOptions = WriteOptions {
196 await_durable: true,
197 };
198 txn.commit_with_options(&WRITE_OPTS).await?;
199
200 Ok(meta.config)
201 }
202
203 pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
204 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
205 let meta_key = kv::basin_meta::ser_key(&basin);
206 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
207 return Err(BasinNotFoundError { basin }.into());
208 };
209 if meta.deleted_at.is_none() {
210 meta.deleted_at = Some(OffsetDateTime::now_utc());
211 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
212 txn.put(
213 kv::basin_deletion_pending::ser_key(&basin),
214 kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
215 )?;
216 static WRITE_OPTS: WriteOptions = WriteOptions {
217 await_durable: true,
218 };
219 txn.commit_with_options(&WRITE_OPTS).await?;
220 self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
221 }
222 Ok(())
223 }
224}
225
226fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
227 Bash::length_prefixed(&[
228 req_token.as_bytes(),
229 &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
230 .expect("serializable"),
231 ])
232}