1use s2_common::{
2 basin::{BasinInfo, BasinName, ListBasinsRequest},
3 config::{BasinConfig, BasinReconfiguration},
4 resources::{Page, ProvisionMode, ProvisionResult, RequestToken},
5 stream::StreamNameStartAfter,
6};
7use s2_storage::bash::Bash;
8use slatedb::{
9 IsolationLevel,
10 config::{DurabilityLevel, ScanOptions},
11};
12use time::OffsetDateTime;
13
14use super::{Backend, bgtasks::BgtaskTrigger, store::db_txn_get};
15use crate::backend::{
16 error::{
17 BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, DeleteBasinError,
18 GetBasinConfigError, ListBasinsError, ProvisionBasinError, ReconfigureBasinError,
19 },
20 kv,
21};
22
23impl Backend {
24 pub async fn list_basins(
25 &self,
26 request: ListBasinsRequest,
27 ) -> Result<Page<BasinInfo>, ListBasinsError> {
28 let ListBasinsRequest {
29 prefix,
30 start_after,
31 limit,
32 } = request;
33
34 let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
35 if key_range.is_empty() {
36 return Ok(Page::new_empty());
37 }
38
39 let scan_opts = ScanOptions {
40 durability_filter: DurabilityLevel::Remote,
41 ..Default::default()
42 };
43 let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
44
45 let mut basins = Vec::with_capacity(limit.as_usize());
46 let mut has_more = false;
47 while let Some(kv) = it.next().await? {
48 let basin = kv::basin_meta::deser_key(kv.key)?;
49 assert!(basin.as_ref() > start_after.as_ref());
50 assert!(basin.as_ref() >= prefix.as_ref());
51 if basins.len() == limit.as_usize() {
52 has_more = true;
53 break;
54 }
55 let meta = kv::basin_meta::deser_value(kv.value)?;
56 basins.push(BasinInfo {
57 name: basin,
58 location: None,
59 created_at: meta.created_at,
60 deleted_at: meta.deleted_at,
61 });
62 }
63 Ok(Page::new(basins, has_more))
64 }
65
66 pub async fn provision_basin(
67 &self,
68 basin: BasinName,
69 config: BasinConfig,
70 mode: ProvisionMode,
71 ) -> Result<ProvisionResult<BasinInfo>, ProvisionBasinError> {
72 let meta_key = kv::basin_meta::ser_key(&basin);
73
74 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
75
76 let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
77 if let Some(existing_meta) = &existing_meta
78 && existing_meta.deleted_at.is_some()
79 {
80 return Err(BasinDeletionPendingError { basin }.into());
81 }
82
83 let outcome = match (existing_meta, mode) {
84 (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
85 let new_creation_idempotency_key = request_token
86 .as_ref()
87 .map(|req_token| creation_idempotency_key(req_token, &config));
88 return if new_creation_idempotency_key.is_some()
89 && existing.creation_idempotency_key == new_creation_idempotency_key
90 {
91 Ok(ProvisionResult::Noop(BasinInfo {
92 name: basin,
93 location: None,
94 created_at: existing.created_at,
95 deleted_at: None,
96 }))
97 } else {
98 Err(BasinAlreadyExistsError { basin }.into())
99 };
100 }
101 (Some(existing), ProvisionMode::Ensure) => {
102 let meta = kv::basin_meta::BasinMeta {
103 config,
104 created_at: existing.created_at,
105 deleted_at: None,
106 creation_idempotency_key: existing.creation_idempotency_key,
107 };
108 if existing.config == meta.config {
109 ProvisionResult::Noop(meta)
110 } else {
111 ProvisionResult::Updated(meta)
112 }
113 }
114 (None, ProvisionMode::CreateOnly { request_token }) => {
115 let new_creation_idempotency_key = request_token
116 .as_ref()
117 .map(|req_token| creation_idempotency_key(req_token, &config));
118 ProvisionResult::Created(kv::basin_meta::BasinMeta {
119 config,
120 created_at: OffsetDateTime::now_utc(),
121 deleted_at: None,
122 creation_idempotency_key: new_creation_idempotency_key,
123 })
124 }
125 (None, ProvisionMode::Ensure) => ProvisionResult::Created(kv::basin_meta::BasinMeta {
126 config,
127 created_at: OffsetDateTime::now_utc(),
128 deleted_at: None,
129 creation_idempotency_key: None,
130 }),
131 };
132
133 if !matches!(&outcome, ProvisionResult::Noop(_)) {
134 let meta = outcome.inner();
135 txn.put(&meta_key, kv::basin_meta::ser_value(meta))?;
136
137 txn.commit().await?;
138 }
139
140 Ok(outcome.map(|meta| BasinInfo {
141 name: basin,
142 location: None,
143 created_at: meta.created_at,
144 deleted_at: None,
145 }))
146 }
147
148 pub async fn get_basin_config(
149 &self,
150 basin: BasinName,
151 ) -> Result<BasinConfig, GetBasinConfigError> {
152 let Some(meta) = self
153 .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
154 .await?
155 else {
156 return Err(BasinNotFoundError { basin }.into());
157 };
158 Ok(meta.config)
159 }
160
161 pub async fn reconfigure_basin(
162 &self,
163 basin: BasinName,
164 reconfig: BasinReconfiguration,
165 ) -> Result<BasinConfig, ReconfigureBasinError> {
166 let meta_key = kv::basin_meta::ser_key(&basin);
167
168 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
169
170 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
171 return Err(BasinNotFoundError { basin }.into());
172 };
173
174 if meta.deleted_at.is_some() {
175 return Err(BasinDeletionPendingError { basin }.into());
176 }
177
178 meta.config = meta.config.reconfigure(reconfig);
179
180 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
181
182 txn.commit().await?;
183
184 Ok(meta.config)
185 }
186
187 pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
188 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
189 let meta_key = kv::basin_meta::ser_key(&basin);
190 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
191 return Err(BasinNotFoundError { basin }.into());
192 };
193 if meta.deleted_at.is_none() {
194 meta.deleted_at = Some(OffsetDateTime::now_utc());
195 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
196 txn.put(
197 kv::basin_deletion_pending::ser_key(&basin),
198 kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
199 )?;
200 txn.commit().await?;
201 self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
202 }
203 Ok(())
204 }
205}
206
207fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
208 Bash::length_prefixed(&[
209 req_token.as_bytes(),
210 &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
211 .expect("serializable"),
212 ])
213}