Skip to main content

s2_storage/record/
iterator.rs

1use std::iter::FusedIterator;
2
3use s2_common::record::Metered;
4
5use super::{
6    StoredRecordDecodeError, StoredSequencedBytes, StoredSequencedRecord, decode_stored_record,
7};
8
9pub struct StoredRecordIterator<I> {
10    inner: I,
11}
12
13impl<I> StoredRecordIterator<I> {
14    pub fn new(inner: I) -> Self {
15        Self { inner }
16    }
17}
18
19impl<I, E> Iterator for StoredRecordIterator<I>
20where
21    I: Iterator<Item = Result<StoredSequencedBytes, E>>,
22    E: std::fmt::Debug + Into<StoredRecordDecodeError>,
23{
24    type Item = Result<Metered<StoredSequencedRecord>, StoredRecordDecodeError>;
25
26    fn next(&mut self) -> Option<Self::Item> {
27        self.inner.next().map(|result| {
28            let (position, bytes) = result.map_err(Into::into)?.into_parts();
29            let record = decode_stored_record(bytes)?;
30            Ok(record.sequenced(position))
31        })
32    }
33}
34
35impl<I, E> FusedIterator for StoredRecordIterator<I>
36where
37    I: FusedIterator<Item = Result<StoredSequencedBytes, E>>,
38    E: std::fmt::Debug + Into<StoredRecordDecodeError>,
39{
40}
41
42#[cfg(test)]
43mod tests {
44    use bytes::{BufMut, Bytes, BytesMut};
45    use s2_common::record::{
46        EnvelopeRecord, Metered, MeteredExt, MeteredSize, Record, SeqNum, Sequenced,
47        StreamPosition, Timestamp,
48    };
49
50    use super::*;
51    use crate::record::{
52        EncryptedRecord, StoredRecord, StoredSequencedBytes, StoredSequencedRecord,
53        encode_stored_record,
54    };
55
56    fn test_stored_plaintext_record(
57        seq_num: SeqNum,
58        timestamp: Timestamp,
59        body: &'static [u8],
60    ) -> Metered<StoredSequencedRecord> {
61        StoredRecord::Plaintext(Record::Envelope(
62            EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(body)).unwrap(),
63        ))
64        .metered()
65        .sequenced(StreamPosition { seq_num, timestamp })
66    }
67
68    fn test_stored_encrypted_record(
69        seq_num: SeqNum,
70        timestamp: Timestamp,
71    ) -> Metered<StoredSequencedRecord> {
72        let metered_size = Record::Envelope(
73            EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(b"secret payload")).unwrap(),
74        )
75        .metered_size();
76
77        let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
78        encoded.put_u8(0x02);
79        encoded.put_bytes(0xAB, 12);
80        encoded.put_slice(b"ciphertext");
81        encoded.put_bytes(0xCD, 16);
82        let record = EncryptedRecord::try_from(encoded.freeze()).unwrap();
83
84        StoredRecord::Encrypted {
85            metered_size,
86            record,
87        }
88        .metered()
89        .sequenced(StreamPosition { seq_num, timestamp })
90    }
91
92    fn to_stored_bytes_iter(
93        records: Vec<Metered<StoredSequencedRecord>>,
94    ) -> impl Iterator<Item = Result<StoredSequencedBytes, StoredRecordDecodeError>> {
95        records
96            .into_iter()
97            .map(|record| {
98                let (position, record) = record.into_parts();
99                Sequenced::new(position, encode_stored_record(record.as_ref()))
100            })
101            .map(Ok)
102    }
103
104    #[test]
105    fn stored_iterator_decodes_plaintext_records() {
106        let expected = vec![
107            test_stored_plaintext_record(1, 10, b"p0"),
108            test_stored_plaintext_record(2, 11, b"p1"),
109        ];
110        let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
111            .collect::<Result<Vec<_>, _>>()
112            .unwrap();
113
114        assert_eq!(actual, expected);
115    }
116
117    #[test]
118    fn stored_iterator_preserves_encrypted_records() {
119        let expected = vec![test_stored_encrypted_record(1, 10)];
120
121        let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
122            .collect::<Result<Vec<_>, _>>()
123            .unwrap();
124
125        assert_eq!(actual, expected);
126    }
127
128    #[test]
129    fn stored_iterator_surfaces_decode_errors() {
130        let invalid_data = Sequenced::new(
131            StreamPosition {
132                seq_num: 1,
133                timestamp: 10,
134            },
135            Bytes::new(),
136        );
137        let mut iter = StoredRecordIterator::new(std::iter::once::<
138            Result<StoredSequencedBytes, StoredRecordDecodeError>,
139        >(Ok(invalid_data)));
140
141        let error = iter
142            .next()
143            .expect("error expected")
144            .expect_err("expected error");
145        assert!(matches!(
146            error,
147            StoredRecordDecodeError::Truncated("MagicByte")
148        ));
149        assert!(iter.next().is_none());
150    }
151
152    #[test]
153    fn stored_iterator_preserves_source_errors() {
154        let mut iter = StoredRecordIterator::new(std::iter::once::<
155            Result<StoredSequencedBytes, StoredRecordDecodeError>,
156        >(Err(
157            StoredRecordDecodeError::InvalidValue("test", "boom"),
158        )));
159
160        let error = iter
161            .next()
162            .expect("error expected")
163            .expect_err("expected error");
164        assert!(matches!(
165            error,
166            StoredRecordDecodeError::InvalidValue("test", "boom")
167        ));
168    }
169}