Skip to main content

citadel_sync/
crdt.rs

1use crate::hlc::HlcTimestamp;
2use crate::node_id::NodeId;
3
4/// Per-entry CRDT metadata for LWW (Last-Writer-Wins) conflict resolution.
5///
6/// 20 bytes on wire: entry_kind (1B) + padding (3B) + HLC timestamp (12B) + NodeId (8B).
7///
8/// Conflict resolution: higher timestamp wins, NodeId tiebreaker.
9/// This forms a join-semilattice with a total order, guaranteeing:
10/// - Commutativity: merge(a, b) == merge(b, a)
11/// - Associativity: merge(merge(a, b), c) == merge(a, merge(b, c))
12/// - Idempotency: merge(a, a) == a
13#[derive(Clone, Copy, PartialEq, Eq, Hash)]
14pub struct CrdtMeta {
15    pub timestamp: HlcTimestamp,
16    pub node_id: NodeId,
17}
18
19/// Wire size of CrdtMeta: HLC (12B) + NodeId (8B) = 20 bytes.
20pub const CRDT_META_SIZE: usize = 20;
21
22/// Wire size of a full CRDT-encoded value header: kind (1B) + padding (3B) + meta (20B) = 24 bytes.
23pub const CRDT_HEADER_SIZE: usize = 24;
24
25/// Type of CRDT entry.
26#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27#[repr(u8)]
28pub enum EntryKind {
29    /// Key-value write (user value follows the header).
30    Put = 0,
31    /// Logical delete (tombstone). No user value follows.
32    Tombstone = 1,
33}
34
35impl EntryKind {
36    pub fn from_u8(v: u8) -> Option<Self> {
37        match v {
38            0 => Some(Self::Put),
39            1 => Some(Self::Tombstone),
40            _ => None,
41        }
42    }
43}
44
45impl CrdtMeta {
46    #[inline]
47    pub fn new(timestamp: HlcTimestamp, node_id: NodeId) -> Self {
48        Self { timestamp, node_id }
49    }
50
51    pub fn to_bytes(&self) -> [u8; CRDT_META_SIZE] {
52        let mut buf = [0u8; CRDT_META_SIZE];
53        let ts_bytes = self.timestamp.to_bytes();
54        let nid_bytes = self.node_id.to_bytes();
55        buf[0..12].copy_from_slice(&ts_bytes);
56        buf[12..20].copy_from_slice(&nid_bytes);
57        buf
58    }
59
60    pub fn from_bytes(b: &[u8; CRDT_META_SIZE]) -> Self {
61        let ts = HlcTimestamp::from_bytes(b[0..12].try_into().unwrap());
62        let nid = NodeId::from_bytes(b[12..20].try_into().unwrap());
63        Self {
64            timestamp: ts,
65            node_id: nid,
66        }
67    }
68
69    /// Higher timestamp wins, NodeId tiebreaker.
70    #[inline]
71    pub fn lww_cmp(&self, other: &Self) -> std::cmp::Ordering {
72        self.timestamp
73            .cmp(&other.timestamp)
74            .then(self.node_id.cmp(&other.node_id))
75    }
76
77    #[inline]
78    pub fn wins_over(&self, other: &Self) -> bool {
79        self.lww_cmp(other) == std::cmp::Ordering::Greater
80    }
81}
82
83impl std::fmt::Debug for CrdtMeta {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        write!(f, "CrdtMeta({:?}, {:?})", self.timestamp, self.node_id)
86    }
87}
88
89/// Encode a user value with CRDT header.
90///
91/// Format: `[entry_kind: u8][_pad: 3B][HLC: 12B][NodeId: 8B][user_value...]`
92///
93/// Total header: 24 bytes. User value follows immediately after.
94pub fn encode_lww_value(meta: &CrdtMeta, kind: EntryKind, user_value: &[u8]) -> Vec<u8> {
95    let user_len = if kind == EntryKind::Tombstone {
96        0
97    } else {
98        user_value.len()
99    };
100    let mut buf = Vec::with_capacity(CRDT_HEADER_SIZE + user_len);
101    buf.push(kind as u8);
102    buf.extend_from_slice(&[0u8; 3]); // padding
103    buf.extend_from_slice(&meta.to_bytes());
104    if kind == EntryKind::Put {
105        buf.extend_from_slice(user_value);
106    }
107    buf
108}
109
110#[derive(Debug)]
111pub struct DecodedValue<'a> {
112    pub meta: CrdtMeta,
113    pub kind: EntryKind,
114    pub user_value: &'a [u8],
115}
116
117pub fn decode_lww_value(data: &[u8]) -> Result<DecodedValue<'_>, DecodeError> {
118    if data.len() < CRDT_HEADER_SIZE {
119        return Err(DecodeError::TooShort {
120            expected: CRDT_HEADER_SIZE,
121            actual: data.len(),
122        });
123    }
124
125    let kind = EntryKind::from_u8(data[0]).ok_or(DecodeError::InvalidEntryKind(data[0]))?;
126    // bytes 1..4 are padding (ignored on read)
127    let meta_bytes: &[u8; CRDT_META_SIZE] = data[4..24].try_into().unwrap();
128    let meta = CrdtMeta::from_bytes(meta_bytes);
129
130    let user_value = if kind == EntryKind::Tombstone {
131        &data[CRDT_HEADER_SIZE..CRDT_HEADER_SIZE] // empty slice
132    } else {
133        &data[CRDT_HEADER_SIZE..]
134    };
135
136    Ok(DecodedValue {
137        meta,
138        kind,
139        user_value,
140    })
141}
142
143#[derive(Debug, thiserror::Error)]
144pub enum DecodeError {
145    #[error("CRDT value too short: expected at least {expected} bytes, got {actual}")]
146    TooShort { expected: usize, actual: usize },
147
148    #[error("invalid CRDT entry kind: {0}")]
149    InvalidEntryKind(u8),
150}
151
152/// Merge two CRDT entries for the same key.
153///
154/// Returns which side wins using LWW resolution:
155/// higher timestamp wins, NodeId tiebreaker.
156/// The entry kind (Put vs Tombstone) does NOT affect the merge -
157/// a tombstone with a higher timestamp defeats a put with a lower one.
158#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum MergeResult {
160    /// Keep the local entry.
161    Local,
162    /// Take the remote entry.
163    Remote,
164    /// Both entries are identical.
165    Equal,
166}
167
168pub fn lww_merge(local: &CrdtMeta, remote: &CrdtMeta) -> MergeResult {
169    match local.lww_cmp(remote) {
170        std::cmp::Ordering::Greater => MergeResult::Local,
171        std::cmp::Ordering::Less => MergeResult::Remote,
172        std::cmp::Ordering::Equal => MergeResult::Equal,
173    }
174}
175
176#[cfg(test)]
177#[path = "crdt_tests.rs"]
178mod tests;