Skip to main content

ubiquisync_core/codec/
encoder.rs

1use std::{collections::HashMap, io::Write};
2
3use crate::{
4    codec::{
5        consts::{FLAG_DEVICE, FLAG_SERVER},
6        error::CodecError,
7        op::Op,
8        writer::EntryBufferWriter,
9    },
10    hlc::Timestamp,
11    uuid::Uuid,
12};
13
14/// Streaming encoder for one segment: writes the header on construction, then
15/// appends entries, carrying the cross-entry state (timestamp base, UUID
16/// dictionary) that delta- and dictionary-encoding need.
17pub struct Encoder<E, W> {
18    sink: W,
19    last_timestamp: u64,
20    uuids: HashMap<Uuid, u32>,
21    server_mode: bool,
22    entry_index: usize,
23    size: usize,
24    _phantom: std::marker::PhantomData<E>,
25}
26
27impl<E: Op, W: Write> Encoder<E, W> {
28    /// Create a new encoder.
29    ///
30    /// `magic` is the segment's leading identity bytes — it is **not** defined
31    /// by this library. Each application must supply its own stable, app-unique
32    /// value (the same bytes its decoder expects) so that one app's segments are
33    /// never mistaken for another's when they share a sync location. Use a
34    /// distinct value per app, and prefer same-length magics across apps so one
35    /// cannot be a prefix of another.
36    pub fn new(mut sink: W, magic: &[u8], server_mode: bool) -> Result<Self, CodecError> {
37        // An empty magic gives zero app isolation — the decoder would compare
38        // zero bytes and accept any header.
39        if magic.is_empty() {
40            return Err(CodecError::BadMagic);
41        }
42        // Write segment header
43        sink.write_all(magic)?;
44        if server_mode {
45            sink.write_all(&[FLAG_SERVER])?;
46        } else {
47            sink.write_all(&[FLAG_DEVICE])?;
48        }
49        Ok(Self {
50            sink,
51            last_timestamp: 0,
52            uuids: HashMap::default(),
53            server_mode,
54            entry_index: 0,
55            size: 0,
56            _phantom: std::marker::PhantomData,
57        })
58    }
59
60    /// Mutable access to the underlying writer (e.g. for fsync).
61    pub fn sink_mut(&mut self) -> &mut W {
62        &mut self.sink
63    }
64
65    /// Number of entries written so far in this segment.
66    pub fn entry_index(&self) -> usize {
67        self.entry_index
68    }
69
70    /// Encode one log entry. Takes parts by ref/value rather than a full
71    /// `LogEntry<E>` so callers iterating `&[E]` can avoid cloning each op
72    /// just to satisfy a `&LogEntry<E>` argument.
73    ///
74    /// The UUID dictionary and last timestamp are persistent, cross-entry
75    /// state; they are committed only once the entry's bytes are written. A
76    /// failure partway through leaves the encoder exactly as it was, so a
77    /// partly-built entry can never leave behind a UUID definition (or an
78    /// advanced clock) that the flushed bytes don't account for.
79    pub fn encode_entry(
80        &mut self,
81        op: &E,
82        timestamp: Timestamp,
83        server_user_id: Option<Uuid>,
84    ) -> Result<usize, CodecError> {
85        let dict_len_before = self.uuids.len();
86        let result = self.try_encode_entry(op, timestamp, server_user_id);
87        if result.is_err() {
88            // Roll back UUID definitions registered before the failure. IDs are
89            // handed out sequentially from 1, so any id past the pre-call count
90            // belongs to this aborted entry.
91            self.uuids.retain(|_, id| (*id as usize) <= dict_len_before);
92        }
93        result
94    }
95
96    fn try_encode_entry(
97        &mut self,
98        op: &E,
99        timestamp: Timestamp,
100        server_user_id: Option<Uuid>,
101    ) -> Result<usize, CodecError> {
102        let mut writer = EntryBufferWriter::new(&mut self.uuids);
103        // Order must match decoder: op (tag + body) → timestamp → server_user_id
104        op.encode(&mut writer)?;
105        let raw_timestamp = timestamp.raw();
106        writer.write_delta(raw_timestamp, self.last_timestamp)?;
107        if self.server_mode {
108            match server_user_id {
109                Some(server_user_id) => writer.write_uuid(&server_user_id),
110                None => return Err(CodecError::MissingUserId),
111            }
112        }
113        let (bytes, _) = writer.finalize();
114        self.sink.write_all(&bytes)?;
115        // Commit cross-entry state only now that the bytes are written.
116        self.last_timestamp = raw_timestamp;
117        self.entry_index += 1;
118        self.size += bytes.len();
119        Ok(self.entry_index)
120    }
121
122    /// Total entry bytes written so far (the segment header is not counted),
123    /// for deciding when to roll over to a new segment.
124    pub fn size(&self) -> usize {
125        self.size
126    }
127}