1#![cfg_attr(not(feature = "std"), no_std)]
87
88#[cfg(feature = "persist")]
90pub mod persist;
91
92#[cfg(not(any(feature = "std", feature = "alloc")))]
94compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
95
96#[cfg(not(feature = "std"))]
97extern crate alloc;
98
99#[cfg(feature = "std")]
100use std::{
101 cmp::Ordering,
102 collections::{HashMap, HashSet},
103 hash::Hash,
104 sync::Arc,
105};
106
107#[cfg(not(feature = "std"))]
108use alloc::{
109 string::String,
110 sync::Arc,
111 vec::Vec,
112};
113#[cfg(not(feature = "std"))]
114use core::{cmp::Ordering, hash::Hash};
115#[cfg(all(not(feature = "std"), feature = "alloc"))]
116use hashbrown::{HashMap, HashSet};
117
118#[cfg(feature = "node-id-u128")]
125pub type NodeId = u128;
126
127#[cfg(not(feature = "node-id-u128"))]
128pub type NodeId = u64;
129
130pub type ColumnKey = String;
132
133const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
136
137#[derive(Debug, Clone, PartialEq)]
144#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
145#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
146#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
147pub struct Change<K, C, V> {
148 pub record_id: K,
149 pub col_name: Option<C>,
151 pub value: Option<V>,
153 pub col_version: u64,
154 pub db_version: u64,
155 pub node_id: NodeId,
156 pub local_db_version: u64,
158 pub flags: u32,
160}
161
162impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
163
164impl<K, C, V> Change<K, C, V> {
165 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 record_id: K,
169 col_name: Option<C>,
170 value: Option<V>,
171 col_version: u64,
172 db_version: u64,
173 node_id: NodeId,
174 local_db_version: u64,
175 flags: u32,
176 ) -> Self {
177 Self {
178 record_id,
179 col_name,
180 value,
181 col_version,
182 db_version,
183 node_id,
184 local_db_version,
185 flags,
186 }
187 }
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
193pub struct ColumnVersion {
194 pub col_version: u64,
195 pub db_version: u64,
196 pub node_id: NodeId,
197 pub local_db_version: u64,
199}
200
201impl ColumnVersion {
202 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
203 Self {
204 col_version,
205 db_version,
206 node_id,
207 local_db_version,
208 }
209 }
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
218pub struct TombstoneInfo {
219 pub db_version: u64,
220 pub node_id: NodeId,
221 pub local_db_version: u64,
222}
223
224impl TombstoneInfo {
225 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
226 Self {
227 db_version,
228 node_id,
229 local_db_version,
230 }
231 }
232
233 pub fn as_column_version(&self) -> ColumnVersion {
235 ColumnVersion::new(
236 TOMBSTONE_COL_VERSION,
237 self.db_version,
238 self.node_id,
239 self.local_db_version,
240 )
241 }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
247pub struct LogicalClock {
248 time: u64,
249}
250
251impl LogicalClock {
252 pub fn new() -> Self {
254 Self { time: 0 }
255 }
256
257 pub fn tick(&mut self) -> u64 {
259 self.time += 1;
260 self.time
261 }
262
263 pub fn update(&mut self, received_time: u64) -> u64 {
265 self.time = self.time.max(received_time);
266 self.time += 1;
267 self.time
268 }
269
270 pub fn set_time(&mut self, time: u64) {
272 self.time = time;
273 }
274
275 pub fn current_time(&self) -> u64 {
277 self.time
278 }
279}
280
281impl Default for LogicalClock {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287#[derive(Debug, Clone)]
291#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
292pub struct TombstoneStorage<K: Hash + Eq> {
293 entries: HashMap<K, TombstoneInfo>,
294}
295
296impl<K: Hash + Eq> TombstoneStorage<K> {
297 pub fn new() -> Self {
298 Self {
299 entries: HashMap::new(),
300 }
301 }
302
303 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
304 self.entries.insert(key, info);
305 }
306
307 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
308 self.entries.get(key).copied()
309 }
310
311 pub fn erase(&mut self, key: &K) -> bool {
312 self.entries.remove(key).is_some()
313 }
314
315 pub fn clear(&mut self) {
316 self.entries.clear();
317 }
318
319 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
320 self.entries.iter()
321 }
322
323 pub fn len(&self) -> usize {
324 self.entries.len()
325 }
326
327 pub fn is_empty(&self) -> bool {
328 self.entries.is_empty()
329 }
330
331 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
335 let initial_len = self.entries.len();
336 self
337 .entries
338 .retain(|_, info| info.db_version >= min_acknowledged_version);
339 initial_len - self.entries.len()
340 }
341}
342
343impl<K: Hash + Eq> Default for TombstoneStorage<K> {
344 fn default() -> Self {
345 Self::new()
346 }
347}
348
349#[derive(Debug, Clone)]
351#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
352#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
353#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
354pub struct Record<C, V> {
355 pub fields: HashMap<C, V>,
356 pub column_versions: HashMap<C, ColumnVersion>,
357 pub lowest_local_db_version: u64,
359 pub highest_local_db_version: u64,
360}
361
362impl<C: Hash + Eq, V> Record<C, V> {
363 pub fn new() -> Self {
364 Self {
365 fields: HashMap::new(),
366 column_versions: HashMap::new(),
367 lowest_local_db_version: u64::MAX,
368 highest_local_db_version: 0,
369 }
370 }
371
372 pub fn from_parts(
374 fields: HashMap<C, V>,
375 column_versions: HashMap<C, ColumnVersion>,
376 ) -> Self {
377 let mut lowest = u64::MAX;
378 let mut highest = 0;
379
380 for ver in column_versions.values() {
381 if ver.local_db_version < lowest {
382 lowest = ver.local_db_version;
383 }
384 if ver.local_db_version > highest {
385 highest = ver.local_db_version;
386 }
387 }
388
389 Self {
390 fields,
391 column_versions,
392 lowest_local_db_version: lowest,
393 highest_local_db_version: highest,
394 }
395 }
396}
397
398impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
399 fn eq(&self, other: &Self) -> bool {
400 self.fields == other.fields
402 }
403}
404
405impl<C: Hash + Eq, V> Default for Record<C, V> {
406 fn default() -> Self {
407 Self::new()
408 }
409}
410
411pub trait MergeRule<K, C, V> {
416 fn should_accept(
418 &self,
419 local_col: u64,
420 local_db: u64,
421 local_node: NodeId,
422 remote_col: u64,
423 remote_db: u64,
424 remote_node: NodeId,
425 ) -> bool;
426
427 fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
429 self.should_accept(
430 local.col_version,
431 local.db_version,
432 local.node_id,
433 remote.col_version,
434 remote.db_version,
435 remote.node_id,
436 )
437 }
438}
439
440#[derive(Debug, Clone, Copy, Default)]
447pub struct DefaultMergeRule;
448
449impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
450 fn should_accept(
451 &self,
452 local_col: u64,
453 local_db: u64,
454 local_node: NodeId,
455 remote_col: u64,
456 remote_db: u64,
457 remote_node: NodeId,
458 ) -> bool {
459 match remote_col.cmp(&local_col) {
460 Ordering::Greater => true,
461 Ordering::Less => false,
462 Ordering::Equal => match remote_db.cmp(&local_db) {
463 Ordering::Greater => true,
464 Ordering::Less => false,
465 Ordering::Equal => remote_node > local_node,
466 },
467 }
468 }
469}
470
471pub trait ChangeComparator<K, C, V> {
473 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
474}
475
476#[derive(Debug, Clone, Copy, Default)]
486pub struct DefaultChangeComparator;
487
488impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
489 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
490 match a.record_id.cmp(&b.record_id) {
492 Ordering::Equal => {}
493 ord => return ord,
494 }
495
496 match (a.col_name.as_ref(), b.col_name.as_ref()) {
498 (None, None) => {}
499 (None, Some(_)) => return Ordering::Greater,
500 (Some(_), None) => return Ordering::Less,
501 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
502 Ordering::Equal => {}
503 ord => return ord,
504 },
505 }
506
507 match b.col_version.cmp(&a.col_version) {
509 Ordering::Equal => {}
510 ord => return ord,
511 }
512
513 match b.db_version.cmp(&a.db_version) {
514 Ordering::Equal => {}
515 ord => return ord,
516 }
517
518 b.node_id.cmp(&a.node_id)
519 }
520}
521
522#[derive(Debug)]
526#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
527#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
528#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
529pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
530 node_id: NodeId,
531 clock: LogicalClock,
532 data: HashMap<K, Record<C, V>>,
533 tombstones: TombstoneStorage<K>,
534 #[cfg_attr(feature = "serde", serde(skip, default))]
535 parent: Option<Arc<CRDT<K, C, V>>>,
536 #[allow(dead_code)]
537 base_version: u64,
538}
539
540impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
541 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
548 let (clock, base_version) = if let Some(ref p) = parent {
549 let parent_clock = p.clock;
550 let base = parent_clock.current_time();
551 (parent_clock, base)
552 } else {
553 (LogicalClock::new(), 0)
554 };
555
556 Self {
557 node_id,
558 clock,
559 data: HashMap::new(),
560 tombstones: TombstoneStorage::new(),
561 parent,
562 base_version,
563 }
564 }
565
566 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
573 let mut crdt = Self::new(node_id, None);
574 crdt.apply_changes(changes);
575 crdt
576 }
577
578 pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
584 self.data.clear();
585 self.tombstones.clear();
586 self.clock = LogicalClock::new();
587 self.apply_changes(changes);
588 }
589
590 fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
592 let max_db_version = changes
594 .iter()
595 .map(|c| c.db_version.max(c.local_db_version))
596 .max()
597 .unwrap_or(0);
598
599 self.clock.set_time(max_db_version);
601
602 for change in changes {
604 let record_id = change.record_id.clone();
605 let col_name = change.col_name.clone();
606 let remote_col_version = change.col_version;
607 let remote_db_version = change.db_version;
608 let remote_node_id = change.node_id;
609 let remote_local_db_version = change.local_db_version;
610 let remote_value = change.value;
611
612 if col_name.is_none() {
613 self.data.remove(&record_id);
615
616 self.tombstones.insert_or_assign(
618 record_id,
619 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
620 );
621 } else if let Some(col_key) = col_name {
622 if !self.is_record_tombstoned(&record_id, false) {
624 let record = self.get_or_create_record_unchecked(&record_id, false);
625
626 if let Some(value) = remote_value {
628 record.fields.insert(col_key.clone(), value);
629 }
630
631 let col_ver = ColumnVersion::new(
633 remote_col_version,
634 remote_db_version,
635 remote_node_id,
636 remote_local_db_version,
637 );
638 record.column_versions.insert(col_key, col_ver);
639
640 if remote_local_db_version < record.lowest_local_db_version {
642 record.lowest_local_db_version = remote_local_db_version;
643 }
644 if remote_local_db_version > record.highest_local_db_version {
645 record.highest_local_db_version = remote_local_db_version;
646 }
647 }
648 }
649 }
650 }
651
652 #[must_use = "changes should be propagated to other nodes"]
663 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
664 where
665 I: IntoIterator<Item = (C, V)>,
666 {
667 self.insert_or_update_with_flags(record_id, 0, fields)
668 }
669
670 #[must_use = "changes should be propagated to other nodes"]
682 pub fn insert_or_update_with_flags<I>(
683 &mut self,
684 record_id: &K,
685 flags: u32,
686 fields: I,
687 ) -> Vec<Change<K, C, V>>
688 where
689 I: IntoIterator<Item = (C, V)>,
690 {
691 let db_version = self.clock.tick();
692
693 if self.is_record_tombstoned(record_id, false) {
695 return Vec::new();
696 }
697
698 let mut changes = Vec::new();
699 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
701
702 for (col_name, value) in fields {
703 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
704 col_info.col_version += 1;
705 col_info.db_version = db_version;
706 col_info.node_id = node_id;
707 col_info.local_db_version = db_version;
708 col_info.col_version
709 } else {
710 record.column_versions.insert(
711 col_name.clone(),
712 ColumnVersion::new(1, db_version, node_id, db_version),
713 );
714 1
715 };
716
717 if db_version < record.lowest_local_db_version {
719 record.lowest_local_db_version = db_version;
720 }
721 if db_version > record.highest_local_db_version {
722 record.highest_local_db_version = db_version;
723 }
724
725 record.fields.insert(col_name.clone(), value.clone());
726 changes.push(Change::new(
727 record_id.clone(),
728 Some(col_name),
729 Some(value),
730 col_version,
731 db_version,
732 node_id,
733 db_version,
734 flags,
735 ));
736 }
737
738 changes
739 }
740
741 #[must_use = "changes should be propagated to other nodes"]
751 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
752 self.delete_record_with_flags(record_id, 0)
753 }
754
755 #[must_use = "changes should be propagated to other nodes"]
766 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
767 if self.is_record_tombstoned(record_id, false) {
768 return None;
769 }
770
771 let db_version = self.clock.tick();
772
773 self.data.remove(record_id);
775
776 self.tombstones.insert_or_assign(
778 record_id.clone(),
779 TombstoneInfo::new(db_version, self.node_id, db_version),
780 );
781
782 Some(Change::new(
783 record_id.clone(),
784 None,
785 None,
786 TOMBSTONE_COL_VERSION,
787 db_version,
788 self.node_id,
789 db_version,
790 flags,
791 ))
792 }
793
794 #[must_use = "changes should be propagated to other nodes"]
808 pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
809 self.delete_field_with_flags(record_id, field_name, 0)
810 }
811
812 #[must_use = "changes should be propagated to other nodes"]
827 pub fn delete_field_with_flags(
828 &mut self,
829 record_id: &K,
830 field_name: &C,
831 flags: u32,
832 ) -> Option<Change<K, C, V>> {
833 if self.is_record_tombstoned(record_id, false) {
835 return None;
836 }
837
838 let record = self.data.get_mut(record_id)?;
840
841 if !record.fields.contains_key(field_name) {
843 return None;
844 }
845
846 let db_version = self.clock.tick();
847
848 let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
850 col_info.col_version += 1;
851 col_info.db_version = db_version;
852 col_info.node_id = self.node_id;
853 col_info.local_db_version = db_version;
854 col_info.col_version
855 } else {
856 record.column_versions.insert(
858 field_name.clone(),
859 ColumnVersion::new(1, db_version, self.node_id, db_version),
860 );
861 1
862 };
863
864 if db_version < record.lowest_local_db_version {
866 record.lowest_local_db_version = db_version;
867 }
868 if db_version > record.highest_local_db_version {
869 record.highest_local_db_version = db_version;
870 }
871
872 record.fields.remove(field_name);
874
875 Some(Change::new(
876 record_id.clone(),
877 Some(field_name.clone()),
878 None, col_version,
880 db_version,
881 self.node_id,
882 db_version,
883 flags,
884 ))
885 }
886
887 pub fn merge_changes<R: MergeRule<K, C, V>>(
898 &mut self,
899 changes: Vec<Change<K, C, V>>,
900 merge_rule: &R,
901 ) -> Vec<Change<K, C, V>> {
902 self.merge_changes_impl(changes, false, merge_rule)
903 }
904
905 fn merge_changes_impl<R: MergeRule<K, C, V>>(
906 &mut self,
907 changes: Vec<Change<K, C, V>>,
908 ignore_parent: bool,
909 merge_rule: &R,
910 ) -> Vec<Change<K, C, V>> {
911 let mut accepted_changes = Vec::new();
912
913 if changes.is_empty() {
914 return accepted_changes;
915 }
916
917 for change in changes {
918 let Change {
920 record_id,
921 col_name,
922 value: remote_value,
923 col_version: remote_col_version,
924 db_version: remote_db_version,
925 node_id: remote_node_id,
926 flags,
927 ..
928 } = change;
929
930 let new_local_db_version = self.clock.update(remote_db_version);
932
933 if self.is_record_tombstoned(&record_id, ignore_parent) {
935 continue;
936 }
937
938 let local_col_info = if col_name.is_none() {
940 self
942 .tombstones
943 .find(&record_id)
944 .map(|info| info.as_column_version())
945 } else if let Some(ref col) = col_name {
946 self
948 .get_record_ptr(&record_id, ignore_parent)
949 .and_then(|record| record.column_versions.get(col).copied())
950 } else {
951 None
952 };
953
954 let should_accept = if let Some(local_info) = local_col_info {
956 merge_rule.should_accept(
957 local_info.col_version,
958 local_info.db_version,
959 local_info.node_id,
960 remote_col_version,
961 remote_db_version,
962 remote_node_id,
963 )
964 } else {
965 true
966 };
967
968 if should_accept {
969 if let Some(col_key) = col_name {
970 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
972
973 if let Some(value) = remote_value.clone() {
975 record.fields.insert(col_key.clone(), value);
976 } else {
977 record.fields.remove(&col_key);
979 }
980
981 record.column_versions.insert(
983 col_key.clone(),
984 ColumnVersion::new(
985 remote_col_version,
986 remote_db_version,
987 remote_node_id,
988 new_local_db_version,
989 ),
990 );
991
992 if new_local_db_version < record.lowest_local_db_version {
994 record.lowest_local_db_version = new_local_db_version;
995 }
996 if new_local_db_version > record.highest_local_db_version {
997 record.highest_local_db_version = new_local_db_version;
998 }
999
1000 accepted_changes.push(Change::new(
1001 record_id,
1002 Some(col_key),
1003 remote_value,
1004 remote_col_version,
1005 remote_db_version,
1006 remote_node_id,
1007 new_local_db_version,
1008 flags,
1009 ));
1010 } else {
1011 self.data.remove(&record_id);
1013
1014 self.tombstones.insert_or_assign(
1016 record_id.clone(),
1017 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1018 );
1019
1020 accepted_changes.push(Change::new(
1021 record_id,
1022 None,
1023 None,
1024 remote_col_version,
1025 remote_db_version,
1026 remote_node_id,
1027 new_local_db_version,
1028 flags,
1029 ));
1030 }
1031 }
1032 }
1033
1034 accepted_changes
1035 }
1036
1037 #[must_use]
1047 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1048 where
1049 K: Ord,
1050 C: Ord,
1051 {
1052 self.get_changes_since_excluding(last_db_version, &HashSet::new())
1053 }
1054
1055 pub fn get_changes_since_excluding(
1057 &self,
1058 last_db_version: u64,
1059 excluding: &HashSet<NodeId>,
1060 ) -> Vec<Change<K, C, V>>
1061 where
1062 K: Ord,
1063 C: Ord,
1064 {
1065 let mut changes = Vec::new();
1066
1067 if let Some(ref parent) = self.parent {
1069 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1070 changes.extend(parent_changes);
1071 }
1072
1073 for (record_id, record) in &self.data {
1075 if record.highest_local_db_version <= last_db_version {
1077 continue;
1078 }
1079
1080 for (col_name, clock_info) in &record.column_versions {
1081 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1082 {
1083 let value = record.fields.get(col_name).cloned();
1084
1085 changes.push(Change::new(
1086 record_id.clone(),
1087 Some(col_name.clone()),
1088 value,
1089 clock_info.col_version,
1090 clock_info.db_version,
1091 clock_info.node_id,
1092 clock_info.local_db_version,
1093 0,
1094 ));
1095 }
1096 }
1097 }
1098
1099 for (record_id, tombstone_info) in self.tombstones.iter() {
1101 if tombstone_info.local_db_version > last_db_version
1102 && !excluding.contains(&tombstone_info.node_id)
1103 {
1104 changes.push(Change::new(
1105 record_id.clone(),
1106 None,
1107 None,
1108 TOMBSTONE_COL_VERSION,
1109 tombstone_info.db_version,
1110 tombstone_info.node_id,
1111 tombstone_info.local_db_version,
1112 0,
1113 ));
1114 }
1115 }
1116
1117 if self.parent.is_some() {
1118 Self::compress_changes(&mut changes);
1120 }
1121
1122 changes
1123 }
1124
1125 pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1135 where
1136 K: Ord,
1137 C: Ord,
1138 {
1139 if changes.is_empty() {
1140 return;
1141 }
1142
1143 let comparator = DefaultChangeComparator;
1146 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1147
1148 let mut write = 0;
1150 for read in 1..changes.len() {
1151 if changes[read].record_id != changes[write].record_id {
1152 write += 1;
1154 if write != read {
1155 changes[write] = changes[read].clone();
1156 }
1157 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1158 let mut first_pos = write;
1161 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1162 first_pos -= 1;
1163 }
1164 changes[first_pos] = changes[read].clone();
1165 write = first_pos;
1166 } else if changes[read].col_name != changes[write].col_name
1167 && changes[write].col_name.is_some()
1168 {
1169 write += 1;
1171 if write != read {
1172 changes[write] = changes[read].clone();
1173 }
1174 }
1175 }
1177
1178 changes.truncate(write + 1);
1179 }
1180
1181 pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1183 self.get_record_ptr(record_id, false)
1184 }
1185
1186 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1188 self.is_record_tombstoned(record_id, false)
1189 }
1190
1191 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1193 if let Some(info) = self.tombstones.find(record_id) {
1194 return Some(info);
1195 }
1196
1197 if let Some(ref parent) = self.parent {
1198 return parent.get_tombstone(record_id);
1199 }
1200
1201 None
1202 }
1203
1204 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1225 self.tombstones.compact(min_acknowledged_version)
1226 }
1227
1228 pub fn tombstone_count(&self) -> usize {
1230 self.tombstones.len()
1231 }
1232
1233 pub fn get_clock(&self) -> &LogicalClock {
1235 &self.clock
1236 }
1237
1238 pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1240 &self.data
1241 }
1242
1243 #[cfg(feature = "json")]
1251 pub fn to_json(&self) -> Result<String, serde_json::Error>
1252 where
1253 K: serde::Serialize,
1254 C: serde::Serialize,
1255 V: serde::Serialize,
1256 {
1257 serde_json::to_string(self)
1258 }
1259
1260 #[cfg(feature = "json")]
1269 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1270 where
1271 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1272 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1273 V: serde::de::DeserializeOwned + Clone,
1274 {
1275 serde_json::from_str(json)
1276 }
1277
1278 #[cfg(feature = "binary")]
1286 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1287 where
1288 K: serde::Serialize,
1289 C: serde::Serialize,
1290 V: serde::Serialize,
1291 {
1292 bincode::serde::encode_to_vec(self, bincode::config::standard())
1293 }
1294
1295 #[cfg(feature = "binary")]
1304 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1305 where
1306 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1307 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1308 V: serde::de::DeserializeOwned + Clone,
1309 {
1310 let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1311 Ok(result)
1312 }
1313
1314 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1317 if self.tombstones.find(record_id).is_some() {
1318 return true;
1319 }
1320
1321 if !ignore_parent {
1322 if let Some(ref parent) = self.parent {
1323 return parent.is_record_tombstoned(record_id, false);
1324 }
1325 }
1326
1327 false
1328 }
1329
1330 fn get_or_create_record_unchecked(
1331 &mut self,
1332 record_id: &K,
1333 ignore_parent: bool,
1334 ) -> &mut Record<C, V> {
1335 #[cfg(feature = "std")]
1336 use std::collections::hash_map::Entry;
1337 #[cfg(all(not(feature = "std"), feature = "alloc"))]
1338 use hashbrown::hash_map::Entry;
1339
1340 match self.data.entry(record_id.clone()) {
1341 Entry::Occupied(e) => e.into_mut(),
1342 Entry::Vacant(e) => {
1343 let record = if !ignore_parent {
1344 self
1345 .parent
1346 .as_ref()
1347 .and_then(|p| p.get_record_ptr(record_id, false))
1348 .cloned()
1349 .unwrap_or_else(Record::new)
1350 } else {
1351 Record::new()
1352 };
1353 e.insert(record)
1354 }
1355 }
1356 }
1357
1358 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1359 if let Some(record) = self.data.get(record_id) {
1360 return Some(record);
1361 }
1362
1363 if !ignore_parent {
1364 if let Some(ref parent) = self.parent {
1365 return parent.get_record_ptr(record_id, false);
1366 }
1367 }
1368
1369 None
1370 }
1371}
1372
1373#[cfg(test)]
1374mod tests {
1375 use super::*;
1376
1377 #[test]
1378 fn test_logical_clock() {
1379 let mut clock = LogicalClock::new();
1380 assert_eq!(clock.current_time(), 0);
1381
1382 let t1 = clock.tick();
1383 assert_eq!(t1, 1);
1384 assert_eq!(clock.current_time(), 1);
1385
1386 let t2 = clock.update(5);
1387 assert_eq!(t2, 6);
1388 assert_eq!(clock.current_time(), 6);
1389 }
1390
1391 #[test]
1392 fn test_tombstone_storage() {
1393 let mut storage = TombstoneStorage::new();
1394 let info = TombstoneInfo::new(10, 1, 10);
1395
1396 storage.insert_or_assign("key1".to_string(), info);
1397 assert_eq!(storage.len(), 1);
1398
1399 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1400 assert_eq!(storage.find(&"key2".to_string()), None);
1401
1402 let removed = storage.compact(15);
1403 assert_eq!(removed, 1);
1404 assert_eq!(storage.len(), 0);
1405 }
1406
1407 #[test]
1408 fn test_basic_insert() {
1409 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1410
1411 let fields = vec![
1412 ("name".to_string(), "Alice".to_string()),
1413 ("age".to_string(), "30".to_string()),
1414 ];
1415
1416 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1417
1418 assert_eq!(changes.len(), 2);
1419 assert_eq!(crdt.get_data().len(), 1);
1420
1421 let record = crdt.get_record(&"user1".to_string()).unwrap();
1422 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1423 assert_eq!(record.fields.get("age").unwrap(), "30");
1424 }
1425
1426 #[test]
1427 fn test_delete_record() {
1428 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1429
1430 let fields = vec![("name".to_string(), "Bob".to_string())];
1431 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1432
1433 let delete_change = crdt.delete_record(&"user2".to_string());
1434 assert!(delete_change.is_some());
1435 assert!(crdt.is_tombstoned(&"user2".to_string()));
1436 assert_eq!(crdt.get_data().len(), 0);
1437 }
1438
1439 #[test]
1440 fn test_merge_changes() {
1441 let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1442 let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1443
1444 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1445 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1446
1447 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1448 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1449
1450 let merge_rule = DefaultMergeRule;
1451 crdt1.merge_changes(changes2, &merge_rule);
1452 crdt2.merge_changes(changes1, &merge_rule);
1453
1454 assert_eq!(
1456 crdt1
1457 .get_record(&"record1".to_string())
1458 .unwrap()
1459 .fields
1460 .get("tag")
1461 .unwrap(),
1462 "Node2"
1463 );
1464 assert_eq!(crdt1.get_data(), crdt2.get_data());
1465 }
1466
1467 #[test]
1468 #[cfg(feature = "serde")]
1469 fn test_change_serialization() {
1470 #[allow(unused_variables)]
1471 let change = Change::new(
1472 "record1".to_string(),
1473 Some("name".to_string()),
1474 Some("Alice".to_string()),
1475 1,
1476 10,
1477 1,
1478 10,
1479 0,
1480 );
1481
1482 #[cfg(feature = "json")]
1484 {
1485 let json = serde_json::to_string(&change).unwrap();
1486 let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1487 assert_eq!(change, deserialized);
1488 }
1489
1490 #[cfg(feature = "binary")]
1492 {
1493 let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1494 let (deserialized, _): (Change<String, String, String>, _) =
1495 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1496 assert_eq!(change, deserialized);
1497 }
1498 }
1499
1500 #[test]
1501 #[cfg(feature = "serde")]
1502 fn test_record_serialization() {
1503 let mut fields = HashMap::new();
1504 fields.insert("name".to_string(), "Bob".to_string());
1505 fields.insert("age".to_string(), "25".to_string());
1506
1507 let mut column_versions = HashMap::new();
1508 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1509 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1510
1511 #[allow(unused_variables)]
1512 let record = Record::from_parts(fields, column_versions);
1513
1514 #[cfg(feature = "json")]
1516 {
1517 let json = serde_json::to_string(&record).unwrap();
1518 let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1519 assert_eq!(record, deserialized);
1520 }
1521
1522 #[cfg(feature = "binary")]
1524 {
1525 let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1526 let (deserialized, _): (Record<String, String>, _) =
1527 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1528 assert_eq!(record, deserialized);
1529 }
1530 }
1531
1532 #[test]
1533 #[cfg(feature = "json")]
1534 fn test_crdt_json_serialization() {
1535 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1536
1537 let fields = vec![
1539 ("name".to_string(), "Alice".to_string()),
1540 ("age".to_string(), "30".to_string()),
1541 ];
1542 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1543
1544 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1545 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1546
1547 let _ = crdt.delete_record(&"user2".to_string());
1549
1550 let json = crdt.to_json().unwrap();
1552
1553 let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1555
1556 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1558 assert_eq!(
1559 crdt.get_record(&"user1".to_string()).unwrap().fields,
1560 deserialized.get_record(&"user1".to_string()).unwrap().fields
1561 );
1562
1563 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1565 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1566
1567 assert_eq!(
1569 crdt.get_clock().current_time(),
1570 deserialized.get_clock().current_time()
1571 );
1572
1573 let has_parent = deserialized.parent.is_some();
1575 assert!(!has_parent);
1576 }
1577
1578 #[test]
1579 #[cfg(feature = "binary")]
1580 fn test_crdt_binary_serialization() {
1581 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1582
1583 let fields = vec![
1585 ("name".to_string(), "Alice".to_string()),
1586 ("age".to_string(), "30".to_string()),
1587 ];
1588 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1589
1590 let bytes = crdt.to_bytes().unwrap();
1592
1593 let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1595
1596 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1598 assert_eq!(
1599 crdt.get_record(&"user1".to_string()).unwrap().fields,
1600 deserialized.get_record(&"user1".to_string()).unwrap().fields
1601 );
1602
1603 assert_eq!(
1605 crdt.get_clock().current_time(),
1606 deserialized.get_clock().current_time()
1607 );
1608 }
1609
1610 #[test]
1611 #[cfg(feature = "serde")]
1612 fn test_parent_not_serialized() {
1613 let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1615 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1616 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1617
1618 let parent_arc = Arc::new(parent);
1620 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1621 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1622 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1623
1624 #[cfg(feature = "json")]
1626 {
1627 let json = serde_json::to_string(&child).unwrap();
1628 let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1629
1630 assert!(deserialized.parent.is_none());
1632
1633 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1635
1636 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1638 }
1639 }
1640}