s2-api 0.27.17

API types for S2, the durable streams API
Documentation
use compact_str::ToCompactString;
use s2_common::{record, types};

include!("s2.v1.rs");

impl From<record::StreamPosition> for StreamPosition {
    fn from(record::StreamPosition { seq_num, timestamp }: record::StreamPosition) -> Self {
        Self { seq_num, timestamp }
    }
}

impl From<Header> for record::Header {
    fn from(Header { name, value }: Header) -> Self {
        Self { name, value }
    }
}

impl From<record::Header> for Header {
    fn from(record::Header { name, value }: record::Header) -> Self {
        Self { name, value }
    }
}

impl TryFrom<AppendRecord> for types::stream::AppendRecord {
    type Error = types::ValidationError;

    fn try_from(
        AppendRecord {
            timestamp,
            headers,
            body,
        }: AppendRecord,
    ) -> Result<Self, Self::Error> {
        Ok(Self::try_from(types::stream::AppendRecordParts {
            timestamp,
            record: record::Record::try_from_parts(
                headers.into_iter().map(Into::into).collect(),
                body,
            )
            .map_err(|e| e.to_string())?
            .into(),
        })?)
    }
}

impl TryFrom<AppendInput> for types::stream::AppendInput {
    type Error = types::ValidationError;

    fn try_from(
        AppendInput {
            records,
            match_seq_num,
            fencing_token,
        }: AppendInput,
    ) -> Result<Self, Self::Error> {
        let records = records
            .into_iter()
            .map(types::stream::AppendRecord::try_from)
            .collect::<Result<Vec<_>, _>>()?;

        Ok(Self {
            records: types::stream::AppendRecordBatch::try_from(records)?,
            match_seq_num,
            fencing_token: fencing_token
                .as_deref()
                .map(|s| s.to_compact_string().try_into())
                .transpose()?,
        })
    }
}

impl From<types::stream::AppendAck> for AppendAck {
    fn from(types::stream::AppendAck { start, end, tail }: types::stream::AppendAck) -> Self {
        Self {
            start: Some(start.into()),
            end: Some(end.into()),
            tail: Some(tail.into()),
        }
    }
}

impl From<record::SequencedRecord> for SequencedRecord {
    fn from(record: record::SequencedRecord) -> Self {
        let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
        let (headers, body) = record.into_parts();
        Self {
            seq_num,
            timestamp,
            headers: headers.into_iter().map(Into::into).collect(),
            body,
        }
    }
}

impl From<types::stream::ReadBatch> for ReadBatch {
    fn from(batch: types::stream::ReadBatch) -> Self {
        Self {
            records: batch.records.into_iter().map(Into::into).collect(),
            tail: batch.tail.map(Into::into),
        }
    }
}