1#![cfg_attr(not(feature = "std"), no_std)]
85
86#[cfg(not(any(feature = "std", feature = "alloc")))]
88compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
89
90#[cfg(not(feature = "std"))]
91extern crate alloc;
92
93#[cfg(feature = "std")]
94use std::{
95 cmp::Ordering,
96 collections::{HashMap, HashSet},
97 hash::Hash,
98 sync::Arc,
99};
100
101#[cfg(not(feature = "std"))]
102use alloc::{
103 string::String,
104 sync::Arc,
105 vec::Vec,
106};
107#[cfg(not(feature = "std"))]
108use core::{cmp::Ordering, hash::Hash};
109#[cfg(all(not(feature = "std"), feature = "alloc"))]
110use hashbrown::{HashMap, HashSet};
111
112pub type NodeId = u64;
114
115pub type ColumnKey = String;
117
118const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
121
122#[derive(Debug, Clone, PartialEq)]
129#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
130#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
131#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
132pub struct Change<K, C, V> {
133 pub record_id: K,
134 pub col_name: Option<C>,
136 pub value: Option<V>,
138 pub col_version: u64,
139 pub db_version: u64,
140 pub node_id: NodeId,
141 pub local_db_version: u64,
143 pub flags: u32,
145}
146
147impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
148
149impl<K, C, V> Change<K, C, V> {
150 #[allow(clippy::too_many_arguments)]
152 pub fn new(
153 record_id: K,
154 col_name: Option<C>,
155 value: Option<V>,
156 col_version: u64,
157 db_version: u64,
158 node_id: NodeId,
159 local_db_version: u64,
160 flags: u32,
161 ) -> Self {
162 Self {
163 record_id,
164 col_name,
165 value,
166 col_version,
167 db_version,
168 node_id,
169 local_db_version,
170 flags,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
178pub struct ColumnVersion {
179 pub col_version: u64,
180 pub db_version: u64,
181 pub node_id: NodeId,
182 pub local_db_version: u64,
184}
185
186impl ColumnVersion {
187 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
188 Self {
189 col_version,
190 db_version,
191 node_id,
192 local_db_version,
193 }
194 }
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
203pub struct TombstoneInfo {
204 pub db_version: u64,
205 pub node_id: NodeId,
206 pub local_db_version: u64,
207}
208
209impl TombstoneInfo {
210 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
211 Self {
212 db_version,
213 node_id,
214 local_db_version,
215 }
216 }
217
218 pub fn as_column_version(&self) -> ColumnVersion {
220 ColumnVersion::new(
221 TOMBSTONE_COL_VERSION,
222 self.db_version,
223 self.node_id,
224 self.local_db_version,
225 )
226 }
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232pub struct LogicalClock {
233 time: u64,
234}
235
236impl LogicalClock {
237 pub fn new() -> Self {
239 Self { time: 0 }
240 }
241
242 pub fn tick(&mut self) -> u64 {
244 self.time += 1;
245 self.time
246 }
247
248 pub fn update(&mut self, received_time: u64) -> u64 {
250 self.time = self.time.max(received_time);
251 self.time += 1;
252 self.time
253 }
254
255 pub fn set_time(&mut self, time: u64) {
257 self.time = time;
258 }
259
260 pub fn current_time(&self) -> u64 {
262 self.time
263 }
264}
265
266impl Default for LogicalClock {
267 fn default() -> Self {
268 Self::new()
269 }
270}
271
272#[derive(Debug, Clone)]
276#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
277pub struct TombstoneStorage<K: Hash + Eq> {
278 entries: HashMap<K, TombstoneInfo>,
279}
280
281impl<K: Hash + Eq> TombstoneStorage<K> {
282 pub fn new() -> Self {
283 Self {
284 entries: HashMap::new(),
285 }
286 }
287
288 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
289 self.entries.insert(key, info);
290 }
291
292 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
293 self.entries.get(key).copied()
294 }
295
296 pub fn erase(&mut self, key: &K) -> bool {
297 self.entries.remove(key).is_some()
298 }
299
300 pub fn clear(&mut self) {
301 self.entries.clear();
302 }
303
304 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
305 self.entries.iter()
306 }
307
308 pub fn len(&self) -> usize {
309 self.entries.len()
310 }
311
312 pub fn is_empty(&self) -> bool {
313 self.entries.is_empty()
314 }
315
316 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
320 let initial_len = self.entries.len();
321 self
322 .entries
323 .retain(|_, info| info.db_version >= min_acknowledged_version);
324 initial_len - self.entries.len()
325 }
326}
327
328impl<K: Hash + Eq> Default for TombstoneStorage<K> {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334#[derive(Debug, Clone)]
336#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
337#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize, V: serde::Serialize")))]
338#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
339pub struct Record<C, V> {
340 pub fields: HashMap<C, V>,
341 pub column_versions: HashMap<C, ColumnVersion>,
342 pub lowest_local_db_version: u64,
344 pub highest_local_db_version: u64,
345}
346
347impl<C: Hash + Eq, V> Record<C, V> {
348 pub fn new() -> Self {
349 Self {
350 fields: HashMap::new(),
351 column_versions: HashMap::new(),
352 lowest_local_db_version: u64::MAX,
353 highest_local_db_version: 0,
354 }
355 }
356
357 pub fn from_parts(
359 fields: HashMap<C, V>,
360 column_versions: HashMap<C, ColumnVersion>,
361 ) -> Self {
362 let mut lowest = u64::MAX;
363 let mut highest = 0;
364
365 for ver in column_versions.values() {
366 if ver.local_db_version < lowest {
367 lowest = ver.local_db_version;
368 }
369 if ver.local_db_version > highest {
370 highest = ver.local_db_version;
371 }
372 }
373
374 Self {
375 fields,
376 column_versions,
377 lowest_local_db_version: lowest,
378 highest_local_db_version: highest,
379 }
380 }
381}
382
383impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
384 fn eq(&self, other: &Self) -> bool {
385 self.fields == other.fields
387 }
388}
389
390impl<C: Hash + Eq, V> Default for Record<C, V> {
391 fn default() -> Self {
392 Self::new()
393 }
394}
395
396pub trait MergeRule<K, C, V> {
401 fn should_accept(
403 &self,
404 local_col: u64,
405 local_db: u64,
406 local_node: NodeId,
407 remote_col: u64,
408 remote_db: u64,
409 remote_node: NodeId,
410 ) -> bool;
411
412 fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
414 self.should_accept(
415 local.col_version,
416 local.db_version,
417 local.node_id,
418 remote.col_version,
419 remote.db_version,
420 remote.node_id,
421 )
422 }
423}
424
425#[derive(Debug, Clone, Copy, Default)]
432pub struct DefaultMergeRule;
433
434impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
435 fn should_accept(
436 &self,
437 local_col: u64,
438 local_db: u64,
439 local_node: NodeId,
440 remote_col: u64,
441 remote_db: u64,
442 remote_node: NodeId,
443 ) -> bool {
444 match remote_col.cmp(&local_col) {
445 Ordering::Greater => true,
446 Ordering::Less => false,
447 Ordering::Equal => match remote_db.cmp(&local_db) {
448 Ordering::Greater => true,
449 Ordering::Less => false,
450 Ordering::Equal => remote_node > local_node,
451 },
452 }
453 }
454}
455
456pub trait ChangeComparator<K, C, V> {
458 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
459}
460
461#[derive(Debug, Clone, Copy, Default)]
471pub struct DefaultChangeComparator;
472
473impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
474 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
475 match a.record_id.cmp(&b.record_id) {
477 Ordering::Equal => {}
478 ord => return ord,
479 }
480
481 match (a.col_name.as_ref(), b.col_name.as_ref()) {
483 (None, None) => {}
484 (None, Some(_)) => return Ordering::Greater,
485 (Some(_), None) => return Ordering::Less,
486 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
487 Ordering::Equal => {}
488 ord => return ord,
489 },
490 }
491
492 match b.col_version.cmp(&a.col_version) {
494 Ordering::Equal => {}
495 ord => return ord,
496 }
497
498 match b.db_version.cmp(&a.db_version) {
499 Ordering::Equal => {}
500 ord => return ord,
501 }
502
503 b.node_id.cmp(&a.node_id)
504 }
505}
506
507#[derive(Debug)]
511#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
512#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
513#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
514pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
515 node_id: NodeId,
516 clock: LogicalClock,
517 data: HashMap<K, Record<C, V>>,
518 tombstones: TombstoneStorage<K>,
519 #[cfg_attr(feature = "serde", serde(skip, default))]
520 parent: Option<Arc<CRDT<K, C, V>>>,
521 #[allow(dead_code)]
522 base_version: u64,
523}
524
525impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
526 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
533 let (clock, base_version) = if let Some(ref p) = parent {
534 let parent_clock = p.clock;
535 let base = parent_clock.current_time();
536 (parent_clock, base)
537 } else {
538 (LogicalClock::new(), 0)
539 };
540
541 Self {
542 node_id,
543 clock,
544 data: HashMap::new(),
545 tombstones: TombstoneStorage::new(),
546 parent,
547 base_version,
548 }
549 }
550
551 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
558 let mut crdt = Self::new(node_id, None);
559 crdt.apply_changes(changes);
560 crdt
561 }
562
563 pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
569 self.data.clear();
570 self.tombstones.clear();
571 self.clock = LogicalClock::new();
572 self.apply_changes(changes);
573 }
574
575 fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
577 let max_db_version = changes
579 .iter()
580 .map(|c| c.db_version.max(c.local_db_version))
581 .max()
582 .unwrap_or(0);
583
584 self.clock.set_time(max_db_version);
586
587 for change in changes {
589 let record_id = change.record_id.clone();
590 let col_name = change.col_name.clone();
591 let remote_col_version = change.col_version;
592 let remote_db_version = change.db_version;
593 let remote_node_id = change.node_id;
594 let remote_local_db_version = change.local_db_version;
595 let remote_value = change.value;
596
597 if col_name.is_none() {
598 self.data.remove(&record_id);
600
601 self.tombstones.insert_or_assign(
603 record_id,
604 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
605 );
606 } else if let Some(col_key) = col_name {
607 if !self.is_record_tombstoned(&record_id, false) {
609 let record = self.get_or_create_record_unchecked(&record_id, false);
610
611 if let Some(value) = remote_value {
613 record.fields.insert(col_key.clone(), value);
614 }
615
616 let col_ver = ColumnVersion::new(
618 remote_col_version,
619 remote_db_version,
620 remote_node_id,
621 remote_local_db_version,
622 );
623 record.column_versions.insert(col_key, col_ver);
624
625 if remote_local_db_version < record.lowest_local_db_version {
627 record.lowest_local_db_version = remote_local_db_version;
628 }
629 if remote_local_db_version > record.highest_local_db_version {
630 record.highest_local_db_version = remote_local_db_version;
631 }
632 }
633 }
634 }
635 }
636
637 #[must_use = "changes should be propagated to other nodes"]
648 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
649 where
650 I: IntoIterator<Item = (C, V)>,
651 {
652 self.insert_or_update_with_flags(record_id, 0, fields)
653 }
654
655 #[must_use = "changes should be propagated to other nodes"]
667 pub fn insert_or_update_with_flags<I>(
668 &mut self,
669 record_id: &K,
670 flags: u32,
671 fields: I,
672 ) -> Vec<Change<K, C, V>>
673 where
674 I: IntoIterator<Item = (C, V)>,
675 {
676 let db_version = self.clock.tick();
677
678 if self.is_record_tombstoned(record_id, false) {
680 return Vec::new();
681 }
682
683 let mut changes = Vec::new();
684 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
686
687 for (col_name, value) in fields {
688 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
689 col_info.col_version += 1;
690 col_info.db_version = db_version;
691 col_info.node_id = node_id;
692 col_info.local_db_version = db_version;
693 col_info.col_version
694 } else {
695 record.column_versions.insert(
696 col_name.clone(),
697 ColumnVersion::new(1, db_version, node_id, db_version),
698 );
699 1
700 };
701
702 if db_version < record.lowest_local_db_version {
704 record.lowest_local_db_version = db_version;
705 }
706 if db_version > record.highest_local_db_version {
707 record.highest_local_db_version = db_version;
708 }
709
710 record.fields.insert(col_name.clone(), value.clone());
711 changes.push(Change::new(
712 record_id.clone(),
713 Some(col_name),
714 Some(value),
715 col_version,
716 db_version,
717 node_id,
718 db_version,
719 flags,
720 ));
721 }
722
723 changes
724 }
725
726 #[must_use = "changes should be propagated to other nodes"]
736 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
737 self.delete_record_with_flags(record_id, 0)
738 }
739
740 #[must_use = "changes should be propagated to other nodes"]
751 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
752 if self.is_record_tombstoned(record_id, false) {
753 return None;
754 }
755
756 let db_version = self.clock.tick();
757
758 self.data.remove(record_id);
760
761 self.tombstones.insert_or_assign(
763 record_id.clone(),
764 TombstoneInfo::new(db_version, self.node_id, db_version),
765 );
766
767 Some(Change::new(
768 record_id.clone(),
769 None,
770 None,
771 TOMBSTONE_COL_VERSION,
772 db_version,
773 self.node_id,
774 db_version,
775 flags,
776 ))
777 }
778
779 pub fn merge_changes<R: MergeRule<K, C, V>>(
790 &mut self,
791 changes: Vec<Change<K, C, V>>,
792 merge_rule: &R,
793 ) -> Vec<Change<K, C, V>> {
794 self.merge_changes_impl(changes, false, merge_rule)
795 }
796
797 fn merge_changes_impl<R: MergeRule<K, C, V>>(
798 &mut self,
799 changes: Vec<Change<K, C, V>>,
800 ignore_parent: bool,
801 merge_rule: &R,
802 ) -> Vec<Change<K, C, V>> {
803 let mut accepted_changes = Vec::new();
804
805 if changes.is_empty() {
806 return accepted_changes;
807 }
808
809 for change in changes {
810 let Change {
812 record_id,
813 col_name,
814 value: remote_value,
815 col_version: remote_col_version,
816 db_version: remote_db_version,
817 node_id: remote_node_id,
818 flags,
819 ..
820 } = change;
821
822 let new_local_db_version = self.clock.update(remote_db_version);
824
825 if self.is_record_tombstoned(&record_id, ignore_parent) {
827 continue;
828 }
829
830 let local_col_info = if col_name.is_none() {
832 self
834 .tombstones
835 .find(&record_id)
836 .map(|info| info.as_column_version())
837 } else if let Some(ref col) = col_name {
838 self
840 .get_record_ptr(&record_id, ignore_parent)
841 .and_then(|record| record.column_versions.get(col).copied())
842 } else {
843 None
844 };
845
846 let should_accept = if let Some(local_info) = local_col_info {
848 merge_rule.should_accept(
849 local_info.col_version,
850 local_info.db_version,
851 local_info.node_id,
852 remote_col_version,
853 remote_db_version,
854 remote_node_id,
855 )
856 } else {
857 true
858 };
859
860 if should_accept {
861 if let Some(col_key) = col_name {
862 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
864
865 if let Some(value) = remote_value.clone() {
867 record.fields.insert(col_key.clone(), value);
868 } else {
869 record.fields.remove(&col_key);
871 }
872
873 record.column_versions.insert(
875 col_key.clone(),
876 ColumnVersion::new(
877 remote_col_version,
878 remote_db_version,
879 remote_node_id,
880 new_local_db_version,
881 ),
882 );
883
884 if new_local_db_version < record.lowest_local_db_version {
886 record.lowest_local_db_version = new_local_db_version;
887 }
888 if new_local_db_version > record.highest_local_db_version {
889 record.highest_local_db_version = new_local_db_version;
890 }
891
892 accepted_changes.push(Change::new(
893 record_id,
894 Some(col_key),
895 remote_value,
896 remote_col_version,
897 remote_db_version,
898 remote_node_id,
899 new_local_db_version,
900 flags,
901 ));
902 } else {
903 self.data.remove(&record_id);
905
906 self.tombstones.insert_or_assign(
908 record_id.clone(),
909 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
910 );
911
912 accepted_changes.push(Change::new(
913 record_id,
914 None,
915 None,
916 remote_col_version,
917 remote_db_version,
918 remote_node_id,
919 new_local_db_version,
920 flags,
921 ));
922 }
923 }
924 }
925
926 accepted_changes
927 }
928
929 #[must_use]
939 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
940 where
941 K: Ord,
942 C: Ord,
943 {
944 self.get_changes_since_excluding(last_db_version, &HashSet::new())
945 }
946
947 pub fn get_changes_since_excluding(
949 &self,
950 last_db_version: u64,
951 excluding: &HashSet<NodeId>,
952 ) -> Vec<Change<K, C, V>>
953 where
954 K: Ord,
955 C: Ord,
956 {
957 let mut changes = Vec::new();
958
959 if let Some(ref parent) = self.parent {
961 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
962 changes.extend(parent_changes);
963 }
964
965 for (record_id, record) in &self.data {
967 if record.highest_local_db_version <= last_db_version {
969 continue;
970 }
971
972 for (col_name, clock_info) in &record.column_versions {
973 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
974 {
975 let value = record.fields.get(col_name).cloned();
976
977 changes.push(Change::new(
978 record_id.clone(),
979 Some(col_name.clone()),
980 value,
981 clock_info.col_version,
982 clock_info.db_version,
983 clock_info.node_id,
984 clock_info.local_db_version,
985 0,
986 ));
987 }
988 }
989 }
990
991 for (record_id, tombstone_info) in self.tombstones.iter() {
993 if tombstone_info.local_db_version > last_db_version
994 && !excluding.contains(&tombstone_info.node_id)
995 {
996 changes.push(Change::new(
997 record_id.clone(),
998 None,
999 None,
1000 TOMBSTONE_COL_VERSION,
1001 tombstone_info.db_version,
1002 tombstone_info.node_id,
1003 tombstone_info.local_db_version,
1004 0,
1005 ));
1006 }
1007 }
1008
1009 if self.parent.is_some() {
1010 Self::compress_changes(&mut changes);
1012 }
1013
1014 changes
1015 }
1016
1017 pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1027 where
1028 K: Ord,
1029 C: Ord,
1030 {
1031 if changes.is_empty() {
1032 return;
1033 }
1034
1035 let comparator = DefaultChangeComparator;
1038 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1039
1040 let mut write = 0;
1042 for read in 1..changes.len() {
1043 if changes[read].record_id != changes[write].record_id {
1044 write += 1;
1046 if write != read {
1047 changes[write] = changes[read].clone();
1048 }
1049 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1050 let mut first_pos = write;
1053 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1054 first_pos -= 1;
1055 }
1056 changes[first_pos] = changes[read].clone();
1057 write = first_pos;
1058 } else if changes[read].col_name != changes[write].col_name
1059 && changes[write].col_name.is_some()
1060 {
1061 write += 1;
1063 if write != read {
1064 changes[write] = changes[read].clone();
1065 }
1066 }
1067 }
1069
1070 changes.truncate(write + 1);
1071 }
1072
1073 pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1075 self.get_record_ptr(record_id, false)
1076 }
1077
1078 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1080 self.is_record_tombstoned(record_id, false)
1081 }
1082
1083 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1085 if let Some(info) = self.tombstones.find(record_id) {
1086 return Some(info);
1087 }
1088
1089 if let Some(ref parent) = self.parent {
1090 return parent.get_tombstone(record_id);
1091 }
1092
1093 None
1094 }
1095
1096 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1117 self.tombstones.compact(min_acknowledged_version)
1118 }
1119
1120 pub fn tombstone_count(&self) -> usize {
1122 self.tombstones.len()
1123 }
1124
1125 pub fn get_clock(&self) -> &LogicalClock {
1127 &self.clock
1128 }
1129
1130 pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1132 &self.data
1133 }
1134
1135 #[cfg(feature = "json")]
1143 pub fn to_json(&self) -> Result<String, serde_json::Error>
1144 where
1145 K: serde::Serialize,
1146 C: serde::Serialize,
1147 V: serde::Serialize,
1148 {
1149 serde_json::to_string(self)
1150 }
1151
1152 #[cfg(feature = "json")]
1161 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1162 where
1163 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1164 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1165 V: serde::de::DeserializeOwned + Clone,
1166 {
1167 serde_json::from_str(json)
1168 }
1169
1170 #[cfg(feature = "binary")]
1178 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1179 where
1180 K: serde::Serialize,
1181 C: serde::Serialize,
1182 V: serde::Serialize,
1183 {
1184 bincode::serde::encode_to_vec(self, bincode::config::standard())
1185 }
1186
1187 #[cfg(feature = "binary")]
1196 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1197 where
1198 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1199 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1200 V: serde::de::DeserializeOwned + Clone,
1201 {
1202 let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1203 Ok(result)
1204 }
1205
1206 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1209 if self.tombstones.find(record_id).is_some() {
1210 return true;
1211 }
1212
1213 if !ignore_parent {
1214 if let Some(ref parent) = self.parent {
1215 return parent.is_record_tombstoned(record_id, false);
1216 }
1217 }
1218
1219 false
1220 }
1221
1222 fn get_or_create_record_unchecked(
1223 &mut self,
1224 record_id: &K,
1225 ignore_parent: bool,
1226 ) -> &mut Record<C, V> {
1227 #[cfg(feature = "std")]
1228 use std::collections::hash_map::Entry;
1229 #[cfg(all(not(feature = "std"), feature = "alloc"))]
1230 use hashbrown::hash_map::Entry;
1231
1232 match self.data.entry(record_id.clone()) {
1233 Entry::Occupied(e) => e.into_mut(),
1234 Entry::Vacant(e) => {
1235 let record = if !ignore_parent {
1236 self
1237 .parent
1238 .as_ref()
1239 .and_then(|p| p.get_record_ptr(record_id, false))
1240 .cloned()
1241 .unwrap_or_else(Record::new)
1242 } else {
1243 Record::new()
1244 };
1245 e.insert(record)
1246 }
1247 }
1248 }
1249
1250 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1251 if let Some(record) = self.data.get(record_id) {
1252 return Some(record);
1253 }
1254
1255 if !ignore_parent {
1256 if let Some(ref parent) = self.parent {
1257 return parent.get_record_ptr(record_id, false);
1258 }
1259 }
1260
1261 None
1262 }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267 use super::*;
1268
1269 #[test]
1270 fn test_logical_clock() {
1271 let mut clock = LogicalClock::new();
1272 assert_eq!(clock.current_time(), 0);
1273
1274 let t1 = clock.tick();
1275 assert_eq!(t1, 1);
1276 assert_eq!(clock.current_time(), 1);
1277
1278 let t2 = clock.update(5);
1279 assert_eq!(t2, 6);
1280 assert_eq!(clock.current_time(), 6);
1281 }
1282
1283 #[test]
1284 fn test_tombstone_storage() {
1285 let mut storage = TombstoneStorage::new();
1286 let info = TombstoneInfo::new(10, 1, 10);
1287
1288 storage.insert_or_assign("key1".to_string(), info);
1289 assert_eq!(storage.len(), 1);
1290
1291 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1292 assert_eq!(storage.find(&"key2".to_string()), None);
1293
1294 let removed = storage.compact(15);
1295 assert_eq!(removed, 1);
1296 assert_eq!(storage.len(), 0);
1297 }
1298
1299 #[test]
1300 fn test_basic_insert() {
1301 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1302
1303 let fields = vec![
1304 ("name".to_string(), "Alice".to_string()),
1305 ("age".to_string(), "30".to_string()),
1306 ];
1307
1308 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1309
1310 assert_eq!(changes.len(), 2);
1311 assert_eq!(crdt.get_data().len(), 1);
1312
1313 let record = crdt.get_record(&"user1".to_string()).unwrap();
1314 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1315 assert_eq!(record.fields.get("age").unwrap(), "30");
1316 }
1317
1318 #[test]
1319 fn test_delete_record() {
1320 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1321
1322 let fields = vec![("name".to_string(), "Bob".to_string())];
1323 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1324
1325 let delete_change = crdt.delete_record(&"user2".to_string());
1326 assert!(delete_change.is_some());
1327 assert!(crdt.is_tombstoned(&"user2".to_string()));
1328 assert_eq!(crdt.get_data().len(), 0);
1329 }
1330
1331 #[test]
1332 fn test_merge_changes() {
1333 let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1334 let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1335
1336 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1337 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1338
1339 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1340 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1341
1342 let merge_rule = DefaultMergeRule;
1343 crdt1.merge_changes(changes2, &merge_rule);
1344 crdt2.merge_changes(changes1, &merge_rule);
1345
1346 assert_eq!(
1348 crdt1
1349 .get_record(&"record1".to_string())
1350 .unwrap()
1351 .fields
1352 .get("tag")
1353 .unwrap(),
1354 "Node2"
1355 );
1356 assert_eq!(crdt1.get_data(), crdt2.get_data());
1357 }
1358
1359 #[test]
1360 #[cfg(feature = "serde")]
1361 fn test_change_serialization() {
1362 #[allow(unused_variables)]
1363 let change = Change::new(
1364 "record1".to_string(),
1365 Some("name".to_string()),
1366 Some("Alice".to_string()),
1367 1,
1368 10,
1369 1,
1370 10,
1371 0,
1372 );
1373
1374 #[cfg(feature = "json")]
1376 {
1377 let json = serde_json::to_string(&change).unwrap();
1378 let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1379 assert_eq!(change, deserialized);
1380 }
1381
1382 #[cfg(feature = "binary")]
1384 {
1385 let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1386 let (deserialized, _): (Change<String, String, String>, _) =
1387 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1388 assert_eq!(change, deserialized);
1389 }
1390 }
1391
1392 #[test]
1393 #[cfg(feature = "serde")]
1394 fn test_record_serialization() {
1395 let mut fields = HashMap::new();
1396 fields.insert("name".to_string(), "Bob".to_string());
1397 fields.insert("age".to_string(), "25".to_string());
1398
1399 let mut column_versions = HashMap::new();
1400 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1401 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1402
1403 #[allow(unused_variables)]
1404 let record = Record::from_parts(fields, column_versions);
1405
1406 #[cfg(feature = "json")]
1408 {
1409 let json = serde_json::to_string(&record).unwrap();
1410 let deserialized: Record<String> = serde_json::from_str(&json).unwrap();
1411 assert_eq!(record, deserialized);
1412 }
1413
1414 #[cfg(feature = "binary")]
1416 {
1417 let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1418 let (deserialized, _): (Record<String>, _) =
1419 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1420 assert_eq!(record, deserialized);
1421 }
1422 }
1423
1424 #[test]
1425 #[cfg(feature = "json")]
1426 fn test_crdt_json_serialization() {
1427 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1428
1429 let fields = vec![
1431 ("name".to_string(), "Alice".to_string()),
1432 ("age".to_string(), "30".to_string()),
1433 ];
1434 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1435
1436 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1437 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1438
1439 let _ = crdt.delete_record(&"user2".to_string());
1441
1442 let json = crdt.to_json().unwrap();
1444
1445 let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1447
1448 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1450 assert_eq!(
1451 crdt.get_record(&"user1".to_string()).unwrap().fields,
1452 deserialized.get_record(&"user1".to_string()).unwrap().fields
1453 );
1454
1455 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1457 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1458
1459 assert_eq!(
1461 crdt.get_clock().current_time(),
1462 deserialized.get_clock().current_time()
1463 );
1464
1465 let has_parent = deserialized.parent.is_some();
1467 assert!(!has_parent);
1468 }
1469
1470 #[test]
1471 #[cfg(feature = "binary")]
1472 fn test_crdt_binary_serialization() {
1473 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1474
1475 let fields = vec![
1477 ("name".to_string(), "Alice".to_string()),
1478 ("age".to_string(), "30".to_string()),
1479 ];
1480 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1481
1482 let bytes = crdt.to_bytes().unwrap();
1484
1485 let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1487
1488 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1490 assert_eq!(
1491 crdt.get_record(&"user1".to_string()).unwrap().fields,
1492 deserialized.get_record(&"user1".to_string()).unwrap().fields
1493 );
1494
1495 assert_eq!(
1497 crdt.get_clock().current_time(),
1498 deserialized.get_clock().current_time()
1499 );
1500 }
1501
1502 #[test]
1503 #[cfg(feature = "serde")]
1504 fn test_parent_not_serialized() {
1505 let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1507 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1508 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1509
1510 let parent_arc = Arc::new(parent);
1512 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1513 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1514 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1515
1516 #[cfg(feature = "json")]
1518 {
1519 let json = serde_json::to_string(&child).unwrap();
1520 let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1521
1522 assert!(deserialized.parent.is_none());
1524
1525 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1527
1528 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1530 }
1531 }
1532}