1use s2_common::{
2 bash::Bash,
3 types::{
4 basin::{BasinInfo, BasinName, ListBasinsRequest},
5 config::{BasinConfig, BasinReconfiguration},
6 resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
7 stream::StreamNameStartAfter,
8 },
9};
10use slatedb::{
11 IsolationLevel,
12 config::{DurabilityLevel, ScanOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18 error::{
19 BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, DeleteBasinError,
20 GetBasinConfigError, ListBasinsError, ProvisionBasinError, ReconfigureBasinError,
21 },
22 kv,
23};
24
25impl Backend {
26 pub async fn list_basins(
27 &self,
28 request: ListBasinsRequest,
29 ) -> Result<Page<BasinInfo>, ListBasinsError> {
30 let ListItemsRequestParts {
31 prefix,
32 start_after,
33 limit,
34 } = request.into();
35
36 let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37 if key_range.is_empty() {
38 return Ok(Page::new_empty());
39 }
40
41 let scan_opts = ScanOptions {
42 durability_filter: DurabilityLevel::Remote,
43 ..Default::default()
44 };
45 let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
46
47 let mut basins = Vec::with_capacity(limit.as_usize());
48 let mut has_more = false;
49 while let Some(kv) = it.next().await? {
50 let basin = kv::basin_meta::deser_key(kv.key)?;
51 assert!(basin.as_ref() > start_after.as_ref());
52 assert!(basin.as_ref() >= prefix.as_ref());
53 if basins.len() == limit.as_usize() {
54 has_more = true;
55 break;
56 }
57 let meta = kv::basin_meta::deser_value(kv.value)?;
58 basins.push(BasinInfo {
59 name: basin,
60 location: None,
61 created_at: meta.created_at,
62 deleted_at: meta.deleted_at,
63 });
64 }
65 Ok(Page::new(basins, has_more))
66 }
67
68 pub async fn provision_basin(
69 &self,
70 basin: BasinName,
71 config: BasinConfig,
72 mode: ProvisionMode,
73 ) -> Result<ProvisionResult<BasinInfo>, ProvisionBasinError> {
74 let meta_key = kv::basin_meta::ser_key(&basin);
75
76 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
77
78 let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
79 if let Some(existing_meta) = &existing_meta
80 && existing_meta.deleted_at.is_some()
81 {
82 return Err(BasinDeletionPendingError { basin }.into());
83 }
84
85 let outcome = match (existing_meta, mode) {
86 (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
87 let new_creation_idempotency_key = request_token
88 .as_ref()
89 .map(|req_token| creation_idempotency_key(req_token, &config));
90 return if new_creation_idempotency_key.is_some()
91 && existing.creation_idempotency_key == new_creation_idempotency_key
92 {
93 Ok(ProvisionResult::Noop(BasinInfo {
94 name: basin,
95 location: None,
96 created_at: existing.created_at,
97 deleted_at: None,
98 }))
99 } else {
100 Err(BasinAlreadyExistsError { basin }.into())
101 };
102 }
103 (Some(existing), ProvisionMode::Ensure) => {
104 let meta = kv::basin_meta::BasinMeta {
105 config,
106 created_at: existing.created_at,
107 deleted_at: None,
108 creation_idempotency_key: existing.creation_idempotency_key,
109 };
110 if existing.config == meta.config {
111 ProvisionResult::Noop(meta)
112 } else {
113 ProvisionResult::Updated(meta)
114 }
115 }
116 (None, ProvisionMode::CreateOnly { request_token }) => {
117 let new_creation_idempotency_key = request_token
118 .as_ref()
119 .map(|req_token| creation_idempotency_key(req_token, &config));
120 ProvisionResult::Created(kv::basin_meta::BasinMeta {
121 config,
122 created_at: OffsetDateTime::now_utc(),
123 deleted_at: None,
124 creation_idempotency_key: new_creation_idempotency_key,
125 })
126 }
127 (None, ProvisionMode::Ensure) => ProvisionResult::Created(kv::basin_meta::BasinMeta {
128 config,
129 created_at: OffsetDateTime::now_utc(),
130 deleted_at: None,
131 creation_idempotency_key: None,
132 }),
133 };
134
135 if !matches!(&outcome, ProvisionResult::Noop(_)) {
136 let meta = outcome.inner();
137 txn.put(&meta_key, kv::basin_meta::ser_value(meta))?;
138
139 txn.commit().await?;
140 }
141
142 Ok(outcome.map(|meta| BasinInfo {
143 name: basin,
144 location: None,
145 created_at: meta.created_at,
146 deleted_at: None,
147 }))
148 }
149
150 pub async fn get_basin_config(
151 &self,
152 basin: BasinName,
153 ) -> Result<BasinConfig, GetBasinConfigError> {
154 let Some(meta) = self
155 .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
156 .await?
157 else {
158 return Err(BasinNotFoundError { basin }.into());
159 };
160 Ok(meta.config)
161 }
162
163 pub async fn reconfigure_basin(
164 &self,
165 basin: BasinName,
166 reconfig: BasinReconfiguration,
167 ) -> Result<BasinConfig, ReconfigureBasinError> {
168 let meta_key = kv::basin_meta::ser_key(&basin);
169
170 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
171
172 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
173 return Err(BasinNotFoundError { basin }.into());
174 };
175
176 if meta.deleted_at.is_some() {
177 return Err(BasinDeletionPendingError { basin }.into());
178 }
179
180 meta.config = meta.config.reconfigure(reconfig);
181
182 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
183
184 txn.commit().await?;
185
186 Ok(meta.config)
187 }
188
189 pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
190 let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
191 let meta_key = kv::basin_meta::ser_key(&basin);
192 let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
193 return Err(BasinNotFoundError { basin }.into());
194 };
195 if meta.deleted_at.is_none() {
196 meta.deleted_at = Some(OffsetDateTime::now_utc());
197 txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
198 txn.put(
199 kv::basin_deletion_pending::ser_key(&basin),
200 kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
201 )?;
202 txn.commit().await?;
203 self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
204 }
205 Ok(())
206 }
207}
208
209fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
210 Bash::length_prefixed(&[
211 req_token.as_bytes(),
212 &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
213 .expect("serializable"),
214 ])
215}