s2_storage/record/
iterator.rs1use std::iter::FusedIterator;
2
3use s2_common::record::Metered;
4
5use super::{
6 StoredRecordDecodeError, StoredSequencedBytes, StoredSequencedRecord, decode_stored_record,
7};
8
9pub struct StoredRecordIterator<I> {
10 inner: I,
11}
12
13impl<I> StoredRecordIterator<I> {
14 pub fn new(inner: I) -> Self {
15 Self { inner }
16 }
17}
18
19impl<I, E> Iterator for StoredRecordIterator<I>
20where
21 I: Iterator<Item = Result<StoredSequencedBytes, E>>,
22 E: std::fmt::Debug + Into<StoredRecordDecodeError>,
23{
24 type Item = Result<Metered<StoredSequencedRecord>, StoredRecordDecodeError>;
25
26 fn next(&mut self) -> Option<Self::Item> {
27 self.inner.next().map(|result| {
28 let (position, bytes) = result.map_err(Into::into)?.into_parts();
29 let record = decode_stored_record(bytes)?;
30 Ok(record.sequenced(position))
31 })
32 }
33}
34
35impl<I, E> FusedIterator for StoredRecordIterator<I>
36where
37 I: FusedIterator<Item = Result<StoredSequencedBytes, E>>,
38 E: std::fmt::Debug + Into<StoredRecordDecodeError>,
39{
40}
41
42#[cfg(test)]
43mod tests {
44 use bytes::{BufMut, Bytes, BytesMut};
45 use s2_common::record::{
46 EnvelopeRecord, Metered, MeteredExt, MeteredSize, Record, SeqNum, Sequenced,
47 StreamPosition, Timestamp,
48 };
49
50 use super::*;
51 use crate::record::{
52 EncryptedRecord, StoredRecord, StoredSequencedBytes, StoredSequencedRecord,
53 encode_stored_record,
54 };
55
56 fn test_stored_plaintext_record(
57 seq_num: SeqNum,
58 timestamp: Timestamp,
59 body: &'static [u8],
60 ) -> Metered<StoredSequencedRecord> {
61 StoredRecord::Plaintext(Record::Envelope(
62 EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(body)).unwrap(),
63 ))
64 .metered()
65 .sequenced(StreamPosition { seq_num, timestamp })
66 }
67
68 fn test_stored_encrypted_record(
69 seq_num: SeqNum,
70 timestamp: Timestamp,
71 ) -> Metered<StoredSequencedRecord> {
72 let metered_size = Record::Envelope(
73 EnvelopeRecord::try_from_parts(vec![], Bytes::from_static(b"secret payload")).unwrap(),
74 )
75 .metered_size();
76
77 let mut encoded = BytesMut::with_capacity(1 + 12 + 10 + 16);
78 encoded.put_u8(0x02);
79 encoded.put_bytes(0xAB, 12);
80 encoded.put_slice(b"ciphertext");
81 encoded.put_bytes(0xCD, 16);
82 let record = EncryptedRecord::try_from(encoded.freeze()).unwrap();
83
84 StoredRecord::Encrypted {
85 metered_size,
86 record,
87 }
88 .metered()
89 .sequenced(StreamPosition { seq_num, timestamp })
90 }
91
92 fn to_stored_bytes_iter(
93 records: Vec<Metered<StoredSequencedRecord>>,
94 ) -> impl Iterator<Item = Result<StoredSequencedBytes, StoredRecordDecodeError>> {
95 records
96 .into_iter()
97 .map(|record| {
98 let (position, record) = record.into_parts();
99 Sequenced::new(position, encode_stored_record(record.as_ref()))
100 })
101 .map(Ok)
102 }
103
104 #[test]
105 fn stored_iterator_decodes_plaintext_records() {
106 let expected = vec![
107 test_stored_plaintext_record(1, 10, b"p0"),
108 test_stored_plaintext_record(2, 11, b"p1"),
109 ];
110 let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
111 .collect::<Result<Vec<_>, _>>()
112 .unwrap();
113
114 assert_eq!(actual, expected);
115 }
116
117 #[test]
118 fn stored_iterator_preserves_encrypted_records() {
119 let expected = vec![test_stored_encrypted_record(1, 10)];
120
121 let actual = StoredRecordIterator::new(to_stored_bytes_iter(expected.clone()))
122 .collect::<Result<Vec<_>, _>>()
123 .unwrap();
124
125 assert_eq!(actual, expected);
126 }
127
128 #[test]
129 fn stored_iterator_surfaces_decode_errors() {
130 let invalid_data = Sequenced::new(
131 StreamPosition {
132 seq_num: 1,
133 timestamp: 10,
134 },
135 Bytes::new(),
136 );
137 let mut iter = StoredRecordIterator::new(std::iter::once::<
138 Result<StoredSequencedBytes, StoredRecordDecodeError>,
139 >(Ok(invalid_data)));
140
141 let error = iter
142 .next()
143 .expect("error expected")
144 .expect_err("expected error");
145 assert!(matches!(
146 error,
147 StoredRecordDecodeError::Truncated("MagicByte")
148 ));
149 assert!(iter.next().is_none());
150 }
151
152 #[test]
153 fn stored_iterator_preserves_source_errors() {
154 let mut iter = StoredRecordIterator::new(std::iter::once::<
155 Result<StoredSequencedBytes, StoredRecordDecodeError>,
156 >(Err(
157 StoredRecordDecodeError::InvalidValue("test", "boom"),
158 )));
159
160 let error = iter
161 .next()
162 .expect("error expected")
163 .expect_err("expected error");
164 assert!(matches!(
165 error,
166 StoredRecordDecodeError::InvalidValue("test", "boom")
167 ));
168 }
169}