Skip to main content

ubiquisync_core/codec/
reader.rs

1use crate::{codec::error::CodecError, uuid::Uuid};
2use std::collections::HashMap;
3use std::io::BufRead;
4
5/// Decodes one log entry's body — the read counterpart to
6/// [`EntryBufferWriter`](super::writer::EntryBufferWriter). Feeds bytes through
7/// the rolling hash and resolves dictionary-compressed UUIDs.
8pub struct EntryBufferReader<'a, R> {
9    reader: HashReader<'a, R>,
10    uuid_dict: &'a mut HashMap<u32, Uuid>,
11}
12
13impl<'a, R: BufRead> EntryBufferReader<'a, R> {
14    /// Wrap `reader`, sharing `uuid_dict` to resolve UUID dictionary references
15    /// across the entries of one segment.
16    pub fn new(reader: &'a mut Reader<R>, uuid_dict: &'a mut HashMap<u32, Uuid>) -> Self {
17        Self {
18            reader: HashReader::new(reader),
19            uuid_dict,
20        }
21    }
22
23    /// Read a single byte.
24    pub fn read_byte(&mut self) -> Result<u8, CodecError> {
25        self.reader.read_exact(1, true).map(|b| b[0])
26    }
27
28    /// Read exactly `len` raw bytes.
29    pub fn read_bytes(&mut self, len: usize) -> Result<Vec<u8>, CodecError> {
30        self.reader.read_exact(len, true)
31    }
32
33    /// Read an unsigned varint (7 data bits per byte, little-endian).
34    pub fn read_varint(&mut self) -> Result<u64, CodecError> {
35        self.reader.read_varint(true)
36    }
37
38    /// Read a length-prefixed byte string (varint length, then the bytes).
39    pub fn read_blob(&mut self) -> Result<Vec<u8>, CodecError> {
40        let len = self.read_varint()?;
41        // try_into, not `as`: on 32-bit targets (e.g. wasm32) `as usize` would
42        // truncate a bogus 64-bit length and mis-decode instead of rejecting.
43        let len: usize = len.try_into().map_err(|_| CodecError::LengthTooLarge(len))?;
44        self.reader.read_exact(len, true)
45    }
46
47    /// Read a little-endian `u16`.
48    pub fn read_u16_le(&mut self) -> Result<u16, CodecError> {
49        self.reader
50            .read_exact(2, true)
51            .map(|b| u16::from_le_bytes([b[0], b[1]]))
52    }
53
54    /// Read a zigzag-encoded signed varint.
55    pub fn read_zigzag(&mut self) -> Result<i64, CodecError> {
56        let encoded = self.read_varint()?;
57        Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64))
58    }
59
60    /// Read a delta-encoded timestamp. The wire stores the difference
61    /// from `last`; the canonical hash sees the full absolute timestamp
62    /// as fixed 8-byte little-endian (matching the writer).
63    pub fn read_delta(&mut self, last: u64) -> Result<u64, CodecError> {
64        let delta = self.reader.read_varint(false)?;
65        // A corrupted/hostile delta must not wrap u64: a wrapped value would
66        // still hash consistently for the wrong timestamp and pass the check.
67        let current = last
68            .checked_add(delta)
69            .ok_or(CodecError::TimestampOverflow)?;
70        self.reader._update_hash(&current.to_le_bytes());
71        Ok(current)
72    }
73
74    /// Read a UUID, resolving dictionary compression: a `0` sentinel means the
75    /// 16 bytes follow inline (and are registered for later reuse); any other
76    /// value is a 1-based index into the segment's UUID dictionary.
77    pub fn read_uuid(&mut self) -> Result<Uuid, CodecError> {
78        // Do not hash raw bytes from the buffer because the hash will be the actual UUID!
79        let x = self.reader.read_varint(false)?;
80        let uuid: Uuid = if x == 0 {
81            // First instance of this UUID, read it in full and then save to dict.
82            let uuid = self.reader.read_exact(16, false)?;
83            let uuid: Uuid = uuid.try_into().map_err(|_| CodecError::UnexpectedEof)?;
84            let idx = self.uuid_dict.len() + 1; // 1-based index
85            self.uuid_dict.insert(idx as u32, uuid);
86            uuid
87        } else {
88            // Known UUID — look up by dict index. Reject a reference that
89            // doesn't fit u32 before converting, so an out-of-range value
90            // can't wrap and resolve to an unrelated dictionary entry.
91            let idx: u32 = x.try_into().map_err(|_| CodecError::UnresolvedUuid(x))?;
92            *self
93                .uuid_dict
94                .get(&idx)
95                .ok_or(CodecError::UnresolvedUuid(x))?
96        };
97        self.reader._update_hash(&uuid);
98        Ok(uuid)
99    }
100
101    /// Verify the trailing 4-byte hash check against the canonical
102    /// content hash. Returns the full hash on success.
103    pub fn finalize(mut self) -> Result<blake3::Hash, CodecError> {
104        self.reader.finalize()
105    }
106}
107
108struct HashReader<'a, R> {
109    reader: &'a mut Reader<R>,
110    hasher: blake3::Hasher,
111}
112
113impl<'a, R: BufRead> HashReader<'a, R> {
114    fn new(reader: &'a mut Reader<R>) -> Self {
115        Self {
116            reader,
117            hasher: blake3::Hasher::new(),
118        }
119    }
120
121    fn read_varint(&mut self, hash: bool) -> Result<u64, CodecError> {
122        let (result, bytes, len) = self.reader.read_varint()?;
123        if hash {
124            self.hasher.update(&bytes[..len]);
125        }
126        Ok(result)
127    }
128
129    fn read_exact(&mut self, len: usize, hash: bool) -> Result<Vec<u8>, CodecError> {
130        let bytes = self.reader.read_vec(len)?;
131        if hash {
132            self.hasher.update(&bytes);
133        }
134        Ok(bytes)
135    }
136
137    fn _update_hash(&mut self, bytes: &[u8]) {
138        self.hasher.update(bytes);
139    }
140
141    fn finalize(&mut self) -> Result<blake3::Hash, CodecError> {
142        let hash = self.hasher.finalize();
143        let mut buf: [u8; 4] = [0; 4];
144        self.reader.read_exact(&mut buf)?;
145        let expected = &hash.as_bytes()[..4];
146        if buf != expected {
147            return Err(CodecError::HashMismatch {
148                expected: u32::from_le_bytes(expected.try_into().unwrap()),
149                got: u32::from_le_bytes(buf),
150            });
151        }
152        Ok(hash)
153    }
154}
155
156/// A thin [`BufRead`] wrapper that the codec reads a segment from, tracking
157/// position only enough to answer [`is_eof`](Self::is_eof).
158pub struct Reader<R> {
159    reader: R,
160}
161
162impl<R: BufRead> Reader<R> {
163    /// Wrap an underlying [`BufRead`] source.
164    pub fn new(reader: R) -> Self {
165        Self { reader }
166    }
167
168    /// Whether the underlying source has no more bytes.
169    pub fn is_eof(&mut self) -> Result<bool, CodecError> {
170        Ok(self.reader.fill_buf()?.is_empty())
171    }
172
173    /// Returns the decoded value plus the raw on-wire bytes (the caller hashes
174    /// them). A u64 varint is at most 10 bytes, so they go in a fixed stack
175    /// buffer — `len` is how many are valid. No allocation.
176    pub(super) fn read_varint(&mut self) -> Result<(u64, [u8; 10], usize), CodecError> {
177        let mut bytes = [0u8; 10];
178        let mut len = 0;
179        let mut result = 0u64;
180        let mut shift = 0;
181        loop {
182            let byte = self.read_byte()?.ok_or(CodecError::UnexpectedEof)?;
183            bytes[len] = byte;
184            len += 1;
185            // On the 10th byte (shift=63), only bit 0 is valid — higher
186            // bits or a continuation flag would overflow u64. This also caps
187            // the loop at 10 bytes, so `bytes[len]` never indexes past the end.
188            if shift == 63 && byte > 1 {
189                return Err(CodecError::VarIntOverflow);
190            }
191            result |= ((byte & 0x7F) as u64) << shift;
192            if byte & 0x80 == 0 {
193                return Ok((result, bytes, len));
194            }
195            shift += 7;
196        }
197    }
198
199    pub(super) fn read_byte(&mut self) -> Result<Option<u8>, CodecError> {
200        let mut buf: [u8; 1] = [0; 1];
201        let read = self.reader.read(&mut buf)?;
202        if read == 0 {
203            return Ok(None);
204        }
205        Ok(Some(buf[0]))
206    }
207
208    pub(super) fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), CodecError> {
209        self.reader.read_exact(buf).map_err(|e| match e.kind() {
210            // A short read is truncation — surface the dedicated EOF error for
211            // consistency with the rest of the decoder, not a generic Io.
212            std::io::ErrorKind::UnexpectedEof => CodecError::UnexpectedEof,
213            _ => CodecError::Io(e),
214        })
215    }
216
217    pub(super) fn read_vec(&mut self, len: usize) -> Result<Vec<u8>, CodecError> {
218        // Grow the buffer with the bytes actually delivered rather than
219        // pre-allocating an on-wire length we haven't validated — a corrupt or
220        // hostile blob length must not OOM the process before we hit EOF.
221        let mut buf = Vec::new();
222        let mut remaining = len;
223        let mut chunk = [0u8; 8192];
224        while remaining > 0 {
225            let want = remaining.min(chunk.len());
226            let n = self.reader.read(&mut chunk[..want])?;
227            if n == 0 {
228                return Err(CodecError::UnexpectedEof);
229            }
230            buf.extend_from_slice(&chunk[..n]);
231            remaining -= n;
232        }
233        Ok(buf)
234    }
235}
236
237#[cfg(test)]
238mod tests {
239    use super::*;
240    use crate::codec::writer::EntryBufferWriter;
241
242    const UUID_A: Uuid = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
243    const UUID_B: Uuid = [
244        0xA0, 0xA1, 0xA2, 0xA3, 0xA4, 0xA5, 0xA6, 0xA7, 0xA8, 0xA9, 0xAA, 0xAB, 0xAC, 0xAD, 0xAE,
245        0xAF,
246    ];
247    const UUID_C: Uuid = [0xFF; 16];
248
249    /// Goal: Verify that all primitive types round-trip correctly through
250    /// the writer and reader, including UUID dictionary dedup and blake3
251    /// hash verification.
252    ///
253    /// Given: Two entries written sequentially sharing a UUID dictionary,
254    ///        each containing every primitive type (byte, varint, blob,
255    ///        u16_le, zigzag, uuid), with 3 distinct UUIDs where UUID_A
256    ///        appears in both entries to exercise dict dedup.
257    /// When:  Both entries are read back with a fresh reader-side UUID dict.
258    /// Then:  All values match, hashes match, and the second occurrence of
259    ///        UUID_A is decoded from the dict (not inline).
260    #[test]
261    fn roundtrip_two_entries_all_types() {
262        let mut write_uuid_dict: HashMap<Uuid, u32> = HashMap::new();
263        let blob_data = b"hello ubiquisync";
264
265        // Simulate HLC timestamps: first entry has two timestamps (e.g. created_at, updated_at),
266        // second entry has one. Gaps: 1 second, sub-ms counter bump, 30 seconds.
267        let ts1: u64 = 1_700_000_000_000 << 16; // ~2023, counter=0
268        let ts2: u64 = ts1 + 1; // same ms, counter=1
269        let ts3: u64 = ts1 + (30_000 << 16); // 30 seconds later
270
271        // ── Write entry 1: all types including 2 delta timestamps ──
272        let mut w1 = EntryBufferWriter::new(&mut write_uuid_dict);
273        w1.write_byte(0x42);
274        w1.write_varint(123456789);
275        w1.write_blob(blob_data);
276        w1.write_u16_le(0xBEEF);
277        w1.write_zigzag(-99);
278        w1.write_delta(ts1, 0).unwrap(); // first timestamp, last=0
279        w1.write_delta(ts2, ts1).unwrap(); // counter bump, delta=1
280        w1.write_uuid(&UUID_A);
281        w1.write_uuid(&UUID_B);
282        let (buf1, hash1) = w1.finalize();
283
284        // ── Write entry 2: UUID_A dict hit, 1 delta timestamp, edge cases ──
285        let mut w2 = EntryBufferWriter::new(&mut write_uuid_dict);
286        w2.write_byte(0x00);
287        w2.write_varint(0); // edge case: zero
288        w2.write_blob(b""); // edge case: empty blob
289        w2.write_u16_le(0x0000);
290        w2.write_zigzag(i64::MIN);
291        w2.write_delta(ts3, ts2).unwrap(); // 30 second gap
292        w2.write_uuid(&UUID_A); // dict hit — should be smaller on wire
293        w2.write_uuid(&UUID_C);
294        let (buf2, hash2) = w2.finalize();
295
296        // Entry 2's UUID_A should be a dict reference (varint 1 = 1 byte),
297        // not inline (1 + 16 = 17 bytes). Sanity check that buf2 is smaller.
298        assert!(
299            buf2.len() < buf1.len(),
300            "entry 2 should be smaller due to UUID_A dict hit"
301        );
302
303        // Hashes should differ — different content.
304        assert_ne!(hash1, hash2);
305
306        // ── Read both entries back ──────────────────────────────────────────
307        let mut combined = Vec::new();
308        combined.extend_from_slice(&buf1);
309        combined.extend_from_slice(&buf2);
310
311        let mut reader = Reader::new(combined.as_slice());
312        let mut read_uuid_dict: HashMap<u32, Uuid> = HashMap::new();
313
314        // ── Read entry 1 ──
315        {
316            let mut r1 = EntryBufferReader::new(&mut reader, &mut read_uuid_dict);
317            assert_eq!(r1.read_byte().unwrap(), 0x42);
318            assert_eq!(r1.read_varint().unwrap(), 123456789);
319            assert_eq!(r1.read_blob().unwrap(), blob_data);
320            assert_eq!(r1.read_u16_le().unwrap(), 0xBEEF);
321            assert_eq!(r1.read_zigzag().unwrap(), -99);
322            assert_eq!(r1.read_delta(0).unwrap(), ts1); // first timestamp
323            assert_eq!(r1.read_delta(ts1).unwrap(), ts2); // counter bump
324            assert_eq!(r1.read_uuid().unwrap(), UUID_A);
325            assert_eq!(r1.read_uuid().unwrap(), UUID_B);
326            let read_hash1 = r1.finalize().unwrap();
327            assert_eq!(read_hash1, hash1);
328        }
329
330        // ── Read entry 2 ──
331        {
332            let mut r2 = EntryBufferReader::new(&mut reader, &mut read_uuid_dict);
333            assert_eq!(r2.read_byte().unwrap(), 0x00);
334            assert_eq!(r2.read_varint().unwrap(), 0);
335            assert_eq!(r2.read_blob().unwrap(), b"" as &[u8]);
336            assert_eq!(r2.read_u16_le().unwrap(), 0x0000);
337            assert_eq!(r2.read_zigzag().unwrap(), i64::MIN);
338            assert_eq!(r2.read_delta(ts2).unwrap(), ts3); // 30 second gap
339            assert_eq!(r2.read_uuid().unwrap(), UUID_A); // from dict
340            assert_eq!(r2.read_uuid().unwrap(), UUID_C);
341            let read_hash2 = r2.finalize().unwrap();
342            assert_eq!(read_hash2, hash2);
343        }
344
345        // Reader-side dict should have all 3 UUIDs.
346        assert_eq!(read_uuid_dict.len(), 3);
347    }
348
349    /// Goal: a varint with a long run of continuation bytes (more than a u64
350    /// can hold) errors with `VarIntOverflow` rather than spinning past the
351    /// 10-byte maximum or overflowing the shift.
352    ///
353    /// Given: ten `0x80` bytes — every byte keeps the continuation bit set, so
354    ///        the value never terminates within u64's range.
355    /// When:  reading a varint.
356    /// Then:  the 10th byte (shift 63) is rejected as `VarIntOverflow`.
357    #[test]
358    fn read_varint_rejects_overflow() {
359        let data = [0x80u8; 10];
360        let mut reader = Reader::new(data.as_slice());
361        assert!(matches!(
362            reader.read_varint(),
363            Err(CodecError::VarIntOverflow)
364        ));
365    }
366}