1#![cfg_attr(not(feature = "std"), no_std)]
87
88#[cfg(any(feature = "persist", feature = "persist-msgpack", feature = "persist-compressed"))]
90pub mod persist;
91
92#[cfg(not(any(feature = "std", feature = "alloc")))]
94compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
95
96#[cfg(not(feature = "std"))]
97extern crate alloc;
98
99#[cfg(feature = "std")]
100use std::{
101 cmp::Ordering,
102 collections::{HashMap, HashSet},
103 hash::Hash,
104 sync::Arc,
105};
106
107#[cfg(not(feature = "std"))]
108use alloc::{
109 string::String,
110 sync::Arc,
111 vec::Vec,
112};
113#[cfg(not(feature = "std"))]
114use core::{cmp::Ordering, hash::Hash};
115#[cfg(all(not(feature = "std"), feature = "alloc"))]
116use hashbrown::{HashMap, HashSet};
117
118#[cfg(feature = "node-id-u128")]
125pub type NodeId = u128;
126
127#[cfg(not(feature = "node-id-u128"))]
128pub type NodeId = u64;
129
130pub type ColumnKey = String;
132
133const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
136
137#[derive(Debug, Clone, PartialEq)]
144#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
145#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
146#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
147pub struct Change<K, C, V> {
148 pub record_id: K,
149 pub col_name: Option<C>,
151 pub value: Option<V>,
153 pub col_version: u64,
154 pub db_version: u64,
155 pub node_id: NodeId,
156 pub local_db_version: u64,
158 pub flags: u32,
160}
161
162impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
163
164impl<K, C, V> Change<K, C, V> {
165 #[allow(clippy::too_many_arguments)]
167 pub fn new(
168 record_id: K,
169 col_name: Option<C>,
170 value: Option<V>,
171 col_version: u64,
172 db_version: u64,
173 node_id: NodeId,
174 local_db_version: u64,
175 flags: u32,
176 ) -> Self {
177 Self {
178 record_id,
179 col_name,
180 value,
181 col_version,
182 db_version,
183 node_id,
184 local_db_version,
185 flags,
186 }
187 }
188}
189
190#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
193pub struct ColumnVersion {
194 pub col_version: u64,
195 pub db_version: u64,
196 pub node_id: NodeId,
197 pub local_db_version: u64,
199}
200
201impl ColumnVersion {
202 pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
203 Self {
204 col_version,
205 db_version,
206 node_id,
207 local_db_version,
208 }
209 }
210}
211
212#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
218pub struct TombstoneInfo {
219 pub db_version: u64,
220 pub node_id: NodeId,
221 pub local_db_version: u64,
222}
223
224impl TombstoneInfo {
225 pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
226 Self {
227 db_version,
228 node_id,
229 local_db_version,
230 }
231 }
232
233 pub fn as_column_version(&self) -> ColumnVersion {
235 ColumnVersion::new(
236 TOMBSTONE_COL_VERSION,
237 self.db_version,
238 self.node_id,
239 self.local_db_version,
240 )
241 }
242}
243
244#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
247pub struct LogicalClock {
248 time: u64,
249}
250
251impl LogicalClock {
252 pub fn new() -> Self {
254 Self { time: 0 }
255 }
256
257 pub fn tick(&mut self) -> u64 {
259 self.time += 1;
260 self.time
261 }
262
263 pub fn update(&mut self, received_time: u64) -> u64 {
265 self.time = self.time.max(received_time);
266 self.time += 1;
267 self.time
268 }
269
270 pub fn set_time(&mut self, time: u64) {
272 self.time = time;
273 }
274
275 pub fn current_time(&self) -> u64 {
277 self.time
278 }
279}
280
281impl Default for LogicalClock {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287#[derive(Debug, Clone)]
291#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
292pub struct TombstoneStorage<K: Hash + Eq> {
293 entries: HashMap<K, TombstoneInfo>,
294}
295
296impl<K: Hash + Eq> TombstoneStorage<K> {
297 pub fn new() -> Self {
298 Self {
299 entries: HashMap::new(),
300 }
301 }
302
303 pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
304 self.entries.insert(key, info);
305 }
306
307 pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
308 self.entries.get(key).copied()
309 }
310
311 pub fn erase(&mut self, key: &K) -> bool {
312 self.entries.remove(key).is_some()
313 }
314
315 pub fn clear(&mut self) {
316 self.entries.clear();
317 }
318
319 pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
320 self.entries.iter()
321 }
322
323 pub fn len(&self) -> usize {
324 self.entries.len()
325 }
326
327 pub fn is_empty(&self) -> bool {
328 self.entries.is_empty()
329 }
330
331 pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
335 let initial_len = self.entries.len();
336 self
337 .entries
338 .retain(|_, info| info.db_version >= min_acknowledged_version);
339 initial_len - self.entries.len()
340 }
341}
342
343impl<K: Hash + Eq> Default for TombstoneStorage<K> {
344 fn default() -> Self {
345 Self::new()
346 }
347}
348
349#[derive(Debug, Clone)]
351#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
352#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
353#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
354pub struct Record<C, V> {
355 pub fields: HashMap<C, V>,
356 pub column_versions: HashMap<C, ColumnVersion>,
357 pub lowest_local_db_version: u64,
359 pub highest_local_db_version: u64,
360}
361
362impl<C: Hash + Eq, V> Record<C, V> {
363 pub fn new() -> Self {
364 Self {
365 fields: HashMap::new(),
366 column_versions: HashMap::new(),
367 lowest_local_db_version: u64::MAX,
368 highest_local_db_version: 0,
369 }
370 }
371
372 pub fn from_parts(
374 fields: HashMap<C, V>,
375 column_versions: HashMap<C, ColumnVersion>,
376 ) -> Self {
377 let mut lowest = u64::MAX;
378 let mut highest = 0;
379
380 for ver in column_versions.values() {
381 if ver.local_db_version < lowest {
382 lowest = ver.local_db_version;
383 }
384 if ver.local_db_version > highest {
385 highest = ver.local_db_version;
386 }
387 }
388
389 Self {
390 fields,
391 column_versions,
392 lowest_local_db_version: lowest,
393 highest_local_db_version: highest,
394 }
395 }
396}
397
398impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
399 fn eq(&self, other: &Self) -> bool {
400 self.fields == other.fields
402 }
403}
404
405impl<C: Hash + Eq, V> Default for Record<C, V> {
406 fn default() -> Self {
407 Self::new()
408 }
409}
410
411pub trait MergeRule<K, C, V> {
416 fn should_accept(
418 &self,
419 local_col: u64,
420 local_db: u64,
421 local_node: NodeId,
422 remote_col: u64,
423 remote_db: u64,
424 remote_node: NodeId,
425 ) -> bool;
426
427 fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
429 self.should_accept(
430 local.col_version,
431 local.db_version,
432 local.node_id,
433 remote.col_version,
434 remote.db_version,
435 remote.node_id,
436 )
437 }
438}
439
440#[derive(Debug, Clone, Copy, Default)]
447pub struct DefaultMergeRule;
448
449impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
450 fn should_accept(
451 &self,
452 local_col: u64,
453 local_db: u64,
454 local_node: NodeId,
455 remote_col: u64,
456 remote_db: u64,
457 remote_node: NodeId,
458 ) -> bool {
459 match remote_col.cmp(&local_col) {
460 Ordering::Greater => true,
461 Ordering::Less => false,
462 Ordering::Equal => match remote_db.cmp(&local_db) {
463 Ordering::Greater => true,
464 Ordering::Less => false,
465 Ordering::Equal => remote_node > local_node,
466 },
467 }
468 }
469}
470
471pub trait ChangeComparator<K, C, V> {
473 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
474}
475
476#[derive(Debug, Clone, Copy, Default)]
486pub struct DefaultChangeComparator;
487
488impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
489 fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
490 match a.record_id.cmp(&b.record_id) {
492 Ordering::Equal => {}
493 ord => return ord,
494 }
495
496 match (a.col_name.as_ref(), b.col_name.as_ref()) {
498 (None, None) => {}
499 (None, Some(_)) => return Ordering::Greater,
500 (Some(_), None) => return Ordering::Less,
501 (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
502 Ordering::Equal => {}
503 ord => return ord,
504 },
505 }
506
507 match b.col_version.cmp(&a.col_version) {
509 Ordering::Equal => {}
510 ord => return ord,
511 }
512
513 match b.db_version.cmp(&a.db_version) {
514 Ordering::Equal => {}
515 ord => return ord,
516 }
517
518 b.node_id.cmp(&a.node_id)
519 }
520}
521
522#[derive(Debug)]
526#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
527#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
528#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
529pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
530 node_id: NodeId,
531 clock: LogicalClock,
532 data: HashMap<K, Record<C, V>>,
533 tombstones: TombstoneStorage<K>,
534 #[cfg_attr(feature = "serde", serde(skip, default))]
535 parent: Option<Arc<CRDT<K, C, V>>>,
536 #[allow(dead_code)]
537 base_version: u64,
538}
539
540impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
541 pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
548 let (clock, base_version) = if let Some(ref p) = parent {
549 let parent_clock = p.clock;
550 let base = parent_clock.current_time();
551 (parent_clock, base)
552 } else {
553 (LogicalClock::new(), 0)
554 };
555
556 Self {
557 node_id,
558 clock,
559 data: HashMap::new(),
560 tombstones: TombstoneStorage::new(),
561 parent,
562 base_version,
563 }
564 }
565
566 pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
573 let mut crdt = Self::new(node_id, None);
574 crdt.apply_changes(changes);
575 crdt
576 }
577
578 pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
584 self.data.clear();
585 self.tombstones.clear();
586 self.clock = LogicalClock::new();
587 self.apply_changes(changes);
588 }
589
590 fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
592 let max_db_version = changes
594 .iter()
595 .map(|c| c.db_version.max(c.local_db_version))
596 .max()
597 .unwrap_or(0);
598
599 self.clock.set_time(max_db_version);
601
602 for change in changes {
604 let record_id = change.record_id.clone();
605 let col_name = change.col_name.clone();
606 let remote_col_version = change.col_version;
607 let remote_db_version = change.db_version;
608 let remote_node_id = change.node_id;
609 let remote_local_db_version = change.local_db_version;
610 let remote_value = change.value;
611
612 if col_name.is_none() {
613 self.data.remove(&record_id);
615
616 self.tombstones.insert_or_assign(
618 record_id,
619 TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
620 );
621 } else if let Some(col_key) = col_name {
622 if !self.is_record_tombstoned(&record_id, false) {
624 let record = self.get_or_create_record_unchecked(&record_id, false);
625
626 if let Some(value) = remote_value {
628 record.fields.insert(col_key.clone(), value);
629 }
630
631 let col_ver = ColumnVersion::new(
633 remote_col_version,
634 remote_db_version,
635 remote_node_id,
636 remote_local_db_version,
637 );
638 record.column_versions.insert(col_key, col_ver);
639
640 if remote_local_db_version < record.lowest_local_db_version {
642 record.lowest_local_db_version = remote_local_db_version;
643 }
644 if remote_local_db_version > record.highest_local_db_version {
645 record.highest_local_db_version = remote_local_db_version;
646 }
647 }
648 }
649 }
650 }
651
652 #[must_use = "changes should be propagated to other nodes"]
663 pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
664 where
665 I: IntoIterator<Item = (C, V)>,
666 {
667 self.insert_or_update_with_flags(record_id, 0, fields)
668 }
669
670 #[must_use = "changes should be propagated to other nodes"]
682 pub fn insert_or_update_with_flags<I>(
683 &mut self,
684 record_id: &K,
685 flags: u32,
686 fields: I,
687 ) -> Vec<Change<K, C, V>>
688 where
689 I: IntoIterator<Item = (C, V)>,
690 {
691 let db_version = self.clock.tick();
692
693 if self.is_record_tombstoned(record_id, false) {
695 return Vec::new();
696 }
697
698 let mut changes = Vec::new();
699 let node_id = self.node_id; let record = self.get_or_create_record_unchecked(record_id, false);
701
702 for (col_name, value) in fields {
703 let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
704 col_info.col_version += 1;
705 col_info.db_version = db_version;
706 col_info.node_id = node_id;
707 col_info.local_db_version = db_version;
708 col_info.col_version
709 } else {
710 record.column_versions.insert(
711 col_name.clone(),
712 ColumnVersion::new(1, db_version, node_id, db_version),
713 );
714 1
715 };
716
717 if db_version < record.lowest_local_db_version {
719 record.lowest_local_db_version = db_version;
720 }
721 if db_version > record.highest_local_db_version {
722 record.highest_local_db_version = db_version;
723 }
724
725 record.fields.insert(col_name.clone(), value.clone());
726 changes.push(Change::new(
727 record_id.clone(),
728 Some(col_name),
729 Some(value),
730 col_version,
731 db_version,
732 node_id,
733 db_version,
734 flags,
735 ));
736 }
737
738 changes
739 }
740
741 #[must_use = "changes should be propagated to other nodes"]
751 pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
752 self.delete_record_with_flags(record_id, 0)
753 }
754
755 #[must_use = "changes should be propagated to other nodes"]
766 pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
767 if self.is_record_tombstoned(record_id, false) {
768 return None;
769 }
770
771 let db_version = self.clock.tick();
772
773 self.data.remove(record_id);
775
776 self.tombstones.insert_or_assign(
778 record_id.clone(),
779 TombstoneInfo::new(db_version, self.node_id, db_version),
780 );
781
782 Some(Change::new(
783 record_id.clone(),
784 None,
785 None,
786 TOMBSTONE_COL_VERSION,
787 db_version,
788 self.node_id,
789 db_version,
790 flags,
791 ))
792 }
793
794 #[must_use = "changes should be propagated to other nodes"]
808 pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
809 self.delete_field_with_flags(record_id, field_name, 0)
810 }
811
812 #[must_use = "changes should be propagated to other nodes"]
827 pub fn delete_field_with_flags(
828 &mut self,
829 record_id: &K,
830 field_name: &C,
831 flags: u32,
832 ) -> Option<Change<K, C, V>> {
833 if self.is_record_tombstoned(record_id, false) {
835 return None;
836 }
837
838 let record = self.data.get_mut(record_id)?;
840
841 if !record.fields.contains_key(field_name) {
843 return None;
844 }
845
846 let db_version = self.clock.tick();
847
848 let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
850 col_info.col_version += 1;
851 col_info.db_version = db_version;
852 col_info.node_id = self.node_id;
853 col_info.local_db_version = db_version;
854 col_info.col_version
855 } else {
856 record.column_versions.insert(
858 field_name.clone(),
859 ColumnVersion::new(1, db_version, self.node_id, db_version),
860 );
861 1
862 };
863
864 if db_version < record.lowest_local_db_version {
866 record.lowest_local_db_version = db_version;
867 }
868 if db_version > record.highest_local_db_version {
869 record.highest_local_db_version = db_version;
870 }
871
872 record.fields.remove(field_name);
874
875 Some(Change::new(
876 record_id.clone(),
877 Some(field_name.clone()),
878 None, col_version,
880 db_version,
881 self.node_id,
882 db_version,
883 flags,
884 ))
885 }
886
887 pub fn merge_changes<R: MergeRule<K, C, V>>(
898 &mut self,
899 changes: Vec<Change<K, C, V>>,
900 merge_rule: &R,
901 ) -> Vec<Change<K, C, V>> {
902 self.merge_changes_impl(changes, false, merge_rule)
903 }
904
905 fn merge_changes_impl<R: MergeRule<K, C, V>>(
906 &mut self,
907 changes: Vec<Change<K, C, V>>,
908 ignore_parent: bool,
909 merge_rule: &R,
910 ) -> Vec<Change<K, C, V>> {
911 let mut accepted_changes = Vec::new();
912
913 if changes.is_empty() {
914 return accepted_changes;
915 }
916
917 for change in changes {
918 let Change {
920 record_id,
921 col_name,
922 value: remote_value,
923 col_version: remote_col_version,
924 db_version: remote_db_version,
925 node_id: remote_node_id,
926 flags,
927 ..
928 } = change;
929
930 let new_local_db_version = self.clock.update(remote_db_version);
932
933 if self.is_record_tombstoned(&record_id, ignore_parent) {
935 continue;
936 }
937
938 let local_col_info = if col_name.is_none() {
940 self
942 .tombstones
943 .find(&record_id)
944 .map(|info| info.as_column_version())
945 } else if let Some(ref col) = col_name {
946 self
948 .get_record_ptr(&record_id, ignore_parent)
949 .and_then(|record| record.column_versions.get(col).copied())
950 } else {
951 None
952 };
953
954 let should_accept = if let Some(local_info) = local_col_info {
956 merge_rule.should_accept(
957 local_info.col_version,
958 local_info.db_version,
959 local_info.node_id,
960 remote_col_version,
961 remote_db_version,
962 remote_node_id,
963 )
964 } else {
965 true
966 };
967
968 if should_accept {
969 if let Some(col_key) = col_name {
970 let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
972
973 if let Some(value) = remote_value.clone() {
975 record.fields.insert(col_key.clone(), value);
976 } else {
977 record.fields.remove(&col_key);
979 }
980
981 record.column_versions.insert(
983 col_key.clone(),
984 ColumnVersion::new(
985 remote_col_version,
986 remote_db_version,
987 remote_node_id,
988 new_local_db_version,
989 ),
990 );
991
992 if new_local_db_version < record.lowest_local_db_version {
994 record.lowest_local_db_version = new_local_db_version;
995 }
996 if new_local_db_version > record.highest_local_db_version {
997 record.highest_local_db_version = new_local_db_version;
998 }
999
1000 accepted_changes.push(Change::new(
1001 record_id,
1002 Some(col_key),
1003 remote_value,
1004 remote_col_version,
1005 remote_db_version,
1006 remote_node_id,
1007 new_local_db_version,
1008 flags,
1009 ));
1010 } else {
1011 self.data.remove(&record_id);
1013
1014 self.tombstones.insert_or_assign(
1016 record_id.clone(),
1017 TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1018 );
1019
1020 accepted_changes.push(Change::new(
1021 record_id,
1022 None,
1023 None,
1024 remote_col_version,
1025 remote_db_version,
1026 remote_node_id,
1027 new_local_db_version,
1028 flags,
1029 ));
1030 }
1031 }
1032 }
1033
1034 accepted_changes
1035 }
1036
1037 #[must_use]
1047 pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1048 where
1049 K: Ord,
1050 C: Ord,
1051 {
1052 self.get_changes_since_excluding(last_db_version, &HashSet::new())
1053 }
1054
1055 pub fn get_changes_since_excluding(
1057 &self,
1058 last_db_version: u64,
1059 excluding: &HashSet<NodeId>,
1060 ) -> Vec<Change<K, C, V>>
1061 where
1062 K: Ord,
1063 C: Ord,
1064 {
1065 let mut changes = Vec::new();
1066
1067 if let Some(ref parent) = self.parent {
1069 let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1070 changes.extend(parent_changes);
1071 }
1072
1073 for (record_id, record) in &self.data {
1075 if record.highest_local_db_version <= last_db_version {
1077 continue;
1078 }
1079
1080 for (col_name, clock_info) in &record.column_versions {
1081 if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1082 {
1083 let value = record.fields.get(col_name).cloned();
1084
1085 changes.push(Change::new(
1086 record_id.clone(),
1087 Some(col_name.clone()),
1088 value,
1089 clock_info.col_version,
1090 clock_info.db_version,
1091 clock_info.node_id,
1092 clock_info.local_db_version,
1093 0,
1094 ));
1095 }
1096 }
1097 }
1098
1099 for (record_id, tombstone_info) in self.tombstones.iter() {
1101 if tombstone_info.local_db_version > last_db_version
1102 && !excluding.contains(&tombstone_info.node_id)
1103 {
1104 changes.push(Change::new(
1105 record_id.clone(),
1106 None,
1107 None,
1108 TOMBSTONE_COL_VERSION,
1109 tombstone_info.db_version,
1110 tombstone_info.node_id,
1111 tombstone_info.local_db_version,
1112 0,
1113 ));
1114 }
1115 }
1116
1117 if self.parent.is_some() {
1118 Self::compress_changes(&mut changes);
1120 }
1121
1122 changes
1123 }
1124
1125 pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1135 where
1136 K: Ord,
1137 C: Ord,
1138 {
1139 if changes.is_empty() {
1140 return;
1141 }
1142
1143 let comparator = DefaultChangeComparator;
1146 changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1147
1148 let mut write = 0;
1150 for read in 1..changes.len() {
1151 if changes[read].record_id != changes[write].record_id {
1152 write += 1;
1154 if write != read {
1155 changes[write] = changes[read].clone();
1156 }
1157 } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1158 let mut first_pos = write;
1161 while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1162 first_pos -= 1;
1163 }
1164 changes[first_pos] = changes[read].clone();
1165 write = first_pos;
1166 } else if changes[read].col_name != changes[write].col_name
1167 && changes[write].col_name.is_some()
1168 {
1169 write += 1;
1171 if write != read {
1172 changes[write] = changes[read].clone();
1173 }
1174 }
1175 }
1177
1178 changes.truncate(write + 1);
1179 }
1180
1181 pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1183 self.get_record_ptr(record_id, false)
1184 }
1185
1186 pub fn is_tombstoned(&self, record_id: &K) -> bool {
1188 self.is_record_tombstoned(record_id, false)
1189 }
1190
1191 pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1193 if let Some(info) = self.tombstones.find(record_id) {
1194 return Some(info);
1195 }
1196
1197 if let Some(ref parent) = self.parent {
1198 return parent.get_tombstone(record_id);
1199 }
1200
1201 None
1202 }
1203
1204 pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1225 self.tombstones.compact(min_acknowledged_version)
1226 }
1227
1228 pub fn tombstone_count(&self) -> usize {
1230 self.tombstones.len()
1231 }
1232
1233 pub fn get_clock(&self) -> &LogicalClock {
1235 &self.clock
1236 }
1237
1238 pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1240 &self.data
1241 }
1242
1243 #[cfg(feature = "json")]
1251 pub fn to_json(&self) -> Result<String, serde_json::Error>
1252 where
1253 K: serde::Serialize,
1254 C: serde::Serialize,
1255 V: serde::Serialize,
1256 {
1257 serde_json::to_string(self)
1258 }
1259
1260 #[cfg(feature = "json")]
1269 pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1270 where
1271 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1272 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1273 V: serde::de::DeserializeOwned + Clone,
1274 {
1275 serde_json::from_str(json)
1276 }
1277
1278 #[cfg(feature = "binary")]
1286 pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1287 where
1288 K: serde::Serialize,
1289 C: serde::Serialize,
1290 V: serde::Serialize,
1291 {
1292 bincode::serde::encode_to_vec(self, bincode::config::standard())
1293 }
1294
1295 #[cfg(feature = "binary")]
1304 pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1305 where
1306 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1307 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1308 V: serde::de::DeserializeOwned + Clone,
1309 {
1310 let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1311 Ok(result)
1312 }
1313
1314 #[cfg(feature = "msgpack")]
1325 pub fn to_msgpack_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error>
1326 where
1327 K: serde::Serialize,
1328 C: serde::Serialize,
1329 V: serde::Serialize,
1330 {
1331 rmp_serde::to_vec(self)
1332 }
1333
1334 #[cfg(feature = "msgpack")]
1346 pub fn from_msgpack_bytes(bytes: &[u8]) -> Result<Self, rmp_serde::decode::Error>
1347 where
1348 K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1349 C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1350 V: serde::de::DeserializeOwned + Clone,
1351 {
1352 rmp_serde::from_slice(bytes)
1353 }
1354
1355 #[cfg(feature = "std")]
1377 pub fn get_changed_since(&self, since_version: u64) -> (
1378 HashMap<K, Record<C, V>>,
1379 HashMap<K, TombstoneInfo>,
1380 ) {
1381 let records = self.data
1382 .iter()
1383 .filter(|(_, record)| record.highest_local_db_version > since_version)
1384 .map(|(k, v)| (k.clone(), v.clone()))
1385 .collect();
1386
1387 let tombstones = self.tombstones
1388 .iter()
1389 .filter(|(_, info)| info.local_db_version > since_version)
1390 .map(|(k, v)| (k.clone(), *v))
1391 .collect();
1392
1393 (records, tombstones)
1394 }
1395
1396 #[cfg(not(feature = "std"))]
1398 pub fn get_changed_since(&self, since_version: u64) -> (
1399 hashbrown::HashMap<K, Record<C, V>>,
1400 hashbrown::HashMap<K, TombstoneInfo>,
1401 ) {
1402 let records = self.data
1403 .iter()
1404 .filter(|(_, record)| record.highest_local_db_version > since_version)
1405 .map(|(k, v)| (k.clone(), v.clone()))
1406 .collect();
1407
1408 let tombstones = self.tombstones
1409 .iter()
1410 .filter(|(_, info)| info.local_db_version > since_version)
1411 .map(|(k, v)| (k.clone(), *v))
1412 .collect();
1413
1414 (records, tombstones)
1415 }
1416
1417 fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1420 if self.tombstones.find(record_id).is_some() {
1421 return true;
1422 }
1423
1424 if !ignore_parent {
1425 if let Some(ref parent) = self.parent {
1426 return parent.is_record_tombstoned(record_id, false);
1427 }
1428 }
1429
1430 false
1431 }
1432
1433 fn get_or_create_record_unchecked(
1434 &mut self,
1435 record_id: &K,
1436 ignore_parent: bool,
1437 ) -> &mut Record<C, V> {
1438 #[cfg(feature = "std")]
1439 use std::collections::hash_map::Entry;
1440 #[cfg(all(not(feature = "std"), feature = "alloc"))]
1441 use hashbrown::hash_map::Entry;
1442
1443 match self.data.entry(record_id.clone()) {
1444 Entry::Occupied(e) => e.into_mut(),
1445 Entry::Vacant(e) => {
1446 let record = if !ignore_parent {
1447 self
1448 .parent
1449 .as_ref()
1450 .and_then(|p| p.get_record_ptr(record_id, false))
1451 .cloned()
1452 .unwrap_or_else(Record::new)
1453 } else {
1454 Record::new()
1455 };
1456 e.insert(record)
1457 }
1458 }
1459 }
1460
1461 fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1462 if let Some(record) = self.data.get(record_id) {
1463 return Some(record);
1464 }
1465
1466 if !ignore_parent {
1467 if let Some(ref parent) = self.parent {
1468 return parent.get_record_ptr(record_id, false);
1469 }
1470 }
1471
1472 None
1473 }
1474}
1475
1476#[cfg(test)]
1477mod tests {
1478 use super::*;
1479
1480 #[test]
1481 fn test_logical_clock() {
1482 let mut clock = LogicalClock::new();
1483 assert_eq!(clock.current_time(), 0);
1484
1485 let t1 = clock.tick();
1486 assert_eq!(t1, 1);
1487 assert_eq!(clock.current_time(), 1);
1488
1489 let t2 = clock.update(5);
1490 assert_eq!(t2, 6);
1491 assert_eq!(clock.current_time(), 6);
1492 }
1493
1494 #[test]
1495 fn test_tombstone_storage() {
1496 let mut storage = TombstoneStorage::new();
1497 let info = TombstoneInfo::new(10, 1, 10);
1498
1499 storage.insert_or_assign("key1".to_string(), info);
1500 assert_eq!(storage.len(), 1);
1501
1502 assert_eq!(storage.find(&"key1".to_string()), Some(info));
1503 assert_eq!(storage.find(&"key2".to_string()), None);
1504
1505 let removed = storage.compact(15);
1506 assert_eq!(removed, 1);
1507 assert_eq!(storage.len(), 0);
1508 }
1509
1510 #[test]
1511 fn test_basic_insert() {
1512 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1513
1514 let fields = vec![
1515 ("name".to_string(), "Alice".to_string()),
1516 ("age".to_string(), "30".to_string()),
1517 ];
1518
1519 let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1520
1521 assert_eq!(changes.len(), 2);
1522 assert_eq!(crdt.get_data().len(), 1);
1523
1524 let record = crdt.get_record(&"user1".to_string()).unwrap();
1525 assert_eq!(record.fields.get("name").unwrap(), "Alice");
1526 assert_eq!(record.fields.get("age").unwrap(), "30");
1527 }
1528
1529 #[test]
1530 fn test_delete_record() {
1531 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1532
1533 let fields = vec![("name".to_string(), "Bob".to_string())];
1534 let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1535
1536 let delete_change = crdt.delete_record(&"user2".to_string());
1537 assert!(delete_change.is_some());
1538 assert!(crdt.is_tombstoned(&"user2".to_string()));
1539 assert_eq!(crdt.get_data().len(), 0);
1540 }
1541
1542 #[test]
1543 fn test_merge_changes() {
1544 let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1545 let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1546
1547 let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1548 let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1549
1550 let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1551 let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1552
1553 let merge_rule = DefaultMergeRule;
1554 crdt1.merge_changes(changes2, &merge_rule);
1555 crdt2.merge_changes(changes1, &merge_rule);
1556
1557 assert_eq!(
1559 crdt1
1560 .get_record(&"record1".to_string())
1561 .unwrap()
1562 .fields
1563 .get("tag")
1564 .unwrap(),
1565 "Node2"
1566 );
1567 assert_eq!(crdt1.get_data(), crdt2.get_data());
1568 }
1569
1570 #[test]
1571 #[cfg(feature = "serde")]
1572 fn test_change_serialization() {
1573 #[allow(unused_variables)]
1574 let change = Change::new(
1575 "record1".to_string(),
1576 Some("name".to_string()),
1577 Some("Alice".to_string()),
1578 1,
1579 10,
1580 1,
1581 10,
1582 0,
1583 );
1584
1585 #[cfg(feature = "json")]
1587 {
1588 let json = serde_json::to_string(&change).unwrap();
1589 let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1590 assert_eq!(change, deserialized);
1591 }
1592
1593 #[cfg(feature = "binary")]
1595 {
1596 let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1597 let (deserialized, _): (Change<String, String, String>, _) =
1598 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1599 assert_eq!(change, deserialized);
1600 }
1601 }
1602
1603 #[test]
1604 #[cfg(feature = "serde")]
1605 fn test_record_serialization() {
1606 let mut fields = HashMap::new();
1607 fields.insert("name".to_string(), "Bob".to_string());
1608 fields.insert("age".to_string(), "25".to_string());
1609
1610 let mut column_versions = HashMap::new();
1611 column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1612 column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1613
1614 #[allow(unused_variables)]
1615 let record = Record::from_parts(fields, column_versions);
1616
1617 #[cfg(feature = "json")]
1619 {
1620 let json = serde_json::to_string(&record).unwrap();
1621 let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1622 assert_eq!(record, deserialized);
1623 }
1624
1625 #[cfg(feature = "binary")]
1627 {
1628 let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1629 let (deserialized, _): (Record<String, String>, _) =
1630 bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1631 assert_eq!(record, deserialized);
1632 }
1633 }
1634
1635 #[test]
1636 #[cfg(feature = "json")]
1637 fn test_crdt_json_serialization() {
1638 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1639
1640 let fields = vec![
1642 ("name".to_string(), "Alice".to_string()),
1643 ("age".to_string(), "30".to_string()),
1644 ];
1645 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1646
1647 let fields2 = vec![("name".to_string(), "Bob".to_string())];
1648 let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1649
1650 let _ = crdt.delete_record(&"user2".to_string());
1652
1653 let json = crdt.to_json().unwrap();
1655
1656 let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1658
1659 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1661 assert_eq!(
1662 crdt.get_record(&"user1".to_string()).unwrap().fields,
1663 deserialized.get_record(&"user1".to_string()).unwrap().fields
1664 );
1665
1666 assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1668 assert!(deserialized.is_tombstoned(&"user2".to_string()));
1669
1670 assert_eq!(
1672 crdt.get_clock().current_time(),
1673 deserialized.get_clock().current_time()
1674 );
1675
1676 let has_parent = deserialized.parent.is_some();
1678 assert!(!has_parent);
1679 }
1680
1681 #[test]
1682 #[cfg(feature = "binary")]
1683 fn test_crdt_binary_serialization() {
1684 let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1685
1686 let fields = vec![
1688 ("name".to_string(), "Alice".to_string()),
1689 ("age".to_string(), "30".to_string()),
1690 ];
1691 let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1692
1693 let bytes = crdt.to_bytes().unwrap();
1695
1696 let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1698
1699 assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1701 assert_eq!(
1702 crdt.get_record(&"user1".to_string()).unwrap().fields,
1703 deserialized.get_record(&"user1".to_string()).unwrap().fields
1704 );
1705
1706 assert_eq!(
1708 crdt.get_clock().current_time(),
1709 deserialized.get_clock().current_time()
1710 );
1711 }
1712
1713 #[test]
1714 #[cfg(feature = "serde")]
1715 fn test_parent_not_serialized() {
1716 let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1718 let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1719 let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1720
1721 let parent_arc = Arc::new(parent);
1723 let mut child = CRDT::new(2, Some(parent_arc.clone()));
1724 let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1725 let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1726
1727 #[cfg(feature = "json")]
1729 {
1730 let json = serde_json::to_string(&child).unwrap();
1731 let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1732
1733 assert!(deserialized.parent.is_none());
1735
1736 assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1738
1739 assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1741 }
1742 }
1743}