1use citadel_core::Result;
2use citadel_txn::manager::TxnManager;
3use citadel_txn::write_txn::WriteTxn;
4
5use crate::crdt::{decode_lww_value, lww_merge, EntryKind, MergeResult};
6use crate::patch::SyncPatch;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct ApplyResult {
11 pub entries_applied: u64,
13 pub entries_skipped: u64,
15 pub entries_equal: u64,
17}
18
19impl ApplyResult {
20 pub fn empty() -> Self {
21 Self {
22 entries_applied: 0,
23 entries_skipped: 0,
24 entries_equal: 0,
25 }
26 }
27}
28
29pub fn apply_patch(manager: &TxnManager, patch: &SyncPatch) -> Result<ApplyResult> {
35 if patch.is_empty() {
36 return Ok(ApplyResult::empty());
37 }
38
39 let mut wtx = manager.begin_write()?;
40 let result = apply_patch_to_txn(&mut wtx, patch)?;
41 wtx.commit()?;
42 Ok(result)
43}
44
45pub fn apply_patch_to_txn(wtx: &mut WriteTxn<'_>, patch: &SyncPatch) -> Result<ApplyResult> {
49 let mut result = ApplyResult::empty();
50
51 for entry in &patch.entries {
52 if patch.crdt_aware {
53 if let Some(ref remote_meta) = entry.crdt_meta {
54 let existing = wtx.get(&entry.key)?;
55 if let Some(local_data) = existing {
56 if let Ok(local_decoded) = decode_lww_value(&local_data) {
57 match lww_merge(&local_decoded.meta, remote_meta) {
58 MergeResult::Local => {
59 result.entries_skipped += 1;
60 continue;
61 }
62 MergeResult::Equal => {
63 result.entries_equal += 1;
64 continue;
65 }
66 MergeResult::Remote => {
67 }
69 }
70 }
71 }
73 }
75 }
76
77 match entry.kind {
79 EntryKind::Put => {
80 wtx.insert(&entry.key, &entry.value)?;
81 }
82 EntryKind::Tombstone => {
83 wtx.insert(&entry.key, &entry.value)?;
86 }
87 }
88 result.entries_applied += 1;
89 }
90
91 Ok(result)
92}
93
94pub fn apply_patch_to_table(
96 manager: &TxnManager,
97 table_name: &[u8],
98 patch: &SyncPatch,
99) -> Result<ApplyResult> {
100 if patch.is_empty() {
101 return Ok(ApplyResult::empty());
102 }
103
104 let mut wtx = manager.begin_write()?;
105 match wtx.create_table(table_name) {
106 Ok(()) => {}
107 Err(citadel_core::Error::TableAlreadyExists(_)) => {}
108 Err(e) => return Err(e),
109 }
110 let result = apply_patch_to_table_txn(&mut wtx, table_name, patch)?;
111 wtx.commit()?;
112 Ok(result)
113}
114
115pub fn apply_patch_to_table_txn(
117 wtx: &mut WriteTxn<'_>,
118 table_name: &[u8],
119 patch: &SyncPatch,
120) -> Result<ApplyResult> {
121 let mut result = ApplyResult::empty();
122
123 for entry in &patch.entries {
124 if patch.crdt_aware {
125 if let Some(ref remote_meta) = entry.crdt_meta {
126 let existing = wtx.table_get(table_name, &entry.key)?;
127 if let Some(local_data) = existing {
128 if let Ok(local_decoded) = decode_lww_value(&local_data) {
129 match lww_merge(&local_decoded.meta, remote_meta) {
130 MergeResult::Local => {
131 result.entries_skipped += 1;
132 continue;
133 }
134 MergeResult::Equal => {
135 result.entries_equal += 1;
136 continue;
137 }
138 MergeResult::Remote => {}
139 }
140 }
141 }
142 }
143 }
144
145 match entry.kind {
146 EntryKind::Put | EntryKind::Tombstone => {
147 wtx.table_insert(table_name, &entry.key, &entry.value)?;
148 }
149 }
150 result.entries_applied += 1;
151 }
152
153 Ok(result)
154}
155
156#[cfg(test)]
157#[path = "apply_tests.rs"]
158mod tests;