Skip to main content

s2_lite/backend/
basins.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::{BasinInfo, BasinName, BasinState, ListBasinsRequest},
5        config::{BasinConfig, BasinReconfiguration},
6        resources::{CreateMode, ListItemsRequestParts, Page, RequestToken},
7        stream::StreamNameStartAfter,
8    },
9};
10use slatedb::{
11    IsolationLevel,
12    config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18    error::{
19        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
20        DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
21    },
22    kv,
23};
24
25impl Backend {
26    pub async fn list_basins(
27        &self,
28        request: ListBasinsRequest,
29    ) -> Result<Page<BasinInfo>, ListBasinsError> {
30        let ListItemsRequestParts {
31            prefix,
32            start_after,
33            limit,
34        } = request.into();
35
36        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37        if key_range.is_empty() {
38            return Ok(Page::new_empty());
39        }
40
41        static SCAN_OPTS: ScanOptions = ScanOptions {
42            durability_filter: DurabilityLevel::Remote,
43            dirty: false,
44            read_ahead_bytes: 1,
45            cache_blocks: false,
46            max_fetch_tasks: 1,
47        };
48        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
49
50        let mut basins = Vec::with_capacity(limit.as_usize());
51        let mut has_more = false;
52        while let Some(kv) = it.next().await? {
53            let basin = kv::basin_meta::deser_key(kv.key)?;
54            assert!(basin.as_ref() > start_after.as_ref());
55            assert!(basin.as_ref() >= prefix.as_ref());
56            if basins.len() == limit.as_usize() {
57                has_more = true;
58                break;
59            }
60            let meta = kv::basin_meta::deser_value(kv.value)?;
61            let state = if meta.deleted_at.is_some() {
62                BasinState::Deleting
63            } else {
64                BasinState::Active
65            };
66            basins.push(BasinInfo {
67                name: basin,
68                scope: None,
69                state,
70            });
71        }
72        Ok(Page::new(basins, has_more))
73    }
74
75    pub async fn create_basin(
76        &self,
77        basin: BasinName,
78        config: impl Into<BasinReconfiguration>,
79        mode: CreateMode,
80    ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
81        let config = config.into();
82        let meta_key = kv::basin_meta::ser_key(&basin);
83
84        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
85
86        let new_creation_idempotency_key = match &mode {
87            CreateMode::CreateOnly(Some(req_token)) => {
88                let resolved = BasinConfig::default().reconfigure(config.clone());
89                Some(creation_idempotency_key(req_token, &resolved))
90            }
91            _ => None,
92        };
93
94        let mut existing_meta_opt = None;
95        if let Some(existing_meta) =
96            db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?
97        {
98            if existing_meta.deleted_at.is_some() {
99                return Err(BasinDeletionPendingError { basin }.into());
100            }
101            match mode {
102                CreateMode::CreateOnly(_) => {
103                    return if new_creation_idempotency_key.is_some()
104                        && existing_meta.creation_idempotency_key == new_creation_idempotency_key
105                    {
106                        Ok(CreatedOrReconfigured::Created(BasinInfo {
107                            name: basin,
108                            scope: None,
109                            state: BasinState::Active,
110                        }))
111                    } else {
112                        Err(BasinAlreadyExistsError { basin }.into())
113                    };
114                }
115                CreateMode::CreateOrReconfigure => {
116                    existing_meta_opt = Some(existing_meta);
117                }
118            }
119        }
120
121        let is_reconfigure = existing_meta_opt.is_some();
122        let (resolved, created_at, creation_idempotency_key) = match existing_meta_opt {
123            Some(existing) => (
124                existing.config.reconfigure(config),
125                existing.created_at,
126                existing.creation_idempotency_key,
127            ),
128            None => (
129                BasinConfig::default().reconfigure(config),
130                OffsetDateTime::now_utc(),
131                new_creation_idempotency_key,
132            ),
133        };
134
135        let meta = kv::basin_meta::BasinMeta {
136            config: resolved,
137            created_at,
138            deleted_at: None,
139            creation_idempotency_key,
140        };
141
142        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
143
144        static WRITE_OPTS: WriteOptions = WriteOptions {
145            await_durable: true,
146        };
147        txn.commit_with_options(&WRITE_OPTS).await?;
148
149        let info = BasinInfo {
150            name: basin,
151            scope: None,
152            state: BasinState::Active,
153        };
154
155        Ok(if is_reconfigure {
156            CreatedOrReconfigured::Reconfigured(info)
157        } else {
158            CreatedOrReconfigured::Created(info)
159        })
160    }
161
162    pub async fn get_basin_config(
163        &self,
164        basin: BasinName,
165    ) -> Result<BasinConfig, GetBasinConfigError> {
166        let Some(meta) = self
167            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
168            .await?
169        else {
170            return Err(BasinNotFoundError { basin }.into());
171        };
172        Ok(meta.config)
173    }
174
175    pub async fn reconfigure_basin(
176        &self,
177        basin: BasinName,
178        reconfig: BasinReconfiguration,
179    ) -> Result<BasinConfig, ReconfigureBasinError> {
180        let meta_key = kv::basin_meta::ser_key(&basin);
181
182        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
183
184        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
185            return Err(BasinNotFoundError { basin }.into());
186        };
187
188        if meta.deleted_at.is_some() {
189            return Err(BasinDeletionPendingError { basin }.into());
190        }
191
192        meta.config = meta.config.reconfigure(reconfig);
193
194        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
195
196        static WRITE_OPTS: WriteOptions = WriteOptions {
197            await_durable: true,
198        };
199        txn.commit_with_options(&WRITE_OPTS).await?;
200
201        Ok(meta.config)
202    }
203
204    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
205        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
206        let meta_key = kv::basin_meta::ser_key(&basin);
207        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
208            return Err(BasinNotFoundError { basin }.into());
209        };
210        if meta.deleted_at.is_none() {
211            meta.deleted_at = Some(OffsetDateTime::now_utc());
212            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
213            txn.put(
214                kv::basin_deletion_pending::ser_key(&basin),
215                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
216            )?;
217            static WRITE_OPTS: WriteOptions = WriteOptions {
218                await_durable: true,
219            };
220            txn.commit_with_options(&WRITE_OPTS).await?;
221            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
222        }
223        Ok(())
224    }
225}
226
227fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
228    Bash::length_prefixed(&[
229        req_token.as_bytes(),
230        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
231            .expect("serializable"),
232    ])
233}