1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::{BasinInfo, BasinName, ListBasinsRequest},
5 config::{BasinConfig, BasinReconfiguration},
6 resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
7 stream::StreamNameStartAfter,
8 },
9};
10use slatedb::{
11 IsolationLevel, IterationOrder,
12 config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18 error::{
19 BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, DeleteBasinError,
20 GetBasinConfigError, ListBasinsError, ProvisionBasinError, ReconfigureBasinError,
21 },
22 kv,
23};
24
25impl Backend {
26 pub async fn list_basins(
27 &self,
28 request: ListBasinsRequest,
29 ) -> Result<Page<BasinInfo>, ListBasinsError> {
30 let ListItemsRequestParts {
31 prefix,
32 start_after,
33 limit,
34 } = request.into();
35
36 let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37 if key_range.is_empty() {
38 return Ok(Page::new_empty());
39 }
40
41 static SCAN_OPTS: ScanOptions = ScanOptions {
42 durability_filter: DurabilityLevel::Remote,
43 dirty: false,
44 read_ahead_bytes: 1,
45 cache_blocks: false,
46 max_fetch_tasks: 1,
47 order: IterationOrder::Ascending,
48 };
49 let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
50
51 let mut basins = Vec::with_capacity(limit.as_usize());
52 let mut has_more = false;
53 while let Some(kv) = it.next().await? {
54 let basin = kv::basin_meta::deser_key(kv.key)?;
55 assert!(basin.as_ref() > start_after.as_ref());
56 assert!(basin.as_ref() >= prefix.as_ref());
57 if basins.len() == limit.as_usize() {
58 has_more = true;
59 break;
60 }
61 let meta = kv::basin_meta::deser_value(kv.value)?;
62 basins.push(BasinInfo {
63 name: basin,
64 scope: None,
65 created_at: meta.created_at,
66 deleted_at: meta.deleted_at,
67 });
68 }
69 Ok(Page::new(basins, has_more))
70 }
71
72 pub async fn provision_basin(
73 &self,
74 basin: BasinName,
75 config: BasinConfig,
76 mode: ProvisionMode,
77 ) -> Result<ProvisionResult<BasinInfo>, ProvisionBasinError> {
78 let meta_key = kv::basin_meta::ser_key(&basin);
79
80 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
81
82 let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
83 if let Some(existing_meta) = &existing_meta
84 && existing_meta.deleted_at.is_some()
85 {
86 return Err(BasinDeletionPendingError { basin }.into());
87 }
88
89 let outcome = match (existing_meta, mode) {
90 (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
91 let new_creation_idempotency_key = request_token
92 .as_ref()
93 .map(|req_token| creation_idempotency_key(req_token, &config));
94 return if new_creation_idempotency_key.is_some()
95 && existing.creation_idempotency_key == new_creation_idempotency_key
96 {
97 Ok(ProvisionResult::Noop(BasinInfo {
98 name: basin,
99 scope: None,
100 created_at: existing.created_at,
101 deleted_at: None,
102 }))
103 } else {
104 Err(BasinAlreadyExistsError { basin }.into())
105 };
106 }
107 (Some(existing), ProvisionMode::Ensure) => {
108 let meta = kv::basin_meta::BasinMeta {
109 config,
110 created_at: existing.created_at,
111 deleted_at: None,
112 creation_idempotency_key: existing.creation_idempotency_key,
113 };
114 if existing.config == meta.config {
115 ProvisionResult::Noop(meta)
116 } else {
117 ProvisionResult::Updated(meta)
118 }
119 }
120 (None, ProvisionMode::CreateOnly { request_token }) => {
121 let new_creation_idempotency_key = request_token
122 .as_ref()
123 .map(|req_token| creation_idempotency_key(req_token, &config));
124 ProvisionResult::Created(kv::basin_meta::BasinMeta {
125 config,
126 created_at: OffsetDateTime::now_utc(),
127 deleted_at: None,
128 creation_idempotency_key: new_creation_idempotency_key,
129 })
130 }
131 (None, ProvisionMode::Ensure) => ProvisionResult::Created(kv::basin_meta::BasinMeta {
132 config,
133 created_at: OffsetDateTime::now_utc(),
134 deleted_at: None,
135 creation_idempotency_key: None,
136 }),
137 };
138
139 if !matches!(&outcome, ProvisionResult::Noop(_)) {
140 let meta = outcome.inner();
141 txn.put(&meta_key, kv::basin_meta::ser_value(meta))?;
142
143 static WRITE_OPTS: WriteOptions = WriteOptions {
144 await_durable: true,
145 };
146 txn.commit_with_options(&WRITE_OPTS).await?;
147 }
148
149 Ok(outcome.map(|meta| BasinInfo {
150 name: basin,
151 scope: None,
152 created_at: meta.created_at,
153 deleted_at: None,
154 }))
155 }
156
157 pub async fn get_basin_config(
158 &self,
159 basin: BasinName,
160 ) -> Result<BasinConfig, GetBasinConfigError> {
161 let Some(meta) = self
162 .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
163 .await?
164 else {
165 return Err(BasinNotFoundError { basin }.into());
166 };
167 Ok(meta.config)
168 }
169
170 pub async fn reconfigure_basin(
171 &self,
172 basin: BasinName,
173 reconfig: BasinReconfiguration,
174 ) -> Result<BasinConfig, ReconfigureBasinError> {
175 let meta_key = kv::basin_meta::ser_key(&basin);
176
177 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
178
179 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
180 return Err(BasinNotFoundError { basin }.into());
181 };
182
183 if meta.deleted_at.is_some() {
184 return Err(BasinDeletionPendingError { basin }.into());
185 }
186
187 meta.config = meta.config.reconfigure(reconfig);
188
189 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
190
191 static WRITE_OPTS: WriteOptions = WriteOptions {
192 await_durable: true,
193 };
194 txn.commit_with_options(&WRITE_OPTS).await?;
195
196 Ok(meta.config)
197 }
198
199 pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
200 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
201 let meta_key = kv::basin_meta::ser_key(&basin);
202 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
203 return Err(BasinNotFoundError { basin }.into());
204 };
205 if meta.deleted_at.is_none() {
206 meta.deleted_at = Some(OffsetDateTime::now_utc());
207 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
208 txn.put(
209 kv::basin_deletion_pending::ser_key(&basin),
210 kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
211 )?;
212 static WRITE_OPTS: WriteOptions = WriteOptions {
213 await_durable: true,
214 };
215 txn.commit_with_options(&WRITE_OPTS).await?;
216 self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
217 }
218 Ok(())
219 }
220}
221
222fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
223 Bash::length_prefixed(&[
224 req_token.as_bytes(),
225 &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
226 .expect("serializable"),
227 ])
228}