Skip to main content

s2_lite/backend/
basins.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::{BasinInfo, BasinName, ListBasinsRequest},
5        config::{BasinConfig, BasinReconfiguration},
6        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7        stream::StreamNameStartAfter,
8    },
9};
10use slatedb::{
11    IsolationLevel, IterationOrder,
12    config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18    error::{
19        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
20        DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
21    },
22    kv,
23};
24
25impl Backend {
26    pub async fn list_basins(
27        &self,
28        request: ListBasinsRequest,
29    ) -> Result<Page<BasinInfo>, ListBasinsError> {
30        let ListItemsRequestParts {
31            prefix,
32            start_after,
33            limit,
34        } = request.into();
35
36        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37        if key_range.is_empty() {
38            return Ok(Page::new_empty());
39        }
40
41        static SCAN_OPTS: ScanOptions = ScanOptions {
42            durability_filter: DurabilityLevel::Remote,
43            dirty: false,
44            read_ahead_bytes: 1,
45            cache_blocks: false,
46            max_fetch_tasks: 1,
47            order: IterationOrder::Ascending,
48        };
49        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
50
51        let mut basins = Vec::with_capacity(limit.as_usize());
52        let mut has_more = false;
53        while let Some(kv) = it.next().await? {
54            let basin = kv::basin_meta::deser_key(kv.key)?;
55            assert!(basin.as_ref() > start_after.as_ref());
56            assert!(basin.as_ref() >= prefix.as_ref());
57            if basins.len() == limit.as_usize() {
58                has_more = true;
59                break;
60            }
61            let meta = kv::basin_meta::deser_value(kv.value)?;
62            basins.push(BasinInfo {
63                name: basin,
64                scope: None,
65                created_at: meta.created_at,
66                deleted_at: meta.deleted_at,
67            });
68        }
69        Ok(Page::new(basins, has_more))
70    }
71
72    pub async fn create_basin(
73        &self,
74        basin: BasinName,
75        config: impl Into<BasinReconfiguration>,
76        mode: CreateMode,
77    ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
78        let config = config.into();
79        let meta_key = kv::basin_meta::ser_key(&basin);
80
81        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
82
83        let new_creation_idempotency_key = match &mode {
84            CreateMode::CreateOnly(Some(req_token)) => {
85                let resolved = BasinConfig::default().reconfigure(config.clone());
86                Some(creation_idempotency_key(req_token, &resolved))
87            }
88            _ => None,
89        };
90
91        let mut existing_meta_opt = None;
92        if let Some(existing_meta) =
93            db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?
94        {
95            if existing_meta.deleted_at.is_some() {
96                return Err(BasinDeletionPendingError { basin }.into());
97            }
98            match mode {
99                CreateMode::CreateOnly(_) => {
100                    return if new_creation_idempotency_key.is_some()
101                        && existing_meta.creation_idempotency_key == new_creation_idempotency_key
102                    {
103                        Ok(CreatedOrReconfigured::Created(BasinInfo {
104                            name: basin,
105                            scope: None,
106                            created_at: existing_meta.created_at,
107                            deleted_at: None,
108                        }))
109                    } else {
110                        Err(BasinAlreadyExistsError { basin }.into())
111                    };
112                }
113                CreateMode::CreateOrReconfigure => {
114                    existing_meta_opt = Some(existing_meta);
115                }
116            }
117        }
118
119        let is_reconfigure = existing_meta_opt.is_some();
120        let (resolved, created_at, creation_idempotency_key) = match existing_meta_opt {
121            Some(existing) => (
122                existing.config.reconfigure(config),
123                existing.created_at,
124                existing.creation_idempotency_key,
125            ),
126            None => (
127                BasinConfig::default().reconfigure(config),
128                OffsetDateTime::now_utc(),
129                new_creation_idempotency_key,
130            ),
131        };
132
133        let meta = kv::basin_meta::BasinMeta {
134            config: resolved,
135            created_at,
136            deleted_at: None,
137            creation_idempotency_key,
138        };
139
140        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
141
142        static WRITE_OPTS: WriteOptions = WriteOptions {
143            await_durable: true,
144        };
145        txn.commit_with_options(&WRITE_OPTS).await?;
146
147        let info = BasinInfo {
148            name: basin,
149            scope: None,
150            created_at,
151            deleted_at: None,
152        };
153
154        Ok(if is_reconfigure {
155            CreatedOrReconfigured::Reconfigured(info)
156        } else {
157            CreatedOrReconfigured::Created(info)
158        })
159    }
160
161    pub async fn get_basin_config(
162        &self,
163        basin: BasinName,
164    ) -> Result<BasinConfig, GetBasinConfigError> {
165        let Some(meta) = self
166            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
167            .await?
168        else {
169            return Err(BasinNotFoundError { basin }.into());
170        };
171        Ok(meta.config)
172    }
173
174    pub async fn reconfigure_basin(
175        &self,
176        basin: BasinName,
177        reconfig: BasinReconfiguration,
178    ) -> Result<BasinConfig, ReconfigureBasinError> {
179        let meta_key = kv::basin_meta::ser_key(&basin);
180
181        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
182
183        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
184            return Err(BasinNotFoundError { basin }.into());
185        };
186
187        if meta.deleted_at.is_some() {
188            return Err(BasinDeletionPendingError { basin }.into());
189        }
190
191        meta.config = meta.config.reconfigure(reconfig);
192
193        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
194
195        static WRITE_OPTS: WriteOptions = WriteOptions {
196            await_durable: true,
197        };
198        txn.commit_with_options(&WRITE_OPTS).await?;
199
200        Ok(meta.config)
201    }
202
203    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
204        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
205        let meta_key = kv::basin_meta::ser_key(&basin);
206        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
207            return Err(BasinNotFoundError { basin }.into());
208        };
209        if meta.deleted_at.is_none() {
210            meta.deleted_at = Some(OffsetDateTime::now_utc());
211            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
212            txn.put(
213                kv::basin_deletion_pending::ser_key(&basin),
214                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
215            )?;
216            static WRITE_OPTS: WriteOptions = WriteOptions {
217                await_durable: true,
218            };
219            txn.commit_with_options(&WRITE_OPTS).await?;
220            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
221        }
222        Ok(())
223    }
224}
225
226fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
227    Bash::length_prefixed(&[
228        req_token.as_bytes(),
229        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
230            .expect("serializable"),
231    ])
232}