Skip to main content

s2_lite/backend/
basins.rs

1use s2_common::{
2    basin::{BasinInfo, BasinName, ListBasinsRequest},
3    config::{BasinConfig, BasinReconfiguration},
4    resources::{Page, ProvisionMode, ProvisionResult, RequestToken},
5    stream::StreamNameStartAfter,
6};
7use s2_storage::bash::Bash;
8use slatedb::{
9    IsolationLevel,
10    config::{DurabilityLevel, ScanOptions},
11};
12use time::OffsetDateTime;
13
14use super::{Backend, bgtasks::BgtaskTrigger, store::db_txn_get};
15use crate::backend::{
16    error::{
17        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, DeleteBasinError,
18        GetBasinConfigError, ListBasinsError, ProvisionBasinError, ReconfigureBasinError,
19    },
20    kv,
21};
22
23impl Backend {
24    pub async fn list_basins(
25        &self,
26        request: ListBasinsRequest,
27    ) -> Result<Page<BasinInfo>, ListBasinsError> {
28        let ListBasinsRequest {
29            prefix,
30            start_after,
31            limit,
32        } = request;
33
34        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
35        if key_range.is_empty() {
36            return Ok(Page::new_empty());
37        }
38
39        let scan_opts = ScanOptions {
40            durability_filter: DurabilityLevel::Remote,
41            ..Default::default()
42        };
43        let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
44
45        let mut basins = Vec::with_capacity(limit.as_usize());
46        let mut has_more = false;
47        while let Some(kv) = it.next().await? {
48            let basin = kv::basin_meta::deser_key(kv.key)?;
49            assert!(basin.as_ref() > start_after.as_ref());
50            assert!(basin.as_ref() >= prefix.as_ref());
51            if basins.len() == limit.as_usize() {
52                has_more = true;
53                break;
54            }
55            let meta = kv::basin_meta::deser_value(kv.value)?;
56            basins.push(BasinInfo {
57                name: basin,
58                location: None,
59                created_at: meta.created_at,
60                deleted_at: meta.deleted_at,
61            });
62        }
63        Ok(Page::new(basins, has_more))
64    }
65
66    pub async fn provision_basin(
67        &self,
68        basin: BasinName,
69        config: BasinConfig,
70        mode: ProvisionMode,
71    ) -> Result<ProvisionResult<BasinInfo>, ProvisionBasinError> {
72        let meta_key = kv::basin_meta::ser_key(&basin);
73
74        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
75
76        let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
77        if let Some(existing_meta) = &existing_meta
78            && existing_meta.deleted_at.is_some()
79        {
80            return Err(BasinDeletionPendingError { basin }.into());
81        }
82
83        let outcome = match (existing_meta, mode) {
84            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
85                let new_creation_idempotency_key = request_token
86                    .as_ref()
87                    .map(|req_token| creation_idempotency_key(req_token, &config));
88                return if new_creation_idempotency_key.is_some()
89                    && existing.creation_idempotency_key == new_creation_idempotency_key
90                {
91                    Ok(ProvisionResult::Noop(BasinInfo {
92                        name: basin,
93                        location: None,
94                        created_at: existing.created_at,
95                        deleted_at: None,
96                    }))
97                } else {
98                    Err(BasinAlreadyExistsError { basin }.into())
99                };
100            }
101            (Some(existing), ProvisionMode::Ensure) => {
102                let meta = kv::basin_meta::BasinMeta {
103                    config,
104                    created_at: existing.created_at,
105                    deleted_at: None,
106                    creation_idempotency_key: existing.creation_idempotency_key,
107                };
108                if existing.config == meta.config {
109                    ProvisionResult::Noop(meta)
110                } else {
111                    ProvisionResult::Updated(meta)
112                }
113            }
114            (None, ProvisionMode::CreateOnly { request_token }) => {
115                let new_creation_idempotency_key = request_token
116                    .as_ref()
117                    .map(|req_token| creation_idempotency_key(req_token, &config));
118                ProvisionResult::Created(kv::basin_meta::BasinMeta {
119                    config,
120                    created_at: OffsetDateTime::now_utc(),
121                    deleted_at: None,
122                    creation_idempotency_key: new_creation_idempotency_key,
123                })
124            }
125            (None, ProvisionMode::Ensure) => ProvisionResult::Created(kv::basin_meta::BasinMeta {
126                config,
127                created_at: OffsetDateTime::now_utc(),
128                deleted_at: None,
129                creation_idempotency_key: None,
130            }),
131        };
132
133        if !matches!(&outcome, ProvisionResult::Noop(_)) {
134            let meta = outcome.inner();
135            txn.put(&meta_key, kv::basin_meta::ser_value(meta))?;
136
137            txn.commit().await?;
138        }
139
140        Ok(outcome.map(|meta| BasinInfo {
141            name: basin,
142            location: None,
143            created_at: meta.created_at,
144            deleted_at: None,
145        }))
146    }
147
148    pub async fn get_basin_config(
149        &self,
150        basin: BasinName,
151    ) -> Result<BasinConfig, GetBasinConfigError> {
152        let Some(meta) = self
153            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
154            .await?
155        else {
156            return Err(BasinNotFoundError { basin }.into());
157        };
158        Ok(meta.config)
159    }
160
161    pub async fn reconfigure_basin(
162        &self,
163        basin: BasinName,
164        reconfig: BasinReconfiguration,
165    ) -> Result<BasinConfig, ReconfigureBasinError> {
166        let meta_key = kv::basin_meta::ser_key(&basin);
167
168        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
169
170        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
171            return Err(BasinNotFoundError { basin }.into());
172        };
173
174        if meta.deleted_at.is_some() {
175            return Err(BasinDeletionPendingError { basin }.into());
176        }
177
178        meta.config = meta.config.reconfigure(reconfig);
179
180        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
181
182        txn.commit().await?;
183
184        Ok(meta.config)
185    }
186
187    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
188        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
189        let meta_key = kv::basin_meta::ser_key(&basin);
190        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
191            return Err(BasinNotFoundError { basin }.into());
192        };
193        if meta.deleted_at.is_none() {
194            meta.deleted_at = Some(OffsetDateTime::now_utc());
195            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
196            txn.put(
197                kv::basin_deletion_pending::ser_key(&basin),
198                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
199            )?;
200            txn.commit().await?;
201            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
202        }
203        Ok(())
204    }
205}
206
207fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
208    Bash::length_prefixed(&[
209        req_token.as_bytes(),
210        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
211            .expect("serializable"),
212    ])
213}