s2-storage 0.2.0

Storage-layer internals shared by S2 server implementations
Documentation
use std::iter::FusedIterator;

use s2_common::record::Metered;

use super::{
    StoredRecordDecodeError, StoredSequencedBytes, StoredSequencedRecord, decode_stored_record,
};

pub struct StoredRecordIterator<I> {
    inner: I,
}

impl<I> StoredRecordIterator<I> {
    pub fn new(inner: I) -> Self {
        Self { inner }
    }
}

impl<I, E> Iterator for StoredRecordIterator<I>
where
    I: Iterator<Item = Result<StoredSequencedBytes, E>>,
    E: std::fmt::Debug + Into<StoredRecordDecodeError>,
{
    type Item = Result<Metered<StoredSequencedRecord>, StoredRecordDecodeError>;

    fn next(&mut self) -> Option<Self::Item> {
        self.inner.next().map(|result| {
            let (position, bytes) = result.map_err(Into::into)?.into_parts();
            let record = decode_stored_record(bytes)?;
            Ok(record.sequenced(position))
        })
    }
}

impl<I, E> FusedIterator for StoredRecordIterator<I>
where
    I: FusedIterator<Item = Result<StoredSequencedBytes, E>>,
    E: std::fmt::Debug + Into<StoredRecordDecodeError>,
{
}

#[cfg(test)]
mod tests {
    use bytes::{BufMut, Bytes, BytesMut};
    use s2_common::record::{
        EnvelopeRecord, Metered, MeteredExt, MeteredSize, Record, SeqNum, Sequenced,
        StreamPosition, Timestamp,
    };

    use super::*;
    use crate::record::{
        EncryptedRecord, StoredRecord, StoredSequencedBytes, StoredSequencedRecord,
        encode_stored_record,
    };

    fn test_stored_plaintext_record(
        seq_num: SeqNum,
        timestamp: Timestamp,
        body: &'static [u8],
    ) -> Metered<StoredSequencedRecord> {
        StoredRecord::Plaintext(Record::Envelope(
            EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(body)).unwrap(),
        ))
        .metered()
        .sequenced(StreamPosition { seq_num, timestamp })
    }

    fn test_stored_encrypted_record(
        seq_num: SeqNum,
        timestamp: Timestamp,
    ) -> Metered<StoredSequencedRecord> {
        let metered_size = Record::Envelope(
            EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(b"secret payload")).unwrap(),
        )
        .metered_size();

        let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
        encoded.put_u8(0x02);
        encoded.put_bytes(0xAB, 12);
        encoded.put_slice(b"ciphertext");
        encoded.put_bytes(0xCD, 16);
        let record = EncryptedRecord::try_from(encoded.freeze()).unwrap();

        StoredRecord::Encrypted {
            metered_size,
            record,
        }
        .metered()
        .sequenced(StreamPosition { seq_num, timestamp })
    }

    fn to_stored_bytes_iter(
        records: Vec<Metered<StoredSequencedRecord>>,
    ) -> impl Iterator<Item = Result<StoredSequencedBytes, StoredRecordDecodeError>> {
        records
            .into_iter()
            .map(|record| {
                let (position, record) = record.into_parts();
                Sequenced::new(position, encode_stored_record(record.as_ref()))
            })
            .map(Ok)
    }

    #[test]
    fn stored_iterator_decodes_plaintext_records() {
        let expected = vec![
            test_stored_plaintext_record(1, 10, b"p0"),
            test_stored_plaintext_record(2, 11, b"p1"),
        ];
        let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
            .collect::<Result<Vec<_>, _>>()
            .unwrap();

        assert_eq!(actual, expected);
    }

    #[test]
    fn stored_iterator_preserves_encrypted_records() {
        let expected = vec![test_stored_encrypted_record(1, 10)];

        let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
            .collect::<Result<Vec<_>, _>>()
            .unwrap();

        assert_eq!(actual, expected);
    }

    #[test]
    fn stored_iterator_surfaces_decode_errors() {
        let invalid_data = Sequenced::new(
            StreamPosition {
                seq_num: 1,
                timestamp: 10,
            },
            Bytes::new(),
        );
        let mut iter = StoredRecordIterator::new(std::iter::once::<
            Result<StoredSequencedBytes, StoredRecordDecodeError>,
        >(Ok(invalid_data)));

        let error = iter
            .next()
            .expect("error expected")
            .expect_err("expected error");
        assert!(matches!(
            error,
            StoredRecordDecodeError::Truncated("MagicByte")
        ));
        assert!(iter.next().is_none());
    }

    #[test]
    fn stored_iterator_preserves_source_errors() {
        let mut iter = StoredRecordIterator::new(std::iter::once::<
            Result<StoredSequencedBytes, StoredRecordDecodeError>,
        >(Err(
            StoredRecordDecodeError::InvalidValue("test", "boom"),
        )));

        let error = iter
            .next()
            .expect("error expected")
            .expect_err("expected error");
        assert!(matches!(
            error,
            StoredRecordDecodeError::InvalidValue("test", "boom")
        ));
    }
}