Skip to main content

citadel_sync/
apply.rs

1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8/// Result of applying a sync patch to a database.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11    /// Entries written (remote won or key was new).
12    pub entries_applied: u64,
13    /// Entries skipped (local won LWW comparison).
14    pub entries_skipped: u64,
15    /// Entries where local and remote are identical (no-op).
16    pub entries_equal: u64,
17}
18
19impl ApplyResult {
20    pub fn empty() -> Self {
21        Self {
22            entries_applied: 0,
23            entries_skipped: 0,
24            entries_equal: 0,
25        }
26    }
27}
28
29/// Apply a sync patch to a database via TxnManager.
30///
31/// Opens a write transaction, applies entries, and commits.
32/// For CRDT-aware patches: reads existing values and uses LWW merge.
33/// For non-CRDT patches: unconditionally writes all entries.
34pub fn apply_patch(manager: &TxnManager, patch: &SyncPatch) -> Result<ApplyResult> {
35    if patch.is_empty() {
36        return Ok(ApplyResult::empty());
37    }
38
39    let mut wtx = manager.begin_write()?;
40    let result = apply_patch_to_txn(&mut wtx, patch)?;
41    wtx.commit()?;
42    Ok(result)
43}
44
45/// Apply a sync patch within an existing write transaction.
46///
47/// The caller is responsible for committing or aborting the transaction.
48pub fn apply_patch_to_txn(wtx: &mut WriteTxn<'_>, patch: &SyncPatch) -> Result<ApplyResult> {
49    let mut result = ApplyResult::empty();
50
51    for entry in &patch.entries {
52        if patch.crdt_aware {
53            if let Some(ref remote_meta) = entry.crdt_meta {
54                let existing = wtx.get(&entry.key)?;
55                if let Some(local_data) = existing {
56                    if let Ok(local_decoded) = decode_lww_value(&local_data) {
57                        match lww_merge(&local_decoded.meta, remote_meta) {
58                            MergeResult::Local => {
59                                result.entries_skipped += 1;
60                                continue;
61                            }
62                            MergeResult::Equal => {
63                                result.entries_equal += 1;
64                                continue;
65                            }
66                            MergeResult::Remote => {
67                                // Remote wins - fall through to write
68                            }
69                        }
70                    }
71                    // Local value doesn't have valid CRDT header - remote wins
72                }
73                // Key doesn't exist locally - remote wins
74            }
75        }
76
77        // Write the entry (either non-CRDT unconditional, or CRDT remote-wins)
78        match entry.kind {
79            EntryKind::Put => {
80                wtx.insert(&entry.key, &entry.value)?;
81            }
82            EntryKind::Tombstone => {
83                // Tombstone: write the CRDT header as the value so it participates
84                // in future LWW merges. The key remains with a tombstone marker.
85                wtx.insert(&entry.key, &entry.value)?;
86            }
87        }
88        result.entries_applied += 1;
89    }
90
91    Ok(result)
92}
93
94/// Apply a sync patch to a named table, creating it if needed.
95pub fn apply_patch_to_table(
96    manager: &TxnManager,
97    table_name: &[u8],
98    patch: &SyncPatch,
99) -> Result<ApplyResult> {
100    if patch.is_empty() {
101        return Ok(ApplyResult::empty());
102    }
103
104    let mut wtx = manager.begin_write()?;
105    match wtx.create_table(table_name) {
106        Ok(()) => {}
107        Err(citadel_core::Error::TableAlreadyExists(_)) => {}
108        Err(e) => return Err(e),
109    }
110    let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
111    wtx.commit()?;
112    Ok(result)
113}
114
115/// Apply a sync patch to a named table within an existing write transaction.
116pub fn apply_patch_to_table_txn(
117    wtx: &mut WriteTxn<'_>,
118    table_name: &[u8],
119    patch: &SyncPatch,
120) -> Result<ApplyResult> {
121    let mut result = ApplyResult::empty();
122
123    for entry in &patch.entries {
124        if patch.crdt_aware {
125            if let Some(ref remote_meta) = entry.crdt_meta {
126                let existing = wtx.table_get(table_name, &entry.key)?;
127                if let Some(local_data) = existing {
128                    if let Ok(local_decoded) = decode_lww_value(&local_data) {
129                        match lww_merge(&local_decoded.meta, remote_meta) {
130                            MergeResult::Local => {
131                                result.entries_skipped += 1;
132                                continue;
133                            }
134                            MergeResult::Equal => {
135                                result.entries_equal += 1;
136                                continue;
137                            }
138                            MergeResult::Remote => {}
139                        }
140                    }
141                }
142            }
143        }
144
145        match entry.kind {
146            EntryKind::Put | EntryKind::Tombstone => {
147                wtx.table_insert(table_name, &entry.key, &entry.value)?;
148            }
149        }
150        result.entries_applied += 1;
151    }
152
153    Ok(result)
154}
155
156#[cfg(test)]
157mod tests {
158    use super::*;
159    use crate::crdt::{encode_lww_value, CrdtMeta, EntryKind};
160    use crate::hlc::HlcTimestamp;
161    use crate::node_id::NodeId;
162    use crate::patch::PatchEntry;
163
164    use citadel_core::constants::{DEK_SIZE, MAC_KEY_SIZE, MAC_SIZE};
165    use citadel_io::mmap_io::MmapPageIO;
166
167    const SECOND: i64 = 1_000_000_000;
168
169    fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
170        CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
171    }
172
173    fn test_manager(path: &std::path::Path) -> TxnManager {
174        let file = std::fs::OpenOptions::new()
175            .read(true)
176            .write(true)
177            .create(true)
178            .truncate(true)
179            .open(path)
180            .unwrap();
181        let io = Box::new(MmapPageIO::try_new(file).unwrap());
182        let dek = [0x42u8; DEK_SIZE];
183        let mac_key = [0x43u8; MAC_KEY_SIZE];
184        let dek_id = [0x44u8; MAC_SIZE];
185        TxnManager::create(io, dek, mac_key, 1, 0x1234, dek_id, 256).unwrap()
186    }
187
188    #[test]
189    fn apply_empty_patch() {
190        let dir = tempfile::tempdir().unwrap();
191        let mgr = test_manager(&dir.path().join("test.db"));
192        let patch = SyncPatch::empty(NodeId::from_u64(1));
193        let result = apply_patch(&mgr, &patch).unwrap();
194        assert_eq!(result, ApplyResult::empty());
195    }
196
197    #[test]
198    fn apply_non_crdt_unconditional() {
199        let dir = tempfile::tempdir().unwrap();
200        let mgr = test_manager(&dir.path().join("test.db"));
201
202        let mut wtx = mgr.begin_write().unwrap();
203        wtx.insert(b"key1", b"old-value").unwrap();
204        wtx.commit().unwrap();
205
206        let patch = SyncPatch {
207            source_node: NodeId::from_u64(1),
208            entries: vec![
209                PatchEntry {
210                    key: b"key1".to_vec(),
211                    value: b"new-value".to_vec(),
212                    kind: EntryKind::Put,
213                    crdt_meta: None,
214                },
215                PatchEntry {
216                    key: b"key2".to_vec(),
217                    value: b"brand-new".to_vec(),
218                    kind: EntryKind::Put,
219                    crdt_meta: None,
220                },
221            ],
222            crdt_aware: false,
223        };
224
225        let result = apply_patch(&mgr, &patch).unwrap();
226        assert_eq!(result.entries_applied, 2);
227
228        let mut rtx = mgr.begin_read();
229        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), b"new-value");
230        assert_eq!(rtx.get(b"key2").unwrap().unwrap(), b"brand-new");
231    }
232
233    #[test]
234    fn apply_crdt_remote_wins() {
235        let dir = tempfile::tempdir().unwrap();
236        let mgr = test_manager(&dir.path().join("test.db"));
237
238        let local_meta = meta(1000 * SECOND, 0, 1);
239        let remote_meta = meta(2000 * SECOND, 0, 2);
240
241        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
242        let mut wtx = mgr.begin_write().unwrap();
243        wtx.insert(b"key1", &local_val).unwrap();
244        wtx.commit().unwrap();
245
246        let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
247        let patch = SyncPatch {
248            source_node: NodeId::from_u64(2),
249            entries: vec![PatchEntry {
250                key: b"key1".to_vec(),
251                value: remote_val.clone(),
252                kind: EntryKind::Put,
253                crdt_meta: Some(remote_meta),
254            }],
255            crdt_aware: true,
256        };
257
258        let result = apply_patch(&mgr, &patch).unwrap();
259        assert_eq!(result.entries_applied, 1);
260        assert_eq!(result.entries_skipped, 0);
261
262        let mut rtx = mgr.begin_read();
263        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), remote_val);
264    }
265
266    #[test]
267    fn apply_crdt_local_wins() {
268        let dir = tempfile::tempdir().unwrap();
269        let mgr = test_manager(&dir.path().join("test.db"));
270
271        let local_meta = meta(2000 * SECOND, 0, 1);
272        let remote_meta = meta(1000 * SECOND, 0, 2);
273
274        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
275        let mut wtx = mgr.begin_write().unwrap();
276        wtx.insert(b"key1", &local_val).unwrap();
277        wtx.commit().unwrap();
278
279        let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
280        let patch = SyncPatch {
281            source_node: NodeId::from_u64(2),
282            entries: vec![PatchEntry {
283                key: b"key1".to_vec(),
284                value: remote_val,
285                kind: EntryKind::Put,
286                crdt_meta: Some(remote_meta),
287            }],
288            crdt_aware: true,
289        };
290
291        let result = apply_patch(&mgr, &patch).unwrap();
292        assert_eq!(result.entries_applied, 0);
293        assert_eq!(result.entries_skipped, 1);
294
295        let mut rtx = mgr.begin_read();
296        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), local_val);
297    }
298
299    #[test]
300    fn apply_crdt_equal() {
301        let dir = tempfile::tempdir().unwrap();
302        let mgr = test_manager(&dir.path().join("test.db"));
303
304        let m = meta(1000 * SECOND, 5, 42);
305        let val = encode_lww_value(&m, EntryKind::Put, b"same");
306
307        let mut wtx = mgr.begin_write().unwrap();
308        wtx.insert(b"key1", &val).unwrap();
309        wtx.commit().unwrap();
310
311        let patch = SyncPatch {
312            source_node: NodeId::from_u64(42),
313            entries: vec![PatchEntry {
314                key: b"key1".to_vec(),
315                value: val.clone(),
316                kind: EntryKind::Put,
317                crdt_meta: Some(m),
318            }],
319            crdt_aware: true,
320        };
321
322        let result = apply_patch(&mgr, &patch).unwrap();
323        assert_eq!(result.entries_equal, 1);
324        assert_eq!(result.entries_applied, 0);
325    }
326
327    #[test]
328    fn apply_crdt_new_key() {
329        let dir = tempfile::tempdir().unwrap();
330        let mgr = test_manager(&dir.path().join("test.db"));
331
332        let m = meta(1000 * SECOND, 0, 1);
333        let val = encode_lww_value(&m, EntryKind::Put, b"new");
334
335        let patch = SyncPatch {
336            source_node: NodeId::from_u64(1),
337            entries: vec![PatchEntry {
338                key: b"new-key".to_vec(),
339                value: val.clone(),
340                kind: EntryKind::Put,
341                crdt_meta: Some(m),
342            }],
343            crdt_aware: true,
344        };
345
346        let result = apply_patch(&mgr, &patch).unwrap();
347        assert_eq!(result.entries_applied, 1);
348
349        let mut rtx = mgr.begin_read();
350        assert_eq!(rtx.get(b"new-key").unwrap().unwrap(), val);
351    }
352
353    #[test]
354    fn apply_crdt_tombstone() {
355        let dir = tempfile::tempdir().unwrap();
356        let mgr = test_manager(&dir.path().join("test.db"));
357
358        let local_meta = meta(1000 * SECOND, 0, 1);
359        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"alive");
360        let mut wtx = mgr.begin_write().unwrap();
361        wtx.insert(b"key1", &local_val).unwrap();
362        wtx.commit().unwrap();
363
364        let remote_meta = meta(2000 * SECOND, 0, 2);
365        let tombstone_val = encode_lww_value(&remote_meta, EntryKind::Tombstone, b"");
366
367        let patch = SyncPatch {
368            source_node: NodeId::from_u64(2),
369            entries: vec![PatchEntry {
370                key: b"key1".to_vec(),
371                value: tombstone_val.clone(),
372                kind: EntryKind::Tombstone,
373                crdt_meta: Some(remote_meta),
374            }],
375            crdt_aware: true,
376        };
377
378        let result = apply_patch(&mgr, &patch).unwrap();
379        assert_eq!(result.entries_applied, 1);
380
381        let mut rtx = mgr.begin_read();
382        let stored = rtx.get(b"key1").unwrap().unwrap();
383        let decoded = decode_lww_value(&stored).unwrap();
384        assert_eq!(decoded.kind, EntryKind::Tombstone);
385    }
386
387    #[test]
388    fn apply_to_txn() {
389        let dir = tempfile::tempdir().unwrap();
390        let mgr = test_manager(&dir.path().join("test.db"));
391
392        let patch = SyncPatch {
393            source_node: NodeId::from_u64(1),
394            entries: vec![PatchEntry {
395                key: b"k".to_vec(),
396                value: b"v".to_vec(),
397                kind: EntryKind::Put,
398                crdt_meta: None,
399            }],
400            crdt_aware: false,
401        };
402
403        let mut wtx = mgr.begin_write().unwrap();
404        let result = apply_patch_to_txn(&mut wtx, &patch).unwrap();
405        assert_eq!(result.entries_applied, 1);
406        wtx.commit().unwrap();
407
408        let mut rtx = mgr.begin_read();
409        assert_eq!(rtx.get(b"k").unwrap().unwrap(), b"v");
410    }
411}