ubiquisync_core/codec/encoder.rs
1use std::{collections::HashMap, io::Write};
2
3use crate::{
4 codec::{
5 consts::{FLAG_DEVICE, FLAG_SERVER},
6 error::CodecError,
7 op::Op,
8 writer::EntryBufferWriter,
9 },
10 hlc::Timestamp,
11 uuid::Uuid,
12};
13
14/// Streaming encoder for one segment: writes the header on construction, then
15/// appends entries, carrying the cross-entry state (timestamp base, UUID
16/// dictionary) that delta- and dictionary-encoding need.
17pub struct Encoder<E, W> {
18 sink: W,
19 last_timestamp: u64,
20 uuids: HashMap<Uuid, u32>,
21 server_mode: bool,
22 entry_index: usize,
23 size: usize,
24 _phantom: std::marker::PhantomData<E>,
25}
26
27impl<E: Op, W: Write> Encoder<E, W> {
28 /// Create a new encoder.
29 ///
30 /// `magic` is the segment's leading identity bytes — it is **not** defined
31 /// by this library. Each application must supply its own stable, app-unique
32 /// value (the same bytes its decoder expects) so that one app's segments are
33 /// never mistaken for another's when they share a sync location. Use a
34 /// distinct value per app, and prefer same-length magics across apps so one
35 /// cannot be a prefix of another.
36 pub fn new(mut sink: W, magic: &[u8], server_mode: bool) -> Result<Self, CodecError> {
37 // An empty magic gives zero app isolation — the decoder would compare
38 // zero bytes and accept any header.
39 if magic.is_empty() {
40 return Err(CodecError::BadMagic);
41 }
42 // Write segment header
43 sink.write_all(magic)?;
44 if server_mode {
45 sink.write_all(&[FLAG_SERVER])?;
46 } else {
47 sink.write_all(&[FLAG_DEVICE])?;
48 }
49 Ok(Self {
50 sink,
51 last_timestamp: 0,
52 uuids: HashMap::default(),
53 server_mode,
54 entry_index: 0,
55 size: 0,
56 _phantom: std::marker::PhantomData,
57 })
58 }
59
60 /// Mutable access to the underlying writer (e.g. for fsync).
61 pub fn sink_mut(&mut self) -> &mut W {
62 &mut self.sink
63 }
64
65 /// Number of entries written so far in this segment.
66 pub fn entry_index(&self) -> usize {
67 self.entry_index
68 }
69
70 /// Encode one log entry. Takes parts by ref/value rather than a full
71 /// `LogEntry<E>` so callers iterating `&[E]` can avoid cloning each op
72 /// just to satisfy a `&LogEntry<E>` argument.
73 ///
74 /// The UUID dictionary and last timestamp are persistent, cross-entry
75 /// state; they are committed only once the entry's bytes are written. A
76 /// failure partway through leaves the encoder exactly as it was, so a
77 /// partly-built entry can never leave behind a UUID definition (or an
78 /// advanced clock) that the flushed bytes don't account for.
79 pub fn encode_entry(
80 &mut self,
81 op: &E,
82 timestamp: Timestamp,
83 server_user_id: Option<Uuid>,
84 ) -> Result<usize, CodecError> {
85 let dict_len_before = self.uuids.len();
86 let result = self.try_encode_entry(op, timestamp, server_user_id);
87 if result.is_err() {
88 // Roll back UUID definitions registered before the failure. IDs are
89 // handed out sequentially from 1, so any id past the pre-call count
90 // belongs to this aborted entry.
91 self.uuids.retain(|_, id| (*id as usize) <= dict_len_before);
92 }
93 result
94 }
95
96 fn try_encode_entry(
97 &mut self,
98 op: &E,
99 timestamp: Timestamp,
100 server_user_id: Option<Uuid>,
101 ) -> Result<usize, CodecError> {
102 let mut writer = EntryBufferWriter::new(&mut self.uuids);
103 // Order must match decoder: op (tag + body) → timestamp → server_user_id
104 op.encode(&mut writer)?;
105 let raw_timestamp = timestamp.raw();
106 writer.write_delta(raw_timestamp, self.last_timestamp)?;
107 if self.server_mode {
108 match server_user_id {
109 Some(server_user_id) => writer.write_uuid(&server_user_id),
110 None => return Err(CodecError::MissingUserId),
111 }
112 }
113 let (bytes, _) = writer.finalize();
114 self.sink.write_all(&bytes)?;
115 // Commit cross-entry state only now that the bytes are written.
116 self.last_timestamp = raw_timestamp;
117 self.entry_index += 1;
118 self.size += bytes.len();
119 Ok(self.entry_index)
120 }
121
122 /// Total entry bytes written so far (the segment header is not counted),
123 /// for deciding when to roll over to a new segment.
124 pub fn size(&self) -> usize {
125 self.size
126 }
127}