Skip to main content

s2_lite/backend/
basins.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::{BasinInfo, BasinName, ListBasinsRequest},
5        config::{BasinConfig, BasinReconfiguration},
6        resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
7        stream::StreamNameStartAfter,
8    },
9};
10use slatedb::{
11    IsolationLevel,
12    config::{DurabilityLevel, ScanOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18    error::{
19        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, DeleteBasinError,
20        GetBasinConfigError, ListBasinsError, ProvisionBasinError, ReconfigureBasinError,
21    },
22    kv,
23};
24
25impl Backend {
26    pub async fn list_basins(
27        &self,
28        request: ListBasinsRequest,
29    ) -> Result<Page<BasinInfo>, ListBasinsError> {
30        let ListItemsRequestParts {
31            prefix,
32            start_after,
33            limit,
34        } = request.into();
35
36        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37        if key_range.is_empty() {
38            return Ok(Page::new_empty());
39        }
40
41        let scan_opts = ScanOptions {
42            durability_filter: DurabilityLevel::Remote,
43            ..Default::default()
44        };
45        let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;
46
47        let mut basins = Vec::with_capacity(limit.as_usize());
48        let mut has_more = false;
49        while let Some(kv) = it.next().await? {
50            let basin = kv::basin_meta::deser_key(kv.key)?;
51            assert!(basin.as_ref() > start_after.as_ref());
52            assert!(basin.as_ref() >= prefix.as_ref());
53            if basins.len() == limit.as_usize() {
54                has_more = true;
55                break;
56            }
57            let meta = kv::basin_meta::deser_value(kv.value)?;
58            basins.push(BasinInfo {
59                name: basin,
60                location: None,
61                created_at: meta.created_at,
62                deleted_at: meta.deleted_at,
63            });
64        }
65        Ok(Page::new(basins, has_more))
66    }
67
68    pub async fn provision_basin(
69        &self,
70        basin: BasinName,
71        config: BasinConfig,
72        mode: ProvisionMode,
73    ) -> Result<ProvisionResult<BasinInfo>, ProvisionBasinError> {
74        let meta_key = kv::basin_meta::ser_key(&basin);
75
76        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
77
78        let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
79        if let Some(existing_meta) = &existing_meta
80            && existing_meta.deleted_at.is_some()
81        {
82            return Err(BasinDeletionPendingError { basin }.into());
83        }
84
85        let outcome = match (existing_meta, mode) {
86            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
87                let new_creation_idempotency_key = request_token
88                    .as_ref()
89                    .map(|req_token| creation_idempotency_key(req_token, &config));
90                return if new_creation_idempotency_key.is_some()
91                    && existing.creation_idempotency_key == new_creation_idempotency_key
92                {
93                    Ok(ProvisionResult::Noop(BasinInfo {
94                        name: basin,
95                        location: None,
96                        created_at: existing.created_at,
97                        deleted_at: None,
98                    }))
99                } else {
100                    Err(BasinAlreadyExistsError { basin }.into())
101                };
102            }
103            (Some(existing), ProvisionMode::Ensure) => {
104                let meta = kv::basin_meta::BasinMeta {
105                    config,
106                    created_at: existing.created_at,
107                    deleted_at: None,
108                    creation_idempotency_key: existing.creation_idempotency_key,
109                };
110                if existing.config == meta.config {
111                    ProvisionResult::Noop(meta)
112                } else {
113                    ProvisionResult::Updated(meta)
114                }
115            }
116            (None, ProvisionMode::CreateOnly { request_token }) => {
117                let new_creation_idempotency_key = request_token
118                    .as_ref()
119                    .map(|req_token| creation_idempotency_key(req_token, &config));
120                ProvisionResult::Created(kv::basin_meta::BasinMeta {
121                    config,
122                    created_at: OffsetDateTime::now_utc(),
123                    deleted_at: None,
124                    creation_idempotency_key: new_creation_idempotency_key,
125                })
126            }
127            (None, ProvisionMode::Ensure) => ProvisionResult::Created(kv::basin_meta::BasinMeta {
128                config,
129                created_at: OffsetDateTime::now_utc(),
130                deleted_at: None,
131                creation_idempotency_key: None,
132            }),
133        };
134
135        if !matches!(&outcome, ProvisionResult::Noop(_)) {
136            let meta = outcome.inner();
137            txn.put(&meta_key, kv::basin_meta::ser_value(meta))?;
138
139            txn.commit().await?;
140        }
141
142        Ok(outcome.map(|meta| BasinInfo {
143            name: basin,
144            location: None,
145            created_at: meta.created_at,
146            deleted_at: None,
147        }))
148    }
149
150    pub async fn get_basin_config(
151        &self,
152        basin: BasinName,
153    ) -> Result<BasinConfig, GetBasinConfigError> {
154        let Some(meta) = self
155            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
156            .await?
157        else {
158            return Err(BasinNotFoundError { basin }.into());
159        };
160        Ok(meta.config)
161    }
162
163    pub async fn reconfigure_basin(
164        &self,
165        basin: BasinName,
166        reconfig: BasinReconfiguration,
167    ) -> Result<BasinConfig, ReconfigureBasinError> {
168        let meta_key = kv::basin_meta::ser_key(&basin);
169
170        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
171
172        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
173            return Err(BasinNotFoundError { basin }.into());
174        };
175
176        if meta.deleted_at.is_some() {
177            return Err(BasinDeletionPendingError { basin }.into());
178        }
179
180        meta.config = meta.config.reconfigure(reconfig);
181
182        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
183
184        txn.commit().await?;
185
186        Ok(meta.config)
187    }
188
189    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
190        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
191        let meta_key = kv::basin_meta::ser_key(&basin);
192        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
193            return Err(BasinNotFoundError { basin }.into());
194        };
195        if meta.deleted_at.is_none() {
196            meta.deleted_at = Some(OffsetDateTime::now_utc());
197            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
198            txn.put(
199                kv::basin_deletion_pending::ser_key(&basin),
200                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
201            )?;
202            txn.commit().await?;
203            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
204        }
205        Ok(())
206    }
207}
208
209fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
210    Bash::length_prefixed(&[
211        req_token.as_bytes(),
212        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
213            .expect("serializable"),
214    ])
215}