1#![cfg_attr(not(feature = "std"), no_std)]
87
88#[cfg(any(feature = "persist", feature = "persist-msgpack", feature = "persist-compressed"))]
90pub mod persist;
91
92#[cfg(not(any(feature = "std", feature = "alloc")))]
94compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
95
96#[cfg(not(feature = "std"))]
97extern crate alloc;
98
99#[cfg(feature = "std")]
100use std::{
101 cmp::Ordering,
102 collections::{HashMap, HashSet},
103 hash::Hash,
104 sync::Arc,
105};
106
107#[cfg(not(feature = "std"))]
108use alloc::{
109 string::String,
110 sync::Arc,
111 vec::Vec,
112};
113#[cfg(not(feature = "std"))]
114use core::{cmp::Ordering, hash::Hash};
115#[cfg(all(not(feature = "std"), feature = "alloc"))]
116use hashbrown::{HashMap, HashSet};
117
118#[cfg(all(feature = "sorted-keys", feature = "std"))]
120type DataMap<K, V> = std::collections::BTreeMap<K, V>;
121#[cfg(all(feature = "sorted-keys", not(feature = "std"), feature = "alloc"))]
122type DataMap<K, V> = alloc::collections::BTreeMap<K, V>;
123#[cfg(all(not(feature = "sorted-keys"), feature = "std"))]
124type DataMap<K, V> = HashMap<K, V>;
125#[cfg(all(not(feature = "sorted-keys"), not(feature = "std"), feature = "alloc"))]
126type DataMap<K, V> = HashMap<K, V>;
127
128#[cfg(all(feature = "sorted-keys", feature = "std"))]
130type DataMapEntry<'a, K, V> = std::collections::btree_map::Entry<'a, K, V>;
131#[cfg(all(feature = "sorted-keys", not(feature = "std"), feature = "alloc"))]
132type DataMapEntry<'a, K, V> = alloc::collections::btree_map::Entry<'a, K, V>;
133#[cfg(all(not(feature = "sorted-keys"), feature = "std"))]
134type DataMapEntry<'a, K, V> = std::collections::hash_map::Entry<'a, K, V>;
135#[cfg(all(not(feature = "sorted-keys"), not(feature = "std"), feature = "alloc"))]
136type DataMapEntry<'a, K, V> = hashbrown::hash_map::Entry<'a, K, V, hashbrown::DefaultHashBuilder>;
137
138#[cfg(feature = "node-id-u128")]
145pub type NodeId = u128;
146
147#[cfg(not(feature = "node-id-u128"))]
148pub type NodeId = u64;
149
150pub type ColumnKey = String;
152
153const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
156
157#[derive(Debug, Clone, PartialEq)]
164#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
166#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
167pub struct Change<K, C, V> {
168 pub record_id: K,
169 pub col_name: Option<C>,
171 pub value: Option<V>,
173 pub col_version: u64,
174 pub db_version: u64,
175 pub node_id: NodeId,
176 pub local_db_version: u64,
178 pub flags: u32,
180}
181
182impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
183
184impl<K, C, V> Change<K, C, V> {
185 #[allow(clippy::too_many_arguments)]
187 pub fn new(
188 record_id: K,
189 col_name: Option<C>,
190 value: Option<V>,
191 col_version: u64,
192 db_version: u64,
193 node_id: NodeId,
194 local_db_version: u64,
195 flags: u32,
196 ) -> Self {
197 Self {
198 record_id,
199 col_name,
200 value,
201 col_version,
202 db_version,
203 node_id,
204 local_db_version,
205 flags,
206 }
207 }
208}
209
210#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
213pub struct ColumnVersion {
214 pub col_version: u64,
215 pub db_version: u64,
216 pub node_id: NodeId,
217 pub local_db_version: u64,
219}
220
221impl ColumnVersion {
222 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
223 Self {
224 col_version,
225 db_version,
226 node_id,
227 local_db_version,
228 }
229 }
230}
231
232#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
238pub struct TombstoneInfo {
239 pub db_version: u64,
240 pub node_id: NodeId,
241 pub local_db_version: u64,
242}
243
244impl TombstoneInfo {
245 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
246 Self {
247 db_version,
248 node_id,
249 local_db_version,
250 }
251 }
252
253 pub fn as_column_version(&self) -> ColumnVersion {
255 ColumnVersion::new(
256 TOMBSTONE_COL_VERSION,
257 self.db_version,
258 self.node_id,
259 self.local_db_version,
260 )
261 }
262}
263
264#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
267pub struct LogicalClock {
268 time: u64,
269}
270
271impl LogicalClock {
272 pub fn new() -> Self {
274 Self { time: 0 }
275 }
276
277 pub fn tick(&mut self) -> u64 {
279 self.time += 1;
280 self.time
281 }
282
283 pub fn update(&mut self, received_time: u64) -> u64 {
285 self.time = self.time.max(received_time);
286 self.time += 1;
287 self.time
288 }
289
290 pub fn set_time(&mut self, time: u64) {
292 self.time = time;
293 }
294
295 pub fn current_time(&self) -> u64 {
297 self.time
298 }
299}
300
301impl Default for LogicalClock {
302 fn default() -> Self {
303 Self::new()
304 }
305}
306
307#[derive(Debug, Clone)]
311#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
312pub struct TombstoneStorage<K: Hash + Eq> {
313 entries: HashMap<K, TombstoneInfo>,
314}
315
316impl<K: Hash + Eq> TombstoneStorage<K> {
317 pub fn new() -> Self {
318 Self {
319 entries: HashMap::new(),
320 }
321 }
322
323 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
324 self.entries.insert(key, info);
325 }
326
327 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
328 self.entries.get(key).copied()
329 }
330
331 pub fn erase(&mut self, key: &K) -> bool {
332 self.entries.remove(key).is_some()
333 }
334
335 pub fn clear(&mut self) {
336 self.entries.clear();
337 }
338
339 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
340 self.entries.iter()
341 }
342
343 pub fn len(&self) -> usize {
344 self.entries.len()
345 }
346
347 pub fn is_empty(&self) -> bool {
348 self.entries.is_empty()
349 }
350
351 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
355 let initial_len = self.entries.len();
356 self
357 .entries
358 .retain(|_, info| info.db_version >= min_acknowledged_version);
359 initial_len - self.entries.len()
360 }
361}
362
363impl<K: Hash + Eq> Default for TombstoneStorage<K> {
364 fn default() -> Self {
365 Self::new()
366 }
367}
368
369#[derive(Debug, Clone)]
371#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
372#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
373#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
374pub struct Record<C, V> {
375 pub fields: HashMap<C, V>,
376 pub column_versions: HashMap<C, ColumnVersion>,
377 pub lowest_local_db_version: u64,
379 pub highest_local_db_version: u64,
380}
381
382impl<C: Hash + Eq, V> Record<C, V> {
383 pub fn new() -> Self {
384 Self {
385 fields: HashMap::new(),
386 column_versions: HashMap::new(),
387 lowest_local_db_version: u64::MAX,
388 highest_local_db_version: 0,
389 }
390 }
391
392 pub fn from_parts(
394 fields: HashMap<C, V>,
395 column_versions: HashMap<C, ColumnVersion>,
396 ) -> Self {
397 let mut lowest = u64::MAX;
398 let mut highest = 0;
399
400 for ver in column_versions.values() {
401 if ver.local_db_version < lowest {
402 lowest = ver.local_db_version;
403 }
404 if ver.local_db_version > highest {
405 highest = ver.local_db_version;
406 }
407 }
408
409 Self {
410 fields,
411 column_versions,
412 lowest_local_db_version: lowest,
413 highest_local_db_version: highest,
414 }
415 }
416}
417
418impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
419 fn eq(&self, other: &Self) -> bool {
420 self.fields == other.fields
422 }
423}
424
425impl<C: Hash + Eq, V> Default for Record<C, V> {
426 fn default() -> Self {
427 Self::new()
428 }
429}
430
431pub trait MergeRule<K, C, V> {
436 fn should_accept(
438 &self,
439 local_col: u64,
440 local_db: u64,
441 local_node: NodeId,
442 remote_col: u64,
443 remote_db: u64,
444 remote_node: NodeId,
445 ) -> bool;
446
447 fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
449 self.should_accept(
450 local.col_version,
451 local.db_version,
452 local.node_id,
453 remote.col_version,
454 remote.db_version,
455 remote.node_id,
456 )
457 }
458}
459
460#[derive(Debug, Clone, Copy, Default)]
467pub struct DefaultMergeRule;
468
469impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
470 fn should_accept(
471 &self,
472 local_col: u64,
473 local_db: u64,
474 local_node: NodeId,
475 remote_col: u64,
476 remote_db: u64,
477 remote_node: NodeId,
478 ) -> bool {
479 match remote_col.cmp(&local_col) {
480 Ordering::Greater => true,
481 Ordering::Less => false,
482 Ordering::Equal => match remote_db.cmp(&local_db) {
483 Ordering::Greater => true,
484 Ordering::Less => false,
485 Ordering::Equal => remote_node > local_node,
486 },
487 }
488 }
489}
490
491pub trait ChangeComparator<K, C, V> {
493 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
494}
495
496#[derive(Debug, Clone, Copy, Default)]
506pub struct DefaultChangeComparator;
507
508impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
509 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
510 match a.record_id.cmp(&b.record_id) {
512 Ordering::Equal => {}
513 ord => return ord,
514 }
515
516 match (a.col_name.as_ref(), b.col_name.as_ref()) {
518 (None, None) => {}
519 (None, Some(_)) => return Ordering::Greater,
520 (Some(_), None) => return Ordering::Less,
521 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
522 Ordering::Equal => {}
523 ord => return ord,
524 },
525 }
526
527 match b.col_version.cmp(&a.col_version) {
529 Ordering::Equal => {}
530 ord => return ord,
531 }
532
533 match b.db_version.cmp(&a.db_version) {
534 Ordering::Equal => {}
535 ord => return ord,
536 }
537
538 b.node_id.cmp(&a.node_id)
539 }
540}
541
542#[derive(Debug)]
560#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
561#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
562#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Ord + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
563pub struct CRDT<K: Ord + Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
564 node_id: NodeId,
565 clock: LogicalClock,
566 data: DataMap<K, Record<C, V>>,
567 tombstones: TombstoneStorage<K>,
568 #[cfg_attr(feature = "serde", serde(skip, default))]
569 parent: Option<Arc<CRDT<K, C, V>>>,
570 #[allow(dead_code)]
571 base_version: u64,
572}
573
574impl<K: Ord + Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
576 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
583 let (clock, base_version) = if let Some(ref p) = parent {
584 let parent_clock = p.clock;
585 let base = parent_clock.current_time();
586 (parent_clock, base)
587 } else {
588 (LogicalClock::new(), 0)
589 };
590
591 Self {
592 node_id,
593 clock,
594 data: DataMap::new(),
595 tombstones: TombstoneStorage::new(),
596 parent,
597 base_version,
598 }
599 }
600
601 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
608 let mut crdt = Self::new(node_id, None);
609 crdt.apply_changes(changes);
610 crdt
611 }
612
613 pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
619 self.data.clear();
620 self.tombstones.clear();
621 self.clock = LogicalClock::new();
622 self.apply_changes(changes);
623 }
624
625 fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
627 let max_db_version = changes
629 .iter()
630 .map(|c| c.db_version.max(c.local_db_version))
631 .max()
632 .unwrap_or(0);
633
634 self.clock.set_time(max_db_version);
636
637 for change in changes {
639 let record_id = change.record_id.clone();
640 let col_name = change.col_name.clone();
641 let remote_col_version = change.col_version;
642 let remote_db_version = change.db_version;
643 let remote_node_id = change.node_id;
644 let remote_local_db_version = change.local_db_version;
645 let remote_value = change.value;
646
647 if col_name.is_none() {
648 self.data.remove(&record_id);
650
651 self.tombstones.insert_or_assign(
653 record_id,
654 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
655 );
656 } else if let Some(col_key) = col_name {
657 if !self.is_record_tombstoned(&record_id, false) {
659 let record = self.get_or_create_record_unchecked(&record_id, false);
660
661 if let Some(value) = remote_value {
663 record.fields.insert(col_key.clone(), value);
664 }
665
666 let col_ver = ColumnVersion::new(
668 remote_col_version,
669 remote_db_version,
670 remote_node_id,
671 remote_local_db_version,
672 );
673 record.column_versions.insert(col_key, col_ver);
674
675 if remote_local_db_version < record.lowest_local_db_version {
677 record.lowest_local_db_version = remote_local_db_version;
678 }
679 if remote_local_db_version > record.highest_local_db_version {
680 record.highest_local_db_version = remote_local_db_version;
681 }
682 }
683 }
684 }
685 }
686
687 #[must_use = "changes should be propagated to other nodes"]
698 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
699 where
700 I: IntoIterator<Item = (C, V)>,
701 {
702 self.insert_or_update_with_flags(record_id, 0, fields)
703 }
704
705 #[must_use = "changes should be propagated to other nodes"]
717 pub fn insert_or_update_with_flags<I>(
718 &mut self,
719 record_id: &K,
720 flags: u32,
721 fields: I,
722 ) -> Vec<Change<K, C, V>>
723 where
724 I: IntoIterator<Item = (C, V)>,
725 {
726 let db_version = self.clock.tick();
727
728 if self.is_record_tombstoned(record_id, false) {
730 return Vec::new();
731 }
732
733 let mut changes = Vec::new();
734 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
736
737 for (col_name, value) in fields {
738 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
739 col_info.col_version += 1;
740 col_info.db_version = db_version;
741 col_info.node_id = node_id;
742 col_info.local_db_version = db_version;
743 col_info.col_version
744 } else {
745 record.column_versions.insert(
746 col_name.clone(),
747 ColumnVersion::new(1, db_version, node_id, db_version),
748 );
749 1
750 };
751
752 if db_version < record.lowest_local_db_version {
754 record.lowest_local_db_version = db_version;
755 }
756 if db_version > record.highest_local_db_version {
757 record.highest_local_db_version = db_version;
758 }
759
760 record.fields.insert(col_name.clone(), value.clone());
761 changes.push(Change::new(
762 record_id.clone(),
763 Some(col_name),
764 Some(value),
765 col_version,
766 db_version,
767 node_id,
768 db_version,
769 flags,
770 ));
771 }
772
773 changes
774 }
775
776 #[must_use = "changes should be propagated to other nodes"]
786 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
787 self.delete_record_with_flags(record_id, 0)
788 }
789
790 #[must_use = "changes should be propagated to other nodes"]
801 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
802 if self.is_record_tombstoned(record_id, false) {
803 return None;
804 }
805
806 let db_version = self.clock.tick();
807
808 self.data.remove(record_id);
810
811 self.tombstones.insert_or_assign(
813 record_id.clone(),
814 TombstoneInfo::new(db_version, self.node_id, db_version),
815 );
816
817 Some(Change::new(
818 record_id.clone(),
819 None,
820 None,
821 TOMBSTONE_COL_VERSION,
822 db_version,
823 self.node_id,
824 db_version,
825 flags,
826 ))
827 }
828
829 #[must_use = "changes should be propagated to other nodes"]
843 pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
844 self.delete_field_with_flags(record_id, field_name, 0)
845 }
846
847 #[must_use = "changes should be propagated to other nodes"]
862 pub fn delete_field_with_flags(
863 &mut self,
864 record_id: &K,
865 field_name: &C,
866 flags: u32,
867 ) -> Option<Change<K, C, V>> {
868 if self.is_record_tombstoned(record_id, false) {
870 return None;
871 }
872
873 let record = self.data.get_mut(record_id)?;
875
876 if !record.fields.contains_key(field_name) {
878 return None;
879 }
880
881 let db_version = self.clock.tick();
882
883 let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
885 col_info.col_version += 1;
886 col_info.db_version = db_version;
887 col_info.node_id = self.node_id;
888 col_info.local_db_version = db_version;
889 col_info.col_version
890 } else {
891 record.column_versions.insert(
893 field_name.clone(),
894 ColumnVersion::new(1, db_version, self.node_id, db_version),
895 );
896 1
897 };
898
899 if db_version < record.lowest_local_db_version {
901 record.lowest_local_db_version = db_version;
902 }
903 if db_version > record.highest_local_db_version {
904 record.highest_local_db_version = db_version;
905 }
906
907 record.fields.remove(field_name);
909
910 Some(Change::new(
911 record_id.clone(),
912 Some(field_name.clone()),
913 None, col_version,
915 db_version,
916 self.node_id,
917 db_version,
918 flags,
919 ))
920 }
921
922 pub fn merge_changes<R: MergeRule<K, C, V>>(
933 &mut self,
934 changes: Vec<Change<K, C, V>>,
935 merge_rule: &R,
936 ) -> Vec<Change<K, C, V>> {
937 self.merge_changes_impl(changes, false, merge_rule)
938 }
939
940 fn merge_changes_impl<R: MergeRule<K, C, V>>(
941 &mut self,
942 changes: Vec<Change<K, C, V>>,
943 ignore_parent: bool,
944 merge_rule: &R,
945 ) -> Vec<Change<K, C, V>> {
946 let mut accepted_changes = Vec::new();
947
948 if changes.is_empty() {
949 return accepted_changes;
950 }
951
952 for change in changes {
953 let Change {
955 record_id,
956 col_name,
957 value: remote_value,
958 col_version: remote_col_version,
959 db_version: remote_db_version,
960 node_id: remote_node_id,
961 flags,
962 ..
963 } = change;
964
965 let new_local_db_version = self.clock.update(remote_db_version);
967
968 if self.is_record_tombstoned(&record_id, ignore_parent) {
970 continue;
971 }
972
973 let local_col_info = if col_name.is_none() {
975 self
977 .tombstones
978 .find(&record_id)
979 .map(|info| info.as_column_version())
980 } else if let Some(ref col) = col_name {
981 self
983 .get_record_ptr(&record_id, ignore_parent)
984 .and_then(|record| record.column_versions.get(col).copied())
985 } else {
986 None
987 };
988
989 let should_accept = if let Some(local_info) = local_col_info {
991 merge_rule.should_accept(
992 local_info.col_version,
993 local_info.db_version,
994 local_info.node_id,
995 remote_col_version,
996 remote_db_version,
997 remote_node_id,
998 )
999 } else {
1000 true
1001 };
1002
1003 if should_accept {
1004 if let Some(col_key) = col_name {
1005 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
1007
1008 if let Some(value) = remote_value.clone() {
1010 record.fields.insert(col_key.clone(), value);
1011 } else {
1012 record.fields.remove(&col_key);
1014 }
1015
1016 record.column_versions.insert(
1018 col_key.clone(),
1019 ColumnVersion::new(
1020 remote_col_version,
1021 remote_db_version,
1022 remote_node_id,
1023 new_local_db_version,
1024 ),
1025 );
1026
1027 if new_local_db_version < record.lowest_local_db_version {
1029 record.lowest_local_db_version = new_local_db_version;
1030 }
1031 if new_local_db_version > record.highest_local_db_version {
1032 record.highest_local_db_version = new_local_db_version;
1033 }
1034
1035 accepted_changes.push(Change::new(
1036 record_id,
1037 Some(col_key),
1038 remote_value,
1039 remote_col_version,
1040 remote_db_version,
1041 remote_node_id,
1042 new_local_db_version,
1043 flags,
1044 ));
1045 } else {
1046 self.data.remove(&record_id);
1048
1049 self.tombstones.insert_or_assign(
1051 record_id.clone(),
1052 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1053 );
1054
1055 accepted_changes.push(Change::new(
1056 record_id,
1057 None,
1058 None,
1059 remote_col_version,
1060 remote_db_version,
1061 remote_node_id,
1062 new_local_db_version,
1063 flags,
1064 ));
1065 }
1066 }
1067 }
1068
1069 accepted_changes
1070 }
1071
1072 #[must_use]
1082 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1083 where
1084 K: Ord,
1085 C: Ord,
1086 {
1087 self.get_changes_since_excluding(last_db_version, &HashSet::new())
1088 }
1089
1090 pub fn get_changes_since_excluding(
1092 &self,
1093 last_db_version: u64,
1094 excluding: &HashSet<NodeId>,
1095 ) -> Vec<Change<K, C, V>>
1096 where
1097 K: Ord,
1098 C: Ord,
1099 {
1100 let mut changes = Vec::new();
1101
1102 if let Some(ref parent) = self.parent {
1104 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1105 changes.extend(parent_changes);
1106 }
1107
1108 for (record_id, record) in &self.data {
1110 if record.highest_local_db_version <= last_db_version {
1112 continue;
1113 }
1114
1115 for (col_name, clock_info) in &record.column_versions {
1116 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1117 {
1118 let value = record.fields.get(col_name).cloned();
1119
1120 changes.push(Change::new(
1121 record_id.clone(),
1122 Some(col_name.clone()),
1123 value,
1124 clock_info.col_version,
1125 clock_info.db_version,
1126 clock_info.node_id,
1127 clock_info.local_db_version,
1128 0,
1129 ));
1130 }
1131 }
1132 }
1133
1134 for (record_id, tombstone_info) in self.tombstones.iter() {
1136 if tombstone_info.local_db_version > last_db_version
1137 && !excluding.contains(&tombstone_info.node_id)
1138 {
1139 changes.push(Change::new(
1140 record_id.clone(),
1141 None,
1142 None,
1143 TOMBSTONE_COL_VERSION,
1144 tombstone_info.db_version,
1145 tombstone_info.node_id,
1146 tombstone_info.local_db_version,
1147 0,
1148 ));
1149 }
1150 }
1151
1152 if self.parent.is_some() {
1153 Self::compress_changes(&mut changes);
1155 }
1156
1157 changes
1158 }
1159
1160 pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1170 where
1171 K: Ord,
1172 C: Ord,
1173 {
1174 if changes.is_empty() {
1175 return;
1176 }
1177
1178 let comparator = DefaultChangeComparator;
1181 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1182
1183 let mut write = 0;
1185 for read in 1..changes.len() {
1186 if changes[read].record_id != changes[write].record_id {
1187 write += 1;
1189 if write != read {
1190 changes[write] = changes[read].clone();
1191 }
1192 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1193 let mut first_pos = write;
1196 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1197 first_pos -= 1;
1198 }
1199 changes[first_pos] = changes[read].clone();
1200 write = first_pos;
1201 } else if changes[read].col_name != changes[write].col_name
1202 && changes[write].col_name.is_some()
1203 {
1204 write += 1;
1206 if write != read {
1207 changes[write] = changes[read].clone();
1208 }
1209 }
1210 }
1212
1213 changes.truncate(write + 1);
1214 }
1215
1216 pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1218 self.get_record_ptr(record_id, false)
1219 }
1220
1221 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1223 self.is_record_tombstoned(record_id, false)
1224 }
1225
1226 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1228 if let Some(info) = self.tombstones.find(record_id) {
1229 return Some(info);
1230 }
1231
1232 if let Some(ref parent) = self.parent {
1233 return parent.get_tombstone(record_id);
1234 }
1235
1236 None
1237 }
1238
1239 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1260 self.tombstones.compact(min_acknowledged_version)
1261 }
1262
1263 pub fn tombstone_count(&self) -> usize {
1265 self.tombstones.len()
1266 }
1267
1268 pub fn get_clock(&self) -> &LogicalClock {
1270 &self.clock
1271 }
1272
1273 pub fn get_data(&self) -> &DataMap<K, Record<C, V>> {
1275 &self.data
1276 }
1277
1278 #[cfg(feature = "json")]
1286 pub fn to_json(&self) -> Result<String, serde_json::Error>
1287 where
1288 K: serde::Serialize,
1289 C: serde::Serialize,
1290 V: serde::Serialize,
1291 {
1292 serde_json::to_string(self)
1293 }
1294
1295 #[cfg(feature = "json")]
1304 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1305 where
1306 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1307 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1308 V: serde::de::DeserializeOwned + Clone,
1309 {
1310 serde_json::from_str(json)
1311 }
1312
1313 #[cfg(feature = "binary")]
1321 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1322 where
1323 K: serde::Serialize,
1324 C: serde::Serialize,
1325 V: serde::Serialize,
1326 {
1327 bincode::serde::encode_to_vec(self, bincode::config::standard())
1328 }
1329
1330 #[cfg(feature = "binary")]
1339 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1340 where
1341 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1342 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1343 V: serde::de::DeserializeOwned + Clone,
1344 {
1345 let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1346 Ok(result)
1347 }
1348
1349 #[cfg(feature = "msgpack")]
1360 pub fn to_msgpack_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error>
1361 where
1362 K: serde::Serialize,
1363 C: serde::Serialize,
1364 V: serde::Serialize,
1365 {
1366 rmp_serde::to_vec(self)
1367 }
1368
1369 #[cfg(feature = "msgpack")]
1381 pub fn from_msgpack_bytes(bytes: &[u8]) -> Result<Self, rmp_serde::decode::Error>
1382 where
1383 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1384 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1385 V: serde::de::DeserializeOwned + Clone,
1386 {
1387 rmp_serde::from_slice(bytes)
1388 }
1389
1390 #[cfg(feature = "std")]
1412 pub fn get_changed_since(&self, since_version: u64) -> (
1413 DataMap<K, Record<C, V>>,
1414 HashMap<K, TombstoneInfo>,
1415 ) {
1416 let records = self.data
1417 .iter()
1418 .filter(|(_, record)| record.highest_local_db_version > since_version)
1419 .map(|(k, v)| (k.clone(), v.clone()))
1420 .collect();
1421
1422 let tombstones = self.tombstones
1423 .iter()
1424 .filter(|(_, info)| info.local_db_version > since_version)
1425 .map(|(k, v)| (k.clone(), *v))
1426 .collect();
1427
1428 (records, tombstones)
1429 }
1430
1431 #[cfg(not(feature = "std"))]
1433 pub fn get_changed_since(&self, since_version: u64) -> (
1434 DataMap<K, Record<C, V>>,
1435 HashMap<K, TombstoneInfo>,
1436 ) {
1437 let records = self.data
1438 .iter()
1439 .filter(|(_, record)| record.highest_local_db_version > since_version)
1440 .map(|(k, v)| (k.clone(), v.clone()))
1441 .collect();
1442
1443 let tombstones = self.tombstones
1444 .iter()
1445 .filter(|(_, info)| info.local_db_version > since_version)
1446 .map(|(k, v)| (k.clone(), *v))
1447 .collect();
1448
1449 (records, tombstones)
1450 }
1451
1452 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1455 if self.tombstones.find(record_id).is_some() {
1456 return true;
1457 }
1458
1459 if !ignore_parent {
1460 if let Some(ref parent) = self.parent {
1461 return parent.is_record_tombstoned(record_id, false);
1462 }
1463 }
1464
1465 false
1466 }
1467
1468 fn get_or_create_record_unchecked(
1469 &mut self,
1470 record_id: &K,
1471 ignore_parent: bool,
1472 ) -> &mut Record<C, V> {
1473 match self.data.entry(record_id.clone()) {
1474 DataMapEntry::Occupied(e) => e.into_mut(),
1475 DataMapEntry::Vacant(e) => {
1476 let record = if !ignore_parent {
1477 self
1478 .parent
1479 .as_ref()
1480 .and_then(|p| p.get_record_ptr(record_id, false))
1481 .cloned()
1482 .unwrap_or_else(Record::new)
1483 } else {
1484 Record::new()
1485 };
1486 e.insert(record)
1487 }
1488 }
1489 }
1490
1491 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1492 if let Some(record) = self.data.get(record_id) {
1493 return Some(record);
1494 }
1495
1496 if !ignore_parent {
1497 if let Some(ref parent) = self.parent {
1498 return parent.get_record_ptr(record_id, false);
1499 }
1500 }
1501
1502 None
1503 }
1504}
1505
1506#[cfg(feature = "sorted-keys")]
1508impl<K: Ord + Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
1509 pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&K, &Record<C, V>)>
1540 where
1541 R: core::ops::RangeBounds<K>,
1542 {
1543 self.data.range(range)
1544 }
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549 use super::*;
1550
1551 #[cfg(not(feature = "std"))]
1552 use alloc::{string::ToString, vec};
1553
1554 #[test]
1555 fn test_logical_clock() {
1556 let mut clock = LogicalClock::new();
1557 assert_eq!(clock.current_time(), 0);
1558
1559 let t1 = clock.tick();
1560 assert_eq!(t1, 1);
1561 assert_eq!(clock.current_time(), 1);
1562
1563 let t2 = clock.update(5);
1564 assert_eq!(t2, 6);
1565 assert_eq!(clock.current_time(), 6);
1566 }
1567
1568 #[test]
1569 fn test_tombstone_storage() {
1570 let mut storage = TombstoneStorage::new();
1571 let info = TombstoneInfo::new(10, 1, 10);
1572
1573 storage.insert_or_assign("key1".to_string(), info);
1574 assert_eq!(storage.len(), 1);
1575
1576 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1577 assert_eq!(storage.find(&"key2".to_string()), None);
1578
1579 let removed = storage.compact(15);
1580 assert_eq!(removed, 1);
1581 assert_eq!(storage.len(), 0);
1582 }
1583
1584 #[test]
1585 fn test_basic_insert() {
1586 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1587
1588 let fields = vec![
1589 ("name".to_string(), "Alice".to_string()),
1590 ("age".to_string(), "30".to_string()),
1591 ];
1592
1593 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1594
1595 assert_eq!(changes.len(), 2);
1596 assert_eq!(crdt.get_data().len(), 1);
1597
1598 let record = crdt.get_record(&"user1".to_string()).unwrap();
1599 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1600 assert_eq!(record.fields.get("age").unwrap(), "30");
1601 }
1602
1603 #[test]
1604 fn test_delete_record() {
1605 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1606
1607 let fields = vec![("name".to_string(), "Bob".to_string())];
1608 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1609
1610 let delete_change = crdt.delete_record(&"user2".to_string());
1611 assert!(delete_change.is_some());
1612 assert!(crdt.is_tombstoned(&"user2".to_string()));
1613 assert_eq!(crdt.get_data().len(), 0);
1614 }
1615
1616 #[test]
1617 fn test_merge_changes() {
1618 let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1619 let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1620
1621 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1622 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1623
1624 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1625 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1626
1627 let merge_rule = DefaultMergeRule;
1628 crdt1.merge_changes(changes2, &merge_rule);
1629 crdt2.merge_changes(changes1, &merge_rule);
1630
1631 assert_eq!(
1633 crdt1
1634 .get_record(&"record1".to_string())
1635 .unwrap()
1636 .fields
1637 .get("tag")
1638 .unwrap(),
1639 "Node2"
1640 );
1641 assert_eq!(crdt1.get_data(), crdt2.get_data());
1642 }
1643
1644 #[test]
1645 #[cfg(feature = "serde")]
1646 fn test_change_serialization() {
1647 #[allow(unused_variables)]
1648 let change = Change::new(
1649 "record1".to_string(),
1650 Some("name".to_string()),
1651 Some("Alice".to_string()),
1652 1,
1653 10,
1654 1,
1655 10,
1656 0,
1657 );
1658
1659 #[cfg(feature = "json")]
1661 {
1662 let json = serde_json::to_string(&change).unwrap();
1663 let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1664 assert_eq!(change, deserialized);
1665 }
1666
1667 #[cfg(feature = "binary")]
1669 {
1670 let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1671 let (deserialized, _): (Change<String, String, String>, _) =
1672 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1673 assert_eq!(change, deserialized);
1674 }
1675 }
1676
1677 #[test]
1678 #[cfg(feature = "serde")]
1679 fn test_record_serialization() {
1680 let mut fields = HashMap::new();
1681 fields.insert("name".to_string(), "Bob".to_string());
1682 fields.insert("age".to_string(), "25".to_string());
1683
1684 let mut column_versions = HashMap::new();
1685 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1686 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1687
1688 #[allow(unused_variables)]
1689 let record = Record::from_parts(fields, column_versions);
1690
1691 #[cfg(feature = "json")]
1693 {
1694 let json = serde_json::to_string(&record).unwrap();
1695 let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1696 assert_eq!(record, deserialized);
1697 }
1698
1699 #[cfg(feature = "binary")]
1701 {
1702 let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1703 let (deserialized, _): (Record<String, String>, _) =
1704 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1705 assert_eq!(record, deserialized);
1706 }
1707 }
1708
1709 #[test]
1710 #[cfg(feature = "json")]
1711 fn test_crdt_json_serialization() {
1712 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1713
1714 let fields = vec![
1716 ("name".to_string(), "Alice".to_string()),
1717 ("age".to_string(), "30".to_string()),
1718 ];
1719 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1720
1721 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1722 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1723
1724 let _ = crdt.delete_record(&"user2".to_string());
1726
1727 let json = crdt.to_json().unwrap();
1729
1730 let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1732
1733 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1735 assert_eq!(
1736 crdt.get_record(&"user1".to_string()).unwrap().fields,
1737 deserialized.get_record(&"user1".to_string()).unwrap().fields
1738 );
1739
1740 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1742 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1743
1744 assert_eq!(
1746 crdt.get_clock().current_time(),
1747 deserialized.get_clock().current_time()
1748 );
1749
1750 let has_parent = deserialized.parent.is_some();
1752 assert!(!has_parent);
1753 }
1754
1755 #[test]
1756 #[cfg(feature = "binary")]
1757 fn test_crdt_binary_serialization() {
1758 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1759
1760 let fields = vec![
1762 ("name".to_string(), "Alice".to_string()),
1763 ("age".to_string(), "30".to_string()),
1764 ];
1765 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1766
1767 let bytes = crdt.to_bytes().unwrap();
1769
1770 let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1772
1773 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1775 assert_eq!(
1776 crdt.get_record(&"user1".to_string()).unwrap().fields,
1777 deserialized.get_record(&"user1".to_string()).unwrap().fields
1778 );
1779
1780 assert_eq!(
1782 crdt.get_clock().current_time(),
1783 deserialized.get_clock().current_time()
1784 );
1785 }
1786
1787 #[test]
1788 #[cfg(feature = "serde")]
1789 fn test_parent_not_serialized() {
1790 let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1792 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1793 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1794
1795 let parent_arc = Arc::new(parent);
1797 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1798 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1799 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1800
1801 #[cfg(feature = "json")]
1803 {
1804 let json = serde_json::to_string(&child).unwrap();
1805 let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1806
1807 assert!(deserialized.parent.is_none());
1809
1810 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1812
1813 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1815 }
1816 }
1817
1818 #[test]
1819 #[cfg(feature = "sorted-keys")]
1820 fn test_sorted_keys_range_queries() {
1821 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1822
1823 let _ = crdt.insert_or_update(
1825 &String::from("session-abc-001"),
1826 vec![(String::from("data"), String::from("first"))],
1827 );
1828 let _ = crdt.insert_or_update(
1829 &String::from("session-abc-002"),
1830 vec![(String::from("data"), String::from("second"))],
1831 );
1832 let _ = crdt.insert_or_update(
1833 &String::from("session-abc-003"),
1834 vec![(String::from("data"), String::from("third"))],
1835 );
1836 let _ = crdt.insert_or_update(
1837 &String::from("session-xyz-001"),
1838 vec![(String::from("data"), String::from("other"))],
1839 );
1840 let _ = crdt.insert_or_update(
1841 &String::from("user-001"),
1842 vec![(String::from("name"), String::from("Alice"))],
1843 );
1844
1845 let session_abc_records: Vec<_> = crdt
1847 .range(String::from("session-abc-")..String::from("session-abd-"))
1848 .collect();
1849
1850 assert_eq!(session_abc_records.len(), 3);
1851 assert!(session_abc_records
1852 .iter()
1853 .all(|(k, _)| k.starts_with("session-abc-")));
1854
1855 let all_keys: Vec<String> = crdt.get_data().keys().cloned().collect();
1857 let mut sorted_keys = all_keys.clone();
1858 sorted_keys.sort();
1859 assert_eq!(all_keys, sorted_keys, "Keys should be in sorted order");
1860
1861 let range_from_user: Vec<_> = crdt.range(String::from("user-")..).collect();
1863 assert_eq!(range_from_user.len(), 1);
1864 assert_eq!(range_from_user[0].0, "user-001");
1865 }
1866}