1use crate::{codec::error::CodecError, uuid::Uuid};
2use std::collections::HashMap;
3use std::io::BufRead;
4
5pub struct EntryBufferReader<'a, R> {
9 reader: HashReader<'a, R>,
10 uuid_dict: &'a mut HashMap<u32, Uuid>,
11}
12
13impl<'a, R: BufRead> EntryBufferReader<'a, R> {
14 pub fn new(reader: &'a mut Reader<R>, uuid_dict: &'a mut HashMap<u32, Uuid>) -> Self {
17 Self {
18 reader: HashReader::new(reader),
19 uuid_dict,
20 }
21 }
22
23 pub fn read_byte(&mut self) -> Result<u8, CodecError> {
25 self.reader.read_exact(1, true).map(|b| b[0])
26 }
27
28 pub fn read_bytes(&mut self, len: usize) -> Result<Vec<u8>, CodecError> {
30 self.reader.read_exact(len, true)
31 }
32
33 pub fn read_varint(&mut self) -> Result<u64, CodecError> {
35 self.reader.read_varint(true)
36 }
37
38 pub fn read_blob(&mut self) -> Result<Vec<u8>, CodecError> {
40 let len = self.read_varint()?;
41 let len: usize = len.try_into().map_err(|_| CodecError::LengthTooLarge(len))?;
44 self.reader.read_exact(len, true)
45 }
46
47 pub fn read_u16_le(&mut self) -> Result<u16, CodecError> {
49 self.reader
50 .read_exact(2, true)
51 .map(|b| u16::from_le_bytes([b[0], b[1]]))
52 }
53
54 pub fn read_zigzag(&mut self) -> Result<i64, CodecError> {
56 let encoded = self.read_varint()?;
57 Ok(((encoded >> 1) as i64) ^ -((encoded & 1) as i64))
58 }
59
60 pub fn read_delta(&mut self, last: u64) -> Result<u64, CodecError> {
64 let delta = self.reader.read_varint(false)?;
65 let current = last
68 .checked_add(delta)
69 .ok_or(CodecError::TimestampOverflow)?;
70 self.reader._update_hash(¤t.to_le_bytes());
71 Ok(current)
72 }
73
74 pub fn read_uuid(&mut self) -> Result<Uuid, CodecError> {
78 let x = self.reader.read_varint(false)?;
80 let uuid: Uuid = if x == 0 {
81 let uuid = self.reader.read_exact(16, false)?;
83 let uuid: Uuid = uuid.try_into().map_err(|_| CodecError::UnexpectedEof)?;
84 let idx = self.uuid_dict.len() + 1; self.uuid_dict.insert(idx as u32, uuid);
86 uuid
87 } else {
88 let idx: u32 = x.try_into().map_err(|_| CodecError::UnresolvedUuid(x))?;
92 *self
93 .uuid_dict
94 .get(&idx)
95 .ok_or(CodecError::UnresolvedUuid(x))?
96 };
97 self.reader._update_hash(&uuid);
98 Ok(uuid)
99 }
100
101 pub fn finalize(mut self) -> Result<blake3::Hash, CodecError> {
104 self.reader.finalize()
105 }
106}
107
108struct HashReader<'a, R> {
109 reader: &'a mut Reader<R>,
110 hasher: blake3::Hasher,
111}
112
113impl<'a, R: BufRead> HashReader<'a, R> {
114 fn new(reader: &'a mut Reader<R>) -> Self {
115 Self {
116 reader,
117 hasher: blake3::Hasher::new(),
118 }
119 }
120
121 fn read_varint(&mut self, hash: bool) -> Result<u64, CodecError> {
122 let (result, bytes, len) = self.reader.read_varint()?;
123 if hash {
124 self.hasher.update(&bytes[..len]);
125 }
126 Ok(result)
127 }
128
129 fn read_exact(&mut self, len: usize, hash: bool) -> Result<Vec<u8>, CodecError> {
130 let bytes = self.reader.read_vec(len)?;
131 if hash {
132 self.hasher.update(&bytes);
133 }
134 Ok(bytes)
135 }
136
137 fn _update_hash(&mut self, bytes: &[u8]) {
138 self.hasher.update(bytes);
139 }
140
141 fn finalize(&mut self) -> Result<blake3::Hash, CodecError> {
142 let hash = self.hasher.finalize();
143 let mut buf: [u8; 4] = [0; 4];
144 self.reader.read_exact(&mut buf)?;
145 let expected = &hash.as_bytes()[..4];
146 if buf != expected {
147 return Err(CodecError::HashMismatch {
148 expected: u32::from_le_bytes(expected.try_into().unwrap()),
149 got: u32::from_le_bytes(buf),
150 });
151 }
152 Ok(hash)
153 }
154}
155
156pub struct Reader<R> {
159 reader: R,
160}
161
162impl<R: BufRead> Reader<R> {
163 pub fn new(reader: R) -> Self {
165 Self { reader }
166 }
167
168 pub fn is_eof(&mut self) -> Result<bool, CodecError> {
170 Ok(self.reader.fill_buf()?.is_empty())
171 }
172
173 pub(super) fn read_varint(&mut self) -> Result<(u64, [u8; 10], usize), CodecError> {
177 let mut bytes = [0u8; 10];
178 let mut len = 0;
179 let mut result = 0u64;
180 let mut shift = 0;
181 loop {
182 let byte = self.read_byte()?.ok_or(CodecError::UnexpectedEof)?;
183 bytes[len] = byte;
184 len += 1;
185 if shift == 63 && byte > 1 {
189 return Err(CodecError::VarIntOverflow);
190 }
191 result |= ((byte & 0x7F) as u64) << shift;
192 if byte & 0x80 == 0 {
193 return Ok((result, bytes, len));
194 }
195 shift += 7;
196 }
197 }
198
199 pub(super) fn read_byte(&mut self) -> Result<Option<u8>, CodecError> {
200 let mut buf: [u8; 1] = [0; 1];
201 let read = self.reader.read(&mut buf)?;
202 if read == 0 {
203 return Ok(None);
204 }
205 Ok(Some(buf[0]))
206 }
207
208 pub(super) fn read_exact(&mut self, buf: &mut [u8]) -> Result<(), CodecError> {
209 self.reader.read_exact(buf).map_err(|e| match e.kind() {
210 std::io::ErrorKind::UnexpectedEof => CodecError::UnexpectedEof,
213 _ => CodecError::Io(e),
214 })
215 }
216
217 pub(super) fn read_vec(&mut self, len: usize) -> Result<Vec<u8>, CodecError> {
218 let mut buf = Vec::new();
222 let mut remaining = len;
223 let mut chunk = [0u8; 8192];
224 while remaining > 0 {
225 let want = remaining.min(chunk.len());
226 let n = self.reader.read(&mut chunk[..want])?;
227 if n == 0 {
228 return Err(CodecError::UnexpectedEof);
229 }
230 buf.extend_from_slice(&chunk[..n]);
231 remaining -= n;
232 }
233 Ok(buf)
234 }
235}
236
237#[cfg(test)]
238mod tests {
239 use super::*;
240 use crate::codec::writer::EntryBufferWriter;
241
242 const UUID_A: Uuid = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16];
243 const UUID_B: Uuid = [
244 0xA0, 0xA1, 0xA2, 0xA3, 0xA4, 0xA5, 0xA6, 0xA7, 0xA8, 0xA9, 0xAA, 0xAB, 0xAC, 0xAD, 0xAE,
245 0xAF,
246 ];
247 const UUID_C: Uuid = [0xFF; 16];
248
249 #[test]
261 fn roundtrip_two_entries_all_types() {
262 let mut write_uuid_dict: HashMap<Uuid, u32> = HashMap::new();
263 let blob_data = b"hello ubiquisync";
264
265 let ts1: u64 = 1_700_000_000_000 << 16; let ts2: u64 = ts1 + 1; let ts3: u64 = ts1 + (30_000 << 16); let mut w1 = EntryBufferWriter::new(&mut write_uuid_dict);
273 w1.write_byte(0x42);
274 w1.write_varint(123456789);
275 w1.write_blob(blob_data);
276 w1.write_u16_le(0xBEEF);
277 w1.write_zigzag(-99);
278 w1.write_delta(ts1, 0).unwrap(); w1.write_delta(ts2, ts1).unwrap(); w1.write_uuid(&UUID_A);
281 w1.write_uuid(&UUID_B);
282 let (buf1, hash1) = w1.finalize();
283
284 let mut w2 = EntryBufferWriter::new(&mut write_uuid_dict);
286 w2.write_byte(0x00);
287 w2.write_varint(0); w2.write_blob(b""); w2.write_u16_le(0x0000);
290 w2.write_zigzag(i64::MIN);
291 w2.write_delta(ts3, ts2).unwrap(); w2.write_uuid(&UUID_A); w2.write_uuid(&UUID_C);
294 let (buf2, hash2) = w2.finalize();
295
296 assert!(
299 buf2.len() < buf1.len(),
300 "entry 2 should be smaller due to UUID_A dict hit"
301 );
302
303 assert_ne!(hash1, hash2);
305
306 let mut combined = Vec::new();
308 combined.extend_from_slice(&buf1);
309 combined.extend_from_slice(&buf2);
310
311 let mut reader = Reader::new(combined.as_slice());
312 let mut read_uuid_dict: HashMap<u32, Uuid> = HashMap::new();
313
314 {
316 let mut r1 = EntryBufferReader::new(&mut reader, &mut read_uuid_dict);
317 assert_eq!(r1.read_byte().unwrap(), 0x42);
318 assert_eq!(r1.read_varint().unwrap(), 123456789);
319 assert_eq!(r1.read_blob().unwrap(), blob_data);
320 assert_eq!(r1.read_u16_le().unwrap(), 0xBEEF);
321 assert_eq!(r1.read_zigzag().unwrap(), -99);
322 assert_eq!(r1.read_delta(0).unwrap(), ts1); assert_eq!(r1.read_delta(ts1).unwrap(), ts2); assert_eq!(r1.read_uuid().unwrap(), UUID_A);
325 assert_eq!(r1.read_uuid().unwrap(), UUID_B);
326 let read_hash1 = r1.finalize().unwrap();
327 assert_eq!(read_hash1, hash1);
328 }
329
330 {
332 let mut r2 = EntryBufferReader::new(&mut reader, &mut read_uuid_dict);
333 assert_eq!(r2.read_byte().unwrap(), 0x00);
334 assert_eq!(r2.read_varint().unwrap(), 0);
335 assert_eq!(r2.read_blob().unwrap(), b"" as &[u8]);
336 assert_eq!(r2.read_u16_le().unwrap(), 0x0000);
337 assert_eq!(r2.read_zigzag().unwrap(), i64::MIN);
338 assert_eq!(r2.read_delta(ts2).unwrap(), ts3); assert_eq!(r2.read_uuid().unwrap(), UUID_A); assert_eq!(r2.read_uuid().unwrap(), UUID_C);
341 let read_hash2 = r2.finalize().unwrap();
342 assert_eq!(read_hash2, hash2);
343 }
344
345 assert_eq!(read_uuid_dict.len(), 3);
347 }
348
349 #[test]
358 fn read_varint_rejects_overflow() {
359 let data = [0x80u8; 10];
360 let mut reader = Reader::new(data.as_slice());
361 assert!(matches!(
362 reader.read_varint(),
363 Err(CodecError::VarIntOverflow)
364 ));
365 }
366}