Skip to main content

citadel_sync/
apply.rs

1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8/// Result of applying a sync patch to a database.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11    /// Entries written (remote won or key was new).
12    pub entries_applied: u64,
13    /// Entries skipped (local won LWW comparison).
14    pub entries_skipped: u64,
15    /// Entries where local and remote are identical (no-op).
16    pub entries_equal: u64,
17}
18
19impl ApplyResult {
20    pub fn empty() -> Self {
21        Self {
22            entries_applied: 0,
23            entries_skipped: 0,
24            entries_equal: 0,
25        }
26    }
27}
28
29/// Apply a sync patch to a database via TxnManager.
30///
31/// Opens a write transaction, applies entries, and commits.
32/// For CRDT-aware patches: reads existing values and uses LWW merge.
33/// For non-CRDT patches: unconditionally writes all entries.
34pub fn apply_patch(manager: &TxnManager, patch: &SyncPatch) -> Result<ApplyResult> {
35    if patch.is_empty() {
36        return Ok(ApplyResult::empty());
37    }
38
39    let mut wtx = manager.begin_write()?;
40    let result = apply_patch_to_txn(&mut wtx, patch)?;
41    wtx.commit()?;
42    Ok(result)
43}
44
45/// Apply a sync patch within an existing write transaction.
46///
47/// The caller is responsible for committing or aborting the transaction.
48pub fn apply_patch_to_txn(wtx: &mut WriteTxn<'_>, patch: &SyncPatch) -> Result<ApplyResult> {
49    let mut result = ApplyResult::empty();
50
51    for entry in &patch.entries {
52        if patch.crdt_aware {
53            if let Some(ref remote_meta) = entry.crdt_meta {
54                // Check if key exists locally with CRDT metadata
55                let existing = wtx.get(&entry.key)?;
56                if let Some(local_data) = existing {
57                    if let Ok(local_decoded) = decode_lww_value(&local_data) {
58                        match lww_merge(&local_decoded.meta, remote_meta) {
59                            MergeResult::Local => {
60                                result.entries_skipped += 1;
61                                continue;
62                            }
63                            MergeResult::Equal => {
64                                result.entries_equal += 1;
65                                continue;
66                            }
67                            MergeResult::Remote => {
68                                // Remote wins — fall through to write
69                            }
70                        }
71                    }
72                    // Local value doesn't have valid CRDT header — remote wins
73                }
74                // Key doesn't exist locally — remote wins
75            }
76        }
77
78        // Write the entry (either non-CRDT unconditional, or CRDT remote-wins)
79        match entry.kind {
80            EntryKind::Put => {
81                wtx.insert(&entry.key, &entry.value)?;
82            }
83            EntryKind::Tombstone => {
84                // Tombstone: write the CRDT header as the value so it participates
85                // in future LWW merges. The key remains with a tombstone marker.
86                wtx.insert(&entry.key, &entry.value)?;
87            }
88        }
89        result.entries_applied += 1;
90    }
91
92    Ok(result)
93}
94
95/// Apply a sync patch to a named table, creating it if needed.
96pub fn apply_patch_to_table(
97    manager: &TxnManager,
98    table_name: &[u8],
99    patch: &SyncPatch,
100) -> Result<ApplyResult> {
101    if patch.is_empty() {
102        return Ok(ApplyResult::empty());
103    }
104
105    let mut wtx = manager.begin_write()?;
106    match wtx.create_table(table_name) {
107        Ok(()) => {}
108        Err(citadel_core::Error::TableAlreadyExists(_)) => {}
109        Err(e) => return Err(e),
110    }
111    let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
112    wtx.commit()?;
113    Ok(result)
114}
115
116/// Apply a sync patch to a named table within an existing write transaction.
117pub fn apply_patch_to_table_txn(
118    wtx: &mut WriteTxn<'_>,
119    table_name: &[u8],
120    patch: &SyncPatch,
121) -> Result<ApplyResult> {
122    let mut result = ApplyResult::empty();
123
124    for entry in &patch.entries {
125        if patch.crdt_aware {
126            if let Some(ref remote_meta) = entry.crdt_meta {
127                let existing = wtx.table_get(table_name, &entry.key)?;
128                if let Some(local_data) = existing {
129                    if let Ok(local_decoded) = decode_lww_value(&local_data) {
130                        match lww_merge(&local_decoded.meta, remote_meta) {
131                            MergeResult::Local => {
132                                result.entries_skipped += 1;
133                                continue;
134                            }
135                            MergeResult::Equal => {
136                                result.entries_equal += 1;
137                                continue;
138                            }
139                            MergeResult::Remote => {}
140                        }
141                    }
142                }
143            }
144        }
145
146        match entry.kind {
147            EntryKind::Put | EntryKind::Tombstone => {
148                wtx.table_insert(table_name, &entry.key, &entry.value)?;
149            }
150        }
151        result.entries_applied += 1;
152    }
153
154    Ok(result)
155}
156
157#[cfg(test)]
158mod tests {
159    use super::*;
160    use crate::crdt::{encode_lww_value, CrdtMeta, EntryKind};
161    use crate::hlc::HlcTimestamp;
162    use crate::node_id::NodeId;
163    use crate::patch::PatchEntry;
164
165    use citadel_core::constants::{DEK_SIZE, MAC_KEY_SIZE, MAC_SIZE};
166    use citadel_io::sync_io::SyncPageIO;
167
168    const SECOND: i64 = 1_000_000_000;
169
170    fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
171        CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
172    }
173
174    fn test_manager(path: &std::path::Path) -> TxnManager {
175        let file = std::fs::OpenOptions::new()
176            .read(true)
177            .write(true)
178            .create(true)
179            .truncate(true)
180            .open(path)
181            .unwrap();
182        let io = Box::new(SyncPageIO::new(file));
183        let dek = [0x42u8; DEK_SIZE];
184        let mac_key = [0x43u8; MAC_KEY_SIZE];
185        let dek_id = [0x44u8; MAC_SIZE];
186        TxnManager::create(io, dek, mac_key, 1, 0x1234, dek_id, 256).unwrap()
187    }
188
189    #[test]
190    fn apply_empty_patch() {
191        let dir = tempfile::tempdir().unwrap();
192        let mgr = test_manager(&dir.path().join("test.db"));
193        let patch = SyncPatch::empty(NodeId::from_u64(1));
194        let result = apply_patch(&mgr, &patch).unwrap();
195        assert_eq!(result, ApplyResult::empty());
196    }
197
198    #[test]
199    fn apply_non_crdt_unconditional() {
200        let dir = tempfile::tempdir().unwrap();
201        let mgr = test_manager(&dir.path().join("test.db"));
202
203        let mut wtx = mgr.begin_write().unwrap();
204        wtx.insert(b"key1", b"old-value").unwrap();
205        wtx.commit().unwrap();
206
207        let patch = SyncPatch {
208            source_node: NodeId::from_u64(1),
209            entries: vec![
210                PatchEntry {
211                    key: b"key1".to_vec(),
212                    value: b"new-value".to_vec(),
213                    kind: EntryKind::Put,
214                    crdt_meta: None,
215                },
216                PatchEntry {
217                    key: b"key2".to_vec(),
218                    value: b"brand-new".to_vec(),
219                    kind: EntryKind::Put,
220                    crdt_meta: None,
221                },
222            ],
223            crdt_aware: false,
224        };
225
226        let result = apply_patch(&mgr, &patch).unwrap();
227        assert_eq!(result.entries_applied, 2);
228
229        let mut rtx = mgr.begin_read();
230        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), b"new-value");
231        assert_eq!(rtx.get(b"key2").unwrap().unwrap(), b"brand-new");
232    }
233
234    #[test]
235    fn apply_crdt_remote_wins() {
236        let dir = tempfile::tempdir().unwrap();
237        let mgr = test_manager(&dir.path().join("test.db"));
238
239        let local_meta = meta(1000 * SECOND, 0, 1);
240        let remote_meta = meta(2000 * SECOND, 0, 2);
241
242        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
243        let mut wtx = mgr.begin_write().unwrap();
244        wtx.insert(b"key1", &local_val).unwrap();
245        wtx.commit().unwrap();
246
247        let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
248        let patch = SyncPatch {
249            source_node: NodeId::from_u64(2),
250            entries: vec![PatchEntry {
251                key: b"key1".to_vec(),
252                value: remote_val.clone(),
253                kind: EntryKind::Put,
254                crdt_meta: Some(remote_meta),
255            }],
256            crdt_aware: true,
257        };
258
259        let result = apply_patch(&mgr, &patch).unwrap();
260        assert_eq!(result.entries_applied, 1);
261        assert_eq!(result.entries_skipped, 0);
262
263        let mut rtx = mgr.begin_read();
264        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), remote_val);
265    }
266
267    #[test]
268    fn apply_crdt_local_wins() {
269        let dir = tempfile::tempdir().unwrap();
270        let mgr = test_manager(&dir.path().join("test.db"));
271
272        let local_meta = meta(2000 * SECOND, 0, 1);
273        let remote_meta = meta(1000 * SECOND, 0, 2);
274
275        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
276        let mut wtx = mgr.begin_write().unwrap();
277        wtx.insert(b"key1", &local_val).unwrap();
278        wtx.commit().unwrap();
279
280        let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
281        let patch = SyncPatch {
282            source_node: NodeId::from_u64(2),
283            entries: vec![PatchEntry {
284                key: b"key1".to_vec(),
285                value: remote_val,
286                kind: EntryKind::Put,
287                crdt_meta: Some(remote_meta),
288            }],
289            crdt_aware: true,
290        };
291
292        let result = apply_patch(&mgr, &patch).unwrap();
293        assert_eq!(result.entries_applied, 0);
294        assert_eq!(result.entries_skipped, 1);
295
296        let mut rtx = mgr.begin_read();
297        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), local_val);
298    }
299
300    #[test]
301    fn apply_crdt_equal() {
302        let dir = tempfile::tempdir().unwrap();
303        let mgr = test_manager(&dir.path().join("test.db"));
304
305        let m = meta(1000 * SECOND, 5, 42);
306        let val = encode_lww_value(&m, EntryKind::Put, b"same");
307
308        let mut wtx = mgr.begin_write().unwrap();
309        wtx.insert(b"key1", &val).unwrap();
310        wtx.commit().unwrap();
311
312        let patch = SyncPatch {
313            source_node: NodeId::from_u64(42),
314            entries: vec![PatchEntry {
315                key: b"key1".to_vec(),
316                value: val.clone(),
317                kind: EntryKind::Put,
318                crdt_meta: Some(m),
319            }],
320            crdt_aware: true,
321        };
322
323        let result = apply_patch(&mgr, &patch).unwrap();
324        assert_eq!(result.entries_equal, 1);
325        assert_eq!(result.entries_applied, 0);
326    }
327
328    #[test]
329    fn apply_crdt_new_key() {
330        let dir = tempfile::tempdir().unwrap();
331        let mgr = test_manager(&dir.path().join("test.db"));
332
333        let m = meta(1000 * SECOND, 0, 1);
334        let val = encode_lww_value(&m, EntryKind::Put, b"new");
335
336        let patch = SyncPatch {
337            source_node: NodeId::from_u64(1),
338            entries: vec![PatchEntry {
339                key: b"new-key".to_vec(),
340                value: val.clone(),
341                kind: EntryKind::Put,
342                crdt_meta: Some(m),
343            }],
344            crdt_aware: true,
345        };
346
347        let result = apply_patch(&mgr, &patch).unwrap();
348        assert_eq!(result.entries_applied, 1);
349
350        let mut rtx = mgr.begin_read();
351        assert_eq!(rtx.get(b"new-key").unwrap().unwrap(), val);
352    }
353
354    #[test]
355    fn apply_crdt_tombstone() {
356        let dir = tempfile::tempdir().unwrap();
357        let mgr = test_manager(&dir.path().join("test.db"));
358
359        let local_meta = meta(1000 * SECOND, 0, 1);
360        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"alive");
361        let mut wtx = mgr.begin_write().unwrap();
362        wtx.insert(b"key1", &local_val).unwrap();
363        wtx.commit().unwrap();
364
365        let remote_meta = meta(2000 * SECOND, 0, 2);
366        let tombstone_val = encode_lww_value(&remote_meta, EntryKind::Tombstone, b"");
367
368        let patch = SyncPatch {
369            source_node: NodeId::from_u64(2),
370            entries: vec![PatchEntry {
371                key: b"key1".to_vec(),
372                value: tombstone_val.clone(),
373                kind: EntryKind::Tombstone,
374                crdt_meta: Some(remote_meta),
375            }],
376            crdt_aware: true,
377        };
378
379        let result = apply_patch(&mgr, &patch).unwrap();
380        assert_eq!(result.entries_applied, 1);
381
382        let mut rtx = mgr.begin_read();
383        let stored = rtx.get(b"key1").unwrap().unwrap();
384        let decoded = decode_lww_value(&stored).unwrap();
385        assert_eq!(decoded.kind, EntryKind::Tombstone);
386    }
387
388    #[test]
389    fn apply_to_txn() {
390        let dir = tempfile::tempdir().unwrap();
391        let mgr = test_manager(&dir.path().join("test.db"));
392
393        let patch = SyncPatch {
394            source_node: NodeId::from_u64(1),
395            entries: vec![PatchEntry {
396                key: b"k".to_vec(),
397                value: b"v".to_vec(),
398                kind: EntryKind::Put,
399                crdt_meta: None,
400            }],
401            crdt_aware: false,
402        };
403
404        let mut wtx = mgr.begin_write().unwrap();
405        let result = apply_patch_to_txn(&mut wtx, &patch).unwrap();
406        assert_eq!(result.entries_applied, 1);
407        wtx.commit().unwrap();
408
409        let mut rtx = mgr.begin_read();
410        assert_eq!(rtx.get(b"k").unwrap().unwrap(), b"v");
411    }
412}