crdt_lite/
lib.rs

1//! # crdt-lite
2//!
3//! A lightweight, column-based CRDT (Conflict-free Replicated Data Type) implementation in Rust.
4//!
5//! This library provides a generic CRDT with last-write-wins semantics, supporting:
6//! - Generic key and value types
7//! - Logical clock for causality tracking
8//! - Tombstone-based deletion
9//! - Parent-child CRDT hierarchies
10//! - Custom merge rules and comparators
11//! - Change compression
12//! - Optional serialization support via Serde
13//!
14//! ## Features
15//!
16//! This crate provides optional serialization support through feature flags:
17//!
18//! - `serde` - Enables Serde support for all CRDT types
19//! - `json` - Enables JSON serialization (includes `serde` + `serde_json`)
20//! - `binary` - Enables binary serialization (includes `serde` + `bincode`)
21//!
22//! ### Usage Example with Serialization
23//!
24//! ```toml
25//! [dependencies]
26//! crdt-lite = { version = "0.1", features = ["json", "binary"] }
27//! ```
28//!
29//! ```rust,ignore
30//! use crdt_lite::CRDT;
31//!
32//! // Create and populate a CRDT
33//! let mut crdt: CRDT<String, String> = CRDT::new(1, None);
34//! let fields = vec![("name".to_string(), "Alice".to_string())];
35//! crdt.insert_or_update(&"user1".to_string(), fields);
36//!
37//! // Serialize to JSON (requires "json" feature)
38//! let json = crdt.to_json().unwrap();
39//!
40//! // Deserialize from JSON
41//! let restored: CRDT<String, String> = CRDT::from_json(&json).unwrap();
42//!
43//! // Serialize to binary (requires "binary" feature)
44//! let bytes = crdt.to_bytes().unwrap();
45//! let restored: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
46//!
47//! // Or use generic serde with any format (requires "serde" feature)
48//! let json = serde_json::to_string(&crdt).unwrap();
49//! let restored: CRDT<String, String> = serde_json::from_str(&json).unwrap();
50//! ```
51//!
52//! **Note on Parent Relationships**: Parent-child CRDT hierarchies are not serialized.
53//! After deserialization, the `parent` field will always be `None`. Applications must
54//! rebuild parent-child relationships if needed.
55//!
56//! ## Security and Resource Management
57//!
58//! ### Logical Clock Overflow
59//! The logical clock uses `u64` for version numbers. While overflow is theoretically possible
60//! after 2^64 operations (extremely unlikely in practice), applications with extreme longevity
61//! should be aware of this limitation.
62//!
63//! ### DoS Protection and Tombstone Management
64//! Tombstones accumulate indefinitely unless manually compacted. To prevent memory exhaustion:
65//! - Call `compact_tombstones()` periodically after all nodes have acknowledged a version
66//! - Implement application-level rate limiting for operations
67//! - Consider setting resource limits on the number of records and tombstones
68//!
69//! **Important**: Only compact tombstones when ALL participating nodes have acknowledged
70//! the minimum version. Compacting too early may cause deleted records to reappear on
71//! nodes that haven't received the deletion yet.
72
73use std::cmp::Ordering;
74use std::collections::HashMap;
75use std::hash::Hash;
76use std::sync::Arc;
77
78/// Type alias for node IDs
79pub type NodeId = u64;
80
81/// Type alias for column keys (field names)
82pub type ColumnKey = String;
83
84/// Column version used for tombstone changes
85/// Using u64::MAX ensures tombstones are treated as having the highest possible version
86const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
87
88/// Represents a single change in the CRDT.
89///
90/// A change can represent:
91/// - An insertion or update of a column value (when `col_name` is `Some`)
92/// - A deletion of a specific column (when `col_name` is `Some` and `value` is `None`)
93/// - A deletion of an entire record (when `col_name` is `None`)
94#[derive(Debug, Clone, PartialEq)]
95#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
96pub struct Change<K, V> {
97  pub record_id: K,
98  /// `None` represents tombstone of the record
99  pub col_name: Option<ColumnKey>,
100  /// `None` represents deletion of the column (not the record)
101  pub value: Option<V>,
102  pub col_version: u64,
103  pub db_version: u64,
104  pub node_id: NodeId,
105  /// Local db_version when the change was created (useful for `get_changes_since`)
106  pub local_db_version: u64,
107  /// Optional flags to indicate the type of change (ephemeral, not stored)
108  pub flags: u32,
109}
110
111impl<K: Eq, V: Eq> Eq for Change<K, V> {}
112
113impl<K, V> Change<K, V> {
114  /// Creates a new Change with all parameters
115  #[allow(clippy::too_many_arguments)]
116  pub fn new(
117    record_id: K,
118    col_name: Option<ColumnKey>,
119    value: Option<V>,
120    col_version: u64,
121    db_version: u64,
122    node_id: NodeId,
123    local_db_version: u64,
124    flags: u32,
125  ) -> Self {
126    Self {
127      record_id,
128      col_name,
129      value,
130      col_version,
131      db_version,
132      node_id,
133      local_db_version,
134      flags,
135    }
136  }
137}
138
139/// Represents version information for a column.
140#[derive(Debug, Clone, Copy, PartialEq, Eq)]
141#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
142pub struct ColumnVersion {
143  pub col_version: u64,
144  pub db_version: u64,
145  pub node_id: NodeId,
146  /// Local db_version when the change was created
147  pub local_db_version: u64,
148}
149
150impl ColumnVersion {
151  pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
152    Self {
153      col_version,
154      db_version,
155      node_id,
156      local_db_version,
157    }
158  }
159}
160
161/// Minimal version information for tombstones.
162///
163/// Stores essential data: db_version for conflict resolution, node_id for sync exclusion,
164/// and local_db_version for sync.
165#[derive(Debug, Clone, Copy, PartialEq, Eq)]
166#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
167pub struct TombstoneInfo {
168  pub db_version: u64,
169  pub node_id: NodeId,
170  pub local_db_version: u64,
171}
172
173impl TombstoneInfo {
174  pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
175    Self {
176      db_version,
177      node_id,
178      local_db_version,
179    }
180  }
181
182  /// Helper to create a ColumnVersion for comparison with regular columns
183  pub fn as_column_version(&self) -> ColumnVersion {
184    ColumnVersion::new(
185      TOMBSTONE_COL_VERSION,
186      self.db_version,
187      self.node_id,
188      self.local_db_version,
189    )
190  }
191}
192
193/// Represents a logical clock for maintaining causality.
194#[derive(Debug, Clone, Copy, PartialEq, Eq)]
195#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
196pub struct LogicalClock {
197  time: u64,
198}
199
200impl LogicalClock {
201  /// Creates a new logical clock starting at 0
202  pub fn new() -> Self {
203    Self { time: 0 }
204  }
205
206  /// Increments the clock for a local event and returns the new time
207  pub fn tick(&mut self) -> u64 {
208    self.time += 1;
209    self.time
210  }
211
212  /// Updates the clock based on a received time and returns the new time
213  pub fn update(&mut self, received_time: u64) -> u64 {
214    self.time = self.time.max(received_time);
215    self.time += 1;
216    self.time
217  }
218
219  /// Sets the logical clock to a specific time
220  pub fn set_time(&mut self, time: u64) {
221    self.time = time;
222  }
223
224  /// Retrieves the current time
225  pub fn current_time(&self) -> u64 {
226    self.time
227  }
228}
229
230impl Default for LogicalClock {
231  fn default() -> Self {
232    Self::new()
233  }
234}
235
236/// Storage for tombstones (deleted records).
237///
238/// Uses a HashMap for efficient lookups and supports compaction.
239#[derive(Debug, Clone)]
240#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
241pub struct TombstoneStorage<K: Hash + Eq> {
242  entries: HashMap<K, TombstoneInfo>,
243}
244
245impl<K: Hash + Eq> TombstoneStorage<K> {
246  pub fn new() -> Self {
247    Self {
248      entries: HashMap::new(),
249    }
250  }
251
252  pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
253    self.entries.insert(key, info);
254  }
255
256  pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
257    self.entries.get(key).copied()
258  }
259
260  pub fn erase(&mut self, key: &K) -> bool {
261    self.entries.remove(key).is_some()
262  }
263
264  pub fn clear(&mut self) {
265    self.entries.clear();
266  }
267
268  pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
269    self.entries.iter()
270  }
271
272  pub fn len(&self) -> usize {
273    self.entries.len()
274  }
275
276  pub fn is_empty(&self) -> bool {
277    self.entries.is_empty()
278  }
279
280  /// Compact tombstones older than the specified version.
281  ///
282  /// Returns the number of tombstones removed.
283  pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
284    let initial_len = self.entries.len();
285    self
286      .entries
287      .retain(|_, info| info.db_version >= min_acknowledged_version);
288    initial_len - self.entries.len()
289  }
290}
291
292impl<K: Hash + Eq> Default for TombstoneStorage<K> {
293  fn default() -> Self {
294    Self::new()
295  }
296}
297
298/// Represents a record in the CRDT.
299#[derive(Debug, Clone)]
300#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
301pub struct Record<V> {
302  pub fields: HashMap<ColumnKey, V>,
303  pub column_versions: HashMap<ColumnKey, ColumnVersion>,
304  /// Track version boundaries for efficient filtering
305  pub lowest_local_db_version: u64,
306  pub highest_local_db_version: u64,
307}
308
309impl<V> Record<V> {
310  pub fn new() -> Self {
311    Self {
312      fields: HashMap::new(),
313      column_versions: HashMap::new(),
314      lowest_local_db_version: u64::MAX,
315      highest_local_db_version: 0,
316    }
317  }
318
319  /// Creates a record from existing fields and column versions
320  pub fn from_parts(
321    fields: HashMap<ColumnKey, V>,
322    column_versions: HashMap<ColumnKey, ColumnVersion>,
323  ) -> Self {
324    let mut lowest = u64::MAX;
325    let mut highest = 0;
326
327    for ver in column_versions.values() {
328      if ver.local_db_version < lowest {
329        lowest = ver.local_db_version;
330      }
331      if ver.local_db_version > highest {
332        highest = ver.local_db_version;
333      }
334    }
335
336    Self {
337      fields,
338      column_versions,
339      lowest_local_db_version: lowest,
340      highest_local_db_version: highest,
341    }
342  }
343}
344
345impl<V: PartialEq> PartialEq for Record<V> {
346  fn eq(&self, other: &Self) -> bool {
347    // Compare only fields, not column_versions (those will differ per node)
348    self.fields == other.fields
349  }
350}
351
352impl<V> Default for Record<V> {
353  fn default() -> Self {
354    Self::new()
355  }
356}
357
358/// Trait for merge rules that determine conflict resolution.
359///
360/// Implementations should return `true` if the remote change should be accepted,
361/// `false` otherwise.
362pub trait MergeRule<K, V> {
363  /// Determines whether to accept a remote change over a local one
364  fn should_accept(
365    &self,
366    local_col: u64,
367    local_db: u64,
368    local_node: NodeId,
369    remote_col: u64,
370    remote_db: u64,
371    remote_node: NodeId,
372  ) -> bool;
373
374  /// Convenience method for Change objects
375  fn should_accept_change(&self, local: &Change<K, V>, remote: &Change<K, V>) -> bool {
376    self.should_accept(
377      local.col_version,
378      local.db_version,
379      local.node_id,
380      remote.col_version,
381      remote.db_version,
382      remote.node_id,
383    )
384  }
385}
386
387/// Default merge rule implementing last-write-wins semantics.
388///
389/// Comparison priority:
390/// 1. Column version (higher wins)
391/// 2. DB version (higher wins)
392/// 3. Node ID (higher wins as tiebreaker)
393#[derive(Debug, Clone, Copy, Default)]
394pub struct DefaultMergeRule;
395
396impl<K, V> MergeRule<K, V> for DefaultMergeRule {
397  fn should_accept(
398    &self,
399    local_col: u64,
400    local_db: u64,
401    local_node: NodeId,
402    remote_col: u64,
403    remote_db: u64,
404    remote_node: NodeId,
405  ) -> bool {
406    match remote_col.cmp(&local_col) {
407      Ordering::Greater => true,
408      Ordering::Less => false,
409      Ordering::Equal => match remote_db.cmp(&local_db) {
410        Ordering::Greater => true,
411        Ordering::Less => false,
412        Ordering::Equal => remote_node > local_node,
413      },
414    }
415  }
416}
417
418/// Trait for change comparators used in sorting and compression.
419pub trait ChangeComparator<K, V> {
420  fn compare(&self, a: &Change<K, V>, b: &Change<K, V>) -> Ordering;
421}
422
423/// Default change comparator.
424///
425/// Sorts by:
426/// 1. Record ID (ascending)
427/// 2. Column name presence (deletions/tombstones last)
428/// 3. Column name (ascending)
429/// 4. Column version (descending - most recent first)
430/// 5. DB version (descending)
431/// 6. Node ID (descending)
432#[derive(Debug, Clone, Copy, Default)]
433pub struct DefaultChangeComparator;
434
435impl<K: Ord, V> ChangeComparator<K, V> for DefaultChangeComparator {
436  fn compare(&self, a: &Change<K, V>, b: &Change<K, V>) -> Ordering {
437    // Compare record IDs
438    match a.record_id.cmp(&b.record_id) {
439      Ordering::Equal => {}
440      ord => return ord,
441    }
442
443    // Deletions (None) come last for each record
444    match (a.col_name.as_ref(), b.col_name.as_ref()) {
445      (None, None) => {}
446      (None, Some(_)) => return Ordering::Greater,
447      (Some(_), None) => return Ordering::Less,
448      (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
449        Ordering::Equal => {}
450        ord => return ord,
451      },
452    }
453
454    // Compare versions (descending - most recent first)
455    match b.col_version.cmp(&a.col_version) {
456      Ordering::Equal => {}
457      ord => return ord,
458    }
459
460    match b.db_version.cmp(&a.db_version) {
461      Ordering::Equal => {}
462      ord => return ord,
463    }
464
465    b.node_id.cmp(&a.node_id)
466  }
467}
468
469/// Main CRDT structure, generic over key (K) and value (V) types.
470///
471/// This implements a column-based CRDT with last-write-wins semantics.
472#[derive(Debug)]
473#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
474#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, V: serde::Serialize")))]
475#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
476pub struct CRDT<K: Hash + Eq + Clone, V: Clone> {
477  node_id: NodeId,
478  clock: LogicalClock,
479  data: HashMap<K, Record<V>>,
480  tombstones: TombstoneStorage<K>,
481  #[cfg_attr(feature = "serde", serde(skip, default))]
482  parent: Option<Arc<CRDT<K, V>>>,
483  #[allow(dead_code)]
484  base_version: u64,
485}
486
487impl<K: Hash + Eq + Clone, V: Clone> CRDT<K, V> {
488  /// Creates a new empty CRDT.
489  ///
490  /// # Arguments
491  ///
492  /// * `node_id` - Unique identifier for this CRDT node
493  /// * `parent` - Optional parent CRDT for hierarchical structures
494  pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, V>>>) -> Self {
495    let (clock, base_version) = if let Some(ref p) = parent {
496      let parent_clock = p.clock;
497      let base = parent_clock.current_time();
498      (parent_clock, base)
499    } else {
500      (LogicalClock::new(), 0)
501    };
502
503    Self {
504      node_id,
505      clock,
506      data: HashMap::new(),
507      tombstones: TombstoneStorage::new(),
508      parent,
509      base_version,
510    }
511  }
512
513  /// Creates a CRDT from a list of changes (e.g., loaded from disk).
514  ///
515  /// # Arguments
516  ///
517  /// * `node_id` - The unique identifier for this CRDT node
518  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
519  pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, V>>) -> Self {
520    let mut crdt = Self::new(node_id, None);
521    crdt.apply_changes(changes);
522    crdt
523  }
524
525  /// Resets the CRDT to a state as if it was constructed with the given changes.
526  ///
527  /// # Arguments
528  ///
529  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
530  pub fn reset(&mut self, changes: Vec<Change<K, V>>) {
531    self.data.clear();
532    self.tombstones.clear();
533    self.clock = LogicalClock::new();
534    self.apply_changes(changes);
535  }
536
537  /// Applies a list of changes to reconstruct the CRDT state.
538  fn apply_changes(&mut self, changes: Vec<Change<K, V>>) {
539    // Determine the maximum db_version from the changes
540    let max_db_version = changes
541      .iter()
542      .map(|c| c.db_version.max(c.local_db_version))
543      .max()
544      .unwrap_or(0);
545
546    // Set the logical clock to the maximum db_version
547    self.clock.set_time(max_db_version);
548
549    // Apply each change to reconstruct the CRDT state
550    for change in changes {
551      let record_id = change.record_id.clone();
552      let col_name = change.col_name.clone();
553      let remote_col_version = change.col_version;
554      let remote_db_version = change.db_version;
555      let remote_node_id = change.node_id;
556      let remote_local_db_version = change.local_db_version;
557      let remote_value = change.value;
558
559      if col_name.is_none() {
560        // Handle deletion
561        self.data.remove(&record_id);
562
563        // Store deletion information in tombstones
564        self.tombstones.insert_or_assign(
565          record_id,
566          TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
567        );
568      } else if let Some(col_key) = col_name {
569        // Handle insertion or update
570        if !self.is_record_tombstoned(&record_id, false) {
571          let record = self.get_or_create_record_unchecked(&record_id, false);
572
573          // Insert or update the field value
574          if let Some(value) = remote_value {
575            record.fields.insert(col_key.clone(), value);
576          }
577
578          // Update the column version info
579          let col_ver = ColumnVersion::new(
580            remote_col_version,
581            remote_db_version,
582            remote_node_id,
583            remote_local_db_version,
584          );
585          record.column_versions.insert(col_key, col_ver);
586
587          // Update version boundaries
588          if remote_local_db_version < record.lowest_local_db_version {
589            record.lowest_local_db_version = remote_local_db_version;
590          }
591          if remote_local_db_version > record.highest_local_db_version {
592            record.highest_local_db_version = remote_local_db_version;
593          }
594        }
595      }
596    }
597  }
598
599  /// Inserts a new record or updates an existing record in the CRDT.
600  ///
601  /// # Arguments
602  ///
603  /// * `record_id` - The unique identifier for the record
604  /// * `fields` - An iterator of (column_name, value) pairs
605  ///
606  /// # Returns
607  ///
608  /// A vector of changes created by this operation
609  #[must_use = "changes should be propagated to other nodes"]
610  pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, V>>
611  where
612    I: IntoIterator<Item = (ColumnKey, V)>,
613  {
614    self.insert_or_update_with_flags(record_id, 0, fields)
615  }
616
617  /// Inserts a new record or updates an existing record with flags.
618  ///
619  /// # Arguments
620  ///
621  /// * `record_id` - The unique identifier for the record
622  /// * `flags` - Flags to indicate the type of change
623  /// * `fields` - An iterator of (column_name, value) pairs
624  ///
625  /// # Returns
626  ///
627  /// A vector of changes created by this operation
628  #[must_use = "changes should be propagated to other nodes"]
629  pub fn insert_or_update_with_flags<I>(
630    &mut self,
631    record_id: &K,
632    flags: u32,
633    fields: I,
634  ) -> Vec<Change<K, V>>
635  where
636    I: IntoIterator<Item = (ColumnKey, V)>,
637  {
638    let db_version = self.clock.tick();
639
640    // Check if the record is tombstoned
641    if self.is_record_tombstoned(record_id, false) {
642      return Vec::new();
643    }
644
645    let mut changes = Vec::new();
646    let node_id = self.node_id; // Store node_id before mutable borrow
647    let record = self.get_or_create_record_unchecked(record_id, false);
648
649    for (col_name, value) in fields {
650      let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
651        col_info.col_version += 1;
652        col_info.db_version = db_version;
653        col_info.node_id = node_id;
654        col_info.local_db_version = db_version;
655        col_info.col_version
656      } else {
657        record.column_versions.insert(
658          col_name.clone(),
659          ColumnVersion::new(1, db_version, node_id, db_version),
660        );
661        1
662      };
663
664      // Update record version boundaries
665      if db_version < record.lowest_local_db_version {
666        record.lowest_local_db_version = db_version;
667      }
668      if db_version > record.highest_local_db_version {
669        record.highest_local_db_version = db_version;
670      }
671
672      record.fields.insert(col_name.clone(), value.clone());
673      changes.push(Change::new(
674        record_id.clone(),
675        Some(col_name),
676        Some(value),
677        col_version,
678        db_version,
679        node_id,
680        db_version,
681        flags,
682      ));
683    }
684
685    changes
686  }
687
688  /// Deletes a record by marking it as tombstoned.
689  ///
690  /// # Arguments
691  ///
692  /// * `record_id` - The unique identifier for the record
693  ///
694  /// # Returns
695  ///
696  /// An optional Change representing the deletion
697  #[must_use = "changes should be propagated to other nodes"]
698  pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, V>> {
699    self.delete_record_with_flags(record_id, 0)
700  }
701
702  /// Deletes a record with flags.
703  ///
704  /// # Arguments
705  ///
706  /// * `record_id` - The unique identifier for the record
707  /// * `flags` - Flags to indicate the type of change
708  ///
709  /// # Returns
710  ///
711  /// An optional Change representing the deletion
712  #[must_use = "changes should be propagated to other nodes"]
713  pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, V>> {
714    if self.is_record_tombstoned(record_id, false) {
715      return None;
716    }
717
718    let db_version = self.clock.tick();
719
720    // Mark as tombstone and remove data
721    self.data.remove(record_id);
722
723    // Store deletion information in tombstones
724    self.tombstones.insert_or_assign(
725      record_id.clone(),
726      TombstoneInfo::new(db_version, self.node_id, db_version),
727    );
728
729    Some(Change::new(
730      record_id.clone(),
731      None,
732      None,
733      TOMBSTONE_COL_VERSION,
734      db_version,
735      self.node_id,
736      db_version,
737      flags,
738    ))
739  }
740
741  /// Merges incoming changes into the CRDT.
742  ///
743  /// # Arguments
744  ///
745  /// * `changes` - Vector of changes to merge
746  /// * `merge_rule` - The merge rule to use for conflict resolution
747  ///
748  /// # Returns
749  ///
750  /// Vector of accepted changes (if requested)
751  pub fn merge_changes<R: MergeRule<K, V>>(
752    &mut self,
753    changes: Vec<Change<K, V>>,
754    merge_rule: &R,
755  ) -> Vec<Change<K, V>> {
756    self.merge_changes_impl(changes, false, merge_rule)
757  }
758
759  fn merge_changes_impl<R: MergeRule<K, V>>(
760    &mut self,
761    changes: Vec<Change<K, V>>,
762    ignore_parent: bool,
763    merge_rule: &R,
764  ) -> Vec<Change<K, V>> {
765    let mut accepted_changes = Vec::new();
766
767    if changes.is_empty() {
768      return accepted_changes;
769    }
770
771    for change in changes {
772      // Move values from change to avoid unnecessary clones
773      let Change {
774        record_id,
775        col_name,
776        value: remote_value,
777        col_version: remote_col_version,
778        db_version: remote_db_version,
779        node_id: remote_node_id,
780        flags,
781        ..
782      } = change;
783
784      // Always update the logical clock to maintain causal consistency
785      let new_local_db_version = self.clock.update(remote_db_version);
786
787      // Skip all changes for tombstoned records
788      if self.is_record_tombstoned(&record_id, ignore_parent) {
789        continue;
790      }
791
792      // Retrieve local column version information
793      let local_col_info = if col_name.is_none() {
794        // For deletions, check tombstones
795        self
796          .tombstones
797          .find(&record_id)
798          .map(|info| info.as_column_version())
799      } else if let Some(ref col) = col_name {
800        // For column updates, check the record
801        self
802          .get_record_ptr(&record_id, ignore_parent)
803          .and_then(|record| record.column_versions.get(col).copied())
804      } else {
805        None
806      };
807
808      // Determine whether to accept the remote change
809      let should_accept = if let Some(local_info) = local_col_info {
810        merge_rule.should_accept(
811          local_info.col_version,
812          local_info.db_version,
813          local_info.node_id,
814          remote_col_version,
815          remote_db_version,
816          remote_node_id,
817        )
818      } else {
819        true
820      };
821
822      if should_accept {
823        if let Some(col_key) = col_name {
824          // Handle insertion or update
825          let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
826
827          // Update field value
828          if let Some(value) = remote_value.clone() {
829            record.fields.insert(col_key.clone(), value);
830          } else {
831            // If remote_value is None, remove the field
832            record.fields.remove(&col_key);
833          }
834
835          // Update the column version info and record version boundaries
836          record.column_versions.insert(
837            col_key.clone(),
838            ColumnVersion::new(
839              remote_col_version,
840              remote_db_version,
841              remote_node_id,
842              new_local_db_version,
843            ),
844          );
845
846          // Update version boundaries
847          if new_local_db_version < record.lowest_local_db_version {
848            record.lowest_local_db_version = new_local_db_version;
849          }
850          if new_local_db_version > record.highest_local_db_version {
851            record.highest_local_db_version = new_local_db_version;
852          }
853
854          accepted_changes.push(Change::new(
855            record_id,
856            Some(col_key),
857            remote_value,
858            remote_col_version,
859            remote_db_version,
860            remote_node_id,
861            new_local_db_version,
862            flags,
863          ));
864        } else {
865          // Handle deletion
866          self.data.remove(&record_id);
867
868          // Store deletion information in tombstones
869          self.tombstones.insert_or_assign(
870            record_id.clone(),
871            TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
872          );
873
874          accepted_changes.push(Change::new(
875            record_id,
876            None,
877            None,
878            remote_col_version,
879            remote_db_version,
880            remote_node_id,
881            new_local_db_version,
882            flags,
883          ));
884        }
885      }
886    }
887
888    accepted_changes
889  }
890
891  /// Retrieves all changes since a given `last_db_version`.
892  ///
893  /// # Arguments
894  ///
895  /// * `last_db_version` - The database version to retrieve changes since
896  ///
897  /// # Returns
898  ///
899  /// A vector of changes
900  #[must_use]
901  pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, V>>
902  where
903    K: Ord,
904  {
905    self.get_changes_since_excluding(last_db_version, &std::collections::HashSet::new())
906  }
907
908  /// Retrieves all changes since a given `last_db_version`, excluding specific nodes.
909  pub fn get_changes_since_excluding(
910    &self,
911    last_db_version: u64,
912    excluding: &std::collections::HashSet<NodeId>,
913  ) -> Vec<Change<K, V>>
914  where
915    K: Ord,
916  {
917    let mut changes = Vec::new();
918
919    // Get changes from parent
920    if let Some(ref parent) = self.parent {
921      let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
922      changes.extend(parent_changes);
923    }
924
925    // Get changes from regular records
926    for (record_id, record) in &self.data {
927      // Skip records that haven't changed since last_db_version
928      if record.highest_local_db_version <= last_db_version {
929        continue;
930      }
931
932      for (col_name, clock_info) in &record.column_versions {
933        if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
934        {
935          let value = record.fields.get(col_name).cloned();
936
937          changes.push(Change::new(
938            record_id.clone(),
939            Some(col_name.clone()),
940            value,
941            clock_info.col_version,
942            clock_info.db_version,
943            clock_info.node_id,
944            clock_info.local_db_version,
945            0,
946          ));
947        }
948      }
949    }
950
951    // Get deletion changes from tombstones
952    for (record_id, tombstone_info) in self.tombstones.iter() {
953      if tombstone_info.local_db_version > last_db_version
954        && !excluding.contains(&tombstone_info.node_id)
955      {
956        changes.push(Change::new(
957          record_id.clone(),
958          None,
959          None,
960          TOMBSTONE_COL_VERSION,
961          tombstone_info.db_version,
962          tombstone_info.node_id,
963          tombstone_info.local_db_version,
964          0,
965        ));
966      }
967    }
968
969    if self.parent.is_some() {
970      // Compress changes to remove redundant operations
971      Self::compress_changes(&mut changes);
972    }
973
974    changes
975  }
976
977  /// Compresses a vector of changes in-place by removing redundant changes.
978  ///
979  /// Changes are sorted and then compressed using a two-pointer technique.
980  ///
981  /// # Performance
982  ///
983  /// This method uses `sort_unstable_by` which provides O(n log n) average time complexity
984  /// but does not preserve the relative order of equal elements. Since the comparator
985  /// provides a total ordering, stability is not required.
986  pub fn compress_changes(changes: &mut Vec<Change<K, V>>)
987  where
988    K: Ord,
989  {
990    if changes.is_empty() {
991      return;
992    }
993
994    // Sort changes using the DefaultChangeComparator
995    // Use sort_unstable for better performance since we don't need stable sorting
996    let comparator = DefaultChangeComparator;
997    changes.sort_unstable_by(|a, b| comparator.compare(a, b));
998
999    // Use two-pointer technique to compress in-place
1000    let mut write = 0;
1001    for read in 1..changes.len() {
1002      if changes[read].record_id != changes[write].record_id {
1003        // New record, always keep it
1004        write += 1;
1005        if write != read {
1006          changes[write] = changes[read].clone();
1007        }
1008      } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1009        // Current read is a deletion, backtrack to first change for this record
1010        // and replace it with the deletion, effectively discarding all field updates
1011        let mut first_pos = write;
1012        while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1013          first_pos -= 1;
1014        }
1015        changes[first_pos] = changes[read].clone();
1016        write = first_pos;
1017      } else if changes[read].col_name != changes[write].col_name
1018        && changes[write].col_name.is_some()
1019      {
1020        // New column for the same record
1021        write += 1;
1022        if write != read {
1023          changes[write] = changes[read].clone();
1024        }
1025      }
1026      // Else: same record and column, keep the existing one (most recent due to sorting)
1027    }
1028
1029    changes.truncate(write + 1);
1030  }
1031
1032  /// Retrieves a reference to a record if it exists.
1033  pub fn get_record(&self, record_id: &K) -> Option<&Record<V>> {
1034    self.get_record_ptr(record_id, false)
1035  }
1036
1037  /// Checks if a record is tombstoned.
1038  pub fn is_tombstoned(&self, record_id: &K) -> bool {
1039    self.is_record_tombstoned(record_id, false)
1040  }
1041
1042  /// Gets tombstone information for a record.
1043  pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1044    if let Some(info) = self.tombstones.find(record_id) {
1045      return Some(info);
1046    }
1047
1048    if let Some(ref parent) = self.parent {
1049      return parent.get_tombstone(record_id);
1050    }
1051
1052    None
1053  }
1054
1055  /// Removes tombstones older than the specified version.
1056  ///
1057  /// # Safety and DoS Mitigation
1058  ///
1059  /// **IMPORTANT**: Only call this method when ALL participating nodes have acknowledged
1060  /// the `min_acknowledged_version`. Compacting too early may cause deleted records to
1061  /// reappear on nodes that haven't received the deletion yet.
1062  ///
1063  /// To prevent DoS via tombstone accumulation:
1064  /// - Call this method periodically as part of your sync protocol
1065  /// - Track which versions have been acknowledged by all nodes
1066  /// - Consider implementing a tombstone limit and rejecting operations when exceeded
1067  ///
1068  /// # Arguments
1069  ///
1070  /// * `min_acknowledged_version` - Tombstones with db_version < this value will be removed
1071  ///
1072  /// # Returns
1073  ///
1074  /// The number of tombstones removed
1075  pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1076    self.tombstones.compact(min_acknowledged_version)
1077  }
1078
1079  /// Gets the number of tombstones currently stored.
1080  pub fn tombstone_count(&self) -> usize {
1081    self.tombstones.len()
1082  }
1083
1084  /// Gets the current logical clock.
1085  pub fn get_clock(&self) -> &LogicalClock {
1086    &self.clock
1087  }
1088
1089  /// Gets a reference to the internal data map.
1090  pub fn get_data(&self) -> &HashMap<K, Record<V>> {
1091    &self.data
1092  }
1093
1094  /// Serializes the CRDT to a JSON string.
1095  ///
1096  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1097  ///
1098  /// # Errors
1099  ///
1100  /// Returns an error if serialization fails.
1101  #[cfg(feature = "json")]
1102  pub fn to_json(&self) -> Result<String, serde_json::Error>
1103  where
1104    K: serde::Serialize,
1105    V: serde::Serialize,
1106  {
1107    serde_json::to_string(self)
1108  }
1109
1110  /// Deserializes a CRDT from a JSON string.
1111  ///
1112  /// Note: The parent relationship is not deserialized and will be `None`.
1113  /// Applications must rebuild parent-child relationships if needed.
1114  ///
1115  /// # Errors
1116  ///
1117  /// Returns an error if deserialization fails.
1118  #[cfg(feature = "json")]
1119  pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1120  where
1121    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1122    V: serde::de::DeserializeOwned + Clone,
1123  {
1124    serde_json::from_str(json)
1125  }
1126
1127  /// Serializes the CRDT to bytes using bincode.
1128  ///
1129  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1130  ///
1131  /// # Errors
1132  ///
1133  /// Returns an error if serialization fails.
1134  #[cfg(feature = "binary")]
1135  pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::Error>
1136  where
1137    K: serde::Serialize,
1138    V: serde::Serialize,
1139  {
1140    bincode::serialize(self)
1141  }
1142
1143  /// Deserializes a CRDT from bytes using bincode.
1144  ///
1145  /// Note: The parent relationship is not deserialized and will be `None`.
1146  /// Applications must rebuild parent-child relationships if needed.
1147  ///
1148  /// # Errors
1149  ///
1150  /// Returns an error if deserialization fails.
1151  #[cfg(feature = "binary")]
1152  pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::Error>
1153  where
1154    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1155    V: serde::de::DeserializeOwned + Clone,
1156  {
1157    bincode::deserialize(bytes)
1158  }
1159
1160  // Helper methods
1161
1162  fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1163    if self.tombstones.find(record_id).is_some() {
1164      return true;
1165    }
1166
1167    if !ignore_parent {
1168      if let Some(ref parent) = self.parent {
1169        return parent.is_record_tombstoned(record_id, false);
1170      }
1171    }
1172
1173    false
1174  }
1175
1176  fn get_or_create_record_unchecked(
1177    &mut self,
1178    record_id: &K,
1179    ignore_parent: bool,
1180  ) -> &mut Record<V> {
1181    use std::collections::hash_map::Entry;
1182
1183    match self.data.entry(record_id.clone()) {
1184      Entry::Occupied(e) => e.into_mut(),
1185      Entry::Vacant(e) => {
1186        let record = if !ignore_parent {
1187          self
1188            .parent
1189            .as_ref()
1190            .and_then(|p| p.get_record_ptr(record_id, false))
1191            .cloned()
1192            .unwrap_or_else(Record::new)
1193        } else {
1194          Record::new()
1195        };
1196        e.insert(record)
1197      }
1198    }
1199  }
1200
1201  fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<V>> {
1202    if let Some(record) = self.data.get(record_id) {
1203      return Some(record);
1204    }
1205
1206    if !ignore_parent {
1207      if let Some(ref parent) = self.parent {
1208        return parent.get_record_ptr(record_id, false);
1209      }
1210    }
1211
1212    None
1213  }
1214}
1215
1216#[cfg(test)]
1217mod tests {
1218  use super::*;
1219
1220  #[test]
1221  fn test_logical_clock() {
1222    let mut clock = LogicalClock::new();
1223    assert_eq!(clock.current_time(), 0);
1224
1225    let t1 = clock.tick();
1226    assert_eq!(t1, 1);
1227    assert_eq!(clock.current_time(), 1);
1228
1229    let t2 = clock.update(5);
1230    assert_eq!(t2, 6);
1231    assert_eq!(clock.current_time(), 6);
1232  }
1233
1234  #[test]
1235  fn test_tombstone_storage() {
1236    let mut storage = TombstoneStorage::new();
1237    let info = TombstoneInfo::new(10, 1, 10);
1238
1239    storage.insert_or_assign("key1".to_string(), info);
1240    assert_eq!(storage.len(), 1);
1241
1242    assert_eq!(storage.find(&"key1".to_string()), Some(info));
1243    assert_eq!(storage.find(&"key2".to_string()), None);
1244
1245    let removed = storage.compact(15);
1246    assert_eq!(removed, 1);
1247    assert_eq!(storage.len(), 0);
1248  }
1249
1250  #[test]
1251  fn test_basic_insert() {
1252    let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1253
1254    let fields = vec![
1255      ("name".to_string(), "Alice".to_string()),
1256      ("age".to_string(), "30".to_string()),
1257    ];
1258
1259    let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1260
1261    assert_eq!(changes.len(), 2);
1262    assert_eq!(crdt.get_data().len(), 1);
1263
1264    let record = crdt.get_record(&"user1".to_string()).unwrap();
1265    assert_eq!(record.fields.get("name").unwrap(), "Alice");
1266    assert_eq!(record.fields.get("age").unwrap(), "30");
1267  }
1268
1269  #[test]
1270  fn test_delete_record() {
1271    let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1272
1273    let fields = vec![("name".to_string(), "Bob".to_string())];
1274    let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1275
1276    let delete_change = crdt.delete_record(&"user2".to_string());
1277    assert!(delete_change.is_some());
1278    assert!(crdt.is_tombstoned(&"user2".to_string()));
1279    assert_eq!(crdt.get_data().len(), 0);
1280  }
1281
1282  #[test]
1283  fn test_merge_changes() {
1284    let mut crdt1: CRDT<String, String> = CRDT::new(1, None);
1285    let mut crdt2: CRDT<String, String> = CRDT::new(2, None);
1286
1287    let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1288    let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1289
1290    let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1291    let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1292
1293    let merge_rule = DefaultMergeRule;
1294    crdt1.merge_changes(changes2, &merge_rule);
1295    crdt2.merge_changes(changes1, &merge_rule);
1296
1297    // Node2 has higher node_id, so its value should win
1298    assert_eq!(
1299      crdt1
1300        .get_record(&"record1".to_string())
1301        .unwrap()
1302        .fields
1303        .get("tag")
1304        .unwrap(),
1305      "Node2"
1306    );
1307    assert_eq!(crdt1.get_data(), crdt2.get_data());
1308  }
1309
1310  #[test]
1311  #[cfg(feature = "serde")]
1312  fn test_change_serialization() {
1313    #[allow(unused_variables)]
1314    let change = Change::new(
1315      "record1".to_string(),
1316      Some("name".to_string()),
1317      Some("Alice".to_string()),
1318      1,
1319      10,
1320      1,
1321      10,
1322      0,
1323    );
1324
1325    // Test JSON serialization
1326    #[cfg(feature = "json")]
1327    {
1328      let json = serde_json::to_string(&change).unwrap();
1329      let deserialized: Change<String, String> = serde_json::from_str(&json).unwrap();
1330      assert_eq!(change, deserialized);
1331    }
1332
1333    // Test binary serialization
1334    #[cfg(feature = "binary")]
1335    {
1336      let bytes = bincode::serialize(&change).unwrap();
1337      let deserialized: Change<String, String> = bincode::deserialize(&bytes).unwrap();
1338      assert_eq!(change, deserialized);
1339    }
1340  }
1341
1342  #[test]
1343  #[cfg(feature = "serde")]
1344  fn test_record_serialization() {
1345    let mut fields = HashMap::new();
1346    fields.insert("name".to_string(), "Bob".to_string());
1347    fields.insert("age".to_string(), "25".to_string());
1348
1349    let mut column_versions = HashMap::new();
1350    column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1351    column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1352
1353    #[allow(unused_variables)]
1354    let record = Record::from_parts(fields, column_versions);
1355
1356    // Test JSON serialization
1357    #[cfg(feature = "json")]
1358    {
1359      let json = serde_json::to_string(&record).unwrap();
1360      let deserialized: Record<String> = serde_json::from_str(&json).unwrap();
1361      assert_eq!(record, deserialized);
1362    }
1363
1364    // Test binary serialization
1365    #[cfg(feature = "binary")]
1366    {
1367      let bytes = bincode::serialize(&record).unwrap();
1368      let deserialized: Record<String> = bincode::deserialize(&bytes).unwrap();
1369      assert_eq!(record, deserialized);
1370    }
1371  }
1372
1373  #[test]
1374  #[cfg(feature = "json")]
1375  fn test_crdt_json_serialization() {
1376    let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1377
1378    // Add some data
1379    let fields = vec![
1380      ("name".to_string(), "Alice".to_string()),
1381      ("age".to_string(), "30".to_string()),
1382    ];
1383    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1384
1385    let fields2 = vec![("name".to_string(), "Bob".to_string())];
1386    let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1387
1388    // Delete one record
1389    let _ = crdt.delete_record(&"user2".to_string());
1390
1391    // Serialize to JSON
1392    let json = crdt.to_json().unwrap();
1393
1394    // Deserialize
1395    let deserialized: CRDT<String, String> = CRDT::from_json(&json).unwrap();
1396
1397    // Verify data is preserved
1398    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1399    assert_eq!(
1400      crdt.get_record(&"user1".to_string()).unwrap().fields,
1401      deserialized.get_record(&"user1".to_string()).unwrap().fields
1402    );
1403
1404    // Verify tombstones are preserved
1405    assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1406    assert!(deserialized.is_tombstoned(&"user2".to_string()));
1407
1408    // Verify clock is preserved
1409    assert_eq!(
1410      crdt.get_clock().current_time(),
1411      deserialized.get_clock().current_time()
1412    );
1413
1414    // Verify parent is None after deserialization
1415    let has_parent = deserialized.parent.is_some();
1416    assert!(!has_parent);
1417  }
1418
1419  #[test]
1420  #[cfg(feature = "binary")]
1421  fn test_crdt_binary_serialization() {
1422    let mut crdt: CRDT<String, String> = CRDT::new(1, None);
1423
1424    // Add some data
1425    let fields = vec![
1426      ("name".to_string(), "Alice".to_string()),
1427      ("age".to_string(), "30".to_string()),
1428    ];
1429    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1430
1431    // Serialize to bytes
1432    let bytes = crdt.to_bytes().unwrap();
1433
1434    // Deserialize
1435    let deserialized: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
1436
1437    // Verify data is preserved
1438    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1439    assert_eq!(
1440      crdt.get_record(&"user1".to_string()).unwrap().fields,
1441      deserialized.get_record(&"user1".to_string()).unwrap().fields
1442    );
1443
1444    // Verify clock is preserved
1445    assert_eq!(
1446      crdt.get_clock().current_time(),
1447      deserialized.get_clock().current_time()
1448    );
1449  }
1450
1451  #[test]
1452  #[cfg(feature = "serde")]
1453  fn test_parent_not_serialized() {
1454    // Create a parent CRDT
1455    let mut parent: CRDT<String, String> = CRDT::new(1, None);
1456    let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1457    let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1458
1459    // Create a child CRDT with parent
1460    let parent_arc = Arc::new(parent);
1461    let mut child = CRDT::new(2, Some(parent_arc.clone()));
1462    let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1463    let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1464
1465    // Serialize and deserialize the child
1466    #[cfg(feature = "json")]
1467    {
1468      let json = serde_json::to_string(&child).unwrap();
1469      let deserialized: CRDT<String, String> = serde_json::from_str(&json).unwrap();
1470
1471      // Verify parent is None
1472      assert!(deserialized.parent.is_none());
1473
1474      // Verify child's own data is preserved
1475      assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1476
1477      // Verify parent's data is NOT in deserialized child
1478      assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1479    }
1480  }
1481}