Skip to main content

citadel_sync/
patch.rs

1use crate::crdt::{CrdtMeta, EntryKind, CRDT_HEADER_SIZE, CRDT_META_SIZE};
2use crate::diff::DiffResult;
3use crate::node_id::NodeId;
4
5const PATCH_MAGIC: u32 = 0x53594E43; // "SYNC"
6const PATCH_VERSION: u8 = 1;
7
8const FLAG_HAS_CRDT: u8 = 0x01;
9
10/// A single entry in a sync patch.
11#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PatchEntry {
13    pub key: Vec<u8>,
14    pub value: Vec<u8>,
15    pub kind: EntryKind,
16    pub crdt_meta: Option<CrdtMeta>,
17}
18
19/// A serializable sync patch containing entries to apply to a target database.
20#[derive(Debug, Clone)]
21pub struct SyncPatch {
22    pub source_node: NodeId,
23    pub entries: Vec<PatchEntry>,
24    pub crdt_aware: bool,
25}
26
27/// Errors from patch serialization/deserialization.
28#[derive(Debug, thiserror::Error)]
29pub enum PatchError {
30    #[error("invalid patch magic: expected {expected:#010x}, got {actual:#010x}")]
31    InvalidMagic { expected: u32, actual: u32 },
32
33    #[error("unsupported patch version: {0}")]
34    UnsupportedVersion(u8),
35
36    #[error("patch data truncated: expected at least {expected} bytes, got {actual}")]
37    Truncated { expected: usize, actual: usize },
38
39    #[error("invalid entry kind: {0}")]
40    InvalidEntryKind(u8),
41}
42
43impl SyncPatch {
44    /// Build a SyncPatch from a DiffResult.
45    ///
46    /// If `crdt_aware` is true, values are expected to contain CRDT headers
47    /// and entries will carry CrdtMeta extracted from the value prefix.
48    pub fn from_diff(source_node: NodeId, diff: &DiffResult, crdt_aware: bool) -> Self {
49        let entries = diff
50            .entries
51            .iter()
52            .map(|e| {
53                if crdt_aware && e.value.len() >= CRDT_HEADER_SIZE {
54                    if let Ok(decoded) = crate::crdt::decode_lww_value(&e.value) {
55                        return PatchEntry {
56                            key: e.key.clone(),
57                            value: e.value.clone(),
58                            kind: decoded.kind,
59                            crdt_meta: Some(decoded.meta),
60                        };
61                    }
62                }
63                PatchEntry {
64                    key: e.key.clone(),
65                    value: e.value.clone(),
66                    kind: EntryKind::Put,
67                    crdt_meta: None,
68                }
69            })
70            .collect();
71
72        SyncPatch {
73            source_node,
74            entries,
75            crdt_aware,
76        }
77    }
78
79    /// Create an empty patch.
80    pub fn empty(source_node: NodeId) -> Self {
81        SyncPatch {
82            source_node,
83            entries: Vec::new(),
84            crdt_aware: false,
85        }
86    }
87
88    pub fn len(&self) -> usize {
89        self.entries.len()
90    }
91
92    pub fn is_empty(&self) -> bool {
93        self.entries.is_empty()
94    }
95
96    /// Serialize to binary wire format.
97    ///
98    /// Format:
99    /// ```text
100    /// [magic: u32 LE][version: u8][flags: u8][source_node: 8B][entry_count: u32 LE]
101    /// Per entry:
102    ///   [key_len: u16 LE][value_len: u32 LE][kind: u8]
103    ///   [crdt_meta: 20B]  (if FLAG_HAS_CRDT)
104    ///   [key: key_len bytes][value: value_len bytes]
105    /// ```
106    pub fn serialize(&self) -> Vec<u8> {
107        let flags = if self.crdt_aware { FLAG_HAS_CRDT } else { 0 };
108
109        let header_size = 4 + 1 + 1 + 8 + 4; // 18
110        let per_entry_overhead = 2 + 4 + 1 + if self.crdt_aware { CRDT_META_SIZE } else { 0 };
111        let data_size: usize = self
112            .entries
113            .iter()
114            .map(|e| per_entry_overhead + e.key.len() + e.value.len())
115            .sum();
116
117        let mut buf = Vec::with_capacity(header_size + data_size);
118
119        buf.extend_from_slice(&PATCH_MAGIC.to_le_bytes());
120        buf.push(PATCH_VERSION);
121        buf.push(flags);
122        buf.extend_from_slice(&self.source_node.to_bytes());
123        buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
124
125        for entry in &self.entries {
126            buf.extend_from_slice(&(entry.key.len() as u16).to_le_bytes());
127            buf.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
128            buf.push(entry.kind as u8);
129            if self.crdt_aware {
130                if let Some(ref meta) = entry.crdt_meta {
131                    buf.extend_from_slice(&meta.to_bytes());
132                } else {
133                    buf.extend_from_slice(&[0u8; CRDT_META_SIZE]);
134                }
135            }
136            buf.extend_from_slice(&entry.key);
137            buf.extend_from_slice(&entry.value);
138        }
139
140        buf
141    }
142
143    /// Deserialize from binary wire format.
144    pub fn deserialize(data: &[u8]) -> Result<Self, PatchError> {
145        let header_size = 4 + 1 + 1 + 8 + 4; // 18 bytes
146        if data.len() < header_size {
147            return Err(PatchError::Truncated {
148                expected: header_size,
149                actual: data.len(),
150            });
151        }
152
153        let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
154        if magic != PATCH_MAGIC {
155            return Err(PatchError::InvalidMagic {
156                expected: PATCH_MAGIC,
157                actual: magic,
158            });
159        }
160
161        let version = data[4];
162        if version != PATCH_VERSION {
163            return Err(PatchError::UnsupportedVersion(version));
164        }
165
166        let flags = data[5];
167        let crdt_aware = (flags & FLAG_HAS_CRDT) != 0;
168        let source_node = NodeId::from_bytes(data[6..14].try_into().unwrap());
169        let entry_count = u32::from_le_bytes(data[14..18].try_into().unwrap()) as usize;
170
171        let mut entries = Vec::with_capacity(entry_count);
172        let mut pos = header_size;
173
174        for _ in 0..entry_count {
175            // key_len (2) + value_len (4) + kind (1) = 7
176            let entry_header = 7 + if crdt_aware { CRDT_META_SIZE } else { 0 };
177            if pos + entry_header > data.len() {
178                return Err(PatchError::Truncated {
179                    expected: pos + entry_header,
180                    actual: data.len(),
181                });
182            }
183
184            let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
185            let value_len = u32::from_le_bytes(data[pos + 2..pos + 6].try_into().unwrap()) as usize;
186            let kind_byte = data[pos + 6];
187            let kind =
188                EntryKind::from_u8(kind_byte).ok_or(PatchError::InvalidEntryKind(kind_byte))?;
189            pos += 7;
190
191            let crdt_meta = if crdt_aware {
192                let meta_bytes: &[u8; CRDT_META_SIZE] =
193                    data[pos..pos + CRDT_META_SIZE].try_into().unwrap();
194                pos += CRDT_META_SIZE;
195                Some(CrdtMeta::from_bytes(meta_bytes))
196            } else {
197                None
198            };
199
200            if pos + key_len + value_len > data.len() {
201                return Err(PatchError::Truncated {
202                    expected: pos + key_len + value_len,
203                    actual: data.len(),
204                });
205            }
206
207            let key = data[pos..pos + key_len].to_vec();
208            pos += key_len;
209            let value = data[pos..pos + value_len].to_vec();
210            pos += value_len;
211
212            entries.push(PatchEntry {
213                key,
214                value,
215                kind,
216                crdt_meta,
217            });
218        }
219
220        Ok(SyncPatch {
221            source_node,
222            entries,
223            crdt_aware,
224        })
225    }
226}
227
228#[cfg(test)]
229#[path = "patch_tests.rs"]
230mod tests;