Skip to main content

ubiquisync_core/codec/
decoder.rs

1use std::collections::HashMap;
2use std::io::BufRead;
3use std::marker::PhantomData;
4
5use crate::codec::{
6    consts::{FLAG_DEVICE, FLAG_SERVER, TAG_EXPUNGED},
7    error::CodecError,
8    op::Op,
9    reader::{EntryBufferReader, Reader},
10};
11use crate::hlc::Timestamp;
12use crate::log_entry::LogEntry;
13use crate::uuid::Uuid;
14
15/// Streaming decoder for one segment: reads the header on construction, then
16/// yields entries one at a time, carrying the cross-entry state (timestamp
17/// base, UUID dictionary) that delta- and dictionary-decoding need.
18pub struct Decoder<E, R> {
19    buf: Reader<R>,
20    last_timestamp: u64,
21    uuids: HashMap<u32, Uuid>,
22    server_mode: bool,
23    _phantom: PhantomData<E>,
24}
25
26/// The result of decoding a whole segment, including the end state needed to
27/// append more entries to it.
28pub struct DecodedLogs<E> {
29    /// The decoded entries, in segment order.
30    pub entries: Vec<DecodedEntry<E>>,
31    /// The UUID dictionary built while decoding (`uuid → id`), for an encoder
32    /// that continues the segment.
33    pub uuid_dict: HashMap<Uuid, u32>,
34    /// The last absolute timestamp seen, the base for further delta encoding.
35    pub last_timestamp: u64,
36    /// Whether the segment was written in server mode (entries carry user ids).
37    pub server_mode: bool,
38}
39
40/// One decoded entry: a live log entry or an expunged-entry marker.
41#[derive(Clone)]
42pub enum DecodedEntry<E> {
43    /// A normal log entry.
44    LogEntry(LogEntry<E>),
45    /// A tombstone naming the hash of the entry that was expunged.
46    Expunged(blake3::Hash),
47}
48
49impl<E: Op, R: BufRead> Decoder<E, R> {
50    /// `magic` is the app-supplied segment identity the encoder wrote (see
51    /// [`Encoder::new`](crate::codec::Encoder::new)). A segment whose leading
52    /// bytes don't match is rejected as foreign with [`CodecError::BadMagic`].
53    pub fn new(read: R, magic: &[u8]) -> Result<Option<Self>, CodecError> {
54        if magic.is_empty() {
55            return Err(CodecError::BadMagic);
56        }
57        let mut reader = Reader::new(read);
58        if reader.is_eof()? {
59            return Ok(None);
60        }
61        let mut got = vec![0u8; magic.len()];
62        reader.read_exact(&mut got)?;
63        if got.as_slice() != magic {
64            return Err(CodecError::BadMagic);
65        }
66        let flags = reader.read_byte()?.ok_or(CodecError::UnexpectedEof)?;
67        // Strict: a flag byte that isn't exactly a known mode signals a format
68        // we don't understand — reject it rather than masking and silently
69        // treating unknown values as device mode.
70        let server_mode = match flags {
71            FLAG_DEVICE => false,
72            FLAG_SERVER => true,
73            other => return Err(CodecError::UnknownSegmentFlags(other)),
74        };
75        Ok(Some(Self {
76            buf: reader,
77            last_timestamp: 0,
78            uuids: HashMap::default(),
79            server_mode,
80            _phantom: PhantomData,
81        }))
82    }
83
84    /// Decode the next entry, or `None` at end of segment. On error, the
85    /// decoder's cross-entry state is rolled back so a partial failure doesn't
86    /// corrupt subsequent decoding.
87    pub fn decode_entry(&mut self) -> Result<Option<DecodedEntry<E>>, CodecError> {
88        let dict_len_before = self.uuids.len();
89        let result = self.try_decode_entry();
90        if result.is_err() {
91            // Roll back UUID definitions registered by the failed entry. IDs are
92            // assigned sequentially from 1, so any id past the pre-call count
93            // came from this entry; dropping them keeps the dictionary (handed
94            // back by decode_all for encoder reuse) consistent with only the
95            // entries that decoded successfully.
96            self.uuids.retain(|id, _| (*id as usize) <= dict_len_before);
97        }
98        result
99    }
100
101    fn try_decode_entry(&mut self) -> Result<Option<DecodedEntry<E>>, CodecError> {
102        if self.buf.is_eof()? {
103            return Ok(None);
104        }
105        let mut reader = EntryBufferReader::new(&mut self.buf, &mut self.uuids);
106        let tag = reader.read_byte()?;
107        if tag == TAG_EXPUNGED {
108            // Expunged entries are just TAG + 32-byte blake3 hash of the
109            // original entry. No integrity-check suffix, no timestamp delta, no
110            // finalize() — the hash itself is the integrity mechanism.
111            // last_timestamp is intentionally not updated; segment
112            // rewriting recalculates deltas around expunged gaps.
113            let hash_bytes = reader.read_bytes(32)?;
114            let hash =
115                blake3::Hash::from_slice(&hash_bytes).map_err(|_| CodecError::CorruptedLogFile)?;
116            return Ok(Some(DecodedEntry::Expunged(hash)));
117        }
118        let e = E::decode(tag, &mut reader)?;
119        let timestamp = reader.read_delta(self.last_timestamp)?;
120        let server_user_id = if self.server_mode {
121            Some(reader.read_uuid()?)
122        } else {
123            None
124        };
125        reader.finalize()?;
126        // Commit cross-entry state only after the integrity check passes.
127        self.last_timestamp = timestamp;
128        Ok(Some(DecodedEntry::LogEntry(LogEntry {
129            server_user_id,
130            timestamp: Timestamp::from_raw(timestamp),
131            op: e,
132        })))
133    }
134
135    /// Decode an entire segment in one call. Returns whatever entries decoded
136    /// successfully *plus* the first error, if any — entries before a failure
137    /// are still returned. `None` logs means the input held no segment (empty)
138    /// or the header itself was rejected (with the error).
139    pub fn decode_all(buf: R, magic: &[u8]) -> (Option<DecodedLogs<E>>, Option<CodecError>) {
140        match Self::new(buf, magic) {
141            Ok(Some(mut decoder)) => {
142                let mut entries = Vec::new();
143                let mut err = None;
144                loop {
145                    match decoder.decode_entry() {
146                        Ok(Some(entry)) => entries.push(entry),
147                        Ok(None) => break,
148                        Err(e) => {
149                            err = Some(e);
150                            break;
151                        }
152                    }
153                }
154
155                // Invert the UUID dict (id→uuid → uuid→id) for encoder reuse.
156                let mut uuid_dict = HashMap::new();
157                for (id, uuid) in decoder.uuids.into_iter() {
158                    uuid_dict.insert(uuid, id);
159                }
160
161                (
162                    Some(DecodedLogs {
163                        entries,
164                        uuid_dict,
165                        last_timestamp: decoder.last_timestamp,
166                        server_mode: decoder.server_mode,
167                    }),
168                    err,
169                )
170            }
171            Ok(None) => (None, None),
172            Err(e) => (None, Some(e)),
173        }
174    }
175}