s2-lite 0.33.0

Lightweight server implementation of S2, the durable streams API, backed by object storage
Documentation
use std::pin::Pin;

use futures::Stream;
use s2_common::{
    encryption::EncryptionSpec,
    record::StreamPosition,
    types::{
        basin::BasinName,
        stream::{AppendAck, AppendInput, StreamName},
    },
};
use s2_lite::backend::{
    Backend,
    error::{AppendError, CheckTailError},
};

mod read;
mod setup;

pub use read::*;
pub use setup::*;

pub async fn append(
    backend: &Backend,
    basin: BasinName,
    stream: StreamName,
    input: AppendInput,
    encryption: Option<&EncryptionSpec>,
) -> Result<AppendAck, AppendError> {
    backend
        .open_for_append(
            &basin,
            &stream,
            encryption.and_then(encryption_key_for_spec),
        )
        .await?
        .append(input)
        .await
}

pub async fn append_session<S>(
    backend: &Backend,
    basin: BasinName,
    stream: StreamName,
    encryption: Option<&EncryptionSpec>,
    inputs: S,
) -> Result<Pin<Box<dyn Stream<Item = Result<AppendAck, AppendError>>>>, AppendError>
where
    S: Stream<Item = AppendInput> + 'static,
{
    let session = backend
        .open_for_append(
            &basin,
            &stream,
            encryption.and_then(encryption_key_for_spec),
        )
        .await?
        .append_session(inputs);
    Ok(Box::pin(session))
}

pub async fn check_tail(
    backend: &Backend,
    basin: BasinName,
    stream: StreamName,
) -> Result<StreamPosition, CheckTailError> {
    backend
        .open_for_check_tail(&basin, &stream)
        .await?
        .check_tail()
        .await
}