1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11 pub entries_applied: u64,
13 pub entries_skipped: u64,
15 pub entries_equal: u64,
17}
18
19impl ApplyResult {
20 pub fn empty() -> Self {
21 Self {
22 entries_applied: 0,
23 entries_skipped: 0,
24 entries_equal: 0,
25 }
26 }
27}
28
29pub fn apply_patch(manager: &TxnManager, patch: &SyncPatch) -> Result<ApplyResult> {
35 if patch.is_empty() {
36 return Ok(ApplyResult::empty());
37 }
38
39 let mut wtx = manager.begin_write()?;
40 let result = apply_patch_to_txn(&mut wtx, patch)?;
41 wtx.commit()?;
42 Ok(result)
43}
44
45pub fn apply_patch_to_txn(wtx: &mut WriteTxn<'_>, patch: &SyncPatch) -> Result<ApplyResult> {
49 let mut result = ApplyResult::empty();
50
51 for entry in &patch.entries {
52 if patch.crdt_aware {
53 if let Some(ref remote_meta) = entry.crdt_meta {
54 let existing = wtx.get(&entry.key)?;
56 if let Some(local_data) = existing {
57 if let Ok(local_decoded) = decode_lww_value(&local_data) {
58 match lww_merge(&local_decoded.meta, remote_meta) {
59 MergeResult::Local => {
60 result.entries_skipped += 1;
61 continue;
62 }
63 MergeResult::Equal => {
64 result.entries_equal += 1;
65 continue;
66 }
67 MergeResult::Remote => {
68 }
70 }
71 }
72 }
74 }
76 }
77
78 match entry.kind {
80 EntryKind::Put => {
81 wtx.insert(&entry.key, &entry.value)?;
82 }
83 EntryKind::Tombstone => {
84 wtx.insert(&entry.key, &entry.value)?;
87 }
88 }
89 result.entries_applied += 1;
90 }
91
92 Ok(result)
93}
94
95pub fn apply_patch_to_table(
97 manager: &TxnManager,
98 table_name: &[u8],
99 patch: &SyncPatch,
100) -> Result<ApplyResult> {
101 if patch.is_empty() {
102 return Ok(ApplyResult::empty());
103 }
104
105 let mut wtx = manager.begin_write()?;
106 match wtx.create_table(table_name) {
107 Ok(()) => {}
108 Err(citadel_core::Error::TableAlreadyExists(_)) => {}
109 Err(e) => return Err(e),
110 }
111 let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
112 wtx.commit()?;
113 Ok(result)
114}
115
116pub fn apply_patch_to_table_txn(
118 wtx: &mut WriteTxn<'_>,
119 table_name: &[u8],
120 patch: &SyncPatch,
121) -> Result<ApplyResult> {
122 let mut result = ApplyResult::empty();
123
124 for entry in &patch.entries {
125 if patch.crdt_aware {
126 if let Some(ref remote_meta) = entry.crdt_meta {
127 let existing = wtx.table_get(table_name, &entry.key)?;
128 if let Some(local_data) = existing {
129 if let Ok(local_decoded) = decode_lww_value(&local_data) {
130 match lww_merge(&local_decoded.meta, remote_meta) {
131 MergeResult::Local => {
132 result.entries_skipped += 1;
133 continue;
134 }
135 MergeResult::Equal => {
136 result.entries_equal += 1;
137 continue;
138 }
139 MergeResult::Remote => {}
140 }
141 }
142 }
143 }
144 }
145
146 match entry.kind {
147 EntryKind::Put | EntryKind::Tombstone => {
148 wtx.table_insert(table_name, &entry.key, &entry.value)?;
149 }
150 }
151 result.entries_applied += 1;
152 }
153
154 Ok(result)
155}
156
157#[cfg(test)]
158mod tests {
159 use super::*;
160 use crate::crdt::{encode_lww_value, CrdtMeta, EntryKind};
161 use crate::hlc::HlcTimestamp;
162 use crate::node_id::NodeId;
163 use crate::patch::PatchEntry;
164
165 use citadel_core::constants::{DEK_SIZE, MAC_KEY_SIZE, MAC_SIZE};
166 use citadel_io::sync_io::SyncPageIO;
167
168 const SECOND: i64 = 1_000_000_000;
169
170 fn meta(wall_ns: i64, logical: i32, node: u64) -> CrdtMeta {
171 CrdtMeta::new(HlcTimestamp::new(wall_ns, logical), NodeId::from_u64(node))
172 }
173
174 fn test_manager(path: &std::path::Path) -> TxnManager {
175 let file = std::fs::OpenOptions::new()
176 .read(true)
177 .write(true)
178 .create(true)
179 .truncate(true)
180 .open(path)
181 .unwrap();
182 let io = Box::new(SyncPageIO::new(file));
183 let dek = [0x42u8; DEK_SIZE];
184 let mac_key = [0x43u8; MAC_KEY_SIZE];
185 let dek_id = [0x44u8; MAC_SIZE];
186 TxnManager::create(io, dek, mac_key, 1, 0x1234, dek_id, 256).unwrap()
187 }
188
189 #[test]
190 fn apply_empty_patch() {
191 let dir = tempfile::tempdir().unwrap();
192 let mgr = test_manager(&dir.path().join("test.db"));
193 let patch = SyncPatch::empty(NodeId::from_u64(1));
194 let result = apply_patch(&mgr, &patch).unwrap();
195 assert_eq!(result, ApplyResult::empty());
196 }
197
198 #[test]
199 fn apply_non_crdt_unconditional() {
200 let dir = tempfile::tempdir().unwrap();
201 let mgr = test_manager(&dir.path().join("test.db"));
202
203 let mut wtx = mgr.begin_write().unwrap();
204 wtx.insert(b"key1", b"old-value").unwrap();
205 wtx.commit().unwrap();
206
207 let patch = SyncPatch {
208 source_node: NodeId::from_u64(1),
209 entries: vec![
210 PatchEntry {
211 key: b"key1".to_vec(),
212 value: b"new-value".to_vec(),
213 kind: EntryKind::Put,
214 crdt_meta: None,
215 },
216 PatchEntry {
217 key: b"key2".to_vec(),
218 value: b"brand-new".to_vec(),
219 kind: EntryKind::Put,
220 crdt_meta: None,
221 },
222 ],
223 crdt_aware: false,
224 };
225
226 let result = apply_patch(&mgr, &patch).unwrap();
227 assert_eq!(result.entries_applied, 2);
228
229 let mut rtx = mgr.begin_read();
230 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), b"new-value");
231 assert_eq!(rtx.get(b"key2").unwrap().unwrap(), b"brand-new");
232 }
233
234 #[test]
235 fn apply_crdt_remote_wins() {
236 let dir = tempfile::tempdir().unwrap();
237 let mgr = test_manager(&dir.path().join("test.db"));
238
239 let local_meta = meta(1000 * SECOND, 0, 1);
240 let remote_meta = meta(2000 * SECOND, 0, 2);
241
242 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
243 let mut wtx = mgr.begin_write().unwrap();
244 wtx.insert(b"key1", &local_val).unwrap();
245 wtx.commit().unwrap();
246
247 let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
248 let patch = SyncPatch {
249 source_node: NodeId::from_u64(2),
250 entries: vec![PatchEntry {
251 key: b"key1".to_vec(),
252 value: remote_val.clone(),
253 kind: EntryKind::Put,
254 crdt_meta: Some(remote_meta),
255 }],
256 crdt_aware: true,
257 };
258
259 let result = apply_patch(&mgr, &patch).unwrap();
260 assert_eq!(result.entries_applied, 1);
261 assert_eq!(result.entries_skipped, 0);
262
263 let mut rtx = mgr.begin_read();
264 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), remote_val);
265 }
266
267 #[test]
268 fn apply_crdt_local_wins() {
269 let dir = tempfile::tempdir().unwrap();
270 let mgr = test_manager(&dir.path().join("test.db"));
271
272 let local_meta = meta(2000 * SECOND, 0, 1);
273 let remote_meta = meta(1000 * SECOND, 0, 2);
274
275 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"local");
276 let mut wtx = mgr.begin_write().unwrap();
277 wtx.insert(b"key1", &local_val).unwrap();
278 wtx.commit().unwrap();
279
280 let remote_val = encode_lww_value(&remote_meta, EntryKind::Put, b"remote");
281 let patch = SyncPatch {
282 source_node: NodeId::from_u64(2),
283 entries: vec![PatchEntry {
284 key: b"key1".to_vec(),
285 value: remote_val,
286 kind: EntryKind::Put,
287 crdt_meta: Some(remote_meta),
288 }],
289 crdt_aware: true,
290 };
291
292 let result = apply_patch(&mgr, &patch).unwrap();
293 assert_eq!(result.entries_applied, 0);
294 assert_eq!(result.entries_skipped, 1);
295
296 let mut rtx = mgr.begin_read();
297 assert_eq!(rtx.get(b"key1").unwrap().unwrap(), local_val);
298 }
299
300 #[test]
301 fn apply_crdt_equal() {
302 let dir = tempfile::tempdir().unwrap();
303 let mgr = test_manager(&dir.path().join("test.db"));
304
305 let m = meta(1000 * SECOND, 5, 42);
306 let val = encode_lww_value(&m, EntryKind::Put, b"same");
307
308 let mut wtx = mgr.begin_write().unwrap();
309 wtx.insert(b"key1", &val).unwrap();
310 wtx.commit().unwrap();
311
312 let patch = SyncPatch {
313 source_node: NodeId::from_u64(42),
314 entries: vec![PatchEntry {
315 key: b"key1".to_vec(),
316 value: val.clone(),
317 kind: EntryKind::Put,
318 crdt_meta: Some(m),
319 }],
320 crdt_aware: true,
321 };
322
323 let result = apply_patch(&mgr, &patch).unwrap();
324 assert_eq!(result.entries_equal, 1);
325 assert_eq!(result.entries_applied, 0);
326 }
327
328 #[test]
329 fn apply_crdt_new_key() {
330 let dir = tempfile::tempdir().unwrap();
331 let mgr = test_manager(&dir.path().join("test.db"));
332
333 let m = meta(1000 * SECOND, 0, 1);
334 let val = encode_lww_value(&m, EntryKind::Put, b"new");
335
336 let patch = SyncPatch {
337 source_node: NodeId::from_u64(1),
338 entries: vec![PatchEntry {
339 key: b"new-key".to_vec(),
340 value: val.clone(),
341 kind: EntryKind::Put,
342 crdt_meta: Some(m),
343 }],
344 crdt_aware: true,
345 };
346
347 let result = apply_patch(&mgr, &patch).unwrap();
348 assert_eq!(result.entries_applied, 1);
349
350 let mut rtx = mgr.begin_read();
351 assert_eq!(rtx.get(b"new-key").unwrap().unwrap(), val);
352 }
353
354 #[test]
355 fn apply_crdt_tombstone() {
356 let dir = tempfile::tempdir().unwrap();
357 let mgr = test_manager(&dir.path().join("test.db"));
358
359 let local_meta = meta(1000 * SECOND, 0, 1);
360 let local_val = encode_lww_value(&local_meta, EntryKind::Put, b"alive");
361 let mut wtx = mgr.begin_write().unwrap();
362 wtx.insert(b"key1", &local_val).unwrap();
363 wtx.commit().unwrap();
364
365 let remote_meta = meta(2000 * SECOND, 0, 2);
366 let tombstone_val = encode_lww_value(&remote_meta, EntryKind::Tombstone, b"");
367
368 let patch = SyncPatch {
369 source_node: NodeId::from_u64(2),
370 entries: vec![PatchEntry {
371 key: b"key1".to_vec(),
372 value: tombstone_val.clone(),
373 kind: EntryKind::Tombstone,
374 crdt_meta: Some(remote_meta),
375 }],
376 crdt_aware: true,
377 };
378
379 let result = apply_patch(&mgr, &patch).unwrap();
380 assert_eq!(result.entries_applied, 1);
381
382 let mut rtx = mgr.begin_read();
383 let stored = rtx.get(b"key1").unwrap().unwrap();
384 let decoded = decode_lww_value(&stored).unwrap();
385 assert_eq!(decoded.kind, EntryKind::Tombstone);
386 }
387
388 #[test]
389 fn apply_to_txn() {
390 let dir = tempfile::tempdir().unwrap();
391 let mgr = test_manager(&dir.path().join("test.db"));
392
393 let patch = SyncPatch {
394 source_node: NodeId::from_u64(1),
395 entries: vec![PatchEntry {
396 key: b"k".to_vec(),
397 value: b"v".to_vec(),
398 kind: EntryKind::Put,
399 crdt_meta: None,
400 }],
401 crdt_aware: false,
402 };
403
404 let mut wtx = mgr.begin_write().unwrap();
405 let result = apply_patch_to_txn(&mut wtx, &patch).unwrap();
406 assert_eq!(result.entries_applied, 1);
407 wtx.commit().unwrap();
408
409 let mut rtx = mgr.begin_read();
410 assert_eq!(rtx.get(b"k").unwrap().unwrap(), b"v");
411 }
412}