1#![cfg_attr(not(feature = "std"), no_std)]
85
86#[cfg(not(any(feature = "std", feature = "alloc")))]
88compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
89
90#[cfg(not(feature = "std"))]
91extern crate alloc;
92
93#[cfg(feature = "std")]
94use std::{
95 cmp::Ordering,
96 collections::{HashMap, HashSet},
97 hash::Hash,
98 sync::Arc,
99};
100
101#[cfg(not(feature = "std"))]
102use alloc::{
103 string::String,
104 sync::Arc,
105 vec::Vec,
106};
107#[cfg(not(feature = "std"))]
108use core::{cmp::Ordering, hash::Hash};
109#[cfg(all(not(feature = "std"), feature = "alloc"))]
110use hashbrown::{HashMap, HashSet};
111
112pub type NodeId = u64;
114
115pub type ColumnKey = String;
117
118const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
121
122#[derive(Debug, Clone, PartialEq)]
129#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
130#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
131#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
132pub struct Change<K, C, V> {
133 pub record_id: K,
134 pub col_name: Option<C>,
136 pub value: Option<V>,
138 pub col_version: u64,
139 pub db_version: u64,
140 pub node_id: NodeId,
141 pub local_db_version: u64,
143 pub flags: u32,
145}
146
147impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
148
149impl<K, C, V> Change<K, C, V> {
150 #[allow(clippy::too_many_arguments)]
152 pub fn new(
153 record_id: K,
154 col_name: Option<C>,
155 value: Option<V>,
156 col_version: u64,
157 db_version: u64,
158 node_id: NodeId,
159 local_db_version: u64,
160 flags: u32,
161 ) -> Self {
162 Self {
163 record_id,
164 col_name,
165 value,
166 col_version,
167 db_version,
168 node_id,
169 local_db_version,
170 flags,
171 }
172 }
173}
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
178pub struct ColumnVersion {
179 pub col_version: u64,
180 pub db_version: u64,
181 pub node_id: NodeId,
182 pub local_db_version: u64,
184}
185
186impl ColumnVersion {
187 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
188 Self {
189 col_version,
190 db_version,
191 node_id,
192 local_db_version,
193 }
194 }
195}
196
197#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
203pub struct TombstoneInfo {
204 pub db_version: u64,
205 pub node_id: NodeId,
206 pub local_db_version: u64,
207}
208
209impl TombstoneInfo {
210 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
211 Self {
212 db_version,
213 node_id,
214 local_db_version,
215 }
216 }
217
218 pub fn as_column_version(&self) -> ColumnVersion {
220 ColumnVersion::new(
221 TOMBSTONE_COL_VERSION,
222 self.db_version,
223 self.node_id,
224 self.local_db_version,
225 )
226 }
227}
228
229#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232pub struct LogicalClock {
233 time: u64,
234}
235
236impl LogicalClock {
237 pub fn new() -> Self {
239 Self { time: 0 }
240 }
241
242 pub fn tick(&mut self) -> u64 {
244 self.time += 1;
245 self.time
246 }
247
248 pub fn update(&mut self, received_time: u64) -> u64 {
250 self.time = self.time.max(received_time);
251 self.time += 1;
252 self.time
253 }
254
255 pub fn set_time(&mut self, time: u64) {
257 self.time = time;
258 }
259
260 pub fn current_time(&self) -> u64 {
262 self.time
263 }
264}
265
266impl Default for LogicalClock {
267 fn default() -> Self {
268 Self::new()
269 }
270}
271
272#[derive(Debug, Clone)]
276#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
277pub struct TombstoneStorage<K: Hash + Eq> {
278 entries: HashMap<K, TombstoneInfo>,
279}
280
281impl<K: Hash + Eq> TombstoneStorage<K> {
282 pub fn new() -> Self {
283 Self {
284 entries: HashMap::new(),
285 }
286 }
287
288 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
289 self.entries.insert(key, info);
290 }
291
292 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
293 self.entries.get(key).copied()
294 }
295
296 pub fn erase(&mut self, key: &K) -> bool {
297 self.entries.remove(key).is_some()
298 }
299
300 pub fn clear(&mut self) {
301 self.entries.clear();
302 }
303
304 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
305 self.entries.iter()
306 }
307
308 pub fn len(&self) -> usize {
309 self.entries.len()
310 }
311
312 pub fn is_empty(&self) -> bool {
313 self.entries.is_empty()
314 }
315
316 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
320 let initial_len = self.entries.len();
321 self
322 .entries
323 .retain(|_, info| info.db_version >= min_acknowledged_version);
324 initial_len - self.entries.len()
325 }
326}
327
328impl<K: Hash + Eq> Default for TombstoneStorage<K> {
329 fn default() -> Self {
330 Self::new()
331 }
332}
333
334#[derive(Debug, Clone)]
336#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
337#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
338#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
339pub struct Record<C, V> {
340 pub fields: HashMap<C, V>,
341 pub column_versions: HashMap<C, ColumnVersion>,
342 pub lowest_local_db_version: u64,
344 pub highest_local_db_version: u64,
345}
346
347impl<C: Hash + Eq, V> Record<C, V> {
348 pub fn new() -> Self {
349 Self {
350 fields: HashMap::new(),
351 column_versions: HashMap::new(),
352 lowest_local_db_version: u64::MAX,
353 highest_local_db_version: 0,
354 }
355 }
356
357 pub fn from_parts(
359 fields: HashMap<C, V>,
360 column_versions: HashMap<C, ColumnVersion>,
361 ) -> Self {
362 let mut lowest = u64::MAX;
363 let mut highest = 0;
364
365 for ver in column_versions.values() {
366 if ver.local_db_version < lowest {
367 lowest = ver.local_db_version;
368 }
369 if ver.local_db_version > highest {
370 highest = ver.local_db_version;
371 }
372 }
373
374 Self {
375 fields,
376 column_versions,
377 lowest_local_db_version: lowest,
378 highest_local_db_version: highest,
379 }
380 }
381}
382
383impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
384 fn eq(&self, other: &Self) -> bool {
385 self.fields == other.fields
387 }
388}
389
390impl<C: Hash + Eq, V> Default for Record<C, V> {
391 fn default() -> Self {
392 Self::new()
393 }
394}
395
396pub trait MergeRule<K, C, V> {
401 fn should_accept(
403 &self,
404 local_col: u64,
405 local_db: u64,
406 local_node: NodeId,
407 remote_col: u64,
408 remote_db: u64,
409 remote_node: NodeId,
410 ) -> bool;
411
412 fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
414 self.should_accept(
415 local.col_version,
416 local.db_version,
417 local.node_id,
418 remote.col_version,
419 remote.db_version,
420 remote.node_id,
421 )
422 }
423}
424
425#[derive(Debug, Clone, Copy, Default)]
432pub struct DefaultMergeRule;
433
434impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
435 fn should_accept(
436 &self,
437 local_col: u64,
438 local_db: u64,
439 local_node: NodeId,
440 remote_col: u64,
441 remote_db: u64,
442 remote_node: NodeId,
443 ) -> bool {
444 match remote_col.cmp(&local_col) {
445 Ordering::Greater => true,
446 Ordering::Less => false,
447 Ordering::Equal => match remote_db.cmp(&local_db) {
448 Ordering::Greater => true,
449 Ordering::Less => false,
450 Ordering::Equal => remote_node > local_node,
451 },
452 }
453 }
454}
455
456pub trait ChangeComparator<K, C, V> {
458 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
459}
460
461#[derive(Debug, Clone, Copy, Default)]
471pub struct DefaultChangeComparator;
472
473impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
474 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
475 match a.record_id.cmp(&b.record_id) {
477 Ordering::Equal => {}
478 ord => return ord,
479 }
480
481 match (a.col_name.as_ref(), b.col_name.as_ref()) {
483 (None, None) => {}
484 (None, Some(_)) => return Ordering::Greater,
485 (Some(_), None) => return Ordering::Less,
486 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
487 Ordering::Equal => {}
488 ord => return ord,
489 },
490 }
491
492 match b.col_version.cmp(&a.col_version) {
494 Ordering::Equal => {}
495 ord => return ord,
496 }
497
498 match b.db_version.cmp(&a.db_version) {
499 Ordering::Equal => {}
500 ord => return ord,
501 }
502
503 b.node_id.cmp(&a.node_id)
504 }
505}
506
507#[derive(Debug)]
511#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
512#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
513#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
514pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
515 node_id: NodeId,
516 clock: LogicalClock,
517 data: HashMap<K, Record<C, V>>,
518 tombstones: TombstoneStorage<K>,
519 #[cfg_attr(feature = "serde", serde(skip, default))]
520 parent: Option<Arc<CRDT<K, C, V>>>,
521 #[allow(dead_code)]
522 base_version: u64,
523}
524
525impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
526 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
533 let (clock, base_version) = if let Some(ref p) = parent {
534 let parent_clock = p.clock;
535 let base = parent_clock.current_time();
536 (parent_clock, base)
537 } else {
538 (LogicalClock::new(), 0)
539 };
540
541 Self {
542 node_id,
543 clock,
544 data: HashMap::new(),
545 tombstones: TombstoneStorage::new(),
546 parent,
547 base_version,
548 }
549 }
550
551 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
558 let mut crdt = Self::new(node_id, None);
559 crdt.apply_changes(changes);
560 crdt
561 }
562
563 pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
569 self.data.clear();
570 self.tombstones.clear();
571 self.clock = LogicalClock::new();
572 self.apply_changes(changes);
573 }
574
575 fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
577 let max_db_version = changes
579 .iter()
580 .map(|c| c.db_version.max(c.local_db_version))
581 .max()
582 .unwrap_or(0);
583
584 self.clock.set_time(max_db_version);
586
587 for change in changes {
589 let record_id = change.record_id.clone();
590 let col_name = change.col_name.clone();
591 let remote_col_version = change.col_version;
592 let remote_db_version = change.db_version;
593 let remote_node_id = change.node_id;
594 let remote_local_db_version = change.local_db_version;
595 let remote_value = change.value;
596
597 if col_name.is_none() {
598 self.data.remove(&record_id);
600
601 self.tombstones.insert_or_assign(
603 record_id,
604 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
605 );
606 } else if let Some(col_key) = col_name {
607 if !self.is_record_tombstoned(&record_id, false) {
609 let record = self.get_or_create_record_unchecked(&record_id, false);
610
611 if let Some(value) = remote_value {
613 record.fields.insert(col_key.clone(), value);
614 }
615
616 let col_ver = ColumnVersion::new(
618 remote_col_version,
619 remote_db_version,
620 remote_node_id,
621 remote_local_db_version,
622 );
623 record.column_versions.insert(col_key, col_ver);
624
625 if remote_local_db_version < record.lowest_local_db_version {
627 record.lowest_local_db_version = remote_local_db_version;
628 }
629 if remote_local_db_version > record.highest_local_db_version {
630 record.highest_local_db_version = remote_local_db_version;
631 }
632 }
633 }
634 }
635 }
636
637 #[must_use = "changes should be propagated to other nodes"]
648 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
649 where
650 I: IntoIterator<Item = (C, V)>,
651 {
652 self.insert_or_update_with_flags(record_id, 0, fields)
653 }
654
655 #[must_use = "changes should be propagated to other nodes"]
667 pub fn insert_or_update_with_flags<I>(
668 &mut self,
669 record_id: &K,
670 flags: u32,
671 fields: I,
672 ) -> Vec<Change<K, C, V>>
673 where
674 I: IntoIterator<Item = (C, V)>,
675 {
676 let db_version = self.clock.tick();
677
678 if self.is_record_tombstoned(record_id, false) {
680 return Vec::new();
681 }
682
683 let mut changes = Vec::new();
684 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
686
687 for (col_name, value) in fields {
688 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
689 col_info.col_version += 1;
690 col_info.db_version = db_version;
691 col_info.node_id = node_id;
692 col_info.local_db_version = db_version;
693 col_info.col_version
694 } else {
695 record.column_versions.insert(
696 col_name.clone(),
697 ColumnVersion::new(1, db_version, node_id, db_version),
698 );
699 1
700 };
701
702 if db_version < record.lowest_local_db_version {
704 record.lowest_local_db_version = db_version;
705 }
706 if db_version > record.highest_local_db_version {
707 record.highest_local_db_version = db_version;
708 }
709
710 record.fields.insert(col_name.clone(), value.clone());
711 changes.push(Change::new(
712 record_id.clone(),
713 Some(col_name),
714 Some(value),
715 col_version,
716 db_version,
717 node_id,
718 db_version,
719 flags,
720 ));
721 }
722
723 changes
724 }
725
726 #[must_use = "changes should be propagated to other nodes"]
736 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
737 self.delete_record_with_flags(record_id, 0)
738 }
739
740 #[must_use = "changes should be propagated to other nodes"]
751 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
752 if self.is_record_tombstoned(record_id, false) {
753 return None;
754 }
755
756 let db_version = self.clock.tick();
757
758 self.data.remove(record_id);
760
761 self.tombstones.insert_or_assign(
763 record_id.clone(),
764 TombstoneInfo::new(db_version, self.node_id, db_version),
765 );
766
767 Some(Change::new(
768 record_id.clone(),
769 None,
770 None,
771 TOMBSTONE_COL_VERSION,
772 db_version,
773 self.node_id,
774 db_version,
775 flags,
776 ))
777 }
778
779 #[must_use = "changes should be propagated to other nodes"]
793 pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
794 self.delete_field_with_flags(record_id, field_name, 0)
795 }
796
797 #[must_use = "changes should be propagated to other nodes"]
812 pub fn delete_field_with_flags(
813 &mut self,
814 record_id: &K,
815 field_name: &C,
816 flags: u32,
817 ) -> Option<Change<K, C, V>> {
818 if self.is_record_tombstoned(record_id, false) {
820 return None;
821 }
822
823 let record = self.data.get_mut(record_id)?;
825
826 if !record.fields.contains_key(field_name) {
828 return None;
829 }
830
831 let db_version = self.clock.tick();
832
833 let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
835 col_info.col_version += 1;
836 col_info.db_version = db_version;
837 col_info.node_id = self.node_id;
838 col_info.local_db_version = db_version;
839 col_info.col_version
840 } else {
841 record.column_versions.insert(
843 field_name.clone(),
844 ColumnVersion::new(1, db_version, self.node_id, db_version),
845 );
846 1
847 };
848
849 if db_version < record.lowest_local_db_version {
851 record.lowest_local_db_version = db_version;
852 }
853 if db_version > record.highest_local_db_version {
854 record.highest_local_db_version = db_version;
855 }
856
857 record.fields.remove(field_name);
859
860 Some(Change::new(
861 record_id.clone(),
862 Some(field_name.clone()),
863 None, col_version,
865 db_version,
866 self.node_id,
867 db_version,
868 flags,
869 ))
870 }
871
872 pub fn merge_changes<R: MergeRule<K, C, V>>(
883 &mut self,
884 changes: Vec<Change<K, C, V>>,
885 merge_rule: &R,
886 ) -> Vec<Change<K, C, V>> {
887 self.merge_changes_impl(changes, false, merge_rule)
888 }
889
890 fn merge_changes_impl<R: MergeRule<K, C, V>>(
891 &mut self,
892 changes: Vec<Change<K, C, V>>,
893 ignore_parent: bool,
894 merge_rule: &R,
895 ) -> Vec<Change<K, C, V>> {
896 let mut accepted_changes = Vec::new();
897
898 if changes.is_empty() {
899 return accepted_changes;
900 }
901
902 for change in changes {
903 let Change {
905 record_id,
906 col_name,
907 value: remote_value,
908 col_version: remote_col_version,
909 db_version: remote_db_version,
910 node_id: remote_node_id,
911 flags,
912 ..
913 } = change;
914
915 let new_local_db_version = self.clock.update(remote_db_version);
917
918 if self.is_record_tombstoned(&record_id, ignore_parent) {
920 continue;
921 }
922
923 let local_col_info = if col_name.is_none() {
925 self
927 .tombstones
928 .find(&record_id)
929 .map(|info| info.as_column_version())
930 } else if let Some(ref col) = col_name {
931 self
933 .get_record_ptr(&record_id, ignore_parent)
934 .and_then(|record| record.column_versions.get(col).copied())
935 } else {
936 None
937 };
938
939 let should_accept = if let Some(local_info) = local_col_info {
941 merge_rule.should_accept(
942 local_info.col_version,
943 local_info.db_version,
944 local_info.node_id,
945 remote_col_version,
946 remote_db_version,
947 remote_node_id,
948 )
949 } else {
950 true
951 };
952
953 if should_accept {
954 if let Some(col_key) = col_name {
955 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
957
958 if let Some(value) = remote_value.clone() {
960 record.fields.insert(col_key.clone(), value);
961 } else {
962 record.fields.remove(&col_key);
964 }
965
966 record.column_versions.insert(
968 col_key.clone(),
969 ColumnVersion::new(
970 remote_col_version,
971 remote_db_version,
972 remote_node_id,
973 new_local_db_version,
974 ),
975 );
976
977 if new_local_db_version < record.lowest_local_db_version {
979 record.lowest_local_db_version = new_local_db_version;
980 }
981 if new_local_db_version > record.highest_local_db_version {
982 record.highest_local_db_version = new_local_db_version;
983 }
984
985 accepted_changes.push(Change::new(
986 record_id,
987 Some(col_key),
988 remote_value,
989 remote_col_version,
990 remote_db_version,
991 remote_node_id,
992 new_local_db_version,
993 flags,
994 ));
995 } else {
996 self.data.remove(&record_id);
998
999 self.tombstones.insert_or_assign(
1001 record_id.clone(),
1002 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1003 );
1004
1005 accepted_changes.push(Change::new(
1006 record_id,
1007 None,
1008 None,
1009 remote_col_version,
1010 remote_db_version,
1011 remote_node_id,
1012 new_local_db_version,
1013 flags,
1014 ));
1015 }
1016 }
1017 }
1018
1019 accepted_changes
1020 }
1021
1022 #[must_use]
1032 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1033 where
1034 K: Ord,
1035 C: Ord,
1036 {
1037 self.get_changes_since_excluding(last_db_version, &HashSet::new())
1038 }
1039
1040 pub fn get_changes_since_excluding(
1042 &self,
1043 last_db_version: u64,
1044 excluding: &HashSet<NodeId>,
1045 ) -> Vec<Change<K, C, V>>
1046 where
1047 K: Ord,
1048 C: Ord,
1049 {
1050 let mut changes = Vec::new();
1051
1052 if let Some(ref parent) = self.parent {
1054 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1055 changes.extend(parent_changes);
1056 }
1057
1058 for (record_id, record) in &self.data {
1060 if record.highest_local_db_version <= last_db_version {
1062 continue;
1063 }
1064
1065 for (col_name, clock_info) in &record.column_versions {
1066 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1067 {
1068 let value = record.fields.get(col_name).cloned();
1069
1070 changes.push(Change::new(
1071 record_id.clone(),
1072 Some(col_name.clone()),
1073 value,
1074 clock_info.col_version,
1075 clock_info.db_version,
1076 clock_info.node_id,
1077 clock_info.local_db_version,
1078 0,
1079 ));
1080 }
1081 }
1082 }
1083
1084 for (record_id, tombstone_info) in self.tombstones.iter() {
1086 if tombstone_info.local_db_version > last_db_version
1087 && !excluding.contains(&tombstone_info.node_id)
1088 {
1089 changes.push(Change::new(
1090 record_id.clone(),
1091 None,
1092 None,
1093 TOMBSTONE_COL_VERSION,
1094 tombstone_info.db_version,
1095 tombstone_info.node_id,
1096 tombstone_info.local_db_version,
1097 0,
1098 ));
1099 }
1100 }
1101
1102 if self.parent.is_some() {
1103 Self::compress_changes(&mut changes);
1105 }
1106
1107 changes
1108 }
1109
1110 pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1120 where
1121 K: Ord,
1122 C: Ord,
1123 {
1124 if changes.is_empty() {
1125 return;
1126 }
1127
1128 let comparator = DefaultChangeComparator;
1131 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1132
1133 let mut write = 0;
1135 for read in 1..changes.len() {
1136 if changes[read].record_id != changes[write].record_id {
1137 write += 1;
1139 if write != read {
1140 changes[write] = changes[read].clone();
1141 }
1142 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1143 let mut first_pos = write;
1146 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1147 first_pos -= 1;
1148 }
1149 changes[first_pos] = changes[read].clone();
1150 write = first_pos;
1151 } else if changes[read].col_name != changes[write].col_name
1152 && changes[write].col_name.is_some()
1153 {
1154 write += 1;
1156 if write != read {
1157 changes[write] = changes[read].clone();
1158 }
1159 }
1160 }
1162
1163 changes.truncate(write + 1);
1164 }
1165
1166 pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1168 self.get_record_ptr(record_id, false)
1169 }
1170
1171 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1173 self.is_record_tombstoned(record_id, false)
1174 }
1175
1176 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1178 if let Some(info) = self.tombstones.find(record_id) {
1179 return Some(info);
1180 }
1181
1182 if let Some(ref parent) = self.parent {
1183 return parent.get_tombstone(record_id);
1184 }
1185
1186 None
1187 }
1188
1189 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1210 self.tombstones.compact(min_acknowledged_version)
1211 }
1212
1213 pub fn tombstone_count(&self) -> usize {
1215 self.tombstones.len()
1216 }
1217
1218 pub fn get_clock(&self) -> &LogicalClock {
1220 &self.clock
1221 }
1222
1223 pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1225 &self.data
1226 }
1227
1228 #[cfg(feature = "json")]
1236 pub fn to_json(&self) -> Result<String, serde_json::Error>
1237 where
1238 K: serde::Serialize,
1239 C: serde::Serialize,
1240 V: serde::Serialize,
1241 {
1242 serde_json::to_string(self)
1243 }
1244
1245 #[cfg(feature = "json")]
1254 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1255 where
1256 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1257 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1258 V: serde::de::DeserializeOwned + Clone,
1259 {
1260 serde_json::from_str(json)
1261 }
1262
1263 #[cfg(feature = "binary")]
1271 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1272 where
1273 K: serde::Serialize,
1274 C: serde::Serialize,
1275 V: serde::Serialize,
1276 {
1277 bincode::serde::encode_to_vec(self, bincode::config::standard())
1278 }
1279
1280 #[cfg(feature = "binary")]
1289 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1290 where
1291 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1292 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1293 V: serde::de::DeserializeOwned + Clone,
1294 {
1295 let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1296 Ok(result)
1297 }
1298
1299 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1302 if self.tombstones.find(record_id).is_some() {
1303 return true;
1304 }
1305
1306 if !ignore_parent {
1307 if let Some(ref parent) = self.parent {
1308 return parent.is_record_tombstoned(record_id, false);
1309 }
1310 }
1311
1312 false
1313 }
1314
1315 fn get_or_create_record_unchecked(
1316 &mut self,
1317 record_id: &K,
1318 ignore_parent: bool,
1319 ) -> &mut Record<C, V> {
1320 #[cfg(feature = "std")]
1321 use std::collections::hash_map::Entry;
1322 #[cfg(all(not(feature = "std"), feature = "alloc"))]
1323 use hashbrown::hash_map::Entry;
1324
1325 match self.data.entry(record_id.clone()) {
1326 Entry::Occupied(e) => e.into_mut(),
1327 Entry::Vacant(e) => {
1328 let record = if !ignore_parent {
1329 self
1330 .parent
1331 .as_ref()
1332 .and_then(|p| p.get_record_ptr(record_id, false))
1333 .cloned()
1334 .unwrap_or_else(Record::new)
1335 } else {
1336 Record::new()
1337 };
1338 e.insert(record)
1339 }
1340 }
1341 }
1342
1343 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1344 if let Some(record) = self.data.get(record_id) {
1345 return Some(record);
1346 }
1347
1348 if !ignore_parent {
1349 if let Some(ref parent) = self.parent {
1350 return parent.get_record_ptr(record_id, false);
1351 }
1352 }
1353
1354 None
1355 }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360 use super::*;
1361
1362 #[test]
1363 fn test_logical_clock() {
1364 let mut clock = LogicalClock::new();
1365 assert_eq!(clock.current_time(), 0);
1366
1367 let t1 = clock.tick();
1368 assert_eq!(t1, 1);
1369 assert_eq!(clock.current_time(), 1);
1370
1371 let t2 = clock.update(5);
1372 assert_eq!(t2, 6);
1373 assert_eq!(clock.current_time(), 6);
1374 }
1375
1376 #[test]
1377 fn test_tombstone_storage() {
1378 let mut storage = TombstoneStorage::new();
1379 let info = TombstoneInfo::new(10, 1, 10);
1380
1381 storage.insert_or_assign("key1".to_string(), info);
1382 assert_eq!(storage.len(), 1);
1383
1384 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1385 assert_eq!(storage.find(&"key2".to_string()), None);
1386
1387 let removed = storage.compact(15);
1388 assert_eq!(removed, 1);
1389 assert_eq!(storage.len(), 0);
1390 }
1391
1392 #[test]
1393 fn test_basic_insert() {
1394 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1395
1396 let fields = vec![
1397 ("name".to_string(), "Alice".to_string()),
1398 ("age".to_string(), "30".to_string()),
1399 ];
1400
1401 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1402
1403 assert_eq!(changes.len(), 2);
1404 assert_eq!(crdt.get_data().len(), 1);
1405
1406 let record = crdt.get_record(&"user1".to_string()).unwrap();
1407 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1408 assert_eq!(record.fields.get("age").unwrap(), "30");
1409 }
1410
1411 #[test]
1412 fn test_delete_record() {
1413 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1414
1415 let fields = vec![("name".to_string(), "Bob".to_string())];
1416 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1417
1418 let delete_change = crdt.delete_record(&"user2".to_string());
1419 assert!(delete_change.is_some());
1420 assert!(crdt.is_tombstoned(&"user2".to_string()));
1421 assert_eq!(crdt.get_data().len(), 0);
1422 }
1423
1424 #[test]
1425 fn test_merge_changes() {
1426 let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1427 let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1428
1429 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1430 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1431
1432 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1433 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1434
1435 let merge_rule = DefaultMergeRule;
1436 crdt1.merge_changes(changes2, &merge_rule);
1437 crdt2.merge_changes(changes1, &merge_rule);
1438
1439 assert_eq!(
1441 crdt1
1442 .get_record(&"record1".to_string())
1443 .unwrap()
1444 .fields
1445 .get("tag")
1446 .unwrap(),
1447 "Node2"
1448 );
1449 assert_eq!(crdt1.get_data(), crdt2.get_data());
1450 }
1451
1452 #[test]
1453 #[cfg(feature = "serde")]
1454 fn test_change_serialization() {
1455 #[allow(unused_variables)]
1456 let change = Change::new(
1457 "record1".to_string(),
1458 Some("name".to_string()),
1459 Some("Alice".to_string()),
1460 1,
1461 10,
1462 1,
1463 10,
1464 0,
1465 );
1466
1467 #[cfg(feature = "json")]
1469 {
1470 let json = serde_json::to_string(&change).unwrap();
1471 let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1472 assert_eq!(change, deserialized);
1473 }
1474
1475 #[cfg(feature = "binary")]
1477 {
1478 let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1479 let (deserialized, _): (Change<String, String, String>, _) =
1480 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1481 assert_eq!(change, deserialized);
1482 }
1483 }
1484
1485 #[test]
1486 #[cfg(feature = "serde")]
1487 fn test_record_serialization() {
1488 let mut fields = HashMap::new();
1489 fields.insert("name".to_string(), "Bob".to_string());
1490 fields.insert("age".to_string(), "25".to_string());
1491
1492 let mut column_versions = HashMap::new();
1493 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1494 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1495
1496 #[allow(unused_variables)]
1497 let record = Record::from_parts(fields, column_versions);
1498
1499 #[cfg(feature = "json")]
1501 {
1502 let json = serde_json::to_string(&record).unwrap();
1503 let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1504 assert_eq!(record, deserialized);
1505 }
1506
1507 #[cfg(feature = "binary")]
1509 {
1510 let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1511 let (deserialized, _): (Record<String, String>, _) =
1512 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1513 assert_eq!(record, deserialized);
1514 }
1515 }
1516
1517 #[test]
1518 #[cfg(feature = "json")]
1519 fn test_crdt_json_serialization() {
1520 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1521
1522 let fields = vec![
1524 ("name".to_string(), "Alice".to_string()),
1525 ("age".to_string(), "30".to_string()),
1526 ];
1527 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1528
1529 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1530 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1531
1532 let _ = crdt.delete_record(&"user2".to_string());
1534
1535 let json = crdt.to_json().unwrap();
1537
1538 let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1540
1541 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1543 assert_eq!(
1544 crdt.get_record(&"user1".to_string()).unwrap().fields,
1545 deserialized.get_record(&"user1".to_string()).unwrap().fields
1546 );
1547
1548 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1550 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1551
1552 assert_eq!(
1554 crdt.get_clock().current_time(),
1555 deserialized.get_clock().current_time()
1556 );
1557
1558 let has_parent = deserialized.parent.is_some();
1560 assert!(!has_parent);
1561 }
1562
1563 #[test]
1564 #[cfg(feature = "binary")]
1565 fn test_crdt_binary_serialization() {
1566 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1567
1568 let fields = vec![
1570 ("name".to_string(), "Alice".to_string()),
1571 ("age".to_string(), "30".to_string()),
1572 ];
1573 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1574
1575 let bytes = crdt.to_bytes().unwrap();
1577
1578 let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1580
1581 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1583 assert_eq!(
1584 crdt.get_record(&"user1".to_string()).unwrap().fields,
1585 deserialized.get_record(&"user1".to_string()).unwrap().fields
1586 );
1587
1588 assert_eq!(
1590 crdt.get_clock().current_time(),
1591 deserialized.get_clock().current_time()
1592 );
1593 }
1594
1595 #[test]
1596 #[cfg(feature = "serde")]
1597 fn test_parent_not_serialized() {
1598 let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1600 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1601 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1602
1603 let parent_arc = Arc::new(parent);
1605 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1606 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1607 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1608
1609 #[cfg(feature = "json")]
1611 {
1612 let json = serde_json::to_string(&child).unwrap();
1613 let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1614
1615 assert!(deserialized.parent.is_none());
1617
1618 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1620
1621 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1623 }
1624 }
1625}