crdt_lite/
lib.rs

1//! # crdt-lite
2//!
3//! A lightweight, column-based CRDT (Conflict-free Replicated Data Type) implementation in Rust.
4//!
5//! This library provides a generic CRDT with last-write-wins semantics, supporting:
6//! - Generic key and value types
7//! - Logical clock for causality tracking
8//! - Tombstone-based deletion
9//! - Parent-child CRDT hierarchies
10//! - Custom merge rules and comparators
11//! - Change compression
12//! - Optional serialization support via Serde
13//!
14//! ## Features
15//!
16//! This crate provides optional features through feature flags:
17//!
18//! - `std` - Enables standard library support (enabled by default)
19//! - `alloc` - Enables allocator support for no_std environments (pulls in hashbrown for HashMap)
20//! - `serde` - Enables Serde support for all CRDT types
21//! - `json` - Enables JSON serialization (includes `serde` + `serde_json`)
22//! - `binary` - Enables binary serialization (includes `serde` + `bincode`)
23//! - `persist` - Enables persistence layer with WAL and hooks (includes `binary` + `std`)
24//! - `node-id-u128` - Uses u128 for NodeId instead of u64 (useful for UUID-based node IDs)
25//!
26//! ### no_std Support
27//!
28//! For no_std environments, disable default features and enable the `alloc` feature:
29//!
30//! ```toml
31//! [dependencies]
32//! crdt-lite = { version = "0.2", default-features = false, features = ["alloc"] }
33//! ```
34//!
35//! ### Usage Example with Serialization
36//!
37//! ```toml
38//! [dependencies]
39//! crdt-lite = { version = "0.1", features = ["json", "binary"] }
40//! ```
41//!
42//! ```rust,ignore
43//! use crdt_lite::CRDT;
44//!
45//! // Create and populate a CRDT
46//! let mut crdt: CRDT<String, String> = CRDT::new(1, None);
47//! let fields = vec![("name".to_string(), "Alice".to_string())];
48//! crdt.insert_or_update(&"user1".to_string(), fields);
49//!
50//! // Serialize to JSON (requires "json" feature)
51//! let json = crdt.to_json().unwrap();
52//!
53//! // Deserialize from JSON
54//! let restored: CRDT<String, String> = CRDT::from_json(&json).unwrap();
55//!
56//! // Serialize to binary (requires "binary" feature)
57//! let bytes = crdt.to_bytes().unwrap();
58//! let restored: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
59//!
60//! // Or use generic serde with any format (requires "serde" feature)
61//! let json = serde_json::to_string(&crdt).unwrap();
62//! let restored: CRDT<String, String> = serde_json::from_str(&json).unwrap();
63//! ```
64//!
65//! **Note on Parent Relationships**: Parent-child CRDT hierarchies are not serialized.
66//! After deserialization, the `parent` field will always be `None`. Applications must
67//! rebuild parent-child relationships if needed.
68//!
69//! ## Security and Resource Management
70//!
71//! ### Logical Clock Overflow
72//! The logical clock uses `u64` for version numbers. While overflow is theoretically possible
73//! after 2^64 operations (extremely unlikely in practice), applications with extreme longevity
74//! should be aware of this limitation.
75//!
76//! ### DoS Protection and Tombstone Management
77//! Tombstones accumulate indefinitely unless manually compacted. To prevent memory exhaustion:
78//! - Call `compact_tombstones()` periodically after all nodes have acknowledged a version
79//! - Implement application-level rate limiting for operations
80//! - Consider setting resource limits on the number of records and tombstones
81//!
82//! **Important**: Only compact tombstones when ALL participating nodes have acknowledged
83//! the minimum version. Compacting too early may cause deleted records to reappear on
84//! nodes that haven't received the deletion yet.
85
86#![cfg_attr(not(feature = "std"), no_std)]
87
88// Persistence module (requires "persist" feature)
89#[cfg(feature = "persist")]
90pub mod persist;
91
92// Ensure valid feature combination: either std or alloc must be enabled
93#[cfg(not(any(feature = "std", feature = "alloc")))]
94compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
95
96#[cfg(not(feature = "std"))]
97extern crate alloc;
98
99#[cfg(feature = "std")]
100use std::{
101  cmp::Ordering,
102  collections::{HashMap, HashSet},
103  hash::Hash,
104  sync::Arc,
105};
106
107#[cfg(not(feature = "std"))]
108use alloc::{
109  string::String,
110  sync::Arc,
111  vec::Vec,
112};
113#[cfg(not(feature = "std"))]
114use core::{cmp::Ordering, hash::Hash};
115#[cfg(all(not(feature = "std"), feature = "alloc"))]
116use hashbrown::{HashMap, HashSet};
117
118/// Type alias for node IDs
119///
120/// By default, NodeId is u64. Use the `node-id-u128` feature to enable u128 for UUID-based IDs:
121/// ```toml
122/// crdt-lite = { version = "0.4", features = ["node-id-u128"] }
123/// ```
124#[cfg(feature = "node-id-u128")]
125pub type NodeId = u128;
126
127#[cfg(not(feature = "node-id-u128"))]
128pub type NodeId = u64;
129
130/// Type alias for column keys (field names)
131pub type ColumnKey = String;
132
133/// Column version used for tombstone changes
134/// Using u64::MAX ensures tombstones are treated as having the highest possible version
135const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
136
137/// Represents a single change in the CRDT.
138///
139/// A change can represent:
140/// - An insertion or update of a column value (when `col_name` is `Some`)
141/// - A deletion of a specific column (when `col_name` is `Some` and `value` is `None`)
142/// - A deletion of an entire record (when `col_name` is `None`)
143#[derive(Debug, Clone, PartialEq)]
144#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
145#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
146#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
147pub struct Change<K, C, V> {
148  pub record_id: K,
149  /// `None` represents tombstone of the record
150  pub col_name: Option<C>,
151  /// `None` represents deletion of the column (not the record)
152  pub value: Option<V>,
153  pub col_version: u64,
154  pub db_version: u64,
155  pub node_id: NodeId,
156  /// Local db_version when the change was created (useful for `get_changes_since`)
157  pub local_db_version: u64,
158  /// Optional flags to indicate the type of change (ephemeral, not stored)
159  pub flags: u32,
160}
161
162impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
163
164impl<K, C, V> Change<K, C, V> {
165  /// Creates a new Change with all parameters
166  #[allow(clippy::too_many_arguments)]
167  pub fn new(
168    record_id: K,
169    col_name: Option<C>,
170    value: Option<V>,
171    col_version: u64,
172    db_version: u64,
173    node_id: NodeId,
174    local_db_version: u64,
175    flags: u32,
176  ) -> Self {
177    Self {
178      record_id,
179      col_name,
180      value,
181      col_version,
182      db_version,
183      node_id,
184      local_db_version,
185      flags,
186    }
187  }
188}
189
190/// Represents version information for a column.
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
192#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
193pub struct ColumnVersion {
194  pub col_version: u64,
195  pub db_version: u64,
196  pub node_id: NodeId,
197  /// Local db_version when the change was created
198  pub local_db_version: u64,
199}
200
201impl ColumnVersion {
202  pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
203    Self {
204      col_version,
205      db_version,
206      node_id,
207      local_db_version,
208    }
209  }
210}
211
212/// Minimal version information for tombstones.
213///
214/// Stores essential data: db_version for conflict resolution, node_id for sync exclusion,
215/// and local_db_version for sync.
216#[derive(Debug, Clone, Copy, PartialEq, Eq)]
217#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
218pub struct TombstoneInfo {
219  pub db_version: u64,
220  pub node_id: NodeId,
221  pub local_db_version: u64,
222}
223
224impl TombstoneInfo {
225  pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
226    Self {
227      db_version,
228      node_id,
229      local_db_version,
230    }
231  }
232
233  /// Helper to create a ColumnVersion for comparison with regular columns
234  pub fn as_column_version(&self) -> ColumnVersion {
235    ColumnVersion::new(
236      TOMBSTONE_COL_VERSION,
237      self.db_version,
238      self.node_id,
239      self.local_db_version,
240    )
241  }
242}
243
244/// Represents a logical clock for maintaining causality.
245#[derive(Debug, Clone, Copy, PartialEq, Eq)]
246#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
247pub struct LogicalClock {
248  time: u64,
249}
250
251impl LogicalClock {
252  /// Creates a new logical clock starting at 0
253  pub fn new() -> Self {
254    Self { time: 0 }
255  }
256
257  /// Increments the clock for a local event and returns the new time
258  pub fn tick(&mut self) -> u64 {
259    self.time += 1;
260    self.time
261  }
262
263  /// Updates the clock based on a received time and returns the new time
264  pub fn update(&mut self, received_time: u64) -> u64 {
265    self.time = self.time.max(received_time);
266    self.time += 1;
267    self.time
268  }
269
270  /// Sets the logical clock to a specific time
271  pub fn set_time(&mut self, time: u64) {
272    self.time = time;
273  }
274
275  /// Retrieves the current time
276  pub fn current_time(&self) -> u64 {
277    self.time
278  }
279}
280
281impl Default for LogicalClock {
282  fn default() -> Self {
283    Self::new()
284  }
285}
286
287/// Storage for tombstones (deleted records).
288///
289/// Uses a HashMap for efficient lookups and supports compaction.
290#[derive(Debug, Clone)]
291#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
292pub struct TombstoneStorage<K: Hash + Eq> {
293  entries: HashMap<K, TombstoneInfo>,
294}
295
296impl<K: Hash + Eq> TombstoneStorage<K> {
297  pub fn new() -> Self {
298    Self {
299      entries: HashMap::new(),
300    }
301  }
302
303  pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
304    self.entries.insert(key, info);
305  }
306
307  pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
308    self.entries.get(key).copied()
309  }
310
311  pub fn erase(&mut self, key: &K) -> bool {
312    self.entries.remove(key).is_some()
313  }
314
315  pub fn clear(&mut self) {
316    self.entries.clear();
317  }
318
319  pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
320    self.entries.iter()
321  }
322
323  pub fn len(&self) -> usize {
324    self.entries.len()
325  }
326
327  pub fn is_empty(&self) -> bool {
328    self.entries.is_empty()
329  }
330
331  /// Compact tombstones older than the specified version.
332  ///
333  /// Returns the number of tombstones removed.
334  pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
335    let initial_len = self.entries.len();
336    self
337      .entries
338      .retain(|_, info| info.db_version >= min_acknowledged_version);
339    initial_len - self.entries.len()
340  }
341}
342
343impl<K: Hash + Eq> Default for TombstoneStorage<K> {
344  fn default() -> Self {
345    Self::new()
346  }
347}
348
349/// Represents a record in the CRDT.
350#[derive(Debug, Clone)]
351#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
352#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
353#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
354pub struct Record<C, V> {
355  pub fields: HashMap<C, V>,
356  pub column_versions: HashMap<C, ColumnVersion>,
357  /// Track version boundaries for efficient filtering
358  pub lowest_local_db_version: u64,
359  pub highest_local_db_version: u64,
360}
361
362impl<C: Hash + Eq, V> Record<C, V> {
363  pub fn new() -> Self {
364    Self {
365      fields: HashMap::new(),
366      column_versions: HashMap::new(),
367      lowest_local_db_version: u64::MAX,
368      highest_local_db_version: 0,
369    }
370  }
371
372  /// Creates a record from existing fields and column versions
373  pub fn from_parts(
374    fields: HashMap<C, V>,
375    column_versions: HashMap<C, ColumnVersion>,
376  ) -> Self {
377    let mut lowest = u64::MAX;
378    let mut highest = 0;
379
380    for ver in column_versions.values() {
381      if ver.local_db_version < lowest {
382        lowest = ver.local_db_version;
383      }
384      if ver.local_db_version > highest {
385        highest = ver.local_db_version;
386      }
387    }
388
389    Self {
390      fields,
391      column_versions,
392      lowest_local_db_version: lowest,
393      highest_local_db_version: highest,
394    }
395  }
396}
397
398impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
399  fn eq(&self, other: &Self) -> bool {
400    // Compare only fields, not column_versions (those will differ per node)
401    self.fields == other.fields
402  }
403}
404
405impl<C: Hash + Eq, V> Default for Record<C, V> {
406  fn default() -> Self {
407    Self::new()
408  }
409}
410
411/// Trait for merge rules that determine conflict resolution.
412///
413/// Implementations should return `true` if the remote change should be accepted,
414/// `false` otherwise.
415pub trait MergeRule<K, C, V> {
416  /// Determines whether to accept a remote change over a local one
417  fn should_accept(
418    &self,
419    local_col: u64,
420    local_db: u64,
421    local_node: NodeId,
422    remote_col: u64,
423    remote_db: u64,
424    remote_node: NodeId,
425  ) -> bool;
426
427  /// Convenience method for Change objects
428  fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
429    self.should_accept(
430      local.col_version,
431      local.db_version,
432      local.node_id,
433      remote.col_version,
434      remote.db_version,
435      remote.node_id,
436    )
437  }
438}
439
440/// Default merge rule implementing last-write-wins semantics.
441///
442/// Comparison priority:
443/// 1. Column version (higher wins)
444/// 2. DB version (higher wins)
445/// 3. Node ID (higher wins as tiebreaker)
446#[derive(Debug, Clone, Copy, Default)]
447pub struct DefaultMergeRule;
448
449impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
450  fn should_accept(
451    &self,
452    local_col: u64,
453    local_db: u64,
454    local_node: NodeId,
455    remote_col: u64,
456    remote_db: u64,
457    remote_node: NodeId,
458  ) -> bool {
459    match remote_col.cmp(&local_col) {
460      Ordering::Greater => true,
461      Ordering::Less => false,
462      Ordering::Equal => match remote_db.cmp(&local_db) {
463        Ordering::Greater => true,
464        Ordering::Less => false,
465        Ordering::Equal => remote_node > local_node,
466      },
467    }
468  }
469}
470
471/// Trait for change comparators used in sorting and compression.
472pub trait ChangeComparator<K, C, V> {
473  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
474}
475
476/// Default change comparator.
477///
478/// Sorts by:
479/// 1. Record ID (ascending)
480/// 2. Column name presence (deletions/tombstones last)
481/// 3. Column name (ascending)
482/// 4. Column version (descending - most recent first)
483/// 5. DB version (descending)
484/// 6. Node ID (descending)
485#[derive(Debug, Clone, Copy, Default)]
486pub struct DefaultChangeComparator;
487
488impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
489  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
490    // Compare record IDs
491    match a.record_id.cmp(&b.record_id) {
492      Ordering::Equal => {}
493      ord => return ord,
494    }
495
496    // Deletions (None) come last for each record
497    match (a.col_name.as_ref(), b.col_name.as_ref()) {
498      (None, None) => {}
499      (None, Some(_)) => return Ordering::Greater,
500      (Some(_), None) => return Ordering::Less,
501      (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
502        Ordering::Equal => {}
503        ord => return ord,
504      },
505    }
506
507    // Compare versions (descending - most recent first)
508    match b.col_version.cmp(&a.col_version) {
509      Ordering::Equal => {}
510      ord => return ord,
511    }
512
513    match b.db_version.cmp(&a.db_version) {
514      Ordering::Equal => {}
515      ord => return ord,
516    }
517
518    b.node_id.cmp(&a.node_id)
519  }
520}
521
522/// Main CRDT structure, generic over key (K), column (C), and value (V) types.
523///
524/// This implements a column-based CRDT with last-write-wins semantics.
525#[derive(Debug)]
526#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
527#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
528#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
529pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
530  node_id: NodeId,
531  clock: LogicalClock,
532  data: HashMap<K, Record<C, V>>,
533  tombstones: TombstoneStorage<K>,
534  #[cfg_attr(feature = "serde", serde(skip, default))]
535  parent: Option<Arc<CRDT<K, C, V>>>,
536  #[allow(dead_code)]
537  base_version: u64,
538}
539
540impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
541  /// Creates a new empty CRDT.
542  ///
543  /// # Arguments
544  ///
545  /// * `node_id` - Unique identifier for this CRDT node
546  /// * `parent` - Optional parent CRDT for hierarchical structures
547  pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
548    let (clock, base_version) = if let Some(ref p) = parent {
549      let parent_clock = p.clock;
550      let base = parent_clock.current_time();
551      (parent_clock, base)
552    } else {
553      (LogicalClock::new(), 0)
554    };
555
556    Self {
557      node_id,
558      clock,
559      data: HashMap::new(),
560      tombstones: TombstoneStorage::new(),
561      parent,
562      base_version,
563    }
564  }
565
566  /// Creates a CRDT from a list of changes (e.g., loaded from disk).
567  ///
568  /// # Arguments
569  ///
570  /// * `node_id` - The unique identifier for this CRDT node
571  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
572  pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
573    let mut crdt = Self::new(node_id, None);
574    crdt.apply_changes(changes);
575    crdt
576  }
577
578  /// Resets the CRDT to a state as if it was constructed with the given changes.
579  ///
580  /// # Arguments
581  ///
582  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
583  pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
584    self.data.clear();
585    self.tombstones.clear();
586    self.clock = LogicalClock::new();
587    self.apply_changes(changes);
588  }
589
590  /// Applies a list of changes to reconstruct the CRDT state.
591  fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
592    // Determine the maximum db_version from the changes
593    let max_db_version = changes
594      .iter()
595      .map(|c| c.db_version.max(c.local_db_version))
596      .max()
597      .unwrap_or(0);
598
599    // Set the logical clock to the maximum db_version
600    self.clock.set_time(max_db_version);
601
602    // Apply each change to reconstruct the CRDT state
603    for change in changes {
604      let record_id = change.record_id.clone();
605      let col_name = change.col_name.clone();
606      let remote_col_version = change.col_version;
607      let remote_db_version = change.db_version;
608      let remote_node_id = change.node_id;
609      let remote_local_db_version = change.local_db_version;
610      let remote_value = change.value;
611
612      if col_name.is_none() {
613        // Handle deletion
614        self.data.remove(&record_id);
615
616        // Store deletion information in tombstones
617        self.tombstones.insert_or_assign(
618          record_id,
619          TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
620        );
621      } else if let Some(col_key) = col_name {
622        // Handle insertion or update
623        if !self.is_record_tombstoned(&record_id, false) {
624          let record = self.get_or_create_record_unchecked(&record_id, false);
625
626          // Insert or update the field value
627          if let Some(value) = remote_value {
628            record.fields.insert(col_key.clone(), value);
629          }
630
631          // Update the column version info
632          let col_ver = ColumnVersion::new(
633            remote_col_version,
634            remote_db_version,
635            remote_node_id,
636            remote_local_db_version,
637          );
638          record.column_versions.insert(col_key, col_ver);
639
640          // Update version boundaries
641          if remote_local_db_version < record.lowest_local_db_version {
642            record.lowest_local_db_version = remote_local_db_version;
643          }
644          if remote_local_db_version > record.highest_local_db_version {
645            record.highest_local_db_version = remote_local_db_version;
646          }
647        }
648      }
649    }
650  }
651
652  /// Inserts a new record or updates an existing record in the CRDT.
653  ///
654  /// # Arguments
655  ///
656  /// * `record_id` - The unique identifier for the record
657  /// * `fields` - An iterator of (column_name, value) pairs
658  ///
659  /// # Returns
660  ///
661  /// A vector of changes created by this operation
662  #[must_use = "changes should be propagated to other nodes"]
663  pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
664  where
665    I: IntoIterator<Item = (C, V)>,
666  {
667    self.insert_or_update_with_flags(record_id, 0, fields)
668  }
669
670  /// Inserts a new record or updates an existing record with flags.
671  ///
672  /// # Arguments
673  ///
674  /// * `record_id` - The unique identifier for the record
675  /// * `flags` - Flags to indicate the type of change
676  /// * `fields` - An iterator of (column_name, value) pairs
677  ///
678  /// # Returns
679  ///
680  /// A vector of changes created by this operation
681  #[must_use = "changes should be propagated to other nodes"]
682  pub fn insert_or_update_with_flags<I>(
683    &mut self,
684    record_id: &K,
685    flags: u32,
686    fields: I,
687  ) -> Vec<Change<K, C, V>>
688  where
689    I: IntoIterator<Item = (C, V)>,
690  {
691    let db_version = self.clock.tick();
692
693    // Check if the record is tombstoned
694    if self.is_record_tombstoned(record_id, false) {
695      return Vec::new();
696    }
697
698    let mut changes = Vec::new();
699    let node_id = self.node_id; // Store node_id before mutable borrow
700    let record = self.get_or_create_record_unchecked(record_id, false);
701
702    for (col_name, value) in fields {
703      let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
704        col_info.col_version += 1;
705        col_info.db_version = db_version;
706        col_info.node_id = node_id;
707        col_info.local_db_version = db_version;
708        col_info.col_version
709      } else {
710        record.column_versions.insert(
711          col_name.clone(),
712          ColumnVersion::new(1, db_version, node_id, db_version),
713        );
714        1
715      };
716
717      // Update record version boundaries
718      if db_version < record.lowest_local_db_version {
719        record.lowest_local_db_version = db_version;
720      }
721      if db_version > record.highest_local_db_version {
722        record.highest_local_db_version = db_version;
723      }
724
725      record.fields.insert(col_name.clone(), value.clone());
726      changes.push(Change::new(
727        record_id.clone(),
728        Some(col_name),
729        Some(value),
730        col_version,
731        db_version,
732        node_id,
733        db_version,
734        flags,
735      ));
736    }
737
738    changes
739  }
740
741  /// Deletes a record by marking it as tombstoned.
742  ///
743  /// # Arguments
744  ///
745  /// * `record_id` - The unique identifier for the record
746  ///
747  /// # Returns
748  ///
749  /// An optional Change representing the deletion
750  #[must_use = "changes should be propagated to other nodes"]
751  pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
752    self.delete_record_with_flags(record_id, 0)
753  }
754
755  /// Deletes a record with flags.
756  ///
757  /// # Arguments
758  ///
759  /// * `record_id` - The unique identifier for the record
760  /// * `flags` - Flags to indicate the type of change
761  ///
762  /// # Returns
763  ///
764  /// An optional Change representing the deletion
765  #[must_use = "changes should be propagated to other nodes"]
766  pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
767    if self.is_record_tombstoned(record_id, false) {
768      return None;
769    }
770
771    let db_version = self.clock.tick();
772
773    // Mark as tombstone and remove data
774    self.data.remove(record_id);
775
776    // Store deletion information in tombstones
777    self.tombstones.insert_or_assign(
778      record_id.clone(),
779      TombstoneInfo::new(db_version, self.node_id, db_version),
780    );
781
782    Some(Change::new(
783      record_id.clone(),
784      None,
785      None,
786      TOMBSTONE_COL_VERSION,
787      db_version,
788      self.node_id,
789      db_version,
790      flags,
791    ))
792  }
793
794  /// Deletes a specific field from a record.
795  ///
796  /// # Arguments
797  ///
798  /// * `record_id` - The unique identifier for the record
799  /// * `field_name` - The name of the field to delete
800  ///
801  /// # Returns
802  ///
803  /// An optional Change representing the field deletion. Returns None if:
804  /// - The record is tombstoned
805  /// - The record doesn't exist
806  /// - The field doesn't exist in the record
807  #[must_use = "changes should be propagated to other nodes"]
808  pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
809    self.delete_field_with_flags(record_id, field_name, 0)
810  }
811
812  /// Deletes a specific field from a record with flags.
813  ///
814  /// # Arguments
815  ///
816  /// * `record_id` - The unique identifier for the record
817  /// * `field_name` - The name of the field to delete
818  /// * `flags` - Flags to indicate the type of change
819  ///
820  /// # Returns
821  ///
822  /// An optional Change representing the field deletion. Returns None if:
823  /// - The record is tombstoned
824  /// - The record doesn't exist
825  /// - The field doesn't exist in the record
826  #[must_use = "changes should be propagated to other nodes"]
827  pub fn delete_field_with_flags(
828    &mut self,
829    record_id: &K,
830    field_name: &C,
831    flags: u32,
832  ) -> Option<Change<K, C, V>> {
833    // Check if the record is tombstoned
834    if self.is_record_tombstoned(record_id, false) {
835      return None;
836    }
837
838    // Get the record (return None if it doesn't exist)
839    let record = self.data.get_mut(record_id)?;
840
841    // Check if the field exists
842    if !record.fields.contains_key(field_name) {
843      return None;
844    }
845
846    let db_version = self.clock.tick();
847
848    // Get or create column version and increment it
849    let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
850      col_info.col_version += 1;
851      col_info.db_version = db_version;
852      col_info.node_id = self.node_id;
853      col_info.local_db_version = db_version;
854      col_info.col_version
855    } else {
856      // This shouldn't happen if the field exists, but handle it gracefully
857      record.column_versions.insert(
858        field_name.clone(),
859        ColumnVersion::new(1, db_version, self.node_id, db_version),
860      );
861      1
862    };
863
864    // Update record version boundaries
865    if db_version < record.lowest_local_db_version {
866      record.lowest_local_db_version = db_version;
867    }
868    if db_version > record.highest_local_db_version {
869      record.highest_local_db_version = db_version;
870    }
871
872    // Remove the field from the record (but keep the ColumnVersion as field tombstone)
873    record.fields.remove(field_name);
874
875    Some(Change::new(
876      record_id.clone(),
877      Some(field_name.clone()),
878      None, // None value indicates field deletion
879      col_version,
880      db_version,
881      self.node_id,
882      db_version,
883      flags,
884    ))
885  }
886
887  /// Merges incoming changes into the CRDT.
888  ///
889  /// # Arguments
890  ///
891  /// * `changes` - Vector of changes to merge
892  /// * `merge_rule` - The merge rule to use for conflict resolution
893  ///
894  /// # Returns
895  ///
896  /// Vector of accepted changes (if requested)
897  pub fn merge_changes<R: MergeRule<K, C, V>>(
898    &mut self,
899    changes: Vec<Change<K, C, V>>,
900    merge_rule: &R,
901  ) -> Vec<Change<K, C, V>> {
902    self.merge_changes_impl(changes, false, merge_rule)
903  }
904
905  fn merge_changes_impl<R: MergeRule<K, C, V>>(
906    &mut self,
907    changes: Vec<Change<K, C, V>>,
908    ignore_parent: bool,
909    merge_rule: &R,
910  ) -> Vec<Change<K, C, V>> {
911    let mut accepted_changes = Vec::new();
912
913    if changes.is_empty() {
914      return accepted_changes;
915    }
916
917    for change in changes {
918      // Move values from change to avoid unnecessary clones
919      let Change {
920        record_id,
921        col_name,
922        value: remote_value,
923        col_version: remote_col_version,
924        db_version: remote_db_version,
925        node_id: remote_node_id,
926        flags,
927        ..
928      } = change;
929
930      // Always update the logical clock to maintain causal consistency
931      let new_local_db_version = self.clock.update(remote_db_version);
932
933      // Skip all changes for tombstoned records
934      if self.is_record_tombstoned(&record_id, ignore_parent) {
935        continue;
936      }
937
938      // Retrieve local column version information
939      let local_col_info = if col_name.is_none() {
940        // For deletions, check tombstones
941        self
942          .tombstones
943          .find(&record_id)
944          .map(|info| info.as_column_version())
945      } else if let Some(ref col) = col_name {
946        // For column updates, check the record
947        self
948          .get_record_ptr(&record_id, ignore_parent)
949          .and_then(|record| record.column_versions.get(col).copied())
950      } else {
951        None
952      };
953
954      // Determine whether to accept the remote change
955      let should_accept = if let Some(local_info) = local_col_info {
956        merge_rule.should_accept(
957          local_info.col_version,
958          local_info.db_version,
959          local_info.node_id,
960          remote_col_version,
961          remote_db_version,
962          remote_node_id,
963        )
964      } else {
965        true
966      };
967
968      if should_accept {
969        if let Some(col_key) = col_name {
970          // Handle insertion or update
971          let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
972
973          // Update field value
974          if let Some(value) = remote_value.clone() {
975            record.fields.insert(col_key.clone(), value);
976          } else {
977            // If remote_value is None, remove the field
978            record.fields.remove(&col_key);
979          }
980
981          // Update the column version info and record version boundaries
982          record.column_versions.insert(
983            col_key.clone(),
984            ColumnVersion::new(
985              remote_col_version,
986              remote_db_version,
987              remote_node_id,
988              new_local_db_version,
989            ),
990          );
991
992          // Update version boundaries
993          if new_local_db_version < record.lowest_local_db_version {
994            record.lowest_local_db_version = new_local_db_version;
995          }
996          if new_local_db_version > record.highest_local_db_version {
997            record.highest_local_db_version = new_local_db_version;
998          }
999
1000          accepted_changes.push(Change::new(
1001            record_id,
1002            Some(col_key),
1003            remote_value,
1004            remote_col_version,
1005            remote_db_version,
1006            remote_node_id,
1007            new_local_db_version,
1008            flags,
1009          ));
1010        } else {
1011          // Handle deletion
1012          self.data.remove(&record_id);
1013
1014          // Store deletion information in tombstones
1015          self.tombstones.insert_or_assign(
1016            record_id.clone(),
1017            TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1018          );
1019
1020          accepted_changes.push(Change::new(
1021            record_id,
1022            None,
1023            None,
1024            remote_col_version,
1025            remote_db_version,
1026            remote_node_id,
1027            new_local_db_version,
1028            flags,
1029          ));
1030        }
1031      }
1032    }
1033
1034    accepted_changes
1035  }
1036
1037  /// Retrieves all changes since a given `last_db_version`.
1038  ///
1039  /// # Arguments
1040  ///
1041  /// * `last_db_version` - The database version to retrieve changes since
1042  ///
1043  /// # Returns
1044  ///
1045  /// A vector of changes
1046  #[must_use]
1047  pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1048  where
1049    K: Ord,
1050    C: Ord,
1051  {
1052    self.get_changes_since_excluding(last_db_version, &HashSet::new())
1053  }
1054
1055  /// Retrieves all changes since a given `last_db_version`, excluding specific nodes.
1056  pub fn get_changes_since_excluding(
1057    &self,
1058    last_db_version: u64,
1059    excluding: &HashSet<NodeId>,
1060  ) -> Vec<Change<K, C, V>>
1061  where
1062    K: Ord,
1063    C: Ord,
1064  {
1065    let mut changes = Vec::new();
1066
1067    // Get changes from parent
1068    if let Some(ref parent) = self.parent {
1069      let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1070      changes.extend(parent_changes);
1071    }
1072
1073    // Get changes from regular records
1074    for (record_id, record) in &self.data {
1075      // Skip records that haven't changed since last_db_version
1076      if record.highest_local_db_version <= last_db_version {
1077        continue;
1078      }
1079
1080      for (col_name, clock_info) in &record.column_versions {
1081        if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1082        {
1083          let value = record.fields.get(col_name).cloned();
1084
1085          changes.push(Change::new(
1086            record_id.clone(),
1087            Some(col_name.clone()),
1088            value,
1089            clock_info.col_version,
1090            clock_info.db_version,
1091            clock_info.node_id,
1092            clock_info.local_db_version,
1093            0,
1094          ));
1095        }
1096      }
1097    }
1098
1099    // Get deletion changes from tombstones
1100    for (record_id, tombstone_info) in self.tombstones.iter() {
1101      if tombstone_info.local_db_version > last_db_version
1102        && !excluding.contains(&tombstone_info.node_id)
1103      {
1104        changes.push(Change::new(
1105          record_id.clone(),
1106          None,
1107          None,
1108          TOMBSTONE_COL_VERSION,
1109          tombstone_info.db_version,
1110          tombstone_info.node_id,
1111          tombstone_info.local_db_version,
1112          0,
1113        ));
1114      }
1115    }
1116
1117    if self.parent.is_some() {
1118      // Compress changes to remove redundant operations
1119      Self::compress_changes(&mut changes);
1120    }
1121
1122    changes
1123  }
1124
1125  /// Compresses a vector of changes in-place by removing redundant changes.
1126  ///
1127  /// Changes are sorted and then compressed using a two-pointer technique.
1128  ///
1129  /// # Performance
1130  ///
1131  /// This method uses `sort_unstable_by` which provides O(n log n) average time complexity
1132  /// but does not preserve the relative order of equal elements. Since the comparator
1133  /// provides a total ordering, stability is not required.
1134  pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1135  where
1136    K: Ord,
1137    C: Ord,
1138  {
1139    if changes.is_empty() {
1140      return;
1141    }
1142
1143    // Sort changes using the DefaultChangeComparator
1144    // Use sort_unstable for better performance since we don't need stable sorting
1145    let comparator = DefaultChangeComparator;
1146    changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1147
1148    // Use two-pointer technique to compress in-place
1149    let mut write = 0;
1150    for read in 1..changes.len() {
1151      if changes[read].record_id != changes[write].record_id {
1152        // New record, always keep it
1153        write += 1;
1154        if write != read {
1155          changes[write] = changes[read].clone();
1156        }
1157      } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1158        // Current read is a deletion, backtrack to first change for this record
1159        // and replace it with the deletion, effectively discarding all field updates
1160        let mut first_pos = write;
1161        while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1162          first_pos -= 1;
1163        }
1164        changes[first_pos] = changes[read].clone();
1165        write = first_pos;
1166      } else if changes[read].col_name != changes[write].col_name
1167        && changes[write].col_name.is_some()
1168      {
1169        // New column for the same record
1170        write += 1;
1171        if write != read {
1172          changes[write] = changes[read].clone();
1173        }
1174      }
1175      // Else: same record and column, keep the existing one (most recent due to sorting)
1176    }
1177
1178    changes.truncate(write + 1);
1179  }
1180
1181  /// Retrieves a reference to a record if it exists.
1182  pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1183    self.get_record_ptr(record_id, false)
1184  }
1185
1186  /// Checks if a record is tombstoned.
1187  pub fn is_tombstoned(&self, record_id: &K) -> bool {
1188    self.is_record_tombstoned(record_id, false)
1189  }
1190
1191  /// Gets tombstone information for a record.
1192  pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1193    if let Some(info) = self.tombstones.find(record_id) {
1194      return Some(info);
1195    }
1196
1197    if let Some(ref parent) = self.parent {
1198      return parent.get_tombstone(record_id);
1199    }
1200
1201    None
1202  }
1203
1204  /// Removes tombstones older than the specified version.
1205  ///
1206  /// # Safety and DoS Mitigation
1207  ///
1208  /// **IMPORTANT**: Only call this method when ALL participating nodes have acknowledged
1209  /// the `min_acknowledged_version`. Compacting too early may cause deleted records to
1210  /// reappear on nodes that haven't received the deletion yet.
1211  ///
1212  /// To prevent DoS via tombstone accumulation:
1213  /// - Call this method periodically as part of your sync protocol
1214  /// - Track which versions have been acknowledged by all nodes
1215  /// - Consider implementing a tombstone limit and rejecting operations when exceeded
1216  ///
1217  /// # Arguments
1218  ///
1219  /// * `min_acknowledged_version` - Tombstones with db_version < this value will be removed
1220  ///
1221  /// # Returns
1222  ///
1223  /// The number of tombstones removed
1224  pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1225    self.tombstones.compact(min_acknowledged_version)
1226  }
1227
1228  /// Gets the number of tombstones currently stored.
1229  pub fn tombstone_count(&self) -> usize {
1230    self.tombstones.len()
1231  }
1232
1233  /// Gets the current logical clock.
1234  pub fn get_clock(&self) -> &LogicalClock {
1235    &self.clock
1236  }
1237
1238  /// Gets a reference to the internal data map.
1239  pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1240    &self.data
1241  }
1242
1243  /// Serializes the CRDT to a JSON string.
1244  ///
1245  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1246  ///
1247  /// # Errors
1248  ///
1249  /// Returns an error if serialization fails.
1250  #[cfg(feature = "json")]
1251  pub fn to_json(&self) -> Result<String, serde_json::Error>
1252  where
1253    K: serde::Serialize,
1254    C: serde::Serialize,
1255    V: serde::Serialize,
1256  {
1257    serde_json::to_string(self)
1258  }
1259
1260  /// Deserializes a CRDT from a JSON string.
1261  ///
1262  /// Note: The parent relationship is not deserialized and will be `None`.
1263  /// Applications must rebuild parent-child relationships if needed.
1264  ///
1265  /// # Errors
1266  ///
1267  /// Returns an error if deserialization fails.
1268  #[cfg(feature = "json")]
1269  pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1270  where
1271    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1272    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1273    V: serde::de::DeserializeOwned + Clone,
1274  {
1275    serde_json::from_str(json)
1276  }
1277
1278  /// Serializes the CRDT to bytes using bincode.
1279  ///
1280  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1281  ///
1282  /// # Errors
1283  ///
1284  /// Returns an error if serialization fails.
1285  #[cfg(feature = "binary")]
1286  pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1287  where
1288    K: serde::Serialize,
1289    C: serde::Serialize,
1290    V: serde::Serialize,
1291  {
1292    bincode::serde::encode_to_vec(self, bincode::config::standard())
1293  }
1294
1295  /// Deserializes a CRDT from bytes using bincode.
1296  ///
1297  /// Note: The parent relationship is not deserialized and will be `None`.
1298  /// Applications must rebuild parent-child relationships if needed.
1299  ///
1300  /// # Errors
1301  ///
1302  /// Returns an error if deserialization fails.
1303  #[cfg(feature = "binary")]
1304  pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1305  where
1306    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1307    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1308    V: serde::de::DeserializeOwned + Clone,
1309  {
1310    let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1311    Ok(result)
1312  }
1313
1314  // Helper methods
1315
1316  fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1317    if self.tombstones.find(record_id).is_some() {
1318      return true;
1319    }
1320
1321    if !ignore_parent {
1322      if let Some(ref parent) = self.parent {
1323        return parent.is_record_tombstoned(record_id, false);
1324      }
1325    }
1326
1327    false
1328  }
1329
1330  fn get_or_create_record_unchecked(
1331    &mut self,
1332    record_id: &K,
1333    ignore_parent: bool,
1334  ) -> &mut Record<C, V> {
1335    #[cfg(feature = "std")]
1336    use std::collections::hash_map::Entry;
1337    #[cfg(all(not(feature = "std"), feature = "alloc"))]
1338    use hashbrown::hash_map::Entry;
1339
1340    match self.data.entry(record_id.clone()) {
1341      Entry::Occupied(e) => e.into_mut(),
1342      Entry::Vacant(e) => {
1343        let record = if !ignore_parent {
1344          self
1345            .parent
1346            .as_ref()
1347            .and_then(|p| p.get_record_ptr(record_id, false))
1348            .cloned()
1349            .unwrap_or_else(Record::new)
1350        } else {
1351          Record::new()
1352        };
1353        e.insert(record)
1354      }
1355    }
1356  }
1357
1358  fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1359    if let Some(record) = self.data.get(record_id) {
1360      return Some(record);
1361    }
1362
1363    if !ignore_parent {
1364      if let Some(ref parent) = self.parent {
1365        return parent.get_record_ptr(record_id, false);
1366      }
1367    }
1368
1369    None
1370  }
1371}
1372
1373#[cfg(test)]
1374mod tests {
1375  use super::*;
1376
1377  #[test]
1378  fn test_logical_clock() {
1379    let mut clock = LogicalClock::new();
1380    assert_eq!(clock.current_time(), 0);
1381
1382    let t1 = clock.tick();
1383    assert_eq!(t1, 1);
1384    assert_eq!(clock.current_time(), 1);
1385
1386    let t2 = clock.update(5);
1387    assert_eq!(t2, 6);
1388    assert_eq!(clock.current_time(), 6);
1389  }
1390
1391  #[test]
1392  fn test_tombstone_storage() {
1393    let mut storage = TombstoneStorage::new();
1394    let info = TombstoneInfo::new(10, 1, 10);
1395
1396    storage.insert_or_assign("key1".to_string(), info);
1397    assert_eq!(storage.len(), 1);
1398
1399    assert_eq!(storage.find(&"key1".to_string()), Some(info));
1400    assert_eq!(storage.find(&"key2".to_string()), None);
1401
1402    let removed = storage.compact(15);
1403    assert_eq!(removed, 1);
1404    assert_eq!(storage.len(), 0);
1405  }
1406
1407  #[test]
1408  fn test_basic_insert() {
1409    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1410
1411    let fields = vec![
1412      ("name".to_string(), "Alice".to_string()),
1413      ("age".to_string(), "30".to_string()),
1414    ];
1415
1416    let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1417
1418    assert_eq!(changes.len(), 2);
1419    assert_eq!(crdt.get_data().len(), 1);
1420
1421    let record = crdt.get_record(&"user1".to_string()).unwrap();
1422    assert_eq!(record.fields.get("name").unwrap(), "Alice");
1423    assert_eq!(record.fields.get("age").unwrap(), "30");
1424  }
1425
1426  #[test]
1427  fn test_delete_record() {
1428    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1429
1430    let fields = vec![("name".to_string(), "Bob".to_string())];
1431    let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1432
1433    let delete_change = crdt.delete_record(&"user2".to_string());
1434    assert!(delete_change.is_some());
1435    assert!(crdt.is_tombstoned(&"user2".to_string()));
1436    assert_eq!(crdt.get_data().len(), 0);
1437  }
1438
1439  #[test]
1440  fn test_merge_changes() {
1441    let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1442    let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1443
1444    let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1445    let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1446
1447    let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1448    let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1449
1450    let merge_rule = DefaultMergeRule;
1451    crdt1.merge_changes(changes2, &merge_rule);
1452    crdt2.merge_changes(changes1, &merge_rule);
1453
1454    // Node2 has higher node_id, so its value should win
1455    assert_eq!(
1456      crdt1
1457        .get_record(&"record1".to_string())
1458        .unwrap()
1459        .fields
1460        .get("tag")
1461        .unwrap(),
1462      "Node2"
1463    );
1464    assert_eq!(crdt1.get_data(), crdt2.get_data());
1465  }
1466
1467  #[test]
1468  #[cfg(feature = "serde")]
1469  fn test_change_serialization() {
1470    #[allow(unused_variables)]
1471    let change = Change::new(
1472      "record1".to_string(),
1473      Some("name".to_string()),
1474      Some("Alice".to_string()),
1475      1,
1476      10,
1477      1,
1478      10,
1479      0,
1480    );
1481
1482    // Test JSON serialization
1483    #[cfg(feature = "json")]
1484    {
1485      let json = serde_json::to_string(&change).unwrap();
1486      let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1487      assert_eq!(change, deserialized);
1488    }
1489
1490    // Test binary serialization
1491    #[cfg(feature = "binary")]
1492    {
1493      let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1494      let (deserialized, _): (Change<String, String, String>, _) =
1495        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1496      assert_eq!(change, deserialized);
1497    }
1498  }
1499
1500  #[test]
1501  #[cfg(feature = "serde")]
1502  fn test_record_serialization() {
1503    let mut fields = HashMap::new();
1504    fields.insert("name".to_string(), "Bob".to_string());
1505    fields.insert("age".to_string(), "25".to_string());
1506
1507    let mut column_versions = HashMap::new();
1508    column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1509    column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1510
1511    #[allow(unused_variables)]
1512    let record = Record::from_parts(fields, column_versions);
1513
1514    // Test JSON serialization
1515    #[cfg(feature = "json")]
1516    {
1517      let json = serde_json::to_string(&record).unwrap();
1518      let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1519      assert_eq!(record, deserialized);
1520    }
1521
1522    // Test binary serialization
1523    #[cfg(feature = "binary")]
1524    {
1525      let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1526      let (deserialized, _): (Record<String, String>, _) =
1527        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1528      assert_eq!(record, deserialized);
1529    }
1530  }
1531
1532  #[test]
1533  #[cfg(feature = "json")]
1534  fn test_crdt_json_serialization() {
1535    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1536
1537    // Add some data
1538    let fields = vec![
1539      ("name".to_string(), "Alice".to_string()),
1540      ("age".to_string(), "30".to_string()),
1541    ];
1542    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1543
1544    let fields2 = vec![("name".to_string(), "Bob".to_string())];
1545    let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1546
1547    // Delete one record
1548    let _ = crdt.delete_record(&"user2".to_string());
1549
1550    // Serialize to JSON
1551    let json = crdt.to_json().unwrap();
1552
1553    // Deserialize
1554    let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1555
1556    // Verify data is preserved
1557    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1558    assert_eq!(
1559      crdt.get_record(&"user1".to_string()).unwrap().fields,
1560      deserialized.get_record(&"user1".to_string()).unwrap().fields
1561    );
1562
1563    // Verify tombstones are preserved
1564    assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1565    assert!(deserialized.is_tombstoned(&"user2".to_string()));
1566
1567    // Verify clock is preserved
1568    assert_eq!(
1569      crdt.get_clock().current_time(),
1570      deserialized.get_clock().current_time()
1571    );
1572
1573    // Verify parent is None after deserialization
1574    let has_parent = deserialized.parent.is_some();
1575    assert!(!has_parent);
1576  }
1577
1578  #[test]
1579  #[cfg(feature = "binary")]
1580  fn test_crdt_binary_serialization() {
1581    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1582
1583    // Add some data
1584    let fields = vec![
1585      ("name".to_string(), "Alice".to_string()),
1586      ("age".to_string(), "30".to_string()),
1587    ];
1588    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1589
1590    // Serialize to bytes
1591    let bytes = crdt.to_bytes().unwrap();
1592
1593    // Deserialize
1594    let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1595
1596    // Verify data is preserved
1597    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1598    assert_eq!(
1599      crdt.get_record(&"user1".to_string()).unwrap().fields,
1600      deserialized.get_record(&"user1".to_string()).unwrap().fields
1601    );
1602
1603    // Verify clock is preserved
1604    assert_eq!(
1605      crdt.get_clock().current_time(),
1606      deserialized.get_clock().current_time()
1607    );
1608  }
1609
1610  #[test]
1611  #[cfg(feature = "serde")]
1612  fn test_parent_not_serialized() {
1613    // Create a parent CRDT
1614    let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1615    let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1616    let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1617
1618    // Create a child CRDT with parent
1619    let parent_arc = Arc::new(parent);
1620    let mut child = CRDT::new(2, Some(parent_arc.clone()));
1621    let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1622    let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1623
1624    // Serialize and deserialize the child
1625    #[cfg(feature = "json")]
1626    {
1627      let json = serde_json::to_string(&child).unwrap();
1628      let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1629
1630      // Verify parent is None
1631      assert!(deserialized.parent.is_none());
1632
1633      // Verify child's own data is preserved
1634      assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1635
1636      // Verify parent's data is NOT in deserialized child
1637      assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1638    }
1639  }
1640}