Skip to main content

s2_lite/backend/
basins.rs

1use s2_common::{
2    bash::Bash,
3    types::{
4        basin::{BasinInfo, BasinName, CreateBasinIntent, ListBasinsRequest},
5        config::{BasinConfig, BasinReconfiguration},
6        resources::{ListItemsRequestParts, Page, RequestToken},
7        stream::StreamNameStartAfter,
8    },
9};
10use slatedb::{
11    IsolationLevel, IterationOrder,
12    config::{DurabilityLevel, ScanOptions, WriteOptions},
13};
14use time::OffsetDateTime;
15
16use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
17use crate::backend::{
18    error::{
19        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
20        DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
21    },
22    kv,
23};
24
25impl Backend {
26    pub async fn list_basins(
27        &self,
28        request: ListBasinsRequest,
29    ) -> Result<Page<BasinInfo>, ListBasinsError> {
30        let ListItemsRequestParts {
31            prefix,
32            start_after,
33            limit,
34        } = request.into();
35
36        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
37        if key_range.is_empty() {
38            return Ok(Page::new_empty());
39        }
40
41        static SCAN_OPTS: ScanOptions = ScanOptions {
42            durability_filter: DurabilityLevel::Remote,
43            dirty: false,
44            read_ahead_bytes: 1,
45            cache_blocks: false,
46            max_fetch_tasks: 1,
47            order: IterationOrder::Ascending,
48        };
49        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;
50
51        let mut basins = Vec::with_capacity(limit.as_usize());
52        let mut has_more = false;
53        while let Some(kv) = it.next().await? {
54            let basin = kv::basin_meta::deser_key(kv.key)?;
55            assert!(basin.as_ref() > start_after.as_ref());
56            assert!(basin.as_ref() >= prefix.as_ref());
57            if basins.len() == limit.as_usize() {
58                has_more = true;
59                break;
60            }
61            let meta = kv::basin_meta::deser_value(kv.value)?;
62            basins.push(BasinInfo {
63                name: basin,
64                scope: None,
65                created_at: meta.created_at,
66                deleted_at: meta.deleted_at,
67            });
68        }
69        Ok(Page::new(basins, has_more))
70    }
71
72    pub async fn create_basin(
73        &self,
74        basin: BasinName,
75        intent: CreateBasinIntent,
76    ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
77        let meta_key = kv::basin_meta::ser_key(&basin);
78
79        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
80
81        let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
82        if let Some(existing_meta) = &existing_meta
83            && existing_meta.deleted_at.is_some()
84        {
85            return Err(BasinDeletionPendingError { basin }.into());
86        }
87
88        let (meta, is_reconfigure) = match (existing_meta, intent) {
89            (
90                Some(existing),
91                CreateBasinIntent::CreateOnly {
92                    config,
93                    request_token,
94                },
95            ) => {
96                let new_creation_idempotency_key = request_token
97                    .as_ref()
98                    .map(|req_token| creation_idempotency_key(req_token, &config));
99                return if new_creation_idempotency_key.is_some()
100                    && existing.creation_idempotency_key == new_creation_idempotency_key
101                {
102                    Ok(CreatedOrReconfigured::Created(BasinInfo {
103                        name: basin,
104                        scope: None,
105                        created_at: existing.created_at,
106                        deleted_at: None,
107                    }))
108                } else {
109                    Err(BasinAlreadyExistsError { basin }.into())
110                };
111            }
112            (Some(existing), CreateBasinIntent::CreateOrReconfigure { reconfiguration }) => (
113                kv::basin_meta::BasinMeta {
114                    config: existing.config.reconfigure(reconfiguration),
115                    created_at: existing.created_at,
116                    deleted_at: None,
117                    creation_idempotency_key: existing.creation_idempotency_key,
118                },
119                true,
120            ),
121            (
122                None,
123                CreateBasinIntent::CreateOnly {
124                    config,
125                    request_token,
126                },
127            ) => {
128                let new_creation_idempotency_key = request_token
129                    .as_ref()
130                    .map(|req_token| creation_idempotency_key(req_token, &config));
131                (
132                    kv::basin_meta::BasinMeta {
133                        config,
134                        created_at: OffsetDateTime::now_utc(),
135                        deleted_at: None,
136                        creation_idempotency_key: new_creation_idempotency_key,
137                    },
138                    false,
139                )
140            }
141            (None, CreateBasinIntent::CreateOrReconfigure { reconfiguration }) => (
142                kv::basin_meta::BasinMeta {
143                    config: BasinConfig::default().reconfigure(reconfiguration),
144                    created_at: OffsetDateTime::now_utc(),
145                    deleted_at: None,
146                    creation_idempotency_key: None,
147                },
148                false,
149            ),
150        };
151
152        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
153
154        static WRITE_OPTS: WriteOptions = WriteOptions {
155            await_durable: true,
156        };
157        txn.commit_with_options(&WRITE_OPTS).await?;
158
159        let info = BasinInfo {
160            name: basin,
161            scope: None,
162            created_at: meta.created_at,
163            deleted_at: None,
164        };
165
166        Ok(if is_reconfigure {
167            CreatedOrReconfigured::Reconfigured(info)
168        } else {
169            CreatedOrReconfigured::Created(info)
170        })
171    }
172
173    pub async fn get_basin_config(
174        &self,
175        basin: BasinName,
176    ) -> Result<BasinConfig, GetBasinConfigError> {
177        let Some(meta) = self
178            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
179            .await?
180        else {
181            return Err(BasinNotFoundError { basin }.into());
182        };
183        Ok(meta.config)
184    }
185
186    pub async fn reconfigure_basin(
187        &self,
188        basin: BasinName,
189        reconfig: BasinReconfiguration,
190    ) -> Result<BasinConfig, ReconfigureBasinError> {
191        let meta_key = kv::basin_meta::ser_key(&basin);
192
193        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
194
195        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
196            return Err(BasinNotFoundError { basin }.into());
197        };
198
199        if meta.deleted_at.is_some() {
200            return Err(BasinDeletionPendingError { basin }.into());
201        }
202
203        meta.config = meta.config.reconfigure(reconfig);
204
205        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
206
207        static WRITE_OPTS: WriteOptions = WriteOptions {
208            await_durable: true,
209        };
210        txn.commit_with_options(&WRITE_OPTS).await?;
211
212        Ok(meta.config)
213    }
214
215    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
216        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
217        let meta_key = kv::basin_meta::ser_key(&basin);
218        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
219            return Err(BasinNotFoundError { basin }.into());
220        };
221        if meta.deleted_at.is_none() {
222            meta.deleted_at = Some(OffsetDateTime::now_utc());
223            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
224            txn.put(
225                kv::basin_deletion_pending::ser_key(&basin),
226                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
227            )?;
228            static WRITE_OPTS: WriteOptions = WriteOptions {
229                await_durable: true,
230            };
231            txn.commit_with_options(&WRITE_OPTS).await?;
232            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
233        }
234        Ok(())
235    }
236}
237
238fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
239    Bash::length_prefixed(&[
240        req_token.as_bytes(),
241        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
242            .expect("serializable"),
243    ])
244}