Skip to main content

s2_lite/backend/
basins.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::{BasinInfo, BasinName, ListBasinsRequest},
5        config::{BasinConfig, BasinReconfiguration},
6        resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
7        stream::StreamNameStartAfter,
8    },
9};
10use slatedb::{
11    IsolationLevel, IterationOrder,
12    config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18    error::{
19        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, DeleteBasinError,
20        GetBasinConfigError, ListBasinsError, ProvisionBasinError, ReconfigureBasinError,
21    },
22    kv,
23};
24
25impl Backend {
26    pub async fn list_basins(
27        &self,
28        request: ListBasinsRequest,
29    ) -> Result<Page<BasinInfo>, ListBasinsError> {
30        let ListItemsRequestParts {
31            prefix,
32            start_after,
33            limit,
34        } = request.into();
35
36        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37        if key_range.is_empty() {
38            return Ok(Page::new_empty());
39        }
40
41        static SCAN_OPTS: ScanOptions = ScanOptions {
42            durability_filter: DurabilityLevel::Remote,
43            dirty: false,
44            read_ahead_bytes: 1,
45            cache_blocks: false,
46            max_fetch_tasks: 1,
47            order: IterationOrder::Ascending,
48        };
49        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
50
51        let mut basins = Vec::with_capacity(limit.as_usize());
52        let mut has_more = false;
53        while let Some(kv) = it.next().await? {
54            let basin = kv::basin_meta::deser_key(kv.key)?;
55            assert!(basin.as_ref() > start_after.as_ref());
56            assert!(basin.as_ref() >= prefix.as_ref());
57            if basins.len() == limit.as_usize() {
58                has_more = true;
59                break;
60            }
61            let meta = kv::basin_meta::deser_value(kv.value)?;
62            basins.push(BasinInfo {
63                name: basin,
64                scope: None,
65                created_at: meta.created_at,
66                deleted_at: meta.deleted_at,
67            });
68        }
69        Ok(Page::new(basins, has_more))
70    }
71
72    pub async fn provision_basin(
73        &self,
74        basin: BasinName,
75        config: BasinConfig,
76        mode: ProvisionMode,
77    ) -> Result<ProvisionResult<BasinInfo>, ProvisionBasinError> {
78        let meta_key = kv::basin_meta::ser_key(&basin);
79
80        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
81
82        let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
83        if let Some(existing_meta) = &existing_meta
84            && existing_meta.deleted_at.is_some()
85        {
86            return Err(BasinDeletionPendingError { basin }.into());
87        }
88
89        let outcome = match (existing_meta, mode) {
90            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
91                let new_creation_idempotency_key = request_token
92                    .as_ref()
93                    .map(|req_token| creation_idempotency_key(req_token, &config));
94                return if new_creation_idempotency_key.is_some()
95                    && existing.creation_idempotency_key == new_creation_idempotency_key
96                {
97                    Ok(ProvisionResult::Noop(BasinInfo {
98                        name: basin,
99                        scope: None,
100                        created_at: existing.created_at,
101                        deleted_at: None,
102                    }))
103                } else {
104                    Err(BasinAlreadyExistsError { basin }.into())
105                };
106            }
107            (Some(existing), ProvisionMode::Ensure) => {
108                let meta = kv::basin_meta::BasinMeta {
109                    config,
110                    created_at: existing.created_at,
111                    deleted_at: None,
112                    creation_idempotency_key: existing.creation_idempotency_key,
113                };
114                if existing.config == meta.config {
115                    ProvisionResult::Noop(meta)
116                } else {
117                    ProvisionResult::Updated(meta)
118                }
119            }
120            (None, ProvisionMode::CreateOnly { request_token }) => {
121                let new_creation_idempotency_key = request_token
122                    .as_ref()
123                    .map(|req_token| creation_idempotency_key(req_token, &config));
124                ProvisionResult::Created(kv::basin_meta::BasinMeta {
125                    config,
126                    created_at: OffsetDateTime::now_utc(),
127                    deleted_at: None,
128                    creation_idempotency_key: new_creation_idempotency_key,
129                })
130            }
131            (None, ProvisionMode::Ensure) => ProvisionResult::Created(kv::basin_meta::BasinMeta {
132                config,
133                created_at: OffsetDateTime::now_utc(),
134                deleted_at: None,
135                creation_idempotency_key: None,
136            }),
137        };
138
139        if !matches!(&outcome, ProvisionResult::Noop(_)) {
140            let meta = outcome.inner();
141            txn.put(&meta_key, kv::basin_meta::ser_value(meta))?;
142
143            static WRITE_OPTS: WriteOptions = WriteOptions {
144                await_durable: true,
145            };
146            txn.commit_with_options(&WRITE_OPTS).await?;
147        }
148
149        Ok(outcome.map(|meta| BasinInfo {
150            name: basin,
151            scope: None,
152            created_at: meta.created_at,
153            deleted_at: None,
154        }))
155    }
156
157    pub async fn get_basin_config(
158        &self,
159        basin: BasinName,
160    ) -> Result<BasinConfig, GetBasinConfigError> {
161        let Some(meta) = self
162            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
163            .await?
164        else {
165            return Err(BasinNotFoundError { basin }.into());
166        };
167        Ok(meta.config)
168    }
169
170    pub async fn reconfigure_basin(
171        &self,
172        basin: BasinName,
173        reconfig: BasinReconfiguration,
174    ) -> Result<BasinConfig, ReconfigureBasinError> {
175        let meta_key = kv::basin_meta::ser_key(&basin);
176
177        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
178
179        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
180            return Err(BasinNotFoundError { basin }.into());
181        };
182
183        if meta.deleted_at.is_some() {
184            return Err(BasinDeletionPendingError { basin }.into());
185        }
186
187        meta.config = meta.config.reconfigure(reconfig);
188
189        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
190
191        static WRITE_OPTS: WriteOptions = WriteOptions {
192            await_durable: true,
193        };
194        txn.commit_with_options(&WRITE_OPTS).await?;
195
196        Ok(meta.config)
197    }
198
199    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
200        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
201        let meta_key = kv::basin_meta::ser_key(&basin);
202        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
203            return Err(BasinNotFoundError { basin }.into());
204        };
205        if meta.deleted_at.is_none() {
206            meta.deleted_at = Some(OffsetDateTime::now_utc());
207            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
208            txn.put(
209                kv::basin_deletion_pending::ser_key(&basin),
210                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
211            )?;
212            static WRITE_OPTS: WriteOptions = WriteOptions {
213                await_durable: true,
214            };
215            txn.commit_with_options(&WRITE_OPTS).await?;
216            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
217        }
218        Ok(())
219    }
220}
221
222fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
223    Bash::length_prefixed(&[
224        req_token.as_bytes(),
225        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
226            .expect("serializable"),
227    ])
228}