1use std::cmp::Ordering;
74use std::collections::HashMap;
75use std::hash::Hash;
76use std::sync::Arc;
77
78pub type NodeId = u64;
80
81pub type ColumnKey = String;
83
84const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
87
88#[derive(Debug, Clone, PartialEq)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
96pub struct Change<K, V> {
97 pub record_id: K,
98 pub col_name: Option<ColumnKey>,
100 pub value: Option<V>,
102 pub col_version: u64,
103 pub db_version: u64,
104 pub node_id: NodeId,
105 pub local_db_version: u64,
107 pub flags: u32,
109}
110
111impl<K: Eq, V: Eq> Eq for Change<K, V> {}
112
113impl<K, V> Change<K, V> {
114 #[allow(clippy::too_many_arguments)]
116 pub fn new(
117 record_id: K,
118 col_name: Option<ColumnKey>,
119 value: Option<V>,
120 col_version: u64,
121 db_version: u64,
122 node_id: NodeId,
123 local_db_version: u64,
124 flags: u32,
125 ) -> Self {
126 Self {
127 record_id,
128 col_name,
129 value,
130 col_version,
131 db_version,
132 node_id,
133 local_db_version,
134 flags,
135 }
136 }
137}
138
139#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
142pub struct ColumnVersion {
143 pub col_version: u64,
144 pub db_version: u64,
145 pub node_id: NodeId,
146 pub local_db_version: u64,
148}
149
150impl ColumnVersion {
151 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
152 Self {
153 col_version,
154 db_version,
155 node_id,
156 local_db_version,
157 }
158 }
159}
160
161#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
167pub struct TombstoneInfo {
168 pub db_version: u64,
169 pub node_id: NodeId,
170 pub local_db_version: u64,
171}
172
173impl TombstoneInfo {
174 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
175 Self {
176 db_version,
177 node_id,
178 local_db_version,
179 }
180 }
181
182 pub fn as_column_version(&self) -> ColumnVersion {
184 ColumnVersion::new(
185 TOMBSTONE_COL_VERSION,
186 self.db_version,
187 self.node_id,
188 self.local_db_version,
189 )
190 }
191}
192
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
196pub struct LogicalClock {
197 time: u64,
198}
199
200impl LogicalClock {
201 pub fn new() -> Self {
203 Self { time: 0 }
204 }
205
206 pub fn tick(&mut self) -> u64 {
208 self.time += 1;
209 self.time
210 }
211
212 pub fn update(&mut self, received_time: u64) -> u64 {
214 self.time = self.time.max(received_time);
215 self.time += 1;
216 self.time
217 }
218
219 pub fn set_time(&mut self, time: u64) {
221 self.time = time;
222 }
223
224 pub fn current_time(&self) -> u64 {
226 self.time
227 }
228}
229
230impl Default for LogicalClock {
231 fn default() -> Self {
232 Self::new()
233 }
234}
235
236#[derive(Debug, Clone)]
240#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
241pub struct TombstoneStorage<K: Hash + Eq> {
242 entries: HashMap<K, TombstoneInfo>,
243}
244
245impl<K: Hash + Eq> TombstoneStorage<K> {
246 pub fn new() -> Self {
247 Self {
248 entries: HashMap::new(),
249 }
250 }
251
252 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
253 self.entries.insert(key, info);
254 }
255
256 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
257 self.entries.get(key).copied()
258 }
259
260 pub fn erase(&mut self, key: &K) -> bool {
261 self.entries.remove(key).is_some()
262 }
263
264 pub fn clear(&mut self) {
265 self.entries.clear();
266 }
267
268 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
269 self.entries.iter()
270 }
271
272 pub fn len(&self) -> usize {
273 self.entries.len()
274 }
275
276 pub fn is_empty(&self) -> bool {
277 self.entries.is_empty()
278 }
279
280 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
284 let initial_len = self.entries.len();
285 self
286 .entries
287 .retain(|_, info| info.db_version >= min_acknowledged_version);
288 initial_len - self.entries.len()
289 }
290}
291
292impl<K: Hash + Eq> Default for TombstoneStorage<K> {
293 fn default() -> Self {
294 Self::new()
295 }
296}
297
298#[derive(Debug, Clone)]
300#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
301pub struct Record<V> {
302 pub fields: HashMap<ColumnKey, V>,
303 pub column_versions: HashMap<ColumnKey, ColumnVersion>,
304 pub lowest_local_db_version: u64,
306 pub highest_local_db_version: u64,
307}
308
309impl<V> Record<V> {
310 pub fn new() -> Self {
311 Self {
312 fields: HashMap::new(),
313 column_versions: HashMap::new(),
314 lowest_local_db_version: u64::MAX,
315 highest_local_db_version: 0,
316 }
317 }
318
319 pub fn from_parts(
321 fields: HashMap<ColumnKey, V>,
322 column_versions: HashMap<ColumnKey, ColumnVersion>,
323 ) -> Self {
324 let mut lowest = u64::MAX;
325 let mut highest = 0;
326
327 for ver in column_versions.values() {
328 if ver.local_db_version < lowest {
329 lowest = ver.local_db_version;
330 }
331 if ver.local_db_version > highest {
332 highest = ver.local_db_version;
333 }
334 }
335
336 Self {
337 fields,
338 column_versions,
339 lowest_local_db_version: lowest,
340 highest_local_db_version: highest,
341 }
342 }
343}
344
345impl<V: PartialEq> PartialEq for Record<V> {
346 fn eq(&self, other: &Self) -> bool {
347 self.fields == other.fields
349 }
350}
351
352impl<V> Default for Record<V> {
353 fn default() -> Self {
354 Self::new()
355 }
356}
357
358pub trait MergeRule<K, V> {
363 fn should_accept(
365 &self,
366 local_col: u64,
367 local_db: u64,
368 local_node: NodeId,
369 remote_col: u64,
370 remote_db: u64,
371 remote_node: NodeId,
372 ) -> bool;
373
374 fn should_accept_change(&self, local: &Change<K, V>, remote: &Change<K, V>) -> bool {
376 self.should_accept(
377 local.col_version,
378 local.db_version,
379 local.node_id,
380 remote.col_version,
381 remote.db_version,
382 remote.node_id,
383 )
384 }
385}
386
387#[derive(Debug, Clone, Copy, Default)]
394pub struct DefaultMergeRule;
395
396impl<K, V> MergeRule<K, V> for DefaultMergeRule {
397 fn should_accept(
398 &self,
399 local_col: u64,
400 local_db: u64,
401 local_node: NodeId,
402 remote_col: u64,
403 remote_db: u64,
404 remote_node: NodeId,
405 ) -> bool {
406 match remote_col.cmp(&local_col) {
407 Ordering::Greater => true,
408 Ordering::Less => false,
409 Ordering::Equal => match remote_db.cmp(&local_db) {
410 Ordering::Greater => true,
411 Ordering::Less => false,
412 Ordering::Equal => remote_node > local_node,
413 },
414 }
415 }
416}
417
418pub trait ChangeComparator<K, V> {
420 fn compare(&self, a: &Change<K, V>, b: &Change<K, V>) -> Ordering;
421}
422
423#[derive(Debug, Clone, Copy, Default)]
433pub struct DefaultChangeComparator;
434
435impl<K: Ord, V> ChangeComparator<K, V> for DefaultChangeComparator {
436 fn compare(&self, a: &Change<K, V>, b: &Change<K, V>) -> Ordering {
437 match a.record_id.cmp(&b.record_id) {
439 Ordering::Equal => {}
440 ord => return ord,
441 }
442
443 match (a.col_name.as_ref(), b.col_name.as_ref()) {
445 (None, None) => {}
446 (None, Some(_)) => return Ordering::Greater,
447 (Some(_), None) => return Ordering::Less,
448 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
449 Ordering::Equal => {}
450 ord => return ord,
451 },
452 }
453
454 match b.col_version.cmp(&a.col_version) {
456 Ordering::Equal => {}
457 ord => return ord,
458 }
459
460 match b.db_version.cmp(&a.db_version) {
461 Ordering::Equal => {}
462 ord => return ord,
463 }
464
465 b.node_id.cmp(&a.node_id)
466 }
467}
468
469#[derive(Debug)]
473#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
474#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, V: serde::Serialize")))]
475#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
476pub struct CRDT<K: Hash + Eq + Clone, V: Clone> {
477 node_id: NodeId,
478 clock: LogicalClock,
479 data: HashMap<K, Record<V>>,
480 tombstones: TombstoneStorage<K>,
481 #[cfg_attr(feature = "serde", serde(skip, default))]
482 parent: Option<Arc<CRDT<K, V>>>,
483 #[allow(dead_code)]
484 base_version: u64,
485}
486
487impl<K: Hash + Eq + Clone, V: Clone> CRDT<K, V> {
488 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, V>>>) -> Self {
495 let (clock, base_version) = if let Some(ref p) = parent {
496 let parent_clock = p.clock;
497 let base = parent_clock.current_time();
498 (parent_clock, base)
499 } else {
500 (LogicalClock::new(), 0)
501 };
502
503 Self {
504 node_id,
505 clock,
506 data: HashMap::new(),
507 tombstones: TombstoneStorage::new(),
508 parent,
509 base_version,
510 }
511 }
512
513 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, V>>) -> Self {
520 let mut crdt = Self::new(node_id, None);
521 crdt.apply_changes(changes);
522 crdt
523 }
524
525 pub fn reset(&mut self, changes: Vec<Change<K, V>>) {
531 self.data.clear();
532 self.tombstones.clear();
533 self.clock = LogicalClock::new();
534 self.apply_changes(changes);
535 }
536
537 fn apply_changes(&mut self, changes: Vec<Change<K, V>>) {
539 let max_db_version = changes
541 .iter()
542 .map(|c| c.db_version.max(c.local_db_version))
543 .max()
544 .unwrap_or(0);
545
546 self.clock.set_time(max_db_version);
548
549 for change in changes {
551 let record_id = change.record_id.clone();
552 let col_name = change.col_name.clone();
553 let remote_col_version = change.col_version;
554 let remote_db_version = change.db_version;
555 let remote_node_id = change.node_id;
556 let remote_local_db_version = change.local_db_version;
557 let remote_value = change.value;
558
559 if col_name.is_none() {
560 self.data.remove(&record_id);
562
563 self.tombstones.insert_or_assign(
565 record_id,
566 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
567 );
568 } else if let Some(col_key) = col_name {
569 if !self.is_record_tombstoned(&record_id, false) {
571 let record = self.get_or_create_record_unchecked(&record_id, false);
572
573 if let Some(value) = remote_value {
575 record.fields.insert(col_key.clone(), value);
576 }
577
578 let col_ver = ColumnVersion::new(
580 remote_col_version,
581 remote_db_version,
582 remote_node_id,
583 remote_local_db_version,
584 );
585 record.column_versions.insert(col_key, col_ver);
586
587 if remote_local_db_version < record.lowest_local_db_version {
589 record.lowest_local_db_version = remote_local_db_version;
590 }
591 if remote_local_db_version > record.highest_local_db_version {
592 record.highest_local_db_version = remote_local_db_version;
593 }
594 }
595 }
596 }
597 }
598
599 #[must_use = "changes should be propagated to other nodes"]
610 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, V>>
611 where
612 I: IntoIterator<Item = (ColumnKey, V)>,
613 {
614 self.insert_or_update_with_flags(record_id, 0, fields)
615 }
616
617 #[must_use = "changes should be propagated to other nodes"]
629 pub fn insert_or_update_with_flags<I>(
630 &mut self,
631 record_id: &K,
632 flags: u32,
633 fields: I,
634 ) -> Vec<Change<K, V>>
635 where
636 I: IntoIterator<Item = (ColumnKey, V)>,
637 {
638 let db_version = self.clock.tick();
639
640 if self.is_record_tombstoned(record_id, false) {
642 return Vec::new();
643 }
644
645 let mut changes = Vec::new();
646 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
648
649 for (col_name, value) in fields {
650 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
651 col_info.col_version += 1;
652 col_info.db_version = db_version;
653 col_info.node_id = node_id;
654 col_info.local_db_version = db_version;
655 col_info.col_version
656 } else {
657 record.column_versions.insert(
658 col_name.clone(),
659 ColumnVersion::new(1, db_version, node_id, db_version),
660 );
661 1
662 };
663
664 if db_version < record.lowest_local_db_version {
666 record.lowest_local_db_version = db_version;
667 }
668 if db_version > record.highest_local_db_version {
669 record.highest_local_db_version = db_version;
670 }
671
672 record.fields.insert(col_name.clone(), value.clone());
673 changes.push(Change::new(
674 record_id.clone(),
675 Some(col_name),
676 Some(value),
677 col_version,
678 db_version,
679 node_id,
680 db_version,
681 flags,
682 ));
683 }
684
685 changes
686 }
687
688 #[must_use = "changes should be propagated to other nodes"]
698 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, V>> {
699 self.delete_record_with_flags(record_id, 0)
700 }
701
702 #[must_use = "changes should be propagated to other nodes"]
713 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, V>> {
714 if self.is_record_tombstoned(record_id, false) {
715 return None;
716 }
717
718 let db_version = self.clock.tick();
719
720 self.data.remove(record_id);
722
723 self.tombstones.insert_or_assign(
725 record_id.clone(),
726 TombstoneInfo::new(db_version, self.node_id, db_version),
727 );
728
729 Some(Change::new(
730 record_id.clone(),
731 None,
732 None,
733 TOMBSTONE_COL_VERSION,
734 db_version,
735 self.node_id,
736 db_version,
737 flags,
738 ))
739 }
740
741 pub fn merge_changes<R: MergeRule<K, V>>(
752 &mut self,
753 changes: Vec<Change<K, V>>,
754 merge_rule: &R,
755 ) -> Vec<Change<K, V>> {
756 self.merge_changes_impl(changes, false, merge_rule)
757 }
758
759 fn merge_changes_impl<R: MergeRule<K, V>>(
760 &mut self,
761 changes: Vec<Change<K, V>>,
762 ignore_parent: bool,
763 merge_rule: &R,
764 ) -> Vec<Change<K, V>> {
765 let mut accepted_changes = Vec::new();
766
767 if changes.is_empty() {
768 return accepted_changes;
769 }
770
771 for change in changes {
772 let Change {
774 record_id,
775 col_name,
776 value: remote_value,
777 col_version: remote_col_version,
778 db_version: remote_db_version,
779 node_id: remote_node_id,
780 flags,
781 ..
782 } = change;
783
784 let new_local_db_version = self.clock.update(remote_db_version);
786
787 if self.is_record_tombstoned(&record_id, ignore_parent) {
789 continue;
790 }
791
792 let local_col_info = if col_name.is_none() {
794 self
796 .tombstones
797 .find(&record_id)
798 .map(|info| info.as_column_version())
799 } else if let Some(ref col) = col_name {
800 self
802 .get_record_ptr(&record_id, ignore_parent)
803 .and_then(|record| record.column_versions.get(col).copied())
804 } else {
805 None
806 };
807
808 let should_accept = if let Some(local_info) = local_col_info {
810 merge_rule.should_accept(
811 local_info.col_version,
812 local_info.db_version,
813 local_info.node_id,
814 remote_col_version,
815 remote_db_version,
816 remote_node_id,
817 )
818 } else {
819 true
820 };
821
822 if should_accept {
823 if let Some(col_key) = col_name {
824 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
826
827 if let Some(value) = remote_value.clone() {
829 record.fields.insert(col_key.clone(), value);
830 } else {
831 record.fields.remove(&col_key);
833 }
834
835 record.column_versions.insert(
837 col_key.clone(),
838 ColumnVersion::new(
839 remote_col_version,
840 remote_db_version,
841 remote_node_id,
842 new_local_db_version,
843 ),
844 );
845
846 if new_local_db_version < record.lowest_local_db_version {
848 record.lowest_local_db_version = new_local_db_version;
849 }
850 if new_local_db_version > record.highest_local_db_version {
851 record.highest_local_db_version = new_local_db_version;
852 }
853
854 accepted_changes.push(Change::new(
855 record_id,
856 Some(col_key),
857 remote_value,
858 remote_col_version,
859 remote_db_version,
860 remote_node_id,
861 new_local_db_version,
862 flags,
863 ));
864 } else {
865 self.data.remove(&record_id);
867
868 self.tombstones.insert_or_assign(
870 record_id.clone(),
871 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
872 );
873
874 accepted_changes.push(Change::new(
875 record_id,
876 None,
877 None,
878 remote_col_version,
879 remote_db_version,
880 remote_node_id,
881 new_local_db_version,
882 flags,
883 ));
884 }
885 }
886 }
887
888 accepted_changes
889 }
890
891 #[must_use]
901 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, V>>
902 where
903 K: Ord,
904 {
905 self.get_changes_since_excluding(last_db_version, &std::collections::HashSet::new())
906 }
907
908 pub fn get_changes_since_excluding(
910 &self,
911 last_db_version: u64,
912 excluding: &std::collections::HashSet<NodeId>,
913 ) -> Vec<Change<K, V>>
914 where
915 K: Ord,
916 {
917 let mut changes = Vec::new();
918
919 if let Some(ref parent) = self.parent {
921 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
922 changes.extend(parent_changes);
923 }
924
925 for (record_id, record) in &self.data {
927 if record.highest_local_db_version <= last_db_version {
929 continue;
930 }
931
932 for (col_name, clock_info) in &record.column_versions {
933 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
934 {
935 let value = record.fields.get(col_name).cloned();
936
937 changes.push(Change::new(
938 record_id.clone(),
939 Some(col_name.clone()),
940 value,
941 clock_info.col_version,
942 clock_info.db_version,
943 clock_info.node_id,
944 clock_info.local_db_version,
945 0,
946 ));
947 }
948 }
949 }
950
951 for (record_id, tombstone_info) in self.tombstones.iter() {
953 if tombstone_info.local_db_version > last_db_version
954 && !excluding.contains(&tombstone_info.node_id)
955 {
956 changes.push(Change::new(
957 record_id.clone(),
958 None,
959 None,
960 TOMBSTONE_COL_VERSION,
961 tombstone_info.db_version,
962 tombstone_info.node_id,
963 tombstone_info.local_db_version,
964 0,
965 ));
966 }
967 }
968
969 if self.parent.is_some() {
970 Self::compress_changes(&mut changes);
972 }
973
974 changes
975 }
976
977 pub fn compress_changes(changes: &mut Vec<Change<K, V>>)
987 where
988 K: Ord,
989 {
990 if changes.is_empty() {
991 return;
992 }
993
994 let comparator = DefaultChangeComparator;
997 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
998
999 let mut write = 0;
1001 for read in 1..changes.len() {
1002 if changes[read].record_id != changes[write].record_id {
1003 write += 1;
1005 if write != read {
1006 changes[write] = changes[read].clone();
1007 }
1008 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1009 let mut first_pos = write;
1012 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1013 first_pos -= 1;
1014 }
1015 changes[first_pos] = changes[read].clone();
1016 write = first_pos;
1017 } else if changes[read].col_name != changes[write].col_name
1018 && changes[write].col_name.is_some()
1019 {
1020 write += 1;
1022 if write != read {
1023 changes[write] = changes[read].clone();
1024 }
1025 }
1026 }
1028
1029 changes.truncate(write + 1);
1030 }
1031
1032 pub fn get_record(&self, record_id: &K) -> Option<&Record<V>> {
1034 self.get_record_ptr(record_id, false)
1035 }
1036
1037 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1039 self.is_record_tombstoned(record_id, false)
1040 }
1041
1042 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1044 if let Some(info) = self.tombstones.find(record_id) {
1045 return Some(info);
1046 }
1047
1048 if let Some(ref parent) = self.parent {
1049 return parent.get_tombstone(record_id);
1050 }
1051
1052 None
1053 }
1054
1055 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1076 self.tombstones.compact(min_acknowledged_version)
1077 }
1078
1079 pub fn tombstone_count(&self) -> usize {
1081 self.tombstones.len()
1082 }
1083
1084 pub fn get_clock(&self) -> &LogicalClock {
1086 &self.clock
1087 }
1088
1089 pub fn get_data(&self) -> &HashMap<K, Record<V>> {
1091 &self.data
1092 }
1093
1094 #[cfg(feature = "json")]
1102 pub fn to_json(&self) -> Result<String, serde_json::Error>
1103 where
1104 K: serde::Serialize,
1105 V: serde::Serialize,
1106 {
1107 serde_json::to_string(self)
1108 }
1109
1110 #[cfg(feature = "json")]
1119 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1120 where
1121 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1122 V: serde::de::DeserializeOwned + Clone,
1123 {
1124 serde_json::from_str(json)
1125 }
1126
1127 #[cfg(feature = "binary")]
1135 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error>
1136 where
1137 K: serde::Serialize,
1138 V: serde::Serialize,
1139 {
1140 bincode::serialize(self)
1141 }
1142
1143 #[cfg(feature = "binary")]
1152 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::Error>
1153 where
1154 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1155 V: serde::de::DeserializeOwned + Clone,
1156 {
1157 bincode::deserialize(bytes)
1158 }
1159
1160 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1163 if self.tombstones.find(record_id).is_some() {
1164 return true;
1165 }
1166
1167 if !ignore_parent {
1168 if let Some(ref parent) = self.parent {
1169 return parent.is_record_tombstoned(record_id, false);
1170 }
1171 }
1172
1173 false
1174 }
1175
1176 fn get_or_create_record_unchecked(
1177 &mut self,
1178 record_id: &K,
1179 ignore_parent: bool,
1180 ) -> &mut Record<V> {
1181 use std::collections::hash_map::Entry;
1182
1183 match self.data.entry(record_id.clone()) {
1184 Entry::Occupied(e) => e.into_mut(),
1185 Entry::Vacant(e) => {
1186 let record = if !ignore_parent {
1187 self
1188 .parent
1189 .as_ref()
1190 .and_then(|p| p.get_record_ptr(record_id, false))
1191 .cloned()
1192 .unwrap_or_else(Record::new)
1193 } else {
1194 Record::new()
1195 };
1196 e.insert(record)
1197 }
1198 }
1199 }
1200
1201 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<V>> {
1202 if let Some(record) = self.data.get(record_id) {
1203 return Some(record);
1204 }
1205
1206 if !ignore_parent {
1207 if let Some(ref parent) = self.parent {
1208 return parent.get_record_ptr(record_id, false);
1209 }
1210 }
1211
1212 None
1213 }
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218 use super::*;
1219
1220 #[test]
1221 fn test_logical_clock() {
1222 let mut clock = LogicalClock::new();
1223 assert_eq!(clock.current_time(), 0);
1224
1225 let t1 = clock.tick();
1226 assert_eq!(t1, 1);
1227 assert_eq!(clock.current_time(), 1);
1228
1229 let t2 = clock.update(5);
1230 assert_eq!(t2, 6);
1231 assert_eq!(clock.current_time(), 6);
1232 }
1233
1234 #[test]
1235 fn test_tombstone_storage() {
1236 let mut storage = TombstoneStorage::new();
1237 let info = TombstoneInfo::new(10, 1, 10);
1238
1239 storage.insert_or_assign("key1".to_string(), info);
1240 assert_eq!(storage.len(), 1);
1241
1242 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1243 assert_eq!(storage.find(&"key2".to_string()), None);
1244
1245 let removed = storage.compact(15);
1246 assert_eq!(removed, 1);
1247 assert_eq!(storage.len(), 0);
1248 }
1249
1250 #[test]
1251 fn test_basic_insert() {
1252 let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1253
1254 let fields = vec![
1255 ("name".to_string(), "Alice".to_string()),
1256 ("age".to_string(), "30".to_string()),
1257 ];
1258
1259 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1260
1261 assert_eq!(changes.len(), 2);
1262 assert_eq!(crdt.get_data().len(), 1);
1263
1264 let record = crdt.get_record(&"user1".to_string()).unwrap();
1265 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1266 assert_eq!(record.fields.get("age").unwrap(), "30");
1267 }
1268
1269 #[test]
1270 fn test_delete_record() {
1271 let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1272
1273 let fields = vec![("name".to_string(), "Bob".to_string())];
1274 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1275
1276 let delete_change = crdt.delete_record(&"user2".to_string());
1277 assert!(delete_change.is_some());
1278 assert!(crdt.is_tombstoned(&"user2".to_string()));
1279 assert_eq!(crdt.get_data().len(), 0);
1280 }
1281
1282 #[test]
1283 fn test_merge_changes() {
1284 let mut crdt1: CRDT<String, String> = CRDT::new(1, None);
1285 let mut crdt2: CRDT<String, String> = CRDT::new(2, None);
1286
1287 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1288 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1289
1290 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1291 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1292
1293 let merge_rule = DefaultMergeRule;
1294 crdt1.merge_changes(changes2, &merge_rule);
1295 crdt2.merge_changes(changes1, &merge_rule);
1296
1297 assert_eq!(
1299 crdt1
1300 .get_record(&"record1".to_string())
1301 .unwrap()
1302 .fields
1303 .get("tag")
1304 .unwrap(),
1305 "Node2"
1306 );
1307 assert_eq!(crdt1.get_data(), crdt2.get_data());
1308 }
1309
1310 #[test]
1311 #[cfg(feature = "serde")]
1312 fn test_change_serialization() {
1313 #[allow(unused_variables)]
1314 let change = Change::new(
1315 "record1".to_string(),
1316 Some("name".to_string()),
1317 Some("Alice".to_string()),
1318 1,
1319 10,
1320 1,
1321 10,
1322 0,
1323 );
1324
1325 #[cfg(feature = "json")]
1327 {
1328 let json = serde_json::to_string(&change).unwrap();
1329 let deserialized: Change<String, String> = serde_json::from_str(&json).unwrap();
1330 assert_eq!(change, deserialized);
1331 }
1332
1333 #[cfg(feature = "binary")]
1335 {
1336 let bytes = bincode::serialize(&change).unwrap();
1337 let deserialized: Change<String, String> = bincode::deserialize(&bytes).unwrap();
1338 assert_eq!(change, deserialized);
1339 }
1340 }
1341
1342 #[test]
1343 #[cfg(feature = "serde")]
1344 fn test_record_serialization() {
1345 let mut fields = HashMap::new();
1346 fields.insert("name".to_string(), "Bob".to_string());
1347 fields.insert("age".to_string(), "25".to_string());
1348
1349 let mut column_versions = HashMap::new();
1350 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1351 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1352
1353 #[allow(unused_variables)]
1354 let record = Record::from_parts(fields, column_versions);
1355
1356 #[cfg(feature = "json")]
1358 {
1359 let json = serde_json::to_string(&record).unwrap();
1360 let deserialized: Record<String> = serde_json::from_str(&json).unwrap();
1361 assert_eq!(record, deserialized);
1362 }
1363
1364 #[cfg(feature = "binary")]
1366 {
1367 let bytes = bincode::serialize(&record).unwrap();
1368 let deserialized: Record<String> = bincode::deserialize(&bytes).unwrap();
1369 assert_eq!(record, deserialized);
1370 }
1371 }
1372
1373 #[test]
1374 #[cfg(feature = "json")]
1375 fn test_crdt_json_serialization() {
1376 let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1377
1378 let fields = vec![
1380 ("name".to_string(), "Alice".to_string()),
1381 ("age".to_string(), "30".to_string()),
1382 ];
1383 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1384
1385 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1386 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1387
1388 let _ = crdt.delete_record(&"user2".to_string());
1390
1391 let json = crdt.to_json().unwrap();
1393
1394 let deserialized: CRDT<String, String> = CRDT::from_json(&json).unwrap();
1396
1397 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1399 assert_eq!(
1400 crdt.get_record(&"user1".to_string()).unwrap().fields,
1401 deserialized.get_record(&"user1".to_string()).unwrap().fields
1402 );
1403
1404 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1406 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1407
1408 assert_eq!(
1410 crdt.get_clock().current_time(),
1411 deserialized.get_clock().current_time()
1412 );
1413
1414 let has_parent = deserialized.parent.is_some();
1416 assert!(!has_parent);
1417 }
1418
1419 #[test]
1420 #[cfg(feature = "binary")]
1421 fn test_crdt_binary_serialization() {
1422 let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1423
1424 let fields = vec![
1426 ("name".to_string(), "Alice".to_string()),
1427 ("age".to_string(), "30".to_string()),
1428 ];
1429 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1430
1431 let bytes = crdt.to_bytes().unwrap();
1433
1434 let deserialized: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
1436
1437 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1439 assert_eq!(
1440 crdt.get_record(&"user1".to_string()).unwrap().fields,
1441 deserialized.get_record(&"user1".to_string()).unwrap().fields
1442 );
1443
1444 assert_eq!(
1446 crdt.get_clock().current_time(),
1447 deserialized.get_clock().current_time()
1448 );
1449 }
1450
1451 #[test]
1452 #[cfg(feature = "serde")]
1453 fn test_parent_not_serialized() {
1454 let mut parent: CRDT<String, String> = CRDT::new(1, None);
1456 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1457 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1458
1459 let parent_arc = Arc::new(parent);
1461 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1462 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1463 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1464
1465 #[cfg(feature = "json")]
1467 {
1468 let json = serde_json::to_string(&child).unwrap();
1469 let deserialized: CRDT<String, String> = serde_json::from_str(&json).unwrap();
1470
1471 assert!(deserialized.parent.is_none());
1473
1474 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1476
1477 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1479 }
1480 }
1481}