use crate::crdt::{CrdtMeta, EntryKind, CRDT_HEADER_SIZE, CRDT_META_SIZE};
use crate::diff::DiffResult;
use crate::node_id::NodeId;
const PATCH_MAGIC: u32 = 0x53594E43; const PATCH_VERSION: u8 = 1;
const FLAG_HAS_CRDT: u8 = 0x01;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PatchEntry {
pub key: Vec<u8>,
pub value: Vec<u8>,
pub kind: EntryKind,
pub crdt_meta: Option<CrdtMeta>,
}
#[derive(Debug, Clone)]
pub struct SyncPatch {
pub source_node: NodeId,
pub entries: Vec<PatchEntry>,
pub crdt_aware: bool,
}
#[derive(Debug, thiserror::Error)]
pub enum PatchError {
#[error("invalid patch magic: expected {expected:#010x}, got {actual:#010x}")]
InvalidMagic { expected: u32, actual: u32 },
#[error("unsupported patch version: {0}")]
UnsupportedVersion(u8),
#[error("patch data truncated: expected at least {expected} bytes, got {actual}")]
Truncated { expected: usize, actual: usize },
#[error("invalid entry kind: {0}")]
InvalidEntryKind(u8),
}
impl SyncPatch {
pub fn from_diff(source_node: NodeId, diff: &DiffResult, crdt_aware: bool) -> Self {
let entries = diff
.entries
.iter()
.map(|e| {
if crdt_aware && e.value.len() >= CRDT_HEADER_SIZE {
if let Ok(decoded) = crate::crdt::decode_lww_value(&e.value) {
return PatchEntry {
key: e.key.clone(),
value: e.value.clone(),
kind: decoded.kind,
crdt_meta: Some(decoded.meta),
};
}
}
PatchEntry {
key: e.key.clone(),
value: e.value.clone(),
kind: EntryKind::Put,
crdt_meta: None,
}
})
.collect();
SyncPatch {
source_node,
entries,
crdt_aware,
}
}
pub fn empty(source_node: NodeId) -> Self {
SyncPatch {
source_node,
entries: Vec::new(),
crdt_aware: false,
}
}
pub fn len(&self) -> usize {
self.entries.len()
}
pub fn is_empty(&self) -> bool {
self.entries.is_empty()
}
pub fn serialize(&self) -> Vec<u8> {
let flags = if self.crdt_aware { FLAG_HAS_CRDT } else { 0 };
let header_size = 4 + 1 + 1 + 8 + 4; let per_entry_overhead = 2 + 4 + 1 + if self.crdt_aware { CRDT_META_SIZE } else { 0 };
let data_size: usize = self
.entries
.iter()
.map(|e| per_entry_overhead + e.key.len() + e.value.len())
.sum();
let mut buf = Vec::with_capacity(header_size + data_size);
buf.extend_from_slice(&PATCH_MAGIC.to_le_bytes());
buf.push(PATCH_VERSION);
buf.push(flags);
buf.extend_from_slice(&self.source_node.to_bytes());
buf.extend_from_slice(&(self.entries.len() as u32).to_le_bytes());
for entry in &self.entries {
buf.extend_from_slice(&(entry.key.len() as u16).to_le_bytes());
buf.extend_from_slice(&(entry.value.len() as u32).to_le_bytes());
buf.push(entry.kind as u8);
if self.crdt_aware {
if let Some(ref meta) = entry.crdt_meta {
buf.extend_from_slice(&meta.to_bytes());
} else {
buf.extend_from_slice(&[0u8; CRDT_META_SIZE]);
}
}
buf.extend_from_slice(&entry.key);
buf.extend_from_slice(&entry.value);
}
buf
}
pub fn deserialize(data: &[u8]) -> Result<Self, PatchError> {
let header_size = 4 + 1 + 1 + 8 + 4; if data.len() < header_size {
return Err(PatchError::Truncated {
expected: header_size,
actual: data.len(),
});
}
let magic = u32::from_le_bytes(data[0..4].try_into().unwrap());
if magic != PATCH_MAGIC {
return Err(PatchError::InvalidMagic {
expected: PATCH_MAGIC,
actual: magic,
});
}
let version = data[4];
if version != PATCH_VERSION {
return Err(PatchError::UnsupportedVersion(version));
}
let flags = data[5];
let crdt_aware = (flags & FLAG_HAS_CRDT) != 0;
let source_node = NodeId::from_bytes(data[6..14].try_into().unwrap());
let entry_count = u32::from_le_bytes(data[14..18].try_into().unwrap()) as usize;
let mut entries = Vec::with_capacity(entry_count);
let mut pos = header_size;
for _ in 0..entry_count {
let entry_header = 7 + if crdt_aware { CRDT_META_SIZE } else { 0 };
if pos + entry_header > data.len() {
return Err(PatchError::Truncated {
expected: pos + entry_header,
actual: data.len(),
});
}
let key_len = u16::from_le_bytes(data[pos..pos + 2].try_into().unwrap()) as usize;
let value_len = u32::from_le_bytes(data[pos + 2..pos + 6].try_into().unwrap()) as usize;
let kind_byte = data[pos + 6];
let kind =
EntryKind::from_u8(kind_byte).ok_or(PatchError::InvalidEntryKind(kind_byte))?;
pos += 7;
let crdt_meta = if crdt_aware {
let meta_bytes: &[u8; CRDT_META_SIZE] =
data[pos..pos + CRDT_META_SIZE].try_into().unwrap();
pos += CRDT_META_SIZE;
Some(CrdtMeta::from_bytes(meta_bytes))
} else {
None
};
if pos + key_len + value_len > data.len() {
return Err(PatchError::Truncated {
expected: pos + key_len + value_len,
actual: data.len(),
});
}
let key = data[pos..pos + key_len].to_vec();
pos += key_len;
let value = data[pos..pos + value_len].to_vec();
pos += value_len;
entries.push(PatchEntry {
key,
value,
kind,
crdt_meta,
});
}
Ok(SyncPatch {
source_node,
entries,
crdt_aware,
})
}
}
#[cfg(test)]
#[path = "patch_tests.rs"]
mod tests;