1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
use citadel_core::Result;
use citadel_txn::manager::TxnManager;
use citadel_txn::write_txn::WriteTxn;
use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
use crate::patch::SyncPatch;
/// Result of applying a sync patch to a database.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ApplyResult {
/// Entries written (remote won or key was new).
pub entries_applied: u64,
/// Entries skipped (local won LWW comparison).
pub entries_skipped: u64,
/// Entries where local and remote are identical (no-op).
pub entries_equal: u64,
}
impl ApplyResult {
pub fn empty() -> Self {
Self {
entries_applied: 0,
entries_skipped: 0,
entries_equal: 0,
}
}
}
/// Apply a sync patch to a database via TxnManager.
///
/// Opens a write transaction, applies entries, and commits.
/// For CRDT-aware patches: reads existing values and uses LWW merge.
/// For non-CRDT patches: unconditionally writes all entries.
pub fn apply_patch(manager: &TxnManager, patch: &SyncPatch) -> Result<ApplyResult> {
if patch.is_empty() {
return Ok(ApplyResult::empty());
}
let mut wtx = manager.begin_write()?;
let result = apply_patch_to_txn(&mut wtx, patch)?;
wtx.commit()?;
Ok(result)
}
/// Apply a sync patch within an existing write transaction.
///
/// The caller is responsible for committing or aborting the transaction.
pub fn apply_patch_to_txn(wtx: &mut WriteTxn<'_>, patch: &SyncPatch) -> Result<ApplyResult> {
let mut result = ApplyResult::empty();
for entry in &patch.entries {
if patch.crdt_aware {
if let Some(ref remote_meta) = entry.crdt_meta {
let existing = wtx.get(&entry.key)?;
if let Some(local_data) = existing {
if let Ok(local_decoded) = decode_lww_value(&local_data) {
match lww_merge(&local_decoded.meta, remote_meta) {
MergeResult::Local => {
result.entries_skipped += 1;
continue;
}
MergeResult::Equal => {
result.entries_equal += 1;
continue;
}
MergeResult::Remote => {
// Remote wins - fall through to write
}
}
}
// Local value doesn't have valid CRDT header - remote wins
}
// Key doesn't exist locally - remote wins
}
}
// Write the entry (either non-CRDT unconditional, or CRDT remote-wins)
match entry.kind {
EntryKind::Put => {
wtx.insert(&entry.key, &entry.value)?;
}
EntryKind::Tombstone => {
// Tombstone: write the CRDT header as the value so it participates
// in future LWW merges. The key remains with a tombstone marker.
wtx.insert(&entry.key, &entry.value)?;
}
}
result.entries_applied += 1;
}
Ok(result)
}
/// Apply a sync patch to a named table, creating it if needed.
pub fn apply_patch_to_table(
manager: &TxnManager,
table_name: &[u8],
patch: &SyncPatch,
) -> Result<ApplyResult> {
if patch.is_empty() {
return Ok(ApplyResult::empty());
}
let mut wtx = manager.begin_write()?;
match wtx.create_table(table_name) {
Ok(()) => {}
Err(citadel_core::Error::TableAlreadyExists(_)) => {}
Err(e) => return Err(e),
}
let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
wtx.commit()?;
Ok(result)
}
/// Apply a sync patch to a named table within an existing write transaction.
pub fn apply_patch_to_table_txn(
wtx: &mut WriteTxn<'_>,
table_name: &[u8],
patch: &SyncPatch,
) -> Result<ApplyResult> {
let mut result = ApplyResult::empty();
for entry in &patch.entries {
if patch.crdt_aware {
if let Some(ref remote_meta) = entry.crdt_meta {
let existing = wtx.table_get(table_name, &entry.key)?;
if let Some(local_data) = existing {
if let Ok(local_decoded) = decode_lww_value(&local_data) {
match lww_merge(&local_decoded.meta, remote_meta) {
MergeResult::Local => {
result.entries_skipped += 1;
continue;
}
MergeResult::Equal => {
result.entries_equal += 1;
continue;
}
MergeResult::Remote => {}
}
}
}
}
}
match entry.kind {
EntryKind::Put | EntryKind::Tombstone => {
wtx.table_insert(table_name, &entry.key, &entry.value)?;
}
}
result.entries_applied += 1;
}
Ok(result)
}
#[cfg(test)]
#[path = "apply_tests.rs"]
mod tests;