1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11 pub entries_applied: u64,
13 pub entries_skipped: u64,
15 pub entries_equal: u64,
17}
18
19impl ApplyResult {
20 pub fn empty() -> Self {
21 Self {
22 entries_applied: 0,
23 entries_skipped: 0,
24 entries_equal: 0,
25 }
26 }
27}
28
29pub fn apply_patch(manager: &TxnManager, patch: &SyncPatch) -> Result<ApplyResult> {
35 if patch.is_empty() {
36 return Ok(ApplyResult::empty());
37 }
38
39 let mut wtx = manager.begin_write()?;
40 let result = apply_patch_to_txn(&mut wtx, patch)?;
41 wtx.commit()?;
42 Ok(result)
43}
44
45pub fn apply_patch_to_txn(wtx: &mut WriteTxn<'_>, patch: &SyncPatch) -> Result<ApplyResult> {
49 let mut result = ApplyResult::empty();
50
51 for entry in &patch.entries {
52 if patch.crdt_aware {
53 if let Some(ref remote_meta) = entry.crdt_meta {
54 let existing = wtx.get(&entry.key)?;
55 if let Some(local_data) = existing {
56 if let Ok(local_decoded) = decode_lww_value(&local_data) {
57 match lww_merge(&local_decoded.meta, remote_meta) {
58 MergeResult::Local => {
59 result.entries_skipped += 1;
60 continue;
61 }
62 MergeResult::Equal => {
63 result.entries_equal += 1;
64 continue;
65 }
66 MergeResult::Remote => {
67 }
69 }
70 }
71 }
73 }
75 }
76
77 match entry.kind {
79 EntryKind::Put => {
80 wtx.insert(&entry.key, &entry.value)?;
81 }
82 EntryKind::Tombstone => {
83 wtx.insert(&entry.key, &entry.value)?;
86 }
87 }
88 result.entries_applied += 1;
89 }
90
91 Ok(result)
92}
93
94pub fn apply_patch_to_table(
96 manager: &TxnManager,
97 table_name: &[u8],
98 patch: &SyncPatch,
99) -> Result<ApplyResult> {
100 if patch.is_empty() {
101 return Ok(ApplyResult::empty());
102 }
103
104 let mut wtx = manager.begin_write()?;
105 match wtx.create_table(table_name) {
106 Ok(()) => {}
107 Err(citadel_core::Error::TableAlreadyExists(_)) => {}
108 Err(e) => return Err(e),
109 }
110 let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
111 wtx.commit()?;
112 Ok(result)
113}
114
115pub fn apply_patch_to_table_txn(
117 wtx: &mut WriteTxn<'_>,
118 table_name: &[u8],
119 patch: &SyncPatch,
120) -> Result<ApplyResult> {
121 let mut result = ApplyResult::empty();
122
123 for entry in &patch.entries {
124 if patch.crdt_aware {
125 if let Some(ref remote_meta) = entry.crdt_meta {
126 let existing = wtx.table_get(table_name, &entry.key)?;
127 if let Some(local_data) = existing {
128 if let Ok(local_decoded) = decode_lww_value(&local_data) {
129 match lww_merge(&local_decoded.meta, remote_meta) {
130 MergeResult::Local => {
131 result.entries_skipped += 1;
132 continue;
133 }
134 MergeResult::Equal => {
135 result.entries_equal += 1;
136 continue;
137 }
138 MergeResult::Remote => {}
139 }
140 }
141 }
142 }
143 }
144
145 match entry.kind {
146 EntryKind::Put | EntryKind::Tombstone => {
147 wtx.table_insert(table_name, &entry.key, &entry.value)?;
148 }
149 }
150 result.entries_applied += 1;
151 }
152
153 Ok(result)
154}
155
156#[cfg(test)]
157mod tests {
158 use super::*;
159 use crate::crdt::{encode_lww_value, CrdtMeta, EntryKind};
160 use crate::hlc::HlcTimestamp;
161 use crate::node_id::NodeId;
162 use crate::patch::PatchEntry;
163
164 use citadel_core::constants::{DEK_SIZE, MAC_KEY_SIZE, MAC_SIZE};
165 use citadel_io::mmap_io::MmapPageIO;
166
167 const SECOND: i64 = 1_000_000_000;
168
169 fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
170 CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
171 }
172
173 fn test_manager(path: &std::path::Path) -> TxnManager {
174 let file = std::fs::OpenOptions::new()
175 .read(true)
176 .write(true)
177 .create(true)
178 .truncate(true)
179 .open(path)
180 .unwrap();
181 let io = Box::new(MmapPageIO::try_new(file).unwrap());
182 let dek = [0x42u8; DEK_SIZE];
183 let mac_key = [0x43u8; MAC_KEY_SIZE];
184 let dek_id = [0x44u8; MAC_SIZE];
185 TxnManager::create(io, dek, mac_key, 1, 0x1234, dek_id, 256).unwrap()
186 }
187
188 #[test]
189 fn apply_empty_patch() {
190 let dir = tempfile::tempdir().unwrap();
191 let mgr = test_manager(&dir.path().join("test.db"));
192 let patch = SyncPatch::empty(NodeId::from_u64(1));
193 let result = apply_patch(&mgr, &patch).unwrap();
194 assert_eq!(result, ApplyResult::empty());
195 }
196
197 #[test]
198 fn apply_non_crdt_unconditional() {
199 let dir = tempfile::tempdir().unwrap();
200 let mgr = test_manager(&dir.path().join("test.db"));
201
202 let mut wtx = mgr.begin_write().unwrap();
203 wtx.insert(b"key1", b"old-value").unwrap();
204 wtx.commit().unwrap();
205
206 let patch = SyncPatch {
207 source_node: NodeId::from_u64(1),
208 entries: vec![
209 PatchEntry {
210 key: b"key1".to_vec(),
211 value: b"new-value".to_vec(),
212 kind: EntryKind::Put,
213 crdt_meta: None,
214 },
215 PatchEntry {
216 key: b"key2".to_vec(),
217 value: b"brand-new".to_vec(),
218 kind: EntryKind::Put,
219 crdt_meta: None,
220 },
221 ],
222 crdt_aware: false,
223 };
224
225 let result = apply_patch(&mgr, &patch).unwrap();
226 assert_eq!(result.entries_applied, 2);
227
228 let mut rtx = mgr.begin_read();
229 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), b"new-value");
230 assert_eq!(rtx.get(b"key2").unwrap().unwrap(), b"brand-new");
231 }
232
233 #[test]
234 fn apply_crdt_remote_wins() {
235 let dir = tempfile::tempdir().unwrap();
236 let mgr = test_manager(&dir.path().join("test.db"));
237
238 let local_meta = meta(1000 * SECOND, 0, 1);
239 let remote_meta = meta(2000 * SECOND, 0, 2);
240
241 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
242 let mut wtx = mgr.begin_write().unwrap();
243 wtx.insert(b"key1", &local_val).unwrap();
244 wtx.commit().unwrap();
245
246 let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
247 let patch = SyncPatch {
248 source_node: NodeId::from_u64(2),
249 entries: vec![PatchEntry {
250 key: b"key1".to_vec(),
251 value: remote_val.clone(),
252 kind: EntryKind::Put,
253 crdt_meta: Some(remote_meta),
254 }],
255 crdt_aware: true,
256 };
257
258 let result = apply_patch(&mgr, &patch).unwrap();
259 assert_eq!(result.entries_applied, 1);
260 assert_eq!(result.entries_skipped, 0);
261
262 let mut rtx = mgr.begin_read();
263 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), remote_val);
264 }
265
266 #[test]
267 fn apply_crdt_local_wins() {
268 let dir = tempfile::tempdir().unwrap();
269 let mgr = test_manager(&dir.path().join("test.db"));
270
271 let local_meta = meta(2000 * SECOND, 0, 1);
272 let remote_meta = meta(1000 * SECOND, 0, 2);
273
274 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
275 let mut wtx = mgr.begin_write().unwrap();
276 wtx.insert(b"key1", &local_val).unwrap();
277 wtx.commit().unwrap();
278
279 let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
280 let patch = SyncPatch {
281 source_node: NodeId::from_u64(2),
282 entries: vec![PatchEntry {
283 key: b"key1".to_vec(),
284 value: remote_val,
285 kind: EntryKind::Put,
286 crdt_meta: Some(remote_meta),
287 }],
288 crdt_aware: true,
289 };
290
291 let result = apply_patch(&mgr, &patch).unwrap();
292 assert_eq!(result.entries_applied, 0);
293 assert_eq!(result.entries_skipped, 1);
294
295 let mut rtx = mgr.begin_read();
296 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), local_val);
297 }
298
299 #[test]
300 fn apply_crdt_equal() {
301 let dir = tempfile::tempdir().unwrap();
302 let mgr = test_manager(&dir.path().join("test.db"));
303
304 let m = meta(1000 * SECOND, 5, 42);
305 let val = encode_lww_value(&m, EntryKind::Put, b"same");
306
307 let mut wtx = mgr.begin_write().unwrap();
308 wtx.insert(b"key1", &val).unwrap();
309 wtx.commit().unwrap();
310
311 let patch = SyncPatch {
312 source_node: NodeId::from_u64(42),
313 entries: vec![PatchEntry {
314 key: b"key1".to_vec(),
315 value: val.clone(),
316 kind: EntryKind::Put,
317 crdt_meta: Some(m),
318 }],
319 crdt_aware: true,
320 };
321
322 let result = apply_patch(&mgr, &patch).unwrap();
323 assert_eq!(result.entries_equal, 1);
324 assert_eq!(result.entries_applied, 0);
325 }
326
327 #[test]
328 fn apply_crdt_new_key() {
329 let dir = tempfile::tempdir().unwrap();
330 let mgr = test_manager(&dir.path().join("test.db"));
331
332 let m = meta(1000 * SECOND, 0, 1);
333 let val = encode_lww_value(&m, EntryKind::Put, b"new");
334
335 let patch = SyncPatch {
336 source_node: NodeId::from_u64(1),
337 entries: vec![PatchEntry {
338 key: b"new-key".to_vec(),
339 value: val.clone(),
340 kind: EntryKind::Put,
341 crdt_meta: Some(m),
342 }],
343 crdt_aware: true,
344 };
345
346 let result = apply_patch(&mgr, &patch).unwrap();
347 assert_eq!(result.entries_applied, 1);
348
349 let mut rtx = mgr.begin_read();
350 assert_eq!(rtx.get(b"new-key").unwrap().unwrap(), val);
351 }
352
353 #[test]
354 fn apply_crdt_tombstone() {
355 let dir = tempfile::tempdir().unwrap();
356 let mgr = test_manager(&dir.path().join("test.db"));
357
358 let local_meta = meta(1000 * SECOND, 0, 1);
359 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"alive");
360 let mut wtx = mgr.begin_write().unwrap();
361 wtx.insert(b"key1", &local_val).unwrap();
362 wtx.commit().unwrap();
363
364 let remote_meta = meta(2000 * SECOND, 0, 2);
365 let tombstone_val = encode_lww_value(&remote_meta, EntryKind::Tombstone, b"");
366
367 let patch = SyncPatch {
368 source_node: NodeId::from_u64(2),
369 entries: vec![PatchEntry {
370 key: b"key1".to_vec(),
371 value: tombstone_val.clone(),
372 kind: EntryKind::Tombstone,
373 crdt_meta: Some(remote_meta),
374 }],
375 crdt_aware: true,
376 };
377
378 let result = apply_patch(&mgr, &patch).unwrap();
379 assert_eq!(result.entries_applied, 1);
380
381 let mut rtx = mgr.begin_read();
382 let stored = rtx.get(b"key1").unwrap().unwrap();
383 let decoded = decode_lww_value(&stored).unwrap();
384 assert_eq!(decoded.kind, EntryKind::Tombstone);
385 }
386
387 #[test]
388 fn apply_to_txn() {
389 let dir = tempfile::tempdir().unwrap();
390 let mgr = test_manager(&dir.path().join("test.db"));
391
392 let patch = SyncPatch {
393 source_node: NodeId::from_u64(1),
394 entries: vec![PatchEntry {
395 key: b"k".to_vec(),
396 value: b"v".to_vec(),
397 kind: EntryKind::Put,
398 crdt_meta: None,
399 }],
400 crdt_aware: false,
401 };
402
403 let mut wtx = mgr.begin_write().unwrap();
404 let result = apply_patch_to_txn(&mut wtx, &patch).unwrap();
405 assert_eq!(result.entries_applied, 1);
406 wtx.commit().unwrap();
407
408 let mut rtx = mgr.begin_read();
409 assert_eq!(rtx.get(b"k").unwrap().unwrap(), b"v");
410 }
411}