s2-lite 0.31.0

Lightweight server implementation of S2, the durable streams API, backed by object storage
Documentation
use s2_common::{
    bash::Bash,
    types::{
        basin::{BasinInfo, BasinName, CreateBasinIntent, ListBasinsRequest},
        config::{BasinConfig, BasinReconfiguration},
        resources::{ListItemsRequestParts, Page, RequestToken},
        stream::StreamNameStartAfter,
    },
};
use slatedb::{
    IsolationLevel, IterationOrder,
    config::{DurabilityLevel, ScanOptions, WriteOptions},
};
use time::OffsetDateTime;

use super::{Backend, CreatedOrReconfigured, bgtasks::BgtaskTrigger, store::db_txn_get};
use crate::backend::{
    error::{
        BasinAlreadyExistsError, BasinDeletionPendingError, BasinNotFoundError, CreateBasinError,
        DeleteBasinError, GetBasinConfigError, ListBasinsError, ReconfigureBasinError,
    },
    kv,
};

impl Backend {
    pub async fn list_basins(
        &self,
        request: ListBasinsRequest,
    ) -> Result<Page<BasinInfo>, ListBasinsError> {
        let ListItemsRequestParts {
            prefix,
            start_after,
            limit,
        } = request.into();

        let key_range = kv::basin_meta::ser_key_range(&prefix, &start_after);
        if key_range.is_empty() {
            return Ok(Page::new_empty());
        }

        static SCAN_OPTS: ScanOptions = ScanOptions {
            durability_filter: DurabilityLevel::Remote,
            dirty: false,
            read_ahead_bytes: 1,
            cache_blocks: false,
            max_fetch_tasks: 1,
            order: IterationOrder::Ascending,
        };
        let mut it = self.db.scan_with_options(key_range, &SCAN_OPTS).await?;

        let mut basins = Vec::with_capacity(limit.as_usize());
        let mut has_more = false;
        while let Some(kv) = it.next().await? {
            let basin = kv::basin_meta::deser_key(kv.key)?;
            assert!(basin.as_ref() > start_after.as_ref());
            assert!(basin.as_ref() >= prefix.as_ref());
            if basins.len() == limit.as_usize() {
                has_more = true;
                break;
            }
            let meta = kv::basin_meta::deser_value(kv.value)?;
            basins.push(BasinInfo {
                name: basin,
                scope: None,
                created_at: meta.created_at,
                deleted_at: meta.deleted_at,
            });
        }
        Ok(Page::new(basins, has_more))
    }

    pub async fn create_basin(
        &self,
        basin: BasinName,
        intent: CreateBasinIntent,
    ) -> Result<CreatedOrReconfigured<BasinInfo>, CreateBasinError> {
        let meta_key = kv::basin_meta::ser_key(&basin);

        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;

        let existing_meta = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await?;
        if let Some(existing_meta) = &existing_meta
            && existing_meta.deleted_at.is_some()
        {
            return Err(BasinDeletionPendingError { basin }.into());
        }

        let (meta, is_reconfigure) = match (existing_meta, intent) {
            (
                Some(existing),
                CreateBasinIntent::CreateOnly {
                    config,
                    request_token,
                },
            ) => {
                let new_creation_idempotency_key = request_token
                    .as_ref()
                    .map(|req_token| creation_idempotency_key(req_token, &config));
                return if new_creation_idempotency_key.is_some()
                    && existing.creation_idempotency_key == new_creation_idempotency_key
                {
                    Ok(CreatedOrReconfigured::Created(BasinInfo {
                        name: basin,
                        scope: None,
                        created_at: existing.created_at,
                        deleted_at: None,
                    }))
                } else {
                    Err(BasinAlreadyExistsError { basin }.into())
                };
            }
            (Some(existing), CreateBasinIntent::CreateOrReconfigure { reconfiguration }) => (
                kv::basin_meta::BasinMeta {
                    config: existing.config.reconfigure(reconfiguration),
                    created_at: existing.created_at,
                    deleted_at: None,
                    creation_idempotency_key: existing.creation_idempotency_key,
                },
                true,
            ),
            (
                None,
                CreateBasinIntent::CreateOnly {
                    config,
                    request_token,
                },
            ) => {
                let new_creation_idempotency_key = request_token
                    .as_ref()
                    .map(|req_token| creation_idempotency_key(req_token, &config));
                (
                    kv::basin_meta::BasinMeta {
                        config,
                        created_at: OffsetDateTime::now_utc(),
                        deleted_at: None,
                        creation_idempotency_key: new_creation_idempotency_key,
                    },
                    false,
                )
            }
            (None, CreateBasinIntent::CreateOrReconfigure { reconfiguration }) => (
                kv::basin_meta::BasinMeta {
                    config: BasinConfig::default().reconfigure(reconfiguration),
                    created_at: OffsetDateTime::now_utc(),
                    deleted_at: None,
                    creation_idempotency_key: None,
                },
                false,
            ),
        };

        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;

        static WRITE_OPTS: WriteOptions = WriteOptions {
            await_durable: true,
        };
        txn.commit_with_options(&WRITE_OPTS).await?;

        let info = BasinInfo {
            name: basin,
            scope: None,
            created_at: meta.created_at,
            deleted_at: None,
        };

        Ok(if is_reconfigure {
            CreatedOrReconfigured::Reconfigured(info)
        } else {
            CreatedOrReconfigured::Created(info)
        })
    }

    pub async fn get_basin_config(
        &self,
        basin: BasinName,
    ) -> Result<BasinConfig, GetBasinConfigError> {
        let Some(meta) = self
            .db_get(kv::basin_meta::ser_key(&basin), kv::basin_meta::deser_value)
            .await?
        else {
            return Err(BasinNotFoundError { basin }.into());
        };
        Ok(meta.config)
    }

    pub async fn reconfigure_basin(
        &self,
        basin: BasinName,
        reconfig: BasinReconfiguration,
    ) -> Result<BasinConfig, ReconfigureBasinError> {
        let meta_key = kv::basin_meta::ser_key(&basin);

        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;

        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
            return Err(BasinNotFoundError { basin }.into());
        };

        if meta.deleted_at.is_some() {
            return Err(BasinDeletionPendingError { basin }.into());
        }

        meta.config = meta.config.reconfigure(reconfig);

        txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;

        static WRITE_OPTS: WriteOptions = WriteOptions {
            await_durable: true,
        };
        txn.commit_with_options(&WRITE_OPTS).await?;

        Ok(meta.config)
    }

    pub async fn delete_basin(&self, basin: BasinName) -> Result<(), DeleteBasinError> {
        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
        let meta_key = kv::basin_meta::ser_key(&basin);
        let Some(mut meta) = db_txn_get(&txn, &meta_key, kv::basin_meta::deser_value).await? else {
            return Err(BasinNotFoundError { basin }.into());
        };
        if meta.deleted_at.is_none() {
            meta.deleted_at = Some(OffsetDateTime::now_utc());
            txn.put(&meta_key, kv::basin_meta::ser_value(&meta))?;
            txn.put(
                kv::basin_deletion_pending::ser_key(&basin),
                kv::basin_deletion_pending::ser_value(&StreamNameStartAfter::default()),
            )?;
            static WRITE_OPTS: WriteOptions = WriteOptions {
                await_durable: true,
            };
            txn.commit_with_options(&WRITE_OPTS).await?;
            self.bgtask_trigger(BgtaskTrigger::BasinDeletion);
        }
        Ok(())
    }
}

fn creation_idempotency_key(req_token: &RequestToken, config: &BasinConfig) -> Bash {
    Bash::length_prefixed(&[
        req_token.as_bytes(),
        &serde_json::to_vec(&s2_api::v1::config::BasinConfig::from(config.clone()))
            .expect("serializable"),
    ])
}