1#![cfg_attr(not(feature = "std"), no_std)]
86
87#[cfg(not(any(feature = "std", feature = "alloc")))]
89compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
90
91#[cfg(not(feature = "std"))]
92extern crate alloc;
93
94#[cfg(feature = "std")]
95use std::{
96 cmp::Ordering,
97 collections::{HashMap, HashSet},
98 hash::Hash,
99 sync::Arc,
100};
101
102#[cfg(not(feature = "std"))]
103use alloc::{
104 string::String,
105 sync::Arc,
106 vec::Vec,
107};
108#[cfg(not(feature = "std"))]
109use core::{cmp::Ordering, hash::Hash};
110#[cfg(all(not(feature = "std"), feature = "alloc"))]
111use hashbrown::{HashMap, HashSet};
112
113#[cfg(feature = "node-id-u128")]
120pub type NodeId = u128;
121
122#[cfg(not(feature = "node-id-u128"))]
123pub type NodeId = u64;
124
125pub type ColumnKey = String;
127
128const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
131
132#[derive(Debug, Clone, PartialEq)]
139#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
140#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
141#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
142pub struct Change<K, C, V> {
143 pub record_id: K,
144 pub col_name: Option<C>,
146 pub value: Option<V>,
148 pub col_version: u64,
149 pub db_version: u64,
150 pub node_id: NodeId,
151 pub local_db_version: u64,
153 pub flags: u32,
155}
156
157impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
158
159impl<K, C, V> Change<K, C, V> {
160 #[allow(clippy::too_many_arguments)]
162 pub fn new(
163 record_id: K,
164 col_name: Option<C>,
165 value: Option<V>,
166 col_version: u64,
167 db_version: u64,
168 node_id: NodeId,
169 local_db_version: u64,
170 flags: u32,
171 ) -> Self {
172 Self {
173 record_id,
174 col_name,
175 value,
176 col_version,
177 db_version,
178 node_id,
179 local_db_version,
180 flags,
181 }
182 }
183}
184
185#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
188pub struct ColumnVersion {
189 pub col_version: u64,
190 pub db_version: u64,
191 pub node_id: NodeId,
192 pub local_db_version: u64,
194}
195
196impl ColumnVersion {
197 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
198 Self {
199 col_version,
200 db_version,
201 node_id,
202 local_db_version,
203 }
204 }
205}
206
207#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
213pub struct TombstoneInfo {
214 pub db_version: u64,
215 pub node_id: NodeId,
216 pub local_db_version: u64,
217}
218
219impl TombstoneInfo {
220 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
221 Self {
222 db_version,
223 node_id,
224 local_db_version,
225 }
226 }
227
228 pub fn as_column_version(&self) -> ColumnVersion {
230 ColumnVersion::new(
231 TOMBSTONE_COL_VERSION,
232 self.db_version,
233 self.node_id,
234 self.local_db_version,
235 )
236 }
237}
238
239#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
242pub struct LogicalClock {
243 time: u64,
244}
245
246impl LogicalClock {
247 pub fn new() -> Self {
249 Self { time: 0 }
250 }
251
252 pub fn tick(&mut self) -> u64 {
254 self.time += 1;
255 self.time
256 }
257
258 pub fn update(&mut self, received_time: u64) -> u64 {
260 self.time = self.time.max(received_time);
261 self.time += 1;
262 self.time
263 }
264
265 pub fn set_time(&mut self, time: u64) {
267 self.time = time;
268 }
269
270 pub fn current_time(&self) -> u64 {
272 self.time
273 }
274}
275
276impl Default for LogicalClock {
277 fn default() -> Self {
278 Self::new()
279 }
280}
281
282#[derive(Debug, Clone)]
286#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
287pub struct TombstoneStorage<K: Hash + Eq> {
288 entries: HashMap<K, TombstoneInfo>,
289}
290
291impl<K: Hash + Eq> TombstoneStorage<K> {
292 pub fn new() -> Self {
293 Self {
294 entries: HashMap::new(),
295 }
296 }
297
298 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
299 self.entries.insert(key, info);
300 }
301
302 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
303 self.entries.get(key).copied()
304 }
305
306 pub fn erase(&mut self, key: &K) -> bool {
307 self.entries.remove(key).is_some()
308 }
309
310 pub fn clear(&mut self) {
311 self.entries.clear();
312 }
313
314 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
315 self.entries.iter()
316 }
317
318 pub fn len(&self) -> usize {
319 self.entries.len()
320 }
321
322 pub fn is_empty(&self) -> bool {
323 self.entries.is_empty()
324 }
325
326 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
330 let initial_len = self.entries.len();
331 self
332 .entries
333 .retain(|_, info| info.db_version >= min_acknowledged_version);
334 initial_len - self.entries.len()
335 }
336}
337
338impl<K: Hash + Eq> Default for TombstoneStorage<K> {
339 fn default() -> Self {
340 Self::new()
341 }
342}
343
344#[derive(Debug, Clone)]
346#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
347#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
348#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
349pub struct Record<C, V> {
350 pub fields: HashMap<C, V>,
351 pub column_versions: HashMap<C, ColumnVersion>,
352 pub lowest_local_db_version: u64,
354 pub highest_local_db_version: u64,
355}
356
357impl<C: Hash + Eq, V> Record<C, V> {
358 pub fn new() -> Self {
359 Self {
360 fields: HashMap::new(),
361 column_versions: HashMap::new(),
362 lowest_local_db_version: u64::MAX,
363 highest_local_db_version: 0,
364 }
365 }
366
367 pub fn from_parts(
369 fields: HashMap<C, V>,
370 column_versions: HashMap<C, ColumnVersion>,
371 ) -> Self {
372 let mut lowest = u64::MAX;
373 let mut highest = 0;
374
375 for ver in column_versions.values() {
376 if ver.local_db_version < lowest {
377 lowest = ver.local_db_version;
378 }
379 if ver.local_db_version > highest {
380 highest = ver.local_db_version;
381 }
382 }
383
384 Self {
385 fields,
386 column_versions,
387 lowest_local_db_version: lowest,
388 highest_local_db_version: highest,
389 }
390 }
391}
392
393impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
394 fn eq(&self, other: &Self) -> bool {
395 self.fields == other.fields
397 }
398}
399
400impl<C: Hash + Eq, V> Default for Record<C, V> {
401 fn default() -> Self {
402 Self::new()
403 }
404}
405
406pub trait MergeRule<K, C, V> {
411 fn should_accept(
413 &self,
414 local_col: u64,
415 local_db: u64,
416 local_node: NodeId,
417 remote_col: u64,
418 remote_db: u64,
419 remote_node: NodeId,
420 ) -> bool;
421
422 fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
424 self.should_accept(
425 local.col_version,
426 local.db_version,
427 local.node_id,
428 remote.col_version,
429 remote.db_version,
430 remote.node_id,
431 )
432 }
433}
434
435#[derive(Debug, Clone, Copy, Default)]
442pub struct DefaultMergeRule;
443
444impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
445 fn should_accept(
446 &self,
447 local_col: u64,
448 local_db: u64,
449 local_node: NodeId,
450 remote_col: u64,
451 remote_db: u64,
452 remote_node: NodeId,
453 ) -> bool {
454 match remote_col.cmp(&local_col) {
455 Ordering::Greater => true,
456 Ordering::Less => false,
457 Ordering::Equal => match remote_db.cmp(&local_db) {
458 Ordering::Greater => true,
459 Ordering::Less => false,
460 Ordering::Equal => remote_node > local_node,
461 },
462 }
463 }
464}
465
466pub trait ChangeComparator<K, C, V> {
468 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
469}
470
471#[derive(Debug, Clone, Copy, Default)]
481pub struct DefaultChangeComparator;
482
483impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
484 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
485 match a.record_id.cmp(&b.record_id) {
487 Ordering::Equal => {}
488 ord => return ord,
489 }
490
491 match (a.col_name.as_ref(), b.col_name.as_ref()) {
493 (None, None) => {}
494 (None, Some(_)) => return Ordering::Greater,
495 (Some(_), None) => return Ordering::Less,
496 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
497 Ordering::Equal => {}
498 ord => return ord,
499 },
500 }
501
502 match b.col_version.cmp(&a.col_version) {
504 Ordering::Equal => {}
505 ord => return ord,
506 }
507
508 match b.db_version.cmp(&a.db_version) {
509 Ordering::Equal => {}
510 ord => return ord,
511 }
512
513 b.node_id.cmp(&a.node_id)
514 }
515}
516
517#[derive(Debug)]
521#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
522#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
523#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
524pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
525 node_id: NodeId,
526 clock: LogicalClock,
527 data: HashMap<K, Record<C, V>>,
528 tombstones: TombstoneStorage<K>,
529 #[cfg_attr(feature = "serde", serde(skip, default))]
530 parent: Option<Arc<CRDT<K, C, V>>>,
531 #[allow(dead_code)]
532 base_version: u64,
533}
534
535impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
536 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
543 let (clock, base_version) = if let Some(ref p) = parent {
544 let parent_clock = p.clock;
545 let base = parent_clock.current_time();
546 (parent_clock, base)
547 } else {
548 (LogicalClock::new(), 0)
549 };
550
551 Self {
552 node_id,
553 clock,
554 data: HashMap::new(),
555 tombstones: TombstoneStorage::new(),
556 parent,
557 base_version,
558 }
559 }
560
561 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
568 let mut crdt = Self::new(node_id, None);
569 crdt.apply_changes(changes);
570 crdt
571 }
572
573 pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
579 self.data.clear();
580 self.tombstones.clear();
581 self.clock = LogicalClock::new();
582 self.apply_changes(changes);
583 }
584
585 fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
587 let max_db_version = changes
589 .iter()
590 .map(|c| c.db_version.max(c.local_db_version))
591 .max()
592 .unwrap_or(0);
593
594 self.clock.set_time(max_db_version);
596
597 for change in changes {
599 let record_id = change.record_id.clone();
600 let col_name = change.col_name.clone();
601 let remote_col_version = change.col_version;
602 let remote_db_version = change.db_version;
603 let remote_node_id = change.node_id;
604 let remote_local_db_version = change.local_db_version;
605 let remote_value = change.value;
606
607 if col_name.is_none() {
608 self.data.remove(&record_id);
610
611 self.tombstones.insert_or_assign(
613 record_id,
614 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
615 );
616 } else if let Some(col_key) = col_name {
617 if !self.is_record_tombstoned(&record_id, false) {
619 let record = self.get_or_create_record_unchecked(&record_id, false);
620
621 if let Some(value) = remote_value {
623 record.fields.insert(col_key.clone(), value);
624 }
625
626 let col_ver = ColumnVersion::new(
628 remote_col_version,
629 remote_db_version,
630 remote_node_id,
631 remote_local_db_version,
632 );
633 record.column_versions.insert(col_key, col_ver);
634
635 if remote_local_db_version < record.lowest_local_db_version {
637 record.lowest_local_db_version = remote_local_db_version;
638 }
639 if remote_local_db_version > record.highest_local_db_version {
640 record.highest_local_db_version = remote_local_db_version;
641 }
642 }
643 }
644 }
645 }
646
647 #[must_use = "changes should be propagated to other nodes"]
658 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
659 where
660 I: IntoIterator<Item = (C, V)>,
661 {
662 self.insert_or_update_with_flags(record_id, 0, fields)
663 }
664
665 #[must_use = "changes should be propagated to other nodes"]
677 pub fn insert_or_update_with_flags<I>(
678 &mut self,
679 record_id: &K,
680 flags: u32,
681 fields: I,
682 ) -> Vec<Change<K, C, V>>
683 where
684 I: IntoIterator<Item = (C, V)>,
685 {
686 let db_version = self.clock.tick();
687
688 if self.is_record_tombstoned(record_id, false) {
690 return Vec::new();
691 }
692
693 let mut changes = Vec::new();
694 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
696
697 for (col_name, value) in fields {
698 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
699 col_info.col_version += 1;
700 col_info.db_version = db_version;
701 col_info.node_id = node_id;
702 col_info.local_db_version = db_version;
703 col_info.col_version
704 } else {
705 record.column_versions.insert(
706 col_name.clone(),
707 ColumnVersion::new(1, db_version, node_id, db_version),
708 );
709 1
710 };
711
712 if db_version < record.lowest_local_db_version {
714 record.lowest_local_db_version = db_version;
715 }
716 if db_version > record.highest_local_db_version {
717 record.highest_local_db_version = db_version;
718 }
719
720 record.fields.insert(col_name.clone(), value.clone());
721 changes.push(Change::new(
722 record_id.clone(),
723 Some(col_name),
724 Some(value),
725 col_version,
726 db_version,
727 node_id,
728 db_version,
729 flags,
730 ));
731 }
732
733 changes
734 }
735
736 #[must_use = "changes should be propagated to other nodes"]
746 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
747 self.delete_record_with_flags(record_id, 0)
748 }
749
750 #[must_use = "changes should be propagated to other nodes"]
761 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
762 if self.is_record_tombstoned(record_id, false) {
763 return None;
764 }
765
766 let db_version = self.clock.tick();
767
768 self.data.remove(record_id);
770
771 self.tombstones.insert_or_assign(
773 record_id.clone(),
774 TombstoneInfo::new(db_version, self.node_id, db_version),
775 );
776
777 Some(Change::new(
778 record_id.clone(),
779 None,
780 None,
781 TOMBSTONE_COL_VERSION,
782 db_version,
783 self.node_id,
784 db_version,
785 flags,
786 ))
787 }
788
789 #[must_use = "changes should be propagated to other nodes"]
803 pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
804 self.delete_field_with_flags(record_id, field_name, 0)
805 }
806
807 #[must_use = "changes should be propagated to other nodes"]
822 pub fn delete_field_with_flags(
823 &mut self,
824 record_id: &K,
825 field_name: &C,
826 flags: u32,
827 ) -> Option<Change<K, C, V>> {
828 if self.is_record_tombstoned(record_id, false) {
830 return None;
831 }
832
833 let record = self.data.get_mut(record_id)?;
835
836 if !record.fields.contains_key(field_name) {
838 return None;
839 }
840
841 let db_version = self.clock.tick();
842
843 let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
845 col_info.col_version += 1;
846 col_info.db_version = db_version;
847 col_info.node_id = self.node_id;
848 col_info.local_db_version = db_version;
849 col_info.col_version
850 } else {
851 record.column_versions.insert(
853 field_name.clone(),
854 ColumnVersion::new(1, db_version, self.node_id, db_version),
855 );
856 1
857 };
858
859 if db_version < record.lowest_local_db_version {
861 record.lowest_local_db_version = db_version;
862 }
863 if db_version > record.highest_local_db_version {
864 record.highest_local_db_version = db_version;
865 }
866
867 record.fields.remove(field_name);
869
870 Some(Change::new(
871 record_id.clone(),
872 Some(field_name.clone()),
873 None, col_version,
875 db_version,
876 self.node_id,
877 db_version,
878 flags,
879 ))
880 }
881
882 pub fn merge_changes<R: MergeRule<K, C, V>>(
893 &mut self,
894 changes: Vec<Change<K, C, V>>,
895 merge_rule: &R,
896 ) -> Vec<Change<K, C, V>> {
897 self.merge_changes_impl(changes, false, merge_rule)
898 }
899
900 fn merge_changes_impl<R: MergeRule<K, C, V>>(
901 &mut self,
902 changes: Vec<Change<K, C, V>>,
903 ignore_parent: bool,
904 merge_rule: &R,
905 ) -> Vec<Change<K, C, V>> {
906 let mut accepted_changes = Vec::new();
907
908 if changes.is_empty() {
909 return accepted_changes;
910 }
911
912 for change in changes {
913 let Change {
915 record_id,
916 col_name,
917 value: remote_value,
918 col_version: remote_col_version,
919 db_version: remote_db_version,
920 node_id: remote_node_id,
921 flags,
922 ..
923 } = change;
924
925 let new_local_db_version = self.clock.update(remote_db_version);
927
928 if self.is_record_tombstoned(&record_id, ignore_parent) {
930 continue;
931 }
932
933 let local_col_info = if col_name.is_none() {
935 self
937 .tombstones
938 .find(&record_id)
939 .map(|info| info.as_column_version())
940 } else if let Some(ref col) = col_name {
941 self
943 .get_record_ptr(&record_id, ignore_parent)
944 .and_then(|record| record.column_versions.get(col).copied())
945 } else {
946 None
947 };
948
949 let should_accept = if let Some(local_info) = local_col_info {
951 merge_rule.should_accept(
952 local_info.col_version,
953 local_info.db_version,
954 local_info.node_id,
955 remote_col_version,
956 remote_db_version,
957 remote_node_id,
958 )
959 } else {
960 true
961 };
962
963 if should_accept {
964 if let Some(col_key) = col_name {
965 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
967
968 if let Some(value) = remote_value.clone() {
970 record.fields.insert(col_key.clone(), value);
971 } else {
972 record.fields.remove(&col_key);
974 }
975
976 record.column_versions.insert(
978 col_key.clone(),
979 ColumnVersion::new(
980 remote_col_version,
981 remote_db_version,
982 remote_node_id,
983 new_local_db_version,
984 ),
985 );
986
987 if new_local_db_version < record.lowest_local_db_version {
989 record.lowest_local_db_version = new_local_db_version;
990 }
991 if new_local_db_version > record.highest_local_db_version {
992 record.highest_local_db_version = new_local_db_version;
993 }
994
995 accepted_changes.push(Change::new(
996 record_id,
997 Some(col_key),
998 remote_value,
999 remote_col_version,
1000 remote_db_version,
1001 remote_node_id,
1002 new_local_db_version,
1003 flags,
1004 ));
1005 } else {
1006 self.data.remove(&record_id);
1008
1009 self.tombstones.insert_or_assign(
1011 record_id.clone(),
1012 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1013 );
1014
1015 accepted_changes.push(Change::new(
1016 record_id,
1017 None,
1018 None,
1019 remote_col_version,
1020 remote_db_version,
1021 remote_node_id,
1022 new_local_db_version,
1023 flags,
1024 ));
1025 }
1026 }
1027 }
1028
1029 accepted_changes
1030 }
1031
1032 #[must_use]
1042 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1043 where
1044 K: Ord,
1045 C: Ord,
1046 {
1047 self.get_changes_since_excluding(last_db_version, &HashSet::new())
1048 }
1049
1050 pub fn get_changes_since_excluding(
1052 &self,
1053 last_db_version: u64,
1054 excluding: &HashSet<NodeId>,
1055 ) -> Vec<Change<K, C, V>>
1056 where
1057 K: Ord,
1058 C: Ord,
1059 {
1060 let mut changes = Vec::new();
1061
1062 if let Some(ref parent) = self.parent {
1064 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1065 changes.extend(parent_changes);
1066 }
1067
1068 for (record_id, record) in &self.data {
1070 if record.highest_local_db_version <= last_db_version {
1072 continue;
1073 }
1074
1075 for (col_name, clock_info) in &record.column_versions {
1076 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1077 {
1078 let value = record.fields.get(col_name).cloned();
1079
1080 changes.push(Change::new(
1081 record_id.clone(),
1082 Some(col_name.clone()),
1083 value,
1084 clock_info.col_version,
1085 clock_info.db_version,
1086 clock_info.node_id,
1087 clock_info.local_db_version,
1088 0,
1089 ));
1090 }
1091 }
1092 }
1093
1094 for (record_id, tombstone_info) in self.tombstones.iter() {
1096 if tombstone_info.local_db_version > last_db_version
1097 && !excluding.contains(&tombstone_info.node_id)
1098 {
1099 changes.push(Change::new(
1100 record_id.clone(),
1101 None,
1102 None,
1103 TOMBSTONE_COL_VERSION,
1104 tombstone_info.db_version,
1105 tombstone_info.node_id,
1106 tombstone_info.local_db_version,
1107 0,
1108 ));
1109 }
1110 }
1111
1112 if self.parent.is_some() {
1113 Self::compress_changes(&mut changes);
1115 }
1116
1117 changes
1118 }
1119
1120 pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1130 where
1131 K: Ord,
1132 C: Ord,
1133 {
1134 if changes.is_empty() {
1135 return;
1136 }
1137
1138 let comparator = DefaultChangeComparator;
1141 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1142
1143 let mut write = 0;
1145 for read in 1..changes.len() {
1146 if changes[read].record_id != changes[write].record_id {
1147 write += 1;
1149 if write != read {
1150 changes[write] = changes[read].clone();
1151 }
1152 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1153 let mut first_pos = write;
1156 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1157 first_pos -= 1;
1158 }
1159 changes[first_pos] = changes[read].clone();
1160 write = first_pos;
1161 } else if changes[read].col_name != changes[write].col_name
1162 && changes[write].col_name.is_some()
1163 {
1164 write += 1;
1166 if write != read {
1167 changes[write] = changes[read].clone();
1168 }
1169 }
1170 }
1172
1173 changes.truncate(write + 1);
1174 }
1175
1176 pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1178 self.get_record_ptr(record_id, false)
1179 }
1180
1181 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1183 self.is_record_tombstoned(record_id, false)
1184 }
1185
1186 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1188 if let Some(info) = self.tombstones.find(record_id) {
1189 return Some(info);
1190 }
1191
1192 if let Some(ref parent) = self.parent {
1193 return parent.get_tombstone(record_id);
1194 }
1195
1196 None
1197 }
1198
1199 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1220 self.tombstones.compact(min_acknowledged_version)
1221 }
1222
1223 pub fn tombstone_count(&self) -> usize {
1225 self.tombstones.len()
1226 }
1227
1228 pub fn get_clock(&self) -> &LogicalClock {
1230 &self.clock
1231 }
1232
1233 pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1235 &self.data
1236 }
1237
1238 #[cfg(feature = "json")]
1246 pub fn to_json(&self) -> Result<String, serde_json::Error>
1247 where
1248 K: serde::Serialize,
1249 C: serde::Serialize,
1250 V: serde::Serialize,
1251 {
1252 serde_json::to_string(self)
1253 }
1254
1255 #[cfg(feature = "json")]
1264 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1265 where
1266 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1267 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1268 V: serde::de::DeserializeOwned + Clone,
1269 {
1270 serde_json::from_str(json)
1271 }
1272
1273 #[cfg(feature = "binary")]
1281 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1282 where
1283 K: serde::Serialize,
1284 C: serde::Serialize,
1285 V: serde::Serialize,
1286 {
1287 bincode::serde::encode_to_vec(self, bincode::config::standard())
1288 }
1289
1290 #[cfg(feature = "binary")]
1299 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1300 where
1301 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1302 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1303 V: serde::de::DeserializeOwned + Clone,
1304 {
1305 let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1306 Ok(result)
1307 }
1308
1309 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1312 if self.tombstones.find(record_id).is_some() {
1313 return true;
1314 }
1315
1316 if !ignore_parent {
1317 if let Some(ref parent) = self.parent {
1318 return parent.is_record_tombstoned(record_id, false);
1319 }
1320 }
1321
1322 false
1323 }
1324
1325 fn get_or_create_record_unchecked(
1326 &mut self,
1327 record_id: &K,
1328 ignore_parent: bool,
1329 ) -> &mut Record<C, V> {
1330 #[cfg(feature = "std")]
1331 use std::collections::hash_map::Entry;
1332 #[cfg(all(not(feature = "std"), feature = "alloc"))]
1333 use hashbrown::hash_map::Entry;
1334
1335 match self.data.entry(record_id.clone()) {
1336 Entry::Occupied(e) => e.into_mut(),
1337 Entry::Vacant(e) => {
1338 let record = if !ignore_parent {
1339 self
1340 .parent
1341 .as_ref()
1342 .and_then(|p| p.get_record_ptr(record_id, false))
1343 .cloned()
1344 .unwrap_or_else(Record::new)
1345 } else {
1346 Record::new()
1347 };
1348 e.insert(record)
1349 }
1350 }
1351 }
1352
1353 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1354 if let Some(record) = self.data.get(record_id) {
1355 return Some(record);
1356 }
1357
1358 if !ignore_parent {
1359 if let Some(ref parent) = self.parent {
1360 return parent.get_record_ptr(record_id, false);
1361 }
1362 }
1363
1364 None
1365 }
1366}
1367
1368#[cfg(test)]
1369mod tests {
1370 use super::*;
1371
1372 #[test]
1373 fn test_logical_clock() {
1374 let mut clock = LogicalClock::new();
1375 assert_eq!(clock.current_time(), 0);
1376
1377 let t1 = clock.tick();
1378 assert_eq!(t1, 1);
1379 assert_eq!(clock.current_time(), 1);
1380
1381 let t2 = clock.update(5);
1382 assert_eq!(t2, 6);
1383 assert_eq!(clock.current_time(), 6);
1384 }
1385
1386 #[test]
1387 fn test_tombstone_storage() {
1388 let mut storage = TombstoneStorage::new();
1389 let info = TombstoneInfo::new(10, 1, 10);
1390
1391 storage.insert_or_assign("key1".to_string(), info);
1392 assert_eq!(storage.len(), 1);
1393
1394 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1395 assert_eq!(storage.find(&"key2".to_string()), None);
1396
1397 let removed = storage.compact(15);
1398 assert_eq!(removed, 1);
1399 assert_eq!(storage.len(), 0);
1400 }
1401
1402 #[test]
1403 fn test_basic_insert() {
1404 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1405
1406 let fields = vec![
1407 ("name".to_string(), "Alice".to_string()),
1408 ("age".to_string(), "30".to_string()),
1409 ];
1410
1411 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1412
1413 assert_eq!(changes.len(), 2);
1414 assert_eq!(crdt.get_data().len(), 1);
1415
1416 let record = crdt.get_record(&"user1".to_string()).unwrap();
1417 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1418 assert_eq!(record.fields.get("age").unwrap(), "30");
1419 }
1420
1421 #[test]
1422 fn test_delete_record() {
1423 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1424
1425 let fields = vec![("name".to_string(), "Bob".to_string())];
1426 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1427
1428 let delete_change = crdt.delete_record(&"user2".to_string());
1429 assert!(delete_change.is_some());
1430 assert!(crdt.is_tombstoned(&"user2".to_string()));
1431 assert_eq!(crdt.get_data().len(), 0);
1432 }
1433
1434 #[test]
1435 fn test_merge_changes() {
1436 let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1437 let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1438
1439 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1440 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1441
1442 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1443 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1444
1445 let merge_rule = DefaultMergeRule;
1446 crdt1.merge_changes(changes2, &merge_rule);
1447 crdt2.merge_changes(changes1, &merge_rule);
1448
1449 assert_eq!(
1451 crdt1
1452 .get_record(&"record1".to_string())
1453 .unwrap()
1454 .fields
1455 .get("tag")
1456 .unwrap(),
1457 "Node2"
1458 );
1459 assert_eq!(crdt1.get_data(), crdt2.get_data());
1460 }
1461
1462 #[test]
1463 #[cfg(feature = "serde")]
1464 fn test_change_serialization() {
1465 #[allow(unused_variables)]
1466 let change = Change::new(
1467 "record1".to_string(),
1468 Some("name".to_string()),
1469 Some("Alice".to_string()),
1470 1,
1471 10,
1472 1,
1473 10,
1474 0,
1475 );
1476
1477 #[cfg(feature = "json")]
1479 {
1480 let json = serde_json::to_string(&change).unwrap();
1481 let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1482 assert_eq!(change, deserialized);
1483 }
1484
1485 #[cfg(feature = "binary")]
1487 {
1488 let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1489 let (deserialized, _): (Change<String, String, String>, _) =
1490 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1491 assert_eq!(change, deserialized);
1492 }
1493 }
1494
1495 #[test]
1496 #[cfg(feature = "serde")]
1497 fn test_record_serialization() {
1498 let mut fields = HashMap::new();
1499 fields.insert("name".to_string(), "Bob".to_string());
1500 fields.insert("age".to_string(), "25".to_string());
1501
1502 let mut column_versions = HashMap::new();
1503 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1504 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1505
1506 #[allow(unused_variables)]
1507 let record = Record::from_parts(fields, column_versions);
1508
1509 #[cfg(feature = "json")]
1511 {
1512 let json = serde_json::to_string(&record).unwrap();
1513 let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1514 assert_eq!(record, deserialized);
1515 }
1516
1517 #[cfg(feature = "binary")]
1519 {
1520 let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1521 let (deserialized, _): (Record<String, String>, _) =
1522 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1523 assert_eq!(record, deserialized);
1524 }
1525 }
1526
1527 #[test]
1528 #[cfg(feature = "json")]
1529 fn test_crdt_json_serialization() {
1530 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1531
1532 let fields = vec![
1534 ("name".to_string(), "Alice".to_string()),
1535 ("age".to_string(), "30".to_string()),
1536 ];
1537 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1538
1539 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1540 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1541
1542 let _ = crdt.delete_record(&"user2".to_string());
1544
1545 let json = crdt.to_json().unwrap();
1547
1548 let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1550
1551 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1553 assert_eq!(
1554 crdt.get_record(&"user1".to_string()).unwrap().fields,
1555 deserialized.get_record(&"user1".to_string()).unwrap().fields
1556 );
1557
1558 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1560 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1561
1562 assert_eq!(
1564 crdt.get_clock().current_time(),
1565 deserialized.get_clock().current_time()
1566 );
1567
1568 let has_parent = deserialized.parent.is_some();
1570 assert!(!has_parent);
1571 }
1572
1573 #[test]
1574 #[cfg(feature = "binary")]
1575 fn test_crdt_binary_serialization() {
1576 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1577
1578 let fields = vec![
1580 ("name".to_string(), "Alice".to_string()),
1581 ("age".to_string(), "30".to_string()),
1582 ];
1583 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1584
1585 let bytes = crdt.to_bytes().unwrap();
1587
1588 let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1590
1591 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1593 assert_eq!(
1594 crdt.get_record(&"user1".to_string()).unwrap().fields,
1595 deserialized.get_record(&"user1".to_string()).unwrap().fields
1596 );
1597
1598 assert_eq!(
1600 crdt.get_clock().current_time(),
1601 deserialized.get_clock().current_time()
1602 );
1603 }
1604
1605 #[test]
1606 #[cfg(feature = "serde")]
1607 fn test_parent_not_serialized() {
1608 let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1610 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1611 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1612
1613 let parent_arc = Arc::new(parent);
1615 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1616 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1617 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1618
1619 #[cfg(feature = "json")]
1621 {
1622 let json = serde_json::to_string(&child).unwrap();
1623 let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1624
1625 assert!(deserialized.parent.is_none());
1627
1628 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1630
1631 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1633 }
1634 }
1635}