1use crate::crdt::{CrdtMeta, EntryKind, CRDT_HEADER_SIZE, CRDT_META_SIZE};
2use crate::diff::DiffResult;
3use crate::node_id::NodeId;
4
5const PATCH_MAGIC: u32 = 0x53594E43; const PATCH_VERSION: u8 = 1;
7
8const FLAG_HAS_CRDT: u8 = 0x01;
9
10#[derive(Debug, Clone, PartialEq, Eq)]
12pub struct PatchEntry {
13 pub key: Vec<u8>,
14 pub value: Vec<u8>,
15 pub kind: EntryKind,
16 pub crdt_meta: Option<CrdtMeta>,
17}
18
19#[derive(Debug, Clone)]
21pub struct SyncPatch {
22 pub source_node: NodeId,
23 pub entries: Vec<PatchEntry>,
24 pub crdt_aware: bool,
25}
26
27#[derive(Debug, thiserror::Error)]
29pub enum PatchError {
30 #[error("invalid patch magic: expected {expected:#010x}, got {actual:#010x}")]
31 InvalidMagic { expected: u32, actual: u32 },
32
33 #[error("unsupported patch version: {0}")]
34 UnsupportedVersion(u8),
35
36 #[error("patch data truncated: expected at least {expected} bytes, got {actual}")]
37 Truncated { expected: usize, actual: usize },
38
39 #[error("invalid entry kind: {0}")]
40 InvalidEntryKind(u8),
41}
42
43impl SyncPatch {
44 pub fn from_diff(source_node: NodeId, diff: &DiffResult, crdt_aware: bool) -> Self {
49 let entries = diff
50 .entries
51 .iter()
52 .map(|e| {
53 if crdt_aware && e.value.len() >= CRDT_HEADER_SIZE {
54 if let Ok(decoded) = crate::crdt::decode_lww_value(&e.value) {
55 return PatchEntry {
56 key: e.key.clone(),
57 value: e.value.clone(),
58 kind: decoded.kind,
59 crdt_meta: Some(decoded.meta),
60 };
61 }
62 }
63 PatchEntry {
64 key: e.key.clone(),
65 value: e.value.clone(),
66 kind: EntryKind::Put,
67 crdt_meta: None,
68 }
69 })
70 .collect();
71
72 SyncPatch {
73 source_node,
74 entries,
75 crdt_aware,
76 }
77 }
78
79 pub fn empty(source_node: NodeId) -> Self {
81 SyncPatch {
82 source_node,
83 entries: Vec::new(),
84 crdt_aware: false,
85 }
86 }
87
88 pub fn len(&self) -> usize {
89 self.entries.len()
90 }
91
92 pub fn is_empty(&self) -> bool {
93 self.entries.is_empty()
94 }
95
96 pub fn serialize(&self) -> Vec<u8> {
107 let flags = if self.crdt_aware { FLAG_HAS_CRDT } else { 0 };
108
109 let header_size = 4 + 1 + 1 + 8 + 4; let per_entry_overhead = 2 + 4 + 1 + if self.crdt_aware { CRDT_META_SIZE } else { 0 };
111 let data_size: usize = self
112 .entries
113 .iter()
114 .map(|e| per_entry_overhead + e.key.len() + e.value.len())
115 .sum();
116
117 let mut buf = Vec::with_capacity(header_size + data_size);
118
119 buf.extend_from_slice(&PATCH_MAGIC.to_le_bytes());
120 buf.push(PATCH_VERSION);
121 buf.push(flags);
122 buf.extend_from_slice(&self.source_node.to_bytes());
123 buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
124
125 for entry in &self.entries {
126 buf.extend_from_slice(&(entry.key.len() as u16).to_le_bytes());
127 buf.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
128 buf.push(entry.kind as u8);
129 if self.crdt_aware {
130 if let Some(ref meta) = entry.crdt_meta {
131 buf.extend_from_slice(&meta.to_bytes());
132 } else {
133 buf.extend_from_slice(&[0u8; CRDT_META_SIZE]);
134 }
135 }
136 buf.extend_from_slice(&entry.key);
137 buf.extend_from_slice(&entry.value);
138 }
139
140 buf
141 }
142
143 pub fn deserialize(data: &[u8]) -> Result<Self, PatchError> {
145 let header_size = 4 + 1 + 1 + 8 + 4; if data.len() < header_size {
147 return Err(PatchError::Truncated {
148 expected: header_size,
149 actual: data.len(),
150 });
151 }
152
153 let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
154 if magic != PATCH_MAGIC {
155 return Err(PatchError::InvalidMagic {
156 expected: PATCH_MAGIC,
157 actual: magic,
158 });
159 }
160
161 let version = data[4];
162 if version != PATCH_VERSION {
163 return Err(PatchError::UnsupportedVersion(version));
164 }
165
166 let flags = data[5];
167 let crdt_aware = (flags & FLAG_HAS_CRDT) != 0;
168 let source_node = NodeId::from_bytes(data[6..14].try_into().unwrap());
169 let entry_count = u32::from_le_bytes(data[14..18].try_into().unwrap()) as usize;
170
171 let mut entries = Vec::with_capacity(entry_count);
172 let mut pos = header_size;
173
174 for _ in 0..entry_count {
175 let entry_header = 7 + if crdt_aware { CRDT_META_SIZE } else { 0 };
177 if pos + entry_header > data.len() {
178 return Err(PatchError::Truncated {
179 expected: pos + entry_header,
180 actual: data.len(),
181 });
182 }
183
184 let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
185 let value_len = u32::from_le_bytes(data[pos + 2..pos + 6].try_into().unwrap()) as usize;
186 let kind_byte = data[pos + 6];
187 let kind =
188 EntryKind::from_u8(kind_byte).ok_or(PatchError::InvalidEntryKind(kind_byte))?;
189 pos += 7;
190
191 let crdt_meta = if crdt_aware {
192 let meta_bytes: &[u8; CRDT_META_SIZE] =
193 data[pos..pos + CRDT_META_SIZE].try_into().unwrap();
194 pos += CRDT_META_SIZE;
195 Some(CrdtMeta::from_bytes(meta_bytes))
196 } else {
197 None
198 };
199
200 if pos + key_len + value_len > data.len() {
201 return Err(PatchError::Truncated {
202 expected: pos + key_len + value_len,
203 actual: data.len(),
204 });
205 }
206
207 let key = data[pos..pos + key_len].to_vec();
208 pos += key_len;
209 let value = data[pos..pos + value_len].to_vec();
210 pos += value_len;
211
212 entries.push(PatchEntry {
213 key,
214 value,
215 kind,
216 crdt_meta,
217 });
218 }
219
220 Ok(SyncPatch {
221 source_node,
222 entries,
223 crdt_aware,
224 })
225 }
226}
227
228#[cfg(test)]
229#[path = "patch_tests.rs"]
230mod tests;