1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11 pub entries_applied: u64,
13 pub entries_skipped: u64,
15 pub entries_equal: u64,
17}
18
19impl ApplyResult {
20 pub fn empty() -> Self {
21 Self {
22 entries_applied: 0,
23 entries_skipped: 0,
24 entries_equal: 0,
25 }
26 }
27}
28
29pub fn apply_patch(
35 manager: &TxnManager,
36 patch: &SyncPatch,
37) -> Result<ApplyResult> {
38 if patch.is_empty() {
39 return Ok(ApplyResult::empty());
40 }
41
42 let mut wtx = manager.begin_write()?;
43 let result = apply_patch_to_txn(&mut wtx, patch)?;
44 wtx.commit()?;
45 Ok(result)
46}
47
48pub fn apply_patch_to_txn(
52 wtx: &mut WriteTxn<'_>,
53 patch: &SyncPatch,
54) -> Result<ApplyResult> {
55 let mut result = ApplyResult::empty();
56
57 for entry in &patch.entries {
58 if patch.crdt_aware {
59 if let Some(ref remote_meta) = entry.crdt_meta {
60 let existing = wtx.get(&entry.key)?;
62 if let Some(local_data) = existing {
63 if let Ok(local_decoded) = decode_lww_value(&local_data) {
64 match lww_merge(&local_decoded.meta, remote_meta) {
65 MergeResult::Local => {
66 result.entries_skipped += 1;
67 continue;
68 }
69 MergeResult::Equal => {
70 result.entries_equal += 1;
71 continue;
72 }
73 MergeResult::Remote => {
74 }
76 }
77 }
78 }
80 }
82 }
83
84 match entry.kind {
86 EntryKind::Put => {
87 wtx.insert(&entry.key, &entry.value)?;
88 }
89 EntryKind::Tombstone => {
90 wtx.insert(&entry.key, &entry.value)?;
93 }
94 }
95 result.entries_applied += 1;
96 }
97
98 Ok(result)
99}
100
101pub fn apply_patch_to_table(
103 manager: &TxnManager,
104 table_name: &[u8],
105 patch: &SyncPatch,
106) -> Result<ApplyResult> {
107 if patch.is_empty() {
108 return Ok(ApplyResult::empty());
109 }
110
111 let mut wtx = manager.begin_write()?;
112 match wtx.create_table(table_name) {
113 Ok(()) => {}
114 Err(citadel_core::Error::TableAlreadyExists(_)) => {}
115 Err(e) => return Err(e),
116 }
117 let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
118 wtx.commit()?;
119 Ok(result)
120}
121
122pub fn apply_patch_to_table_txn(
124 wtx: &mut WriteTxn<'_>,
125 table_name: &[u8],
126 patch: &SyncPatch,
127) -> Result<ApplyResult> {
128 let mut result = ApplyResult::empty();
129
130 for entry in &patch.entries {
131 if patch.crdt_aware {
132 if let Some(ref remote_meta) = entry.crdt_meta {
133 let existing = wtx.table_get(table_name, &entry.key)?;
134 if let Some(local_data) = existing {
135 if let Ok(local_decoded) = decode_lww_value(&local_data) {
136 match lww_merge(&local_decoded.meta, remote_meta) {
137 MergeResult::Local => {
138 result.entries_skipped += 1;
139 continue;
140 }
141 MergeResult::Equal => {
142 result.entries_equal += 1;
143 continue;
144 }
145 MergeResult::Remote => {}
146 }
147 }
148 }
149 }
150 }
151
152 match entry.kind {
153 EntryKind::Put | EntryKind::Tombstone => {
154 wtx.table_insert(table_name, &entry.key, &entry.value)?;
155 }
156 }
157 result.entries_applied += 1;
158 }
159
160 Ok(result)
161}
162
163#[cfg(test)]
164mod tests {
165 use super::*;
166 use crate::crdt::{CrdtMeta, EntryKind, encode_lww_value};
167 use crate::hlc::HlcTimestamp;
168 use crate::node_id::NodeId;
169 use crate::patch::PatchEntry;
170
171 use citadel_core::constants::{DEK_SIZE, MAC_KEY_SIZE, MAC_SIZE};
172 use citadel_io::sync_io::SyncPageIO;
173
174 const SECOND: i64 = 1_000_000_000;
175
176 fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
177 CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
178 }
179
180 fn test_manager(path: &std::path::Path) -> TxnManager {
181 let file = std::fs::OpenOptions::new()
182 .read(true)
183 .write(true)
184 .create(true)
185 .truncate(true)
186 .open(path)
187 .unwrap();
188 let io = Box::new(SyncPageIO::new(file));
189 let dek = [0x42u8; DEK_SIZE];
190 let mac_key = [0x43u8; MAC_KEY_SIZE];
191 let dek_id = [0x44u8; MAC_SIZE];
192 TxnManager::create(io, dek, mac_key, 1, 0x1234, dek_id, 256).unwrap()
193 }
194
195 #[test]
196 fn apply_empty_patch() {
197 let dir = tempfile::tempdir().unwrap();
198 let mgr = test_manager(&dir.path().join("test.db"));
199 let patch = SyncPatch::empty(NodeId::from_u64(1));
200 let result = apply_patch(&mgr, &patch).unwrap();
201 assert_eq!(result, ApplyResult::empty());
202 }
203
204 #[test]
205 fn apply_non_crdt_unconditional() {
206 let dir = tempfile::tempdir().unwrap();
207 let mgr = test_manager(&dir.path().join("test.db"));
208
209 let mut wtx = mgr.begin_write().unwrap();
210 wtx.insert(b"key1", b"old-value").unwrap();
211 wtx.commit().unwrap();
212
213 let patch = SyncPatch {
214 source_node: NodeId::from_u64(1),
215 entries: vec![
216 PatchEntry {
217 key: b"key1".to_vec(),
218 value: b"new-value".to_vec(),
219 kind: EntryKind::Put,
220 crdt_meta: None,
221 },
222 PatchEntry {
223 key: b"key2".to_vec(),
224 value: b"brand-new".to_vec(),
225 kind: EntryKind::Put,
226 crdt_meta: None,
227 },
228 ],
229 crdt_aware: false,
230 };
231
232 let result = apply_patch(&mgr, &patch).unwrap();
233 assert_eq!(result.entries_applied, 2);
234
235 let mut rtx = mgr.begin_read();
236 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), b"new-value");
237 assert_eq!(rtx.get(b"key2").unwrap().unwrap(), b"brand-new");
238 }
239
240 #[test]
241 fn apply_crdt_remote_wins() {
242 let dir = tempfile::tempdir().unwrap();
243 let mgr = test_manager(&dir.path().join("test.db"));
244
245 let local_meta = meta(1000 * SECOND, 0, 1);
246 let remote_meta = meta(2000 * SECOND, 0, 2);
247
248 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
249 let mut wtx = mgr.begin_write().unwrap();
250 wtx.insert(b"key1", &local_val).unwrap();
251 wtx.commit().unwrap();
252
253 let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
254 let patch = SyncPatch {
255 source_node: NodeId::from_u64(2),
256 entries: vec![PatchEntry {
257 key: b"key1".to_vec(),
258 value: remote_val.clone(),
259 kind: EntryKind::Put,
260 crdt_meta: Some(remote_meta),
261 }],
262 crdt_aware: true,
263 };
264
265 let result = apply_patch(&mgr, &patch).unwrap();
266 assert_eq!(result.entries_applied, 1);
267 assert_eq!(result.entries_skipped, 0);
268
269 let mut rtx = mgr.begin_read();
270 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), remote_val);
271 }
272
273 #[test]
274 fn apply_crdt_local_wins() {
275 let dir = tempfile::tempdir().unwrap();
276 let mgr = test_manager(&dir.path().join("test.db"));
277
278 let local_meta = meta(2000 * SECOND, 0, 1);
279 let remote_meta = meta(1000 * SECOND, 0, 2);
280
281 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
282 let mut wtx = mgr.begin_write().unwrap();
283 wtx.insert(b"key1", &local_val).unwrap();
284 wtx.commit().unwrap();
285
286 let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
287 let patch = SyncPatch {
288 source_node: NodeId::from_u64(2),
289 entries: vec![PatchEntry {
290 key: b"key1".to_vec(),
291 value: remote_val,
292 kind: EntryKind::Put,
293 crdt_meta: Some(remote_meta),
294 }],
295 crdt_aware: true,
296 };
297
298 let result = apply_patch(&mgr, &patch).unwrap();
299 assert_eq!(result.entries_applied, 0);
300 assert_eq!(result.entries_skipped, 1);
301
302 let mut rtx = mgr.begin_read();
303 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), local_val);
304 }
305
306 #[test]
307 fn apply_crdt_equal() {
308 let dir = tempfile::tempdir().unwrap();
309 let mgr = test_manager(&dir.path().join("test.db"));
310
311 let m = meta(1000 * SECOND, 5, 42);
312 let val = encode_lww_value(&m, EntryKind::Put, b"same");
313
314 let mut wtx = mgr.begin_write().unwrap();
315 wtx.insert(b"key1", &val).unwrap();
316 wtx.commit().unwrap();
317
318 let patch = SyncPatch {
319 source_node: NodeId::from_u64(42),
320 entries: vec![PatchEntry {
321 key: b"key1".to_vec(),
322 value: val.clone(),
323 kind: EntryKind::Put,
324 crdt_meta: Some(m),
325 }],
326 crdt_aware: true,
327 };
328
329 let result = apply_patch(&mgr, &patch).unwrap();
330 assert_eq!(result.entries_equal, 1);
331 assert_eq!(result.entries_applied, 0);
332 }
333
334 #[test]
335 fn apply_crdt_new_key() {
336 let dir = tempfile::tempdir().unwrap();
337 let mgr = test_manager(&dir.path().join("test.db"));
338
339 let m = meta(1000 * SECOND, 0, 1);
340 let val = encode_lww_value(&m, EntryKind::Put, b"new");
341
342 let patch = SyncPatch {
343 source_node: NodeId::from_u64(1),
344 entries: vec![PatchEntry {
345 key: b"new-key".to_vec(),
346 value: val.clone(),
347 kind: EntryKind::Put,
348 crdt_meta: Some(m),
349 }],
350 crdt_aware: true,
351 };
352
353 let result = apply_patch(&mgr, &patch).unwrap();
354 assert_eq!(result.entries_applied, 1);
355
356 let mut rtx = mgr.begin_read();
357 assert_eq!(rtx.get(b"new-key").unwrap().unwrap(), val);
358 }
359
360 #[test]
361 fn apply_crdt_tombstone() {
362 let dir = tempfile::tempdir().unwrap();
363 let mgr = test_manager(&dir.path().join("test.db"));
364
365 let local_meta = meta(1000 * SECOND, 0, 1);
366 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"alive");
367 let mut wtx = mgr.begin_write().unwrap();
368 wtx.insert(b"key1", &local_val).unwrap();
369 wtx.commit().unwrap();
370
371 let remote_meta = meta(2000 * SECOND, 0, 2);
372 let tombstone_val = encode_lww_value(&remote_meta, EntryKind::Tombstone, b"");
373
374 let patch = SyncPatch {
375 source_node: NodeId::from_u64(2),
376 entries: vec![PatchEntry {
377 key: b"key1".to_vec(),
378 value: tombstone_val.clone(),
379 kind: EntryKind::Tombstone,
380 crdt_meta: Some(remote_meta),
381 }],
382 crdt_aware: true,
383 };
384
385 let result = apply_patch(&mgr, &patch).unwrap();
386 assert_eq!(result.entries_applied, 1);
387
388 let mut rtx = mgr.begin_read();
389 let stored = rtx.get(b"key1").unwrap().unwrap();
390 let decoded = decode_lww_value(&stored).unwrap();
391 assert_eq!(decoded.kind, EntryKind::Tombstone);
392 }
393
394 #[test]
395 fn apply_to_txn() {
396 let dir = tempfile::tempdir().unwrap();
397 let mgr = test_manager(&dir.path().join("test.db"));
398
399 let patch = SyncPatch {
400 source_node: NodeId::from_u64(1),
401 entries: vec![PatchEntry {
402 key: b"k".to_vec(),
403 value: b"v".to_vec(),
404 kind: EntryKind::Put,
405 crdt_meta: None,
406 }],
407 crdt_aware: false,
408 };
409
410 let mut wtx = mgr.begin_write().unwrap();
411 let result = apply_patch_to_txn(&mut wtx, &patch).unwrap();
412 assert_eq!(result.entries_applied, 1);
413 wtx.commit().unwrap();
414
415 let mut rtx = mgr.begin_read();
416 assert_eq!(rtx.get(b"k").unwrap().unwrap(), b"v");
417 }
418}