1use crate::hlc::HlcTimestamp;
2use crate::node_id::NodeId;
3
4#[derive(Clone, Copy, PartialEq, Eq, Hash)]
14pub struct CrdtMeta {
15 pub timestamp: HlcTimestamp,
16 pub node_id: NodeId,
17}
18
19pub const CRDT_META_SIZE: usize = 20;
21
22pub const CRDT_HEADER_SIZE: usize = 24;
24
25#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
27#[repr(u8)]
28pub enum EntryKind {
29 Put = 0,
31 Tombstone = 1,
33}
34
35impl EntryKind {
36 pub fn from_u8(v: u8) -> Option<Self> {
37 match v {
38 0 => Some(Self::Put),
39 1 => Some(Self::Tombstone),
40 _ => None,
41 }
42 }
43}
44
45impl CrdtMeta {
46 #[inline]
47 pub fn new(timestamp: HlcTimestamp, node_id: NodeId) -> Self {
48 Self { timestamp, node_id }
49 }
50
51 pub fn to_bytes(&self) -> [u8; CRDT_META_SIZE] {
52 let mut buf = [0u8; CRDT_META_SIZE];
53 let ts_bytes = self.timestamp.to_bytes();
54 let nid_bytes = self.node_id.to_bytes();
55 buf[0..12].copy_from_slice(&ts_bytes);
56 buf[12..20].copy_from_slice(&nid_bytes);
57 buf
58 }
59
60 pub fn from_bytes(b: &[u8; CRDT_META_SIZE]) -> Self {
61 let ts = HlcTimestamp::from_bytes(b[0..12].try_into().unwrap());
62 let nid = NodeId::from_bytes(b[12..20].try_into().unwrap());
63 Self {
64 timestamp: ts,
65 node_id: nid,
66 }
67 }
68
69 #[inline]
71 pub fn lww_cmp(&self, other: &Self) -> std::cmp::Ordering {
72 self.timestamp
73 .cmp(&other.timestamp)
74 .then(self.node_id.cmp(&other.node_id))
75 }
76
77 #[inline]
78 pub fn wins_over(&self, other: &Self) -> bool {
79 self.lww_cmp(other) == std::cmp::Ordering::Greater
80 }
81}
82
83impl std::fmt::Debug for CrdtMeta {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 write!(f, "CrdtMeta({:?}, {:?})", self.timestamp, self.node_id)
86 }
87}
88
89pub fn encode_lww_value(meta: &CrdtMeta, kind: EntryKind, user_value: &[u8]) -> Vec<u8> {
95 let user_len = if kind == EntryKind::Tombstone {
96 0
97 } else {
98 user_value.len()
99 };
100 let mut buf = Vec::with_capacity(CRDT_HEADER_SIZE + user_len);
101 buf.push(kind as u8);
102 buf.extend_from_slice(&[0u8; 3]); buf.extend_from_slice(&meta.to_bytes());
104 if kind == EntryKind::Put {
105 buf.extend_from_slice(user_value);
106 }
107 buf
108}
109
110#[derive(Debug)]
111pub struct DecodedValue<'a> {
112 pub meta: CrdtMeta,
113 pub kind: EntryKind,
114 pub user_value: &'a [u8],
115}
116
117pub fn decode_lww_value(data: &[u8]) -> Result<DecodedValue<'_>, DecodeError> {
118 if data.len() < CRDT_HEADER_SIZE {
119 return Err(DecodeError::TooShort {
120 expected: CRDT_HEADER_SIZE,
121 actual: data.len(),
122 });
123 }
124
125 let kind = EntryKind::from_u8(data[0]).ok_or(DecodeError::InvalidEntryKind(data[0]))?;
126 let meta_bytes: &[u8; CRDT_META_SIZE] = data[4..24].try_into().unwrap();
128 let meta = CrdtMeta::from_bytes(meta_bytes);
129
130 let user_value = if kind == EntryKind::Tombstone {
131 &data[CRDT_HEADER_SIZE..CRDT_HEADER_SIZE] } else {
133 &data[CRDT_HEADER_SIZE..]
134 };
135
136 Ok(DecodedValue {
137 meta,
138 kind,
139 user_value,
140 })
141}
142
143#[derive(Debug, thiserror::Error)]
144pub enum DecodeError {
145 #[error("CRDT value too short: expected at least {expected} bytes, got {actual}")]
146 TooShort { expected: usize, actual: usize },
147
148 #[error("invalid CRDT entry kind: {0}")]
149 InvalidEntryKind(u8),
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq)]
159pub enum MergeResult {
160 Local,
162 Remote,
164 Equal,
166}
167
168pub fn lww_merge(local: &CrdtMeta, remote: &CrdtMeta) -> MergeResult {
169 match local.lww_cmp(remote) {
170 std::cmp::Ordering::Greater => MergeResult::Local,
171 std::cmp::Ordering::Less => MergeResult::Remote,
172 std::cmp::Ordering::Equal => MergeResult::Equal,
173 }
174}
175
176#[cfg(test)]
177#[path = "crdt_tests.rs"]
178mod tests;