Skip to main content

citadeldb_sync/
apply.rs

1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8/// Result of applying a sync patch to a database.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11    /// Entries written (remote won or key was new).
12    pub entries_applied: u64,
13    /// Entries skipped (local won LWW comparison).
14    pub entries_skipped: u64,
15    /// Entries where local and remote are identical (no-op).
16    pub entries_equal: u64,
17}
18
19impl ApplyResult {
20    pub fn empty() -> Self {
21        Self {
22            entries_applied: 0,
23            entries_skipped: 0,
24            entries_equal: 0,
25        }
26    }
27}
28
29/// Apply a sync patch to a database via TxnManager.
30///
31/// Opens a write transaction, applies entries, and commits.
32/// For CRDT-aware patches: reads existing values and uses LWW merge.
33/// For non-CRDT patches: unconditionally writes all entries.
34pub fn apply_patch(
35    manager: &TxnManager,
36    patch: &SyncPatch,
37) -> Result<ApplyResult> {
38    if patch.is_empty() {
39        return Ok(ApplyResult::empty());
40    }
41
42    let mut wtx = manager.begin_write()?;
43    let result = apply_patch_to_txn(&mut wtx, patch)?;
44    wtx.commit()?;
45    Ok(result)
46}
47
48/// Apply a sync patch within an existing write transaction.
49///
50/// The caller is responsible for committing or aborting the transaction.
51pub fn apply_patch_to_txn(
52    wtx: &mut WriteTxn<'_>,
53    patch: &SyncPatch,
54) -> Result<ApplyResult> {
55    let mut result = ApplyResult::empty();
56
57    for entry in &patch.entries {
58        if patch.crdt_aware {
59            if let Some(ref remote_meta) = entry.crdt_meta {
60                // Check if key exists locally with CRDT metadata
61                let existing = wtx.get(&entry.key)?;
62                if let Some(local_data) = existing {
63                    if let Ok(local_decoded) = decode_lww_value(&local_data) {
64                        match lww_merge(&local_decoded.meta, remote_meta) {
65                            MergeResult::Local => {
66                                result.entries_skipped += 1;
67                                continue;
68                            }
69                            MergeResult::Equal => {
70                                result.entries_equal += 1;
71                                continue;
72                            }
73                            MergeResult::Remote => {
74                                // Remote wins — fall through to write
75                            }
76                        }
77                    }
78                    // Local value doesn't have valid CRDT header — remote wins
79                }
80                // Key doesn't exist locally — remote wins
81            }
82        }
83
84        // Write the entry (either non-CRDT unconditional, or CRDT remote-wins)
85        match entry.kind {
86            EntryKind::Put => {
87                wtx.insert(&entry.key, &entry.value)?;
88            }
89            EntryKind::Tombstone => {
90                // Tombstone: write the CRDT header as the value so it participates
91                // in future LWW merges. The key remains with a tombstone marker.
92                wtx.insert(&entry.key, &entry.value)?;
93            }
94        }
95        result.entries_applied += 1;
96    }
97
98    Ok(result)
99}
100
101/// Apply a sync patch to a named table, creating it if needed.
102pub fn apply_patch_to_table(
103    manager: &TxnManager,
104    table_name: &[u8],
105    patch: &SyncPatch,
106) -> Result<ApplyResult> {
107    if patch.is_empty() {
108        return Ok(ApplyResult::empty());
109    }
110
111    let mut wtx = manager.begin_write()?;
112    match wtx.create_table(table_name) {
113        Ok(()) => {}
114        Err(citadel_core::Error::TableAlreadyExists(_)) => {}
115        Err(e) => return Err(e),
116    }
117    let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
118    wtx.commit()?;
119    Ok(result)
120}
121
122/// Apply a sync patch to a named table within an existing write transaction.
123pub fn apply_patch_to_table_txn(
124    wtx: &mut WriteTxn<'_>,
125    table_name: &[u8],
126    patch: &SyncPatch,
127) -> Result<ApplyResult> {
128    let mut result = ApplyResult::empty();
129
130    for entry in &patch.entries {
131        if patch.crdt_aware {
132            if let Some(ref remote_meta) = entry.crdt_meta {
133                let existing = wtx.table_get(table_name, &entry.key)?;
134                if let Some(local_data) = existing {
135                    if let Ok(local_decoded) = decode_lww_value(&local_data) {
136                        match lww_merge(&local_decoded.meta, remote_meta) {
137                            MergeResult::Local => {
138                                result.entries_skipped += 1;
139                                continue;
140                            }
141                            MergeResult::Equal => {
142                                result.entries_equal += 1;
143                                continue;
144                            }
145                            MergeResult::Remote => {}
146                        }
147                    }
148                }
149            }
150        }
151
152        match entry.kind {
153            EntryKind::Put | EntryKind::Tombstone => {
154                wtx.table_insert(table_name, &entry.key, &entry.value)?;
155            }
156        }
157        result.entries_applied += 1;
158    }
159
160    Ok(result)
161}
162
163#[cfg(test)]
164mod tests {
165    use super::*;
166    use crate::crdt::{CrdtMeta, EntryKind, encode_lww_value};
167    use crate::hlc::HlcTimestamp;
168    use crate::node_id::NodeId;
169    use crate::patch::PatchEntry;
170
171    use citadel_core::constants::{DEK_SIZE, MAC_KEY_SIZE, MAC_SIZE};
172    use citadel_io::sync_io::SyncPageIO;
173
174    const SECOND: i64 = 1_000_000_000;
175
176    fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
177        CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
178    }
179
180    fn test_manager(path: &std::path::Path) -> TxnManager {
181        let file = std::fs::OpenOptions::new()
182            .read(true)
183            .write(true)
184            .create(true)
185            .truncate(true)
186            .open(path)
187            .unwrap();
188        let io = Box::new(SyncPageIO::new(file));
189        let dek = [0x42u8; DEK_SIZE];
190        let mac_key = [0x43u8; MAC_KEY_SIZE];
191        let dek_id = [0x44u8; MAC_SIZE];
192        TxnManager::create(io, dek, mac_key, 1, 0x1234, dek_id, 256).unwrap()
193    }
194
195    #[test]
196    fn apply_empty_patch() {
197        let dir = tempfile::tempdir().unwrap();
198        let mgr = test_manager(&dir.path().join("test.db"));
199        let patch = SyncPatch::empty(NodeId::from_u64(1));
200        let result = apply_patch(&mgr, &patch).unwrap();
201        assert_eq!(result, ApplyResult::empty());
202    }
203
204    #[test]
205    fn apply_non_crdt_unconditional() {
206        let dir = tempfile::tempdir().unwrap();
207        let mgr = test_manager(&dir.path().join("test.db"));
208
209        let mut wtx = mgr.begin_write().unwrap();
210        wtx.insert(b"key1", b"old-value").unwrap();
211        wtx.commit().unwrap();
212
213        let patch = SyncPatch {
214            source_node: NodeId::from_u64(1),
215            entries: vec![
216                PatchEntry {
217                    key: b"key1".to_vec(),
218                    value: b"new-value".to_vec(),
219                    kind: EntryKind::Put,
220                    crdt_meta: None,
221                },
222                PatchEntry {
223                    key: b"key2".to_vec(),
224                    value: b"brand-new".to_vec(),
225                    kind: EntryKind::Put,
226                    crdt_meta: None,
227                },
228            ],
229            crdt_aware: false,
230        };
231
232        let result = apply_patch(&mgr, &patch).unwrap();
233        assert_eq!(result.entries_applied, 2);
234
235        let mut rtx = mgr.begin_read();
236        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), b"new-value");
237        assert_eq!(rtx.get(b"key2").unwrap().unwrap(), b"brand-new");
238    }
239
240    #[test]
241    fn apply_crdt_remote_wins() {
242        let dir = tempfile::tempdir().unwrap();
243        let mgr = test_manager(&dir.path().join("test.db"));
244
245        let local_meta = meta(1000 * SECOND, 0, 1);
246        let remote_meta = meta(2000 * SECOND, 0, 2);
247
248        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
249        let mut wtx = mgr.begin_write().unwrap();
250        wtx.insert(b"key1", &local_val).unwrap();
251        wtx.commit().unwrap();
252
253        let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
254        let patch = SyncPatch {
255            source_node: NodeId::from_u64(2),
256            entries: vec![PatchEntry {
257                key: b"key1".to_vec(),
258                value: remote_val.clone(),
259                kind: EntryKind::Put,
260                crdt_meta: Some(remote_meta),
261            }],
262            crdt_aware: true,
263        };
264
265        let result = apply_patch(&mgr, &patch).unwrap();
266        assert_eq!(result.entries_applied, 1);
267        assert_eq!(result.entries_skipped, 0);
268
269        let mut rtx = mgr.begin_read();
270        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), remote_val);
271    }
272
273    #[test]
274    fn apply_crdt_local_wins() {
275        let dir = tempfile::tempdir().unwrap();
276        let mgr = test_manager(&dir.path().join("test.db"));
277
278        let local_meta = meta(2000 * SECOND, 0, 1);
279        let remote_meta = meta(1000 * SECOND, 0, 2);
280
281        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
282        let mut wtx = mgr.begin_write().unwrap();
283        wtx.insert(b"key1", &local_val).unwrap();
284        wtx.commit().unwrap();
285
286        let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
287        let patch = SyncPatch {
288            source_node: NodeId::from_u64(2),
289            entries: vec![PatchEntry {
290                key: b"key1".to_vec(),
291                value: remote_val,
292                kind: EntryKind::Put,
293                crdt_meta: Some(remote_meta),
294            }],
295            crdt_aware: true,
296        };
297
298        let result = apply_patch(&mgr, &patch).unwrap();
299        assert_eq!(result.entries_applied, 0);
300        assert_eq!(result.entries_skipped, 1);
301
302        let mut rtx = mgr.begin_read();
303        assert_eq!(rtx.get(b"key1").unwrap().unwrap(), local_val);
304    }
305
306    #[test]
307    fn apply_crdt_equal() {
308        let dir = tempfile::tempdir().unwrap();
309        let mgr = test_manager(&dir.path().join("test.db"));
310
311        let m = meta(1000 * SECOND, 5, 42);
312        let val = encode_lww_value(&m, EntryKind::Put, b"same");
313
314        let mut wtx = mgr.begin_write().unwrap();
315        wtx.insert(b"key1", &val).unwrap();
316        wtx.commit().unwrap();
317
318        let patch = SyncPatch {
319            source_node: NodeId::from_u64(42),
320            entries: vec![PatchEntry {
321                key: b"key1".to_vec(),
322                value: val.clone(),
323                kind: EntryKind::Put,
324                crdt_meta: Some(m),
325            }],
326            crdt_aware: true,
327        };
328
329        let result = apply_patch(&mgr, &patch).unwrap();
330        assert_eq!(result.entries_equal, 1);
331        assert_eq!(result.entries_applied, 0);
332    }
333
334    #[test]
335    fn apply_crdt_new_key() {
336        let dir = tempfile::tempdir().unwrap();
337        let mgr = test_manager(&dir.path().join("test.db"));
338
339        let m = meta(1000 * SECOND, 0, 1);
340        let val = encode_lww_value(&m, EntryKind::Put, b"new");
341
342        let patch = SyncPatch {
343            source_node: NodeId::from_u64(1),
344            entries: vec![PatchEntry {
345                key: b"new-key".to_vec(),
346                value: val.clone(),
347                kind: EntryKind::Put,
348                crdt_meta: Some(m),
349            }],
350            crdt_aware: true,
351        };
352
353        let result = apply_patch(&mgr, &patch).unwrap();
354        assert_eq!(result.entries_applied, 1);
355
356        let mut rtx = mgr.begin_read();
357        assert_eq!(rtx.get(b"new-key").unwrap().unwrap(), val);
358    }
359
360    #[test]
361    fn apply_crdt_tombstone() {
362        let dir = tempfile::tempdir().unwrap();
363        let mgr = test_manager(&dir.path().join("test.db"));
364
365        let local_meta = meta(1000 * SECOND, 0, 1);
366        let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"alive");
367        let mut wtx = mgr.begin_write().unwrap();
368        wtx.insert(b"key1", &local_val).unwrap();
369        wtx.commit().unwrap();
370
371        let remote_meta = meta(2000 * SECOND, 0, 2);
372        let tombstone_val = encode_lww_value(&remote_meta, EntryKind::Tombstone, b"");
373
374        let patch = SyncPatch {
375            source_node: NodeId::from_u64(2),
376            entries: vec![PatchEntry {
377                key: b"key1".to_vec(),
378                value: tombstone_val.clone(),
379                kind: EntryKind::Tombstone,
380                crdt_meta: Some(remote_meta),
381            }],
382            crdt_aware: true,
383        };
384
385        let result = apply_patch(&mgr, &patch).unwrap();
386        assert_eq!(result.entries_applied, 1);
387
388        let mut rtx = mgr.begin_read();
389        let stored = rtx.get(b"key1").unwrap().unwrap();
390        let decoded = decode_lww_value(&stored).unwrap();
391        assert_eq!(decoded.kind, EntryKind::Tombstone);
392    }
393
394    #[test]
395    fn apply_to_txn() {
396        let dir = tempfile::tempdir().unwrap();
397        let mgr = test_manager(&dir.path().join("test.db"));
398
399        let patch = SyncPatch {
400            source_node: NodeId::from_u64(1),
401            entries: vec![PatchEntry {
402                key: b"k".to_vec(),
403                value: b"v".to_vec(),
404                kind: EntryKind::Put,
405                crdt_meta: None,
406            }],
407            crdt_aware: false,
408        };
409
410        let mut wtx = mgr.begin_write().unwrap();
411        let result = apply_patch_to_txn(&mut wtx, &patch).unwrap();
412        assert_eq!(result.entries_applied, 1);
413        wtx.commit().unwrap();
414
415        let mut rtx = mgr.begin_read();
416        assert_eq!(rtx.get(b"k").unwrap().unwrap(), b"v");
417    }
418}