s2-common 0.31.1

Common stuff for client and servers for S2, the durable streams API
Documentation
use std::iter::FusedIterator;

use super::{
    Metered, RecordDecodeError, StoredRecord, StoredSequencedBytes, StoredSequencedRecord,
};

pub struct StoredRecordIterator<I> {
    inner: I,
}

impl<I> StoredRecordIterator<I> {
    pub fn new(inner: I) -> Self {
        Self { inner }
    }
}

impl<I, E> Iterator for StoredRecordIterator<I>
where
    I: Iterator<Item = Result<StoredSequencedBytes, E>>,
    E: std::fmt::Debug + Into<RecordDecodeError>,
{
    type Item = Result<Metered<StoredSequencedRecord>, RecordDecodeError>;

    fn next(&mut self) -> Option<Self::Item> {
        self.inner.next().map(|result| {
            let (position, bytes) = result.map_err(Into::into)?.into_parts();
            let record: Metered<StoredRecord> = bytes.try_into()?;
            Ok(record.sequenced(position))
        })
    }
}

impl<I, E> FusedIterator for StoredRecordIterator<I>
where
    I: FusedIterator<Item = Result<StoredSequencedBytes, E>>,
    E: std::fmt::Debug + Into<RecordDecodeError>,
{
}

#[cfg(test)]
mod tests {
    use bytes::{BufMut, Bytes, BytesMut};

    use super::*;
    use crate::record::{
        Encodable, EncryptedRecord, EnvelopeRecord, Metered, MeteredExt, MeteredSize, Record,
        SeqNum, Sequenced, StoredRecord, StoredSequencedBytes, StoredSequencedRecord,
        StreamPosition, Timestamp,
    };

    fn test_stored_plaintext_record(
        seq_num: SeqNum,
        timestamp: Timestamp,
        body: &'static [u8],
    ) -> Metered<StoredSequencedRecord> {
        StoredRecord::Plaintext(Record::Envelope(
            EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(body)).unwrap(),
        ))
        .metered()
        .sequenced(StreamPosition { seq_num, timestamp })
    }

    fn test_stored_encrypted_record(
        seq_num: SeqNum,
        timestamp: Timestamp,
    ) -> Metered<StoredSequencedRecord> {
        let metered_size = Record::Envelope(
            EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(b"secret payload")).unwrap(),
        )
        .metered_size();

        let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
        encoded.put_u8(0x02);
        encoded.put_bytes(0xAB, 12);
        encoded.put_slice(b"ciphertext");
        encoded.put_bytes(0xCD, 16);
        let record = EncryptedRecord::try_from(encoded.freeze()).unwrap();

        StoredRecord::Encrypted {
            metered_size,
            record,
        }
        .metered()
        .sequenced(StreamPosition { seq_num, timestamp })
    }

    fn to_stored_bytes_iter(
        records: Vec<Metered<StoredSequencedRecord>>,
    ) -> impl Iterator<Item = Result<StoredSequencedBytes, RecordDecodeError>> {
        records
            .into_iter()
            .map(|record| {
                let (position, record) = record.into_parts();
                Sequenced::new(position, record.as_ref().to_bytes())
            })
            .map(Ok)
    }

    #[test]
    fn stored_iterator_decodes_plaintext_records() {
        let expected = vec![
            test_stored_plaintext_record(1, 10, b"p0"),
            test_stored_plaintext_record(2, 11, b"p1"),
        ];
        let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
            .collect::<Result<Vec<_>, _>>()
            .unwrap();

        assert_eq!(actual, expected);
    }

    #[test]
    fn stored_iterator_preserves_encrypted_records() {
        let expected = vec![test_stored_encrypted_record(1, 10)];

        let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
            .collect::<Result<Vec<_>, _>>()
            .unwrap();

        assert_eq!(actual, expected);
    }

    #[test]
    fn stored_iterator_surfaces_decode_errors() {
        let invalid_data = Sequenced::new(
            StreamPosition {
                seq_num: 1,
                timestamp: 10,
            },
            Bytes::new(),
        );
        let mut iter = StoredRecordIterator::new(std::iter::once::<
            Result<StoredSequencedBytes, RecordDecodeError>,
        >(Ok(invalid_data)));

        let error = iter
            .next()
            .expect("error expected")
            .expect_err("expected error");
        assert!(matches!(error, RecordDecodeError::Truncated("MagicByte")));
        assert!(iter.next().is_none());
    }

    #[test]
    fn stored_iterator_preserves_source_errors() {
        let mut iter = StoredRecordIterator::new(std::iter::once::<
            Result<StoredSequencedBytes, RecordDecodeError>,
        >(Err(RecordDecodeError::InvalidValue(
            "test", "boom",
        ))));

        let error = iter
            .next()
            .expect("error expected")
            .expect_err("expected error");
        assert!(matches!(
            error,
            RecordDecodeError::InvalidValue("test", "boom")
        ));
    }
}