ubiquisync_core/codec/decoder.rs
1use std::collections::HashMap;
2use std::io::BufRead;
3use std::marker::PhantomData;
4
5use crate::codec::{
6 consts::{FLAG_DEVICE, FLAG_SERVER, TAG_EXPUNGED},
7 error::CodecError,
8 op::Op,
9 reader::{EntryBufferReader, Reader},
10};
11use crate::hlc::Timestamp;
12use crate::log_entry::LogEntry;
13use crate::uuid::Uuid;
14
15/// Streaming decoder for one segment: reads the header on construction, then
16/// yields entries one at a time, carrying the cross-entry state (timestamp
17/// base, UUID dictionary) that delta- and dictionary-decoding need.
18pub struct Decoder<E, R> {
19 buf: Reader<R>,
20 last_timestamp: u64,
21 uuids: HashMap<u32, Uuid>,
22 server_mode: bool,
23 _phantom: PhantomData<E>,
24}
25
26/// The result of decoding a whole segment, including the end state needed to
27/// append more entries to it.
28pub struct DecodedLogs<E> {
29 /// The decoded entries, in segment order.
30 pub entries: Vec<DecodedEntry<E>>,
31 /// The UUID dictionary built while decoding (`uuid → id`), for an encoder
32 /// that continues the segment.
33 pub uuid_dict: HashMap<Uuid, u32>,
34 /// The last absolute timestamp seen, the base for further delta encoding.
35 pub last_timestamp: u64,
36 /// Whether the segment was written in server mode (entries carry user ids).
37 pub server_mode: bool,
38}
39
40/// One decoded entry: a live log entry or an expunged-entry marker.
41#[derive(Clone)]
42pub enum DecodedEntry<E> {
43 /// A normal log entry.
44 LogEntry(LogEntry<E>),
45 /// A tombstone naming the hash of the entry that was expunged.
46 Expunged(blake3::Hash),
47}
48
49impl<E: Op, R: BufRead> Decoder<E, R> {
50 /// `magic` is the app-supplied segment identity the encoder wrote (see
51 /// [`Encoder::new`](crate::codec::Encoder::new)). A segment whose leading
52 /// bytes don't match is rejected as foreign with [`CodecError::BadMagic`].
53 pub fn new(read: R, magic: &[u8]) -> Result<Option<Self>, CodecError> {
54 if magic.is_empty() {
55 return Err(CodecError::BadMagic);
56 }
57 let mut reader = Reader::new(read);
58 if reader.is_eof()? {
59 return Ok(None);
60 }
61 let mut got = vec![0u8; magic.len()];
62 reader.read_exact(&mut got)?;
63 if got.as_slice() != magic {
64 return Err(CodecError::BadMagic);
65 }
66 let flags = reader.read_byte()?.ok_or(CodecError::UnexpectedEof)?;
67 // Strict: a flag byte that isn't exactly a known mode signals a format
68 // we don't understand — reject it rather than masking and silently
69 // treating unknown values as device mode.
70 let server_mode = match flags {
71 FLAG_DEVICE => false,
72 FLAG_SERVER => true,
73 other => return Err(CodecError::UnknownSegmentFlags(other)),
74 };
75 Ok(Some(Self {
76 buf: reader,
77 last_timestamp: 0,
78 uuids: HashMap::default(),
79 server_mode,
80 _phantom: PhantomData,
81 }))
82 }
83
84 /// Decode the next entry, or `None` at end of segment. On error, the
85 /// decoder's cross-entry state is rolled back so a partial failure doesn't
86 /// corrupt subsequent decoding.
87 pub fn decode_entry(&mut self) -> Result<Option<DecodedEntry<E>>, CodecError> {
88 let dict_len_before = self.uuids.len();
89 let result = self.try_decode_entry();
90 if result.is_err() {
91 // Roll back UUID definitions registered by the failed entry. IDs are
92 // assigned sequentially from 1, so any id past the pre-call count
93 // came from this entry; dropping them keeps the dictionary (handed
94 // back by decode_all for encoder reuse) consistent with only the
95 // entries that decoded successfully.
96 self.uuids.retain(|id, _| (*id as usize) <= dict_len_before);
97 }
98 result
99 }
100
101 fn try_decode_entry(&mut self) -> Result<Option<DecodedEntry<E>>, CodecError> {
102 if self.buf.is_eof()? {
103 return Ok(None);
104 }
105 let mut reader = EntryBufferReader::new(&mut self.buf, &mut self.uuids);
106 let tag = reader.read_byte()?;
107 if tag == TAG_EXPUNGED {
108 // Expunged entries are just TAG + 32-byte blake3 hash of the
109 // original entry. No integrity-check suffix, no timestamp delta, no
110 // finalize() — the hash itself is the integrity mechanism.
111 // last_timestamp is intentionally not updated; segment
112 // rewriting recalculates deltas around expunged gaps.
113 let hash_bytes = reader.read_bytes(32)?;
114 let hash =
115 blake3::Hash::from_slice(&hash_bytes).map_err(|_| CodecError::CorruptedLogFile)?;
116 return Ok(Some(DecodedEntry::Expunged(hash)));
117 }
118 let e = E::decode(tag, &mut reader)?;
119 let timestamp = reader.read_delta(self.last_timestamp)?;
120 let server_user_id = if self.server_mode {
121 Some(reader.read_uuid()?)
122 } else {
123 None
124 };
125 reader.finalize()?;
126 // Commit cross-entry state only after the integrity check passes.
127 self.last_timestamp = timestamp;
128 Ok(Some(DecodedEntry::LogEntry(LogEntry {
129 server_user_id,
130 timestamp: Timestamp::from_raw(timestamp),
131 op: e,
132 })))
133 }
134
135 /// Decode an entire segment in one call. Returns whatever entries decoded
136 /// successfully *plus* the first error, if any — entries before a failure
137 /// are still returned. `None` logs means the input held no segment (empty)
138 /// or the header itself was rejected (with the error).
139 pub fn decode_all(buf: R, magic: &[u8]) -> (Option<DecodedLogs<E>>, Option<CodecError>) {
140 match Self::new(buf, magic) {
141 Ok(Some(mut decoder)) => {
142 let mut entries = Vec::new();
143 let mut err = None;
144 loop {
145 match decoder.decode_entry() {
146 Ok(Some(entry)) => entries.push(entry),
147 Ok(None) => break,
148 Err(e) => {
149 err = Some(e);
150 break;
151 }
152 }
153 }
154
155 // Invert the UUID dict (id→uuid → uuid→id) for encoder reuse.
156 let mut uuid_dict = HashMap::new();
157 for (id, uuid) in decoder.uuids.into_iter() {
158 uuid_dict.insert(uuid, id);
159 }
160
161 (
162 Some(DecodedLogs {
163 entries,
164 uuid_dict,
165 last_timestamp: decoder.last_timestamp,
166 server_mode: decoder.server_mode,
167 }),
168 err,
169 )
170 }
171 Ok(None) => (None, None),
172 Err(e) => (None, Some(e)),
173 }
174 }
175}