1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
use crate::{codec::error::CodecError, uuid::Uuid};
use std::collections::HashMap;
use std::io::BufRead;
/// Decodes one log entry's body — the read counterpart to
/// [`EntryBufferWriter`](super::writer::EntryBufferWriter). Feeds bytes through
/// the rolling hash and resolves dictionary-compressed UUIDs.
pub struct EntryBufferReader<'a, R> {
reader: HashReader<'a, R>,
uuid_dict: &'a mut HashMap<u32, Uuid>,
}
impl<'a, R: BufRead> EntryBufferReader<'a, R> {
/// Wrap `reader`, sharing `uuid_dict` to resolve UUID dictionary references
/// across the entries of one segment.
pub fn new(reader: &'a mut Reader<R>, uuid_dict: &'a mut HashMap<u32, Uuid>) -> Self {
Self {
reader: HashReader::new(reader),
uuid_dict,
}
}
/// Read a single byte.
pub fn read_byte(&mut self) -> Result<u8, CodecError> {
self.reader.read_exact(1, true).map(|b| b[0])
}
/// Read exactly `len` raw bytes.
pub fn read_bytes(&mut self, len: usize) -> Result<Vec<u8>, CodecError> {
self.reader.read_exact(len, true)
}
/// Read an unsigned varint (7 data bits per byte, little-endian).
pub fn read_varint(&mut self) -> Result<u64, CodecError> {
self.reader.read_varint(true)
}
/// Read a length-prefixed byte string (varint length, then the bytes).
pub fn read_blob(&mut self) -> Result<Vec<u8>, CodecError> {
let len = self.read_varint()?;
// try_into, not `as`: on 32-bit targets (e.g. wasm32) `as usize` would
// truncate a bogus 64-bit length and mis-decode instead of rejecting.
let len: usize = len.try_into().map_err(|_| CodecError::LengthTooLarge(len))?;
self.reader.read_exact(len, true)
}
/// Read a little-endian `u16`.
pub fn read_u16_le(&mut self) -> Result<u16, CodecError> {
self.reader
.read_exact(2, true)
.map(|b| u16::from_le_bytes([b[0], b[1]]))
}
/// Read a zigzag-encoded signed varint.
pub fn read_zigzag(&mut self) -> Result<i64, CodecError> {
let encoded = self.read_varint()?;
Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64))
}
/// Read a delta-encoded timestamp. The wire stores the difference
/// from `last`; the canonical hash sees the full absolute timestamp
/// as fixed 8-byte little-endian (matching the writer).
pub fn read_delta(&mut self, last: u64) -> Result<u64, CodecError> {
let delta = self.reader.read_varint(false)?;
// A corrupted/hostile delta must not wrap u64: a wrapped value would
// still hash consistently for the wrong timestamp and pass the check.
let current = last
.checked_add(delta)
.ok_or(CodecError::TimestampOverflow)?;
self.reader._update_hash(¤t.to_le_bytes());
Ok(current)
}
/// Read a UUID, resolving dictionary compression: a `0` sentinel means the
/// 16 bytes follow inline (and are registered for later reuse); any other
/// value is a 1-based index into the segment's UUID dictionary.
pub fn read_uuid(&mut self) -> Result<Uuid, CodecError> {
// Do not hash raw bytes from the buffer because the hash will be the actual UUID!
let x = self.reader.read_varint(false)?;
let uuid: Uuid = if x == 0 {
// First instance of this UUID, read it in full and then save to dict.
let uuid = self.reader.read_exact(16, false)?;
let uuid: Uuid = uuid.try_into().map_err(|_| CodecError::UnexpectedEof)?;
let idx = self.uuid_dict.len() + 1; // 1-based index
self.uuid_dict.insert(idx as u32, uuid);
uuid
} else {
// Known UUID — look up by dict index. Reject a reference that
// doesn't fit u32 before converting, so an out-of-range value
// can't wrap and resolve to an unrelated dictionary entry.
let idx: u32 = x.try_into().map_err(|_| CodecError::UnresolvedUuid(x))?;
*self
.uuid_dict
.get(&idx)
.ok_or(CodecError::UnresolvedUuid(x))?
};
self.reader._update_hash(&uuid);
Ok(uuid)
}
/// Verify the trailing 4-byte hash check against the canonical
/// content hash. Returns the full hash on success.
pub fn finalize(mut self) -> Result<blake3::Hash, CodecError> {
self.reader.finalize()
}
}
struct HashReader<'a, R> {
reader: &'a mut Reader<R>,
hasher: blake3::Hasher,
}
impl<'a, R: BufRead> HashReader<'a, R> {
fn new(reader: &'a mut Reader<R>) -> Self {
Self {
reader,
hasher: blake3::Hasher::new(),
}
}
fn read_varint(&mut self, hash: bool) -> Result<u64, CodecError> {
let (result, bytes, len) = self.reader.read_varint()?;
if hash {
self.hasher.update(&bytes[..len]);
}
Ok(result)
}
fn read_exact(&mut self, len: usize, hash: bool) -> Result<Vec<u8>, CodecError> {
let bytes = self.reader.read_vec(len)?;
if hash {
self.hasher.update(&bytes);
}
Ok(bytes)
}
fn _update_hash(&mut self, bytes: &[u8]) {
self.hasher.update(bytes);
}
fn finalize(&mut self) -> Result<blake3::Hash, CodecError> {
let hash = self.hasher.finalize();
let mut buf: [u8; 4] = [0; 4];
self.reader.read_exact(&mut buf)?;
let expected = &hash.as_bytes()[..4];
if buf != expected {
return Err(CodecError::HashMismatch {
expected: u32::from_le_bytes(expected.try_into().unwrap()),
got: u32::from_le_bytes(buf),
});
}
Ok(hash)
}
}
/// A thin [`BufRead`] wrapper that the codec reads a segment from, tracking
/// position only enough to answer [`is_eof`](Self::is_eof).
pub struct Reader<R> {
reader: R,
}
impl<R: BufRead> Reader<R> {
/// Wrap an underlying [`BufRead`] source.
pub fn new(reader: R) -> Self {
Self { reader }
}
/// Whether the underlying source has no more bytes.
pub fn is_eof(&mut self) -> Result<bool, CodecError> {
Ok(self.reader.fill_buf()?.is_empty())
}
/// Returns the decoded value plus the raw on-wire bytes (the caller hashes
/// them). A u64 varint is at most 10 bytes, so they go in a fixed stack
/// buffer — `len` is how many are valid. No allocation.
pub(super) fn read_varint(&mut self) -> Result<(u64, [u8; 10], usize), CodecError> {
let mut bytes = [0u8; 10];
let mut len = 0;
let mut result = 0u64;
let mut shift = 0;
loop {
let byte = self.read_byte()?.ok_or(CodecError::UnexpectedEof)?;
bytes[len] = byte;
len += 1;
// On the 10th byte (shift=63), only bit 0 is valid — higher
// bits or a continuation flag would overflow u64. This also caps
// the loop at 10 bytes, so `bytes[len]` never indexes past the end.
if shift == 63 && byte > 1 {
return Err(CodecError::VarIntOverflow);
}
result |= ((byte & 0x7F) as u64) << shift;
if byte & 0x80 == 0 {
return Ok((result, bytes, len));
}
shift += 7;
}
}
pub(super) fn read_byte(&mut self) -> Result<Option<u8>, CodecError> {
let mut buf: [u8; 1] = [0; 1];
let read = self.reader.read(&mut buf)?;
if read == 0 {
return Ok(None);
}
Ok(Some(buf[0]))
}
pub(super) fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), CodecError> {
self.reader.read_exact(buf).map_err(|e| match e.kind() {
// A short read is truncation — surface the dedicated EOF error for
// consistency with the rest of the decoder, not a generic Io.
std::io::ErrorKind::UnexpectedEof => CodecError::UnexpectedEof,
_ => CodecError::Io(e),
})
}
pub(super) fn read_vec(&mut self, len: usize) -> Result<Vec<u8>, CodecError> {
// Grow the buffer with the bytes actually delivered rather than
// pre-allocating an on-wire length we haven't validated — a corrupt or
// hostile blob length must not OOM the process before we hit EOF.
let mut buf = Vec::new();
let mut remaining = len;
let mut chunk = [0u8; 8192];
while remaining > 0 {
let want = remaining.min(chunk.len());
let n = self.reader.read(&mut chunk[..want])?;
if n == 0 {
return Err(CodecError::UnexpectedEof);
}
buf.extend_from_slice(&chunk[..n]);
remaining -= n;
}
Ok(buf)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::writer::EntryBufferWriter;
const UUID_A: Uuid = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
const UUID_B: Uuid = [
0xA0, 0xA1, 0xA2, 0xA3, 0xA4, 0xA5, 0xA6, 0xA7, 0xA8, 0xA9, 0xAA, 0xAB, 0xAC, 0xAD, 0xAE,
0xAF,
];
const UUID_C: Uuid = [0xFF; 16];
/// Goal: Verify that all primitive types round-trip correctly through
/// the writer and reader, including UUID dictionary dedup and blake3
/// hash verification.
///
/// Given: Two entries written sequentially sharing a UUID dictionary,
/// each containing every primitive type (byte, varint, blob,
/// u16_le, zigzag, uuid), with 3 distinct UUIDs where UUID_A
/// appears in both entries to exercise dict dedup.
/// When: Both entries are read back with a fresh reader-side UUID dict.
/// Then: All values match, hashes match, and the second occurrence of
/// UUID_A is decoded from the dict (not inline).
#[test]
fn roundtrip_two_entries_all_types() {
let mut write_uuid_dict: HashMap<Uuid, u32> = HashMap::new();
let blob_data = b"hello ubiquisync";
// Simulate HLC timestamps: first entry has two timestamps (e.g. created_at, updated_at),
// second entry has one. Gaps: 1 second, sub-ms counter bump, 30 seconds.
let ts1: u64 = 1_700_000_000_000 << 16; // ~2023, counter=0
let ts2: u64 = ts1 + 1; // same ms, counter=1
let ts3: u64 = ts1 + (30_000 << 16); // 30 seconds later
// ── Write entry 1: all types including 2 delta timestamps ──
let mut w1 = EntryBufferWriter::new(&mut write_uuid_dict);
w1.write_byte(0x42);
w1.write_varint(123456789);
w1.write_blob(blob_data);
w1.write_u16_le(0xBEEF);
w1.write_zigzag(-99);
w1.write_delta(ts1, 0).unwrap(); // first timestamp, last=0
w1.write_delta(ts2, ts1).unwrap(); // counter bump, delta=1
w1.write_uuid(&UUID_A);
w1.write_uuid(&UUID_B);
let (buf1, hash1) = w1.finalize();
// ── Write entry 2: UUID_A dict hit, 1 delta timestamp, edge cases ──
let mut w2 = EntryBufferWriter::new(&mut write_uuid_dict);
w2.write_byte(0x00);
w2.write_varint(0); // edge case: zero
w2.write_blob(b""); // edge case: empty blob
w2.write_u16_le(0x0000);
w2.write_zigzag(i64::MIN);
w2.write_delta(ts3, ts2).unwrap(); // 30 second gap
w2.write_uuid(&UUID_A); // dict hit — should be smaller on wire
w2.write_uuid(&UUID_C);
let (buf2, hash2) = w2.finalize();
// Entry 2's UUID_A should be a dict reference (varint 1 = 1 byte),
// not inline (1 + 16 = 17 bytes). Sanity check that buf2 is smaller.
assert!(
buf2.len() < buf1.len(),
"entry 2 should be smaller due to UUID_A dict hit"
);
// Hashes should differ — different content.
assert_ne!(hash1, hash2);
// ── Read both entries back ──────────────────────────────────────────
let mut combined = Vec::new();
combined.extend_from_slice(&buf1);
combined.extend_from_slice(&buf2);
let mut reader = Reader::new(combined.as_slice());
let mut read_uuid_dict: HashMap<u32, Uuid> = HashMap::new();
// ── Read entry 1 ──
{
let mut r1 = EntryBufferReader::new(&mut reader, &mut read_uuid_dict);
assert_eq!(r1.read_byte().unwrap(), 0x42);
assert_eq!(r1.read_varint().unwrap(), 123456789);
assert_eq!(r1.read_blob().unwrap(), blob_data);
assert_eq!(r1.read_u16_le().unwrap(), 0xBEEF);
assert_eq!(r1.read_zigzag().unwrap(), -99);
assert_eq!(r1.read_delta(0).unwrap(), ts1); // first timestamp
assert_eq!(r1.read_delta(ts1).unwrap(), ts2); // counter bump
assert_eq!(r1.read_uuid().unwrap(), UUID_A);
assert_eq!(r1.read_uuid().unwrap(), UUID_B);
let read_hash1 = r1.finalize().unwrap();
assert_eq!(read_hash1, hash1);
}
// ── Read entry 2 ──
{
let mut r2 = EntryBufferReader::new(&mut reader, &mut read_uuid_dict);
assert_eq!(r2.read_byte().unwrap(), 0x00);
assert_eq!(r2.read_varint().unwrap(), 0);
assert_eq!(r2.read_blob().unwrap(), b"" as &[u8]);
assert_eq!(r2.read_u16_le().unwrap(), 0x0000);
assert_eq!(r2.read_zigzag().unwrap(), i64::MIN);
assert_eq!(r2.read_delta(ts2).unwrap(), ts3); // 30 second gap
assert_eq!(r2.read_uuid().unwrap(), UUID_A); // from dict
assert_eq!(r2.read_uuid().unwrap(), UUID_C);
let read_hash2 = r2.finalize().unwrap();
assert_eq!(read_hash2, hash2);
}
// Reader-side dict should have all 3 UUIDs.
assert_eq!(read_uuid_dict.len(), 3);
}
/// Goal: a varint with a long run of continuation bytes (more than a u64
/// can hold) errors with `VarIntOverflow` rather than spinning past the
/// 10-byte maximum or overflowing the shift.
///
/// Given: ten `0x80` bytes — every byte keeps the continuation bit set, so
/// the value never terminates within u64's range.
/// When: reading a varint.
/// Then: the 10th byte (shift 63) is rejected as `VarIntOverflow`.
#[test]
fn read_varint_rejects_overflow() {
let data = [0x80u8; 10];
let mut reader = Reader::new(data.as_slice());
assert!(matches!(
reader.read_varint(),
Err(CodecError::VarIntOverflow)
));
}
}