s2-common 0.39.1

Common stuff for client and servers for S2, the durable streams API
Documentation
mod command;
mod envelope;
mod fencing;
mod metering;

use bytes::Bytes;
pub use command::{CommandOp, CommandPayloadError, CommandRecord};
pub use envelope::{EnvelopeRecord, HeaderValidationError};
pub use fencing::{FencingToken, FencingTokenTooLongError, MAX_FENCING_TOKEN_LENGTH};
pub use metering::{Metered, MeteredExt, MeteredSize};

use crate::deep_size::DeepSize;

pub type SeqNum = u64;
pub type NonZeroSeqNum = std::num::NonZeroU64;
pub type Timestamp = u64;

#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub struct StreamPosition {
    pub seq_num: SeqNum,
    pub timestamp: Timestamp,
}

impl StreamPosition {
    pub const MIN: StreamPosition = StreamPosition {
        seq_num: SeqNum::MIN,
        timestamp: Timestamp::MIN,
    };
}

impl std::fmt::Display for StreamPosition {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(f, "{} @ {}", self.seq_num, self.timestamp)
    }
}

impl DeepSize for StreamPosition {
    fn deep_size(&self) -> usize {
        self.seq_num.deep_size() + self.timestamp.deep_size()
    }
}

#[derive(Debug, PartialEq, thiserror::Error)]
pub enum RecordPartsError {
    #[error("unknown command")]
    UnknownCommand,
    #[error("invalid `{0}` command: {1}")]
    CommandPayload(CommandOp, CommandPayloadError),
    #[error("invalid header: {0}")]
    Header(#[from] HeaderValidationError),
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Header {
    pub name: Bytes,
    pub value: Bytes,
}

impl DeepSize for Header {
    fn deep_size(&self) -> usize {
        self.name.len() + self.value.len()
    }
}

impl MeteredSize for Record {
    fn metered_size(&self) -> usize {
        match self {
            Self::Command(command) => command.metered_size(),
            Self::Envelope(envelope) => envelope.metered_size(),
        }
    }
}

#[derive(Debug, PartialEq, Eq, Clone)]
pub enum Record {
    Command(CommandRecord),
    Envelope(EnvelopeRecord),
}

impl DeepSize for Record {
    fn deep_size(&self) -> usize {
        match self {
            Self::Command(c) => c.deep_size(),
            Self::Envelope(e) => e.deep_size(),
        }
    }
}

impl Record {
    pub fn try_from_parts(headers: Vec<Header>, body: Bytes) -> Result<Self, RecordPartsError> {
        if headers.len() == 1 {
            let header = &headers[0];
            if header.name.is_empty() {
                let op = CommandOp::from_id(header.value.as_ref())
                    .ok_or(RecordPartsError::UnknownCommand)?;
                let command_record = CommandRecord::try_from_parts(op, body.as_ref())
                    .map_err(|e| RecordPartsError::CommandPayload(op, e))?;
                return Ok(Self::Command(command_record));
            }
        }
        let envelope = EnvelopeRecord::try_from_parts(headers, body)?;
        Ok(Self::Envelope(envelope))
    }

    pub fn sequenced(self, position: StreamPosition) -> SequencedRecord {
        Sequenced::new(position, self)
    }

    pub fn into_parts(self) -> (Vec<Header>, Bytes) {
        match self {
            Record::Envelope(e) => e.into_parts(),
            Record::Command(c) => {
                let op = c.op();
                let header = Header {
                    name: Bytes::new(),
                    value: Bytes::from_static(op.to_id()),
                };
                (vec![header], c.payload())
            }
        }
    }
}

#[derive(Debug, Clone, PartialEq, Eq)]
pub struct Sequenced<T> {
    position: StreamPosition,
    inner: T,
}

impl<T> Sequenced<T> {
    pub const fn new(position: StreamPosition, inner: T) -> Self {
        Self { position, inner }
    }

    pub const fn position(&self) -> &StreamPosition {
        &self.position
    }

    pub fn inner(&self) -> &T {
        &self.inner
    }

    pub fn as_ref(&self) -> Sequenced<&T> {
        Sequenced::new(self.position, &self.inner)
    }

    pub fn parts(&self) -> (StreamPosition, &T) {
        (self.position, &self.inner)
    }

    pub fn into_parts(self) -> (StreamPosition, T) {
        (self.position, self.inner)
    }
}

pub type SequencedRecord = Sequenced<Record>;

impl<T> MeteredSize for Sequenced<T>
where
    T: MeteredSize,
{
    fn metered_size(&self) -> usize {
        self.inner.metered_size()
    }
}

impl<T> DeepSize for Sequenced<T>
where
    T: DeepSize,
{
    fn deep_size(&self) -> usize {
        self.position.deep_size() + self.inner.deep_size()
    }
}

impl<T> Metered<T>
where
    T: MeteredSize,
{
    pub fn sequenced(self, position: StreamPosition) -> Metered<Sequenced<T>> {
        Metered::with_size(
            self.metered_size(),
            Sequenced::new(position, self.into_inner()),
        )
    }
}

impl<T> Metered<Sequenced<T>> {
    pub fn parts(&self) -> (StreamPosition, Metered<&T>) {
        let size = self.metered_size();
        let (position, inner) = self.as_ref().into_inner().parts();
        (position, Metered::with_size(size, inner))
    }

    pub fn into_parts(self) -> (StreamPosition, Metered<T>) {
        let size = self.metered_size();
        let (position, inner) = self.into_inner().into_parts();
        (position, Metered::with_size(size, inner))
    }
}

#[cfg(test)]
mod test {
    use rstest::rstest;

    use super::*;

    fn semantic_metered_size(record: &Record) -> usize {
        let (headers, body) = record.clone().into_parts();
        8 + (2 * headers.len())
            + headers
                .iter()
                .map(|header| header.name.len() + header.value.len())
                .sum::<usize>()
            + body.len()
    }

    #[test]
    fn empty_header_name_solo() {
        let headers = vec![Header {
            name: Bytes::new(),
            value: Bytes::from("hi"),
        }];
        let body = Bytes::from("hello");
        assert_eq!(
            Record::try_from_parts(headers, body),
            Err(RecordPartsError::UnknownCommand)
        );
    }

    #[test]
    fn empty_header_name_among_others() {
        let headers = vec![
            Header {
                name: Bytes::from("boku"),
                value: Bytes::from("hi"),
            },
            Header {
                name: Bytes::new(),
                value: Bytes::from("hi"),
            },
        ];
        let body = Bytes::from("hello");
        assert_eq!(
            Record::try_from_parts(headers, body),
            Err(RecordPartsError::Header(HeaderValidationError::NameEmpty))
        );
    }

    fn command_parts(op: &'static [u8], payload: &'static [u8]) -> (Vec<Header>, Bytes) {
        let headers = vec![Header {
            name: Bytes::new(),
            value: Bytes::from_static(op),
        }];
        let body = Bytes::from_static(payload);
        (headers, body)
    }

    fn assert_valid_command_record(op: &'static [u8], payload: &'static [u8]) {
        let (headers, body) = command_parts(op, payload);
        let record = Record::try_from_parts(headers.clone(), body.clone()).unwrap();
        let record_metered = record.metered_size();
        match &record {
            Record::Command(cmd) => {
                assert_eq!(cmd.op().to_id(), op);
                assert_eq!(cmd.payload().as_ref(), payload);
            }
            other => panic!("Command expected, got {other:?}"),
        }
        assert_eq!(record_metered, semantic_metered_size(&record));
        let sequenced_record = record.clone().sequenced(StreamPosition {
            seq_num: 42,
            timestamp: 100_000,
        });
        let sequenced_metered = sequenced_record.metered_size();
        assert_eq!(record_metered, sequenced_metered);
        assert_eq!(
            sequenced_record.position,
            StreamPosition {
                seq_num: 42,
                timestamp: 100_000,
            }
        );
        assert_eq!(
            sequenced_record.inner,
            Record::try_from_parts(headers, body).unwrap()
        );
    }

    #[rstest]
    #[case::fence_empty(b"fence", b"")]
    #[case::fence_uuid(b"fence", b"my-special-uuid")]
    #[case::trim_0(b"trim", b"\x00\x00\x00\x00\x00\x00\x00\x00")]
    fn valid_command_records(#[case] op: &'static [u8], #[case] payload: &'static [u8]) {
        assert_valid_command_record(op, payload);
    }

    #[rstest]
    #[case::fence_too_long(
        b"fence",
        b"toolongtoolongtoolongtoolongtoolongtoolongtoolong",
        RecordPartsError::CommandPayload(
            CommandOp::Fence,
            CommandPayloadError::FencingTokenTooLong(FencingTokenTooLongError(49)),
        )
    )]
    #[case::trim_empty(
        b"trim",
        b"",
        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(0),)
    )]
    #[case::trim_overflow(
        b"trim",
        b"\x00\x00\x00\x00\x00\x00\x00\x00\x00",
        RecordPartsError::CommandPayload(CommandOp::Trim, CommandPayloadError::TrimPointSize(9),)
    )]
    fn invalid_command_records(
        #[case] op: &'static [u8],
        #[case] payload: &'static [u8],
        #[case] expected: RecordPartsError,
    ) {
        let (headers, body) = command_parts(op, payload);
        assert_eq!(Record::try_from_parts(headers, body), Err(expected));
    }
}