s2-lite 0.34.0

Lightweight server implementation of S2, the durable streams API, backed by object storage
Documentation
use s2_common::{
    bash::Bash,
    record::StreamPosition,
    types::{
        basin::BasinName,
        config::{OptionalStreamConfig, StreamConfig, StreamReconfiguration},
        resources::{ListItemsRequestParts, Page, ProvisionMode, ProvisionResult, RequestToken},
        stream::{ListStreamsRequest, StreamInfo, StreamName},
    },
};
use slatedb::{
    IsolationLevel,
    config::{DurabilityLevel, ScanOptions},
};
use time::OffsetDateTime;
use tracing::instrument;

use super::{
    Backend,
    store::db_txn_get,
    streamer::{TerminalTrimCondition, TerminalTrimOutcome, doe_arm_delay},
};
use crate::{
    backend::{
        error::{
            BasinDeletionPendingError, BasinNotFoundError, DeleteStreamError, GetStreamConfigError,
            ListStreamsError, ProvisionStreamError, ReconfigureStreamError, StorageError,
            StreamAlreadyExistsError, StreamDeletionPendingError, StreamNotFoundError,
            StreamerError,
        },
        kv,
    },
    stream_id::StreamId,
};

impl Backend {
    pub async fn list_streams(
        &self,
        basin: BasinName,
        request: ListStreamsRequest,
    ) -> Result<Page<StreamInfo>, ListStreamsError> {
        let ListItemsRequestParts {
            prefix,
            start_after,
            limit,
        } = request.into();

        let key_range = kv::stream_meta::ser_key_range(&basin, &prefix, &start_after);
        if key_range.is_empty() {
            return Ok(Page::new_empty());
        }

        let scan_opts = ScanOptions {
            durability_filter: DurabilityLevel::Remote,
            ..Default::default()
        };
        let mut it = self.db.scan_with_options(key_range, &scan_opts).await?;

        let mut streams = Vec::with_capacity(limit.as_usize());
        let mut has_more = false;
        while let Some(kv) = it.next().await? {
            let (deser_basin, stream) = kv::stream_meta::deser_key(kv.key)?;
            assert_eq!(deser_basin.as_ref(), basin.as_ref());
            assert!(stream.as_ref() > start_after.as_ref());
            assert!(stream.as_ref() >= prefix.as_ref());
            if streams.len() == limit.as_usize() {
                has_more = true;
                break;
            }
            let meta = kv::stream_meta::deser_value(kv.value)?;
            streams.push(StreamInfo {
                name: stream,
                created_at: meta.created_at,
                deleted_at: meta.deleted_at,
                cipher: meta.cipher,
            });
        }
        Ok(Page::new(streams, has_more))
    }

    pub async fn provision_stream(
        &self,
        basin: BasinName,
        stream: StreamName,
        config: OptionalStreamConfig,
        mode: ProvisionMode,
    ) -> Result<ProvisionResult<StreamInfo>, ProvisionStreamError> {
        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;

        let Some(basin_meta) = db_txn_get(
            &txn,
            kv::basin_meta::ser_key(&basin),
            kv::basin_meta::deser_value,
        )
        .await?
        else {
            return Err(BasinNotFoundError { basin }.into());
        };

        if basin_meta.deleted_at.is_some() {
            return Err(BasinDeletionPendingError { basin }.into());
        }

        let stream_meta_key = kv::stream_meta::ser_key(&basin, &stream);

        let existing_meta =
            db_txn_get(&txn, &stream_meta_key, kv::stream_meta::deser_value).await?;
        if let Some(existing_meta) = &existing_meta
            && existing_meta.deleted_at.is_some()
        {
            return Err(ProvisionStreamError::StreamDeletionPending(
                StreamDeletionPendingError,
            ));
        }

        let basin_defaults = basin_meta.config.default_stream_config;
        let (outcome, prior_doe_min_age) = match (existing_meta, mode) {
            (Some(existing), ProvisionMode::CreateOnly { request_token }) => {
                let new_creation_idempotency_key = request_token
                    .as_ref()
                    .map(|req_token| creation_idempotency_key(req_token, &config));
                return if new_creation_idempotency_key.is_some()
                    && existing.creation_idempotency_key == new_creation_idempotency_key
                {
                    Ok(ProvisionResult::Noop(StreamInfo {
                        name: stream,
                        created_at: existing.created_at,
                        deleted_at: None,
                        cipher: existing.cipher,
                    }))
                } else {
                    Err(StreamAlreadyExistsError { basin, stream }.into())
                };
            }
            (Some(existing), ProvisionMode::Ensure) => {
                let desired_config = config.merge(basin_defaults);
                let config_unchanged = existing.config == desired_config;
                let meta = kv::stream_meta::StreamMeta {
                    config: desired_config,
                    cipher: existing.cipher,
                    created_at: existing.created_at,
                    deleted_at: None,
                    creation_idempotency_key: existing.creation_idempotency_key,
                };
                (
                    if config_unchanged {
                        ProvisionResult::Noop(meta)
                    } else {
                        ProvisionResult::Updated(meta)
                    },
                    existing.config.delete_on_empty.min_age(),
                )
            }
            (None, ProvisionMode::CreateOnly { request_token }) => {
                let new_creation_idempotency_key = request_token
                    .as_ref()
                    .map(|req_token| creation_idempotency_key(req_token, &config));
                (
                    ProvisionResult::Created(kv::stream_meta::StreamMeta {
                        config: config.merge(basin_defaults),
                        cipher: basin_meta.config.stream_cipher,
                        created_at: OffsetDateTime::now_utc(),
                        deleted_at: None,
                        creation_idempotency_key: new_creation_idempotency_key,
                    }),
                    None,
                )
            }
            (None, ProvisionMode::Ensure) => (
                ProvisionResult::Created(kv::stream_meta::StreamMeta {
                    config: config.merge(basin_defaults),
                    cipher: basin_meta.config.stream_cipher,
                    created_at: OffsetDateTime::now_utc(),
                    deleted_at: None,
                    creation_idempotency_key: None,
                }),
                None,
            ),
        };

        if !matches!(&outcome, ProvisionResult::Noop(_)) {
            let meta = outcome.inner();

            txn.put(&stream_meta_key, kv::stream_meta::ser_value(meta))?;

            let stream_id = StreamId::new(&basin, &stream);

            if matches!(&outcome, ProvisionResult::Created(_)) {
                txn.put(
                    kv::stream_id_mapping::ser_key(stream_id),
                    kv::stream_id_mapping::ser_value(&basin, &stream),
                )?;
                txn.put(
                    kv::stream_tail_position::ser_key(stream_id),
                    kv::stream_tail_position::ser_value(StreamPosition::MIN),
                )?;
            }

            if let Some(min_age) = meta.config.delete_on_empty.min_age()
                && (matches!(&outcome, ProvisionResult::Created(_)) || prior_doe_min_age.is_none())
            {
                txn.put(
                    kv::stream_doe_deadline::ser_key(
                        kv::timestamp::TimestampSecs::after(doe_arm_delay(
                            meta.config.retention_policy.age().unwrap_or_default(),
                            min_age,
                        )),
                        stream_id,
                    ),
                    kv::stream_doe_deadline::ser_value(min_age),
                )?;
            }

            txn.commit().await?;
        }

        if let ProvisionResult::Updated(meta) = &outcome
            && let Some(client) = self.streamer_client_if_active(&basin, &stream)
        {
            client.advise_reconfig(meta.config.clone());
        }

        Ok(outcome.map(|meta| StreamInfo {
            name: stream,
            created_at: meta.created_at,
            deleted_at: None,
            cipher: meta.cipher,
        }))
    }

    pub(super) async fn stream_id_mapping(
        &self,
        stream_id: StreamId,
    ) -> Result<Option<(BasinName, StreamName)>, StorageError> {
        self.db_get(
            kv::stream_id_mapping::ser_key(stream_id),
            kv::stream_id_mapping::deser_value,
        )
        .await
    }

    pub async fn get_stream_config(
        &self,
        basin: BasinName,
        stream: StreamName,
    ) -> Result<StreamConfig, GetStreamConfigError> {
        let meta = self
            .db_get(
                kv::stream_meta::ser_key(&basin, &stream),
                kv::stream_meta::deser_value,
            )
            .await?
            .ok_or_else(|| StreamNotFoundError {
                basin: basin.clone(),
                stream: stream.clone(),
            })?;
        if meta.deleted_at.is_some() {
            return Err(StreamDeletionPendingError.into());
        }
        Ok(meta.config)
    }

    pub async fn reconfigure_stream(
        &self,
        basin: BasinName,
        stream: StreamName,
        reconfig: StreamReconfiguration,
    ) -> Result<StreamConfig, ReconfigureStreamError> {
        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;

        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
        let (basin_meta, meta) = tokio::try_join!(
            db_txn_get(
                &txn,
                kv::basin_meta::ser_key(&basin),
                kv::basin_meta::deser_value,
            ),
            db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value),
        )?;

        let basin_meta = basin_meta.ok_or_else(|| BasinNotFoundError {
            basin: basin.clone(),
        })?;
        if basin_meta.deleted_at.is_some() {
            return Err(BasinDeletionPendingError { basin }.into());
        }

        let mut meta = meta.ok_or_else(|| StreamNotFoundError {
            basin: basin.clone(),
            stream: stream.clone(),
        })?;

        if meta.deleted_at.is_some() {
            return Err(StreamDeletionPendingError.into());
        }

        let prior_doe_min_age = meta.config.delete_on_empty.min_age();

        meta.config = OptionalStreamConfig::from(meta.config)
            .reconfigure(reconfig)
            .merge(basin_meta.config.default_stream_config);

        txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;

        let stream_id = StreamId::new(&basin, &stream);
        if let Some(min_age) = meta.config.delete_on_empty.min_age()
            && prior_doe_min_age.is_none()
        {
            txn.put(
                kv::stream_doe_deadline::ser_key(
                    kv::timestamp::TimestampSecs::after(doe_arm_delay(
                        meta.config.retention_policy.age().unwrap_or_default(),
                        min_age,
                    )),
                    stream_id,
                ),
                kv::stream_doe_deadline::ser_value(min_age),
            )?;
        }

        txn.commit().await?;

        if let Some(client) = self.streamer_client_if_active(&basin, &stream) {
            client.advise_reconfig(meta.config.clone());
        }

        Ok(meta.config)
    }

    #[instrument(ret, err, skip(self))]
    pub async fn delete_stream(
        &self,
        basin: BasinName,
        stream: StreamName,
    ) -> Result<(), DeleteStreamError> {
        self.delete_stream_with_condition(basin, stream, TerminalTrimCondition::Always)
            .await
    }

    pub(super) async fn delete_stream_with_condition(
        &self,
        basin: BasinName,
        stream: StreamName,
        condition: TerminalTrimCondition,
    ) -> Result<(), DeleteStreamError> {
        let outcome = match self.streamer_client_guarded(&basin, &stream).await {
            Ok(client) => client.terminal_trim(condition).await?,
            Err(StreamerError::Storage(e)) => {
                return Err(DeleteStreamError::Storage(e));
            }
            Err(StreamerError::StreamNotFound(e)) => {
                return Err(DeleteStreamError::StreamNotFound(e));
            }
            Err(StreamerError::StreamDeletionPending(_)) => TerminalTrimOutcome::DeletionPending,
        };
        match outcome {
            TerminalTrimOutcome::DeletionPending => self.mark_stream_deleted(basin, stream).await,
            TerminalTrimOutcome::Ineligible => Ok(()),
        }
    }

    async fn mark_stream_deleted(
        &self,
        basin: BasinName,
        stream: StreamName,
    ) -> Result<(), DeleteStreamError> {
        let txn = self.db.begin(IsolationLevel::SerializableSnapshot).await?;
        let meta_key = kv::stream_meta::ser_key(&basin, &stream);
        let mut meta = db_txn_get(&txn, &meta_key, kv::stream_meta::deser_value)
            .await?
            .ok_or_else(|| StreamNotFoundError {
                basin,
                stream: stream.clone(),
            })?;
        if meta.deleted_at.is_none() {
            meta.deleted_at = Some(OffsetDateTime::now_utc());
            txn.put(&meta_key, kv::stream_meta::ser_value(&meta))?;
            txn.commit().await?;
        }
        Ok(())
    }
}

fn creation_idempotency_key(req_token: &RequestToken, config: &OptionalStreamConfig) -> Bash {
    Bash::length_prefixed(&[
        req_token.as_bytes(),
        &s2_api::v1::config::StreamConfig::to_opt(config.clone())
            .as_ref()
            .map(|v| serde_json::to_vec(v).expect("serializable"))
            .unwrap_or_default(),
    ])
}