crdt_lite/
lib.rs

1//! # crdt-lite
2//!
3//! A lightweight, column-based CRDT (Conflict-free Replicated Data Type) implementation in Rust.
4//!
5//! This library provides a generic CRDT with last-write-wins semantics, supporting:
6//! - Generic key and value types
7//! - Logical clock for causality tracking
8//! - Tombstone-based deletion
9//! - Parent-child CRDT hierarchies
10//! - Custom merge rules and comparators
11//! - Change compression
12//! - Optional serialization support via Serde
13//!
14//! ## Features
15//!
16//! This crate provides optional features through feature flags:
17//!
18//! - `std` - Enables standard library support (enabled by default)
19//! - `alloc` - Enables allocator support for no_std environments (pulls in hashbrown for HashMap)
20//! - `serde` - Enables Serde support for all CRDT types
21//! - `json` - Enables JSON serialization (includes `serde` + `serde_json`)
22//! - `binary` - Enables binary serialization (includes `serde` + `bincode`)
23//!
24//! ### no_std Support
25//!
26//! For no_std environments, disable default features and enable the `alloc` feature:
27//!
28//! ```toml
29//! [dependencies]
30//! crdt-lite = { version = "0.2", default-features = false, features = ["alloc"] }
31//! ```
32//!
33//! ### Usage Example with Serialization
34//!
35//! ```toml
36//! [dependencies]
37//! crdt-lite = { version = "0.1", features = ["json", "binary"] }
38//! ```
39//!
40//! ```rust,ignore
41//! use crdt_lite::CRDT;
42//!
43//! // Create and populate a CRDT
44//! let mut crdt: CRDT<String, String> = CRDT::new(1, None);
45//! let fields = vec![("name".to_string(), "Alice".to_string())];
46//! crdt.insert_or_update(&"user1".to_string(), fields);
47//!
48//! // Serialize to JSON (requires "json" feature)
49//! let json = crdt.to_json().unwrap();
50//!
51//! // Deserialize from JSON
52//! let restored: CRDT<String, String> = CRDT::from_json(&json).unwrap();
53//!
54//! // Serialize to binary (requires "binary" feature)
55//! let bytes = crdt.to_bytes().unwrap();
56//! let restored: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
57//!
58//! // Or use generic serde with any format (requires "serde" feature)
59//! let json = serde_json::to_string(&crdt).unwrap();
60//! let restored: CRDT<String, String> = serde_json::from_str(&json).unwrap();
61//! ```
62//!
63//! **Note on Parent Relationships**: Parent-child CRDT hierarchies are not serialized.
64//! After deserialization, the `parent` field will always be `None`. Applications must
65//! rebuild parent-child relationships if needed.
66//!
67//! ## Security and Resource Management
68//!
69//! ### Logical Clock Overflow
70//! The logical clock uses `u64` for version numbers. While overflow is theoretically possible
71//! after 2^64 operations (extremely unlikely in practice), applications with extreme longevity
72//! should be aware of this limitation.
73//!
74//! ### DoS Protection and Tombstone Management
75//! Tombstones accumulate indefinitely unless manually compacted. To prevent memory exhaustion:
76//! - Call `compact_tombstones()` periodically after all nodes have acknowledged a version
77//! - Implement application-level rate limiting for operations
78//! - Consider setting resource limits on the number of records and tombstones
79//!
80//! **Important**: Only compact tombstones when ALL participating nodes have acknowledged
81//! the minimum version. Compacting too early may cause deleted records to reappear on
82//! nodes that haven't received the deletion yet.
83
84#![cfg_attr(not(feature = "std"), no_std)]
85
86// Ensure valid feature combination: either std or alloc must be enabled
87#[cfg(not(any(feature = "std", feature = "alloc")))]
88compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
89
90#[cfg(not(feature = "std"))]
91extern crate alloc;
92
93#[cfg(feature = "std")]
94use std::{
95  cmp::Ordering,
96  collections::{HashMap, HashSet},
97  hash::Hash,
98  sync::Arc,
99};
100
101#[cfg(not(feature = "std"))]
102use alloc::{
103  string::String,
104  sync::Arc,
105  vec::Vec,
106};
107#[cfg(not(feature = "std"))]
108use core::{cmp::Ordering, hash::Hash};
109#[cfg(all(not(feature = "std"), feature = "alloc"))]
110use hashbrown::{HashMap, HashSet};
111
112/// Type alias for node IDs
113pub type NodeId = u64;
114
115/// Type alias for column keys (field names)
116pub type ColumnKey = String;
117
118/// Column version used for tombstone changes
119/// Using u64::MAX ensures tombstones are treated as having the highest possible version
120const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
121
122/// Represents a single change in the CRDT.
123///
124/// A change can represent:
125/// - An insertion or update of a column value (when `col_name` is `Some`)
126/// - A deletion of a specific column (when `col_name` is `Some` and `value` is `None`)
127/// - A deletion of an entire record (when `col_name` is `None`)
128#[derive(Debug, Clone, PartialEq)]
129#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
130#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
131#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
132pub struct Change<K, C, V> {
133  pub record_id: K,
134  /// `None` represents tombstone of the record
135  pub col_name: Option<C>,
136  /// `None` represents deletion of the column (not the record)
137  pub value: Option<V>,
138  pub col_version: u64,
139  pub db_version: u64,
140  pub node_id: NodeId,
141  /// Local db_version when the change was created (useful for `get_changes_since`)
142  pub local_db_version: u64,
143  /// Optional flags to indicate the type of change (ephemeral, not stored)
144  pub flags: u32,
145}
146
147impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
148
149impl<K, C, V> Change<K, C, V> {
150  /// Creates a new Change with all parameters
151  #[allow(clippy::too_many_arguments)]
152  pub fn new(
153    record_id: K,
154    col_name: Option<C>,
155    value: Option<V>,
156    col_version: u64,
157    db_version: u64,
158    node_id: NodeId,
159    local_db_version: u64,
160    flags: u32,
161  ) -> Self {
162    Self {
163      record_id,
164      col_name,
165      value,
166      col_version,
167      db_version,
168      node_id,
169      local_db_version,
170      flags,
171    }
172  }
173}
174
175/// Represents version information for a column.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
178pub struct ColumnVersion {
179  pub col_version: u64,
180  pub db_version: u64,
181  pub node_id: NodeId,
182  /// Local db_version when the change was created
183  pub local_db_version: u64,
184}
185
186impl ColumnVersion {
187  pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
188    Self {
189      col_version,
190      db_version,
191      node_id,
192      local_db_version,
193    }
194  }
195}
196
197/// Minimal version information for tombstones.
198///
199/// Stores essential data: db_version for conflict resolution, node_id for sync exclusion,
200/// and local_db_version for sync.
201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
203pub struct TombstoneInfo {
204  pub db_version: u64,
205  pub node_id: NodeId,
206  pub local_db_version: u64,
207}
208
209impl TombstoneInfo {
210  pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
211    Self {
212      db_version,
213      node_id,
214      local_db_version,
215    }
216  }
217
218  /// Helper to create a ColumnVersion for comparison with regular columns
219  pub fn as_column_version(&self) -> ColumnVersion {
220    ColumnVersion::new(
221      TOMBSTONE_COL_VERSION,
222      self.db_version,
223      self.node_id,
224      self.local_db_version,
225    )
226  }
227}
228
229/// Represents a logical clock for maintaining causality.
230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232pub struct LogicalClock {
233  time: u64,
234}
235
236impl LogicalClock {
237  /// Creates a new logical clock starting at 0
238  pub fn new() -> Self {
239    Self { time: 0 }
240  }
241
242  /// Increments the clock for a local event and returns the new time
243  pub fn tick(&mut self) -> u64 {
244    self.time += 1;
245    self.time
246  }
247
248  /// Updates the clock based on a received time and returns the new time
249  pub fn update(&mut self, received_time: u64) -> u64 {
250    self.time = self.time.max(received_time);
251    self.time += 1;
252    self.time
253  }
254
255  /// Sets the logical clock to a specific time
256  pub fn set_time(&mut self, time: u64) {
257    self.time = time;
258  }
259
260  /// Retrieves the current time
261  pub fn current_time(&self) -> u64 {
262    self.time
263  }
264}
265
266impl Default for LogicalClock {
267  fn default() -> Self {
268    Self::new()
269  }
270}
271
272/// Storage for tombstones (deleted records).
273///
274/// Uses a HashMap for efficient lookups and supports compaction.
275#[derive(Debug, Clone)]
276#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
277pub struct TombstoneStorage<K: Hash + Eq> {
278  entries: HashMap<K, TombstoneInfo>,
279}
280
281impl<K: Hash + Eq> TombstoneStorage<K> {
282  pub fn new() -> Self {
283    Self {
284      entries: HashMap::new(),
285    }
286  }
287
288  pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
289    self.entries.insert(key, info);
290  }
291
292  pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
293    self.entries.get(key).copied()
294  }
295
296  pub fn erase(&mut self, key: &K) -> bool {
297    self.entries.remove(key).is_some()
298  }
299
300  pub fn clear(&mut self) {
301    self.entries.clear();
302  }
303
304  pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
305    self.entries.iter()
306  }
307
308  pub fn len(&self) -> usize {
309    self.entries.len()
310  }
311
312  pub fn is_empty(&self) -> bool {
313    self.entries.is_empty()
314  }
315
316  /// Compact tombstones older than the specified version.
317  ///
318  /// Returns the number of tombstones removed.
319  pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
320    let initial_len = self.entries.len();
321    self
322      .entries
323      .retain(|_, info| info.db_version >= min_acknowledged_version);
324    initial_len - self.entries.len()
325  }
326}
327
328impl<K: Hash + Eq> Default for TombstoneStorage<K> {
329  fn default() -> Self {
330    Self::new()
331  }
332}
333
334/// Represents a record in the CRDT.
335#[derive(Debug, Clone)]
336#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
337#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize, V: serde::Serialize")))]
338#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
339pub struct Record<C, V> {
340  pub fields: HashMap<C, V>,
341  pub column_versions: HashMap<C, ColumnVersion>,
342  /// Track version boundaries for efficient filtering
343  pub lowest_local_db_version: u64,
344  pub highest_local_db_version: u64,
345}
346
347impl<C: Hash + Eq, V> Record<C, V> {
348  pub fn new() -> Self {
349    Self {
350      fields: HashMap::new(),
351      column_versions: HashMap::new(),
352      lowest_local_db_version: u64::MAX,
353      highest_local_db_version: 0,
354    }
355  }
356
357  /// Creates a record from existing fields and column versions
358  pub fn from_parts(
359    fields: HashMap<C, V>,
360    column_versions: HashMap<C, ColumnVersion>,
361  ) -> Self {
362    let mut lowest = u64::MAX;
363    let mut highest = 0;
364
365    for ver in column_versions.values() {
366      if ver.local_db_version < lowest {
367        lowest = ver.local_db_version;
368      }
369      if ver.local_db_version > highest {
370        highest = ver.local_db_version;
371      }
372    }
373
374    Self {
375      fields,
376      column_versions,
377      lowest_local_db_version: lowest,
378      highest_local_db_version: highest,
379    }
380  }
381}
382
383impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
384  fn eq(&self, other: &Self) -> bool {
385    // Compare only fields, not column_versions (those will differ per node)
386    self.fields == other.fields
387  }
388}
389
390impl<C: Hash + Eq, V> Default for Record<C, V> {
391  fn default() -> Self {
392    Self::new()
393  }
394}
395
396/// Trait for merge rules that determine conflict resolution.
397///
398/// Implementations should return `true` if the remote change should be accepted,
399/// `false` otherwise.
400pub trait MergeRule<K, C, V> {
401  /// Determines whether to accept a remote change over a local one
402  fn should_accept(
403    &self,
404    local_col: u64,
405    local_db: u64,
406    local_node: NodeId,
407    remote_col: u64,
408    remote_db: u64,
409    remote_node: NodeId,
410  ) -> bool;
411
412  /// Convenience method for Change objects
413  fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
414    self.should_accept(
415      local.col_version,
416      local.db_version,
417      local.node_id,
418      remote.col_version,
419      remote.db_version,
420      remote.node_id,
421    )
422  }
423}
424
425/// Default merge rule implementing last-write-wins semantics.
426///
427/// Comparison priority:
428/// 1. Column version (higher wins)
429/// 2. DB version (higher wins)
430/// 3. Node ID (higher wins as tiebreaker)
431#[derive(Debug, Clone, Copy, Default)]
432pub struct DefaultMergeRule;
433
434impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
435  fn should_accept(
436    &self,
437    local_col: u64,
438    local_db: u64,
439    local_node: NodeId,
440    remote_col: u64,
441    remote_db: u64,
442    remote_node: NodeId,
443  ) -> bool {
444    match remote_col.cmp(&local_col) {
445      Ordering::Greater => true,
446      Ordering::Less => false,
447      Ordering::Equal => match remote_db.cmp(&local_db) {
448        Ordering::Greater => true,
449        Ordering::Less => false,
450        Ordering::Equal => remote_node > local_node,
451      },
452    }
453  }
454}
455
456/// Trait for change comparators used in sorting and compression.
457pub trait ChangeComparator<K, C, V> {
458  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
459}
460
461/// Default change comparator.
462///
463/// Sorts by:
464/// 1. Record ID (ascending)
465/// 2. Column name presence (deletions/tombstones last)
466/// 3. Column name (ascending)
467/// 4. Column version (descending - most recent first)
468/// 5. DB version (descending)
469/// 6. Node ID (descending)
470#[derive(Debug, Clone, Copy, Default)]
471pub struct DefaultChangeComparator;
472
473impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
474  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
475    // Compare record IDs
476    match a.record_id.cmp(&b.record_id) {
477      Ordering::Equal => {}
478      ord => return ord,
479    }
480
481    // Deletions (None) come last for each record
482    match (a.col_name.as_ref(), b.col_name.as_ref()) {
483      (None, None) => {}
484      (None, Some(_)) => return Ordering::Greater,
485      (Some(_), None) => return Ordering::Less,
486      (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
487        Ordering::Equal => {}
488        ord => return ord,
489      },
490    }
491
492    // Compare versions (descending - most recent first)
493    match b.col_version.cmp(&a.col_version) {
494      Ordering::Equal => {}
495      ord => return ord,
496    }
497
498    match b.db_version.cmp(&a.db_version) {
499      Ordering::Equal => {}
500      ord => return ord,
501    }
502
503    b.node_id.cmp(&a.node_id)
504  }
505}
506
507/// Main CRDT structure, generic over key (K), column (C), and value (V) types.
508///
509/// This implements a column-based CRDT with last-write-wins semantics.
510#[derive(Debug)]
511#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
512#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
513#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
514pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
515  node_id: NodeId,
516  clock: LogicalClock,
517  data: HashMap<K, Record<C, V>>,
518  tombstones: TombstoneStorage<K>,
519  #[cfg_attr(feature = "serde", serde(skip, default))]
520  parent: Option<Arc<CRDT<K, C, V>>>,
521  #[allow(dead_code)]
522  base_version: u64,
523}
524
525impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
526  /// Creates a new empty CRDT.
527  ///
528  /// # Arguments
529  ///
530  /// * `node_id` - Unique identifier for this CRDT node
531  /// * `parent` - Optional parent CRDT for hierarchical structures
532  pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
533    let (clock, base_version) = if let Some(ref p) = parent {
534      let parent_clock = p.clock;
535      let base = parent_clock.current_time();
536      (parent_clock, base)
537    } else {
538      (LogicalClock::new(), 0)
539    };
540
541    Self {
542      node_id,
543      clock,
544      data: HashMap::new(),
545      tombstones: TombstoneStorage::new(),
546      parent,
547      base_version,
548    }
549  }
550
551  /// Creates a CRDT from a list of changes (e.g., loaded from disk).
552  ///
553  /// # Arguments
554  ///
555  /// * `node_id` - The unique identifier for this CRDT node
556  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
557  pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
558    let mut crdt = Self::new(node_id, None);
559    crdt.apply_changes(changes);
560    crdt
561  }
562
563  /// Resets the CRDT to a state as if it was constructed with the given changes.
564  ///
565  /// # Arguments
566  ///
567  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
568  pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
569    self.data.clear();
570    self.tombstones.clear();
571    self.clock = LogicalClock::new();
572    self.apply_changes(changes);
573  }
574
575  /// Applies a list of changes to reconstruct the CRDT state.
576  fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
577    // Determine the maximum db_version from the changes
578    let max_db_version = changes
579      .iter()
580      .map(|c| c.db_version.max(c.local_db_version))
581      .max()
582      .unwrap_or(0);
583
584    // Set the logical clock to the maximum db_version
585    self.clock.set_time(max_db_version);
586
587    // Apply each change to reconstruct the CRDT state
588    for change in changes {
589      let record_id = change.record_id.clone();
590      let col_name = change.col_name.clone();
591      let remote_col_version = change.col_version;
592      let remote_db_version = change.db_version;
593      let remote_node_id = change.node_id;
594      let remote_local_db_version = change.local_db_version;
595      let remote_value = change.value;
596
597      if col_name.is_none() {
598        // Handle deletion
599        self.data.remove(&record_id);
600
601        // Store deletion information in tombstones
602        self.tombstones.insert_or_assign(
603          record_id,
604          TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
605        );
606      } else if let Some(col_key) = col_name {
607        // Handle insertion or update
608        if !self.is_record_tombstoned(&record_id, false) {
609          let record = self.get_or_create_record_unchecked(&record_id, false);
610
611          // Insert or update the field value
612          if let Some(value) = remote_value {
613            record.fields.insert(col_key.clone(), value);
614          }
615
616          // Update the column version info
617          let col_ver = ColumnVersion::new(
618            remote_col_version,
619            remote_db_version,
620            remote_node_id,
621            remote_local_db_version,
622          );
623          record.column_versions.insert(col_key, col_ver);
624
625          // Update version boundaries
626          if remote_local_db_version < record.lowest_local_db_version {
627            record.lowest_local_db_version = remote_local_db_version;
628          }
629          if remote_local_db_version > record.highest_local_db_version {
630            record.highest_local_db_version = remote_local_db_version;
631          }
632        }
633      }
634    }
635  }
636
637  /// Inserts a new record or updates an existing record in the CRDT.
638  ///
639  /// # Arguments
640  ///
641  /// * `record_id` - The unique identifier for the record
642  /// * `fields` - An iterator of (column_name, value) pairs
643  ///
644  /// # Returns
645  ///
646  /// A vector of changes created by this operation
647  #[must_use = "changes should be propagated to other nodes"]
648  pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
649  where
650    I: IntoIterator<Item = (C, V)>,
651  {
652    self.insert_or_update_with_flags(record_id, 0, fields)
653  }
654
655  /// Inserts a new record or updates an existing record with flags.
656  ///
657  /// # Arguments
658  ///
659  /// * `record_id` - The unique identifier for the record
660  /// * `flags` - Flags to indicate the type of change
661  /// * `fields` - An iterator of (column_name, value) pairs
662  ///
663  /// # Returns
664  ///
665  /// A vector of changes created by this operation
666  #[must_use = "changes should be propagated to other nodes"]
667  pub fn insert_or_update_with_flags<I>(
668    &mut self,
669    record_id: &K,
670    flags: u32,
671    fields: I,
672  ) -> Vec<Change<K, C, V>>
673  where
674    I: IntoIterator<Item = (C, V)>,
675  {
676    let db_version = self.clock.tick();
677
678    // Check if the record is tombstoned
679    if self.is_record_tombstoned(record_id, false) {
680      return Vec::new();
681    }
682
683    let mut changes = Vec::new();
684    let node_id = self.node_id; // Store node_id before mutable borrow
685    let record = self.get_or_create_record_unchecked(record_id, false);
686
687    for (col_name, value) in fields {
688      let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
689        col_info.col_version += 1;
690        col_info.db_version = db_version;
691        col_info.node_id = node_id;
692        col_info.local_db_version = db_version;
693        col_info.col_version
694      } else {
695        record.column_versions.insert(
696          col_name.clone(),
697          ColumnVersion::new(1, db_version, node_id, db_version),
698        );
699        1
700      };
701
702      // Update record version boundaries
703      if db_version < record.lowest_local_db_version {
704        record.lowest_local_db_version = db_version;
705      }
706      if db_version > record.highest_local_db_version {
707        record.highest_local_db_version = db_version;
708      }
709
710      record.fields.insert(col_name.clone(), value.clone());
711      changes.push(Change::new(
712        record_id.clone(),
713        Some(col_name),
714        Some(value),
715        col_version,
716        db_version,
717        node_id,
718        db_version,
719        flags,
720      ));
721    }
722
723    changes
724  }
725
726  /// Deletes a record by marking it as tombstoned.
727  ///
728  /// # Arguments
729  ///
730  /// * `record_id` - The unique identifier for the record
731  ///
732  /// # Returns
733  ///
734  /// An optional Change representing the deletion
735  #[must_use = "changes should be propagated to other nodes"]
736  pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
737    self.delete_record_with_flags(record_id, 0)
738  }
739
740  /// Deletes a record with flags.
741  ///
742  /// # Arguments
743  ///
744  /// * `record_id` - The unique identifier for the record
745  /// * `flags` - Flags to indicate the type of change
746  ///
747  /// # Returns
748  ///
749  /// An optional Change representing the deletion
750  #[must_use = "changes should be propagated to other nodes"]
751  pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
752    if self.is_record_tombstoned(record_id, false) {
753      return None;
754    }
755
756    let db_version = self.clock.tick();
757
758    // Mark as tombstone and remove data
759    self.data.remove(record_id);
760
761    // Store deletion information in tombstones
762    self.tombstones.insert_or_assign(
763      record_id.clone(),
764      TombstoneInfo::new(db_version, self.node_id, db_version),
765    );
766
767    Some(Change::new(
768      record_id.clone(),
769      None,
770      None,
771      TOMBSTONE_COL_VERSION,
772      db_version,
773      self.node_id,
774      db_version,
775      flags,
776    ))
777  }
778
779  /// Merges incoming changes into the CRDT.
780  ///
781  /// # Arguments
782  ///
783  /// * `changes` - Vector of changes to merge
784  /// * `merge_rule` - The merge rule to use for conflict resolution
785  ///
786  /// # Returns
787  ///
788  /// Vector of accepted changes (if requested)
789  pub fn merge_changes<R: MergeRule<K, C, V>>(
790    &mut self,
791    changes: Vec<Change<K, C, V>>,
792    merge_rule: &R,
793  ) -> Vec<Change<K, C, V>> {
794    self.merge_changes_impl(changes, false, merge_rule)
795  }
796
797  fn merge_changes_impl<R: MergeRule<K, C, V>>(
798    &mut self,
799    changes: Vec<Change<K, C, V>>,
800    ignore_parent: bool,
801    merge_rule: &R,
802  ) -> Vec<Change<K, C, V>> {
803    let mut accepted_changes = Vec::new();
804
805    if changes.is_empty() {
806      return accepted_changes;
807    }
808
809    for change in changes {
810      // Move values from change to avoid unnecessary clones
811      let Change {
812        record_id,
813        col_name,
814        value: remote_value,
815        col_version: remote_col_version,
816        db_version: remote_db_version,
817        node_id: remote_node_id,
818        flags,
819        ..
820      } = change;
821
822      // Always update the logical clock to maintain causal consistency
823      let new_local_db_version = self.clock.update(remote_db_version);
824
825      // Skip all changes for tombstoned records
826      if self.is_record_tombstoned(&record_id, ignore_parent) {
827        continue;
828      }
829
830      // Retrieve local column version information
831      let local_col_info = if col_name.is_none() {
832        // For deletions, check tombstones
833        self
834          .tombstones
835          .find(&record_id)
836          .map(|info| info.as_column_version())
837      } else if let Some(ref col) = col_name {
838        // For column updates, check the record
839        self
840          .get_record_ptr(&record_id, ignore_parent)
841          .and_then(|record| record.column_versions.get(col).copied())
842      } else {
843        None
844      };
845
846      // Determine whether to accept the remote change
847      let should_accept = if let Some(local_info) = local_col_info {
848        merge_rule.should_accept(
849          local_info.col_version,
850          local_info.db_version,
851          local_info.node_id,
852          remote_col_version,
853          remote_db_version,
854          remote_node_id,
855        )
856      } else {
857        true
858      };
859
860      if should_accept {
861        if let Some(col_key) = col_name {
862          // Handle insertion or update
863          let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
864
865          // Update field value
866          if let Some(value) = remote_value.clone() {
867            record.fields.insert(col_key.clone(), value);
868          } else {
869            // If remote_value is None, remove the field
870            record.fields.remove(&col_key);
871          }
872
873          // Update the column version info and record version boundaries
874          record.column_versions.insert(
875            col_key.clone(),
876            ColumnVersion::new(
877              remote_col_version,
878              remote_db_version,
879              remote_node_id,
880              new_local_db_version,
881            ),
882          );
883
884          // Update version boundaries
885          if new_local_db_version < record.lowest_local_db_version {
886            record.lowest_local_db_version = new_local_db_version;
887          }
888          if new_local_db_version > record.highest_local_db_version {
889            record.highest_local_db_version = new_local_db_version;
890          }
891
892          accepted_changes.push(Change::new(
893            record_id,
894            Some(col_key),
895            remote_value,
896            remote_col_version,
897            remote_db_version,
898            remote_node_id,
899            new_local_db_version,
900            flags,
901          ));
902        } else {
903          // Handle deletion
904          self.data.remove(&record_id);
905
906          // Store deletion information in tombstones
907          self.tombstones.insert_or_assign(
908            record_id.clone(),
909            TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
910          );
911
912          accepted_changes.push(Change::new(
913            record_id,
914            None,
915            None,
916            remote_col_version,
917            remote_db_version,
918            remote_node_id,
919            new_local_db_version,
920            flags,
921          ));
922        }
923      }
924    }
925
926    accepted_changes
927  }
928
929  /// Retrieves all changes since a given `last_db_version`.
930  ///
931  /// # Arguments
932  ///
933  /// * `last_db_version` - The database version to retrieve changes since
934  ///
935  /// # Returns
936  ///
937  /// A vector of changes
938  #[must_use]
939  pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
940  where
941    K: Ord,
942    C: Ord,
943  {
944    self.get_changes_since_excluding(last_db_version, &HashSet::new())
945  }
946
947  /// Retrieves all changes since a given `last_db_version`, excluding specific nodes.
948  pub fn get_changes_since_excluding(
949    &self,
950    last_db_version: u64,
951    excluding: &HashSet<NodeId>,
952  ) -> Vec<Change<K, C, V>>
953  where
954    K: Ord,
955    C: Ord,
956  {
957    let mut changes = Vec::new();
958
959    // Get changes from parent
960    if let Some(ref parent) = self.parent {
961      let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
962      changes.extend(parent_changes);
963    }
964
965    // Get changes from regular records
966    for (record_id, record) in &self.data {
967      // Skip records that haven't changed since last_db_version
968      if record.highest_local_db_version <= last_db_version {
969        continue;
970      }
971
972      for (col_name, clock_info) in &record.column_versions {
973        if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
974        {
975          let value = record.fields.get(col_name).cloned();
976
977          changes.push(Change::new(
978            record_id.clone(),
979            Some(col_name.clone()),
980            value,
981            clock_info.col_version,
982            clock_info.db_version,
983            clock_info.node_id,
984            clock_info.local_db_version,
985            0,
986          ));
987        }
988      }
989    }
990
991    // Get deletion changes from tombstones
992    for (record_id, tombstone_info) in self.tombstones.iter() {
993      if tombstone_info.local_db_version > last_db_version
994        && !excluding.contains(&tombstone_info.node_id)
995      {
996        changes.push(Change::new(
997          record_id.clone(),
998          None,
999          None,
1000          TOMBSTONE_COL_VERSION,
1001          tombstone_info.db_version,
1002          tombstone_info.node_id,
1003          tombstone_info.local_db_version,
1004          0,
1005        ));
1006      }
1007    }
1008
1009    if self.parent.is_some() {
1010      // Compress changes to remove redundant operations
1011      Self::compress_changes(&mut changes);
1012    }
1013
1014    changes
1015  }
1016
1017  /// Compresses a vector of changes in-place by removing redundant changes.
1018  ///
1019  /// Changes are sorted and then compressed using a two-pointer technique.
1020  ///
1021  /// # Performance
1022  ///
1023  /// This method uses `sort_unstable_by` which provides O(n log n) average time complexity
1024  /// but does not preserve the relative order of equal elements. Since the comparator
1025  /// provides a total ordering, stability is not required.
1026  pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1027  where
1028    K: Ord,
1029    C: Ord,
1030  {
1031    if changes.is_empty() {
1032      return;
1033    }
1034
1035    // Sort changes using the DefaultChangeComparator
1036    // Use sort_unstable for better performance since we don't need stable sorting
1037    let comparator = DefaultChangeComparator;
1038    changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1039
1040    // Use two-pointer technique to compress in-place
1041    let mut write = 0;
1042    for read in 1..changes.len() {
1043      if changes[read].record_id != changes[write].record_id {
1044        // New record, always keep it
1045        write += 1;
1046        if write != read {
1047          changes[write] = changes[read].clone();
1048        }
1049      } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1050        // Current read is a deletion, backtrack to first change for this record
1051        // and replace it with the deletion, effectively discarding all field updates
1052        let mut first_pos = write;
1053        while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1054          first_pos -= 1;
1055        }
1056        changes[first_pos] = changes[read].clone();
1057        write = first_pos;
1058      } else if changes[read].col_name != changes[write].col_name
1059        && changes[write].col_name.is_some()
1060      {
1061        // New column for the same record
1062        write += 1;
1063        if write != read {
1064          changes[write] = changes[read].clone();
1065        }
1066      }
1067      // Else: same record and column, keep the existing one (most recent due to sorting)
1068    }
1069
1070    changes.truncate(write + 1);
1071  }
1072
1073  /// Retrieves a reference to a record if it exists.
1074  pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1075    self.get_record_ptr(record_id, false)
1076  }
1077
1078  /// Checks if a record is tombstoned.
1079  pub fn is_tombstoned(&self, record_id: &K) -> bool {
1080    self.is_record_tombstoned(record_id, false)
1081  }
1082
1083  /// Gets tombstone information for a record.
1084  pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1085    if let Some(info) = self.tombstones.find(record_id) {
1086      return Some(info);
1087    }
1088
1089    if let Some(ref parent) = self.parent {
1090      return parent.get_tombstone(record_id);
1091    }
1092
1093    None
1094  }
1095
1096  /// Removes tombstones older than the specified version.
1097  ///
1098  /// # Safety and DoS Mitigation
1099  ///
1100  /// **IMPORTANT**: Only call this method when ALL participating nodes have acknowledged
1101  /// the `min_acknowledged_version`. Compacting too early may cause deleted records to
1102  /// reappear on nodes that haven't received the deletion yet.
1103  ///
1104  /// To prevent DoS via tombstone accumulation:
1105  /// - Call this method periodically as part of your sync protocol
1106  /// - Track which versions have been acknowledged by all nodes
1107  /// - Consider implementing a tombstone limit and rejecting operations when exceeded
1108  ///
1109  /// # Arguments
1110  ///
1111  /// * `min_acknowledged_version` - Tombstones with db_version < this value will be removed
1112  ///
1113  /// # Returns
1114  ///
1115  /// The number of tombstones removed
1116  pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1117    self.tombstones.compact(min_acknowledged_version)
1118  }
1119
1120  /// Gets the number of tombstones currently stored.
1121  pub fn tombstone_count(&self) -> usize {
1122    self.tombstones.len()
1123  }
1124
1125  /// Gets the current logical clock.
1126  pub fn get_clock(&self) -> &LogicalClock {
1127    &self.clock
1128  }
1129
1130  /// Gets a reference to the internal data map.
1131  pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1132    &self.data
1133  }
1134
1135  /// Serializes the CRDT to a JSON string.
1136  ///
1137  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1138  ///
1139  /// # Errors
1140  ///
1141  /// Returns an error if serialization fails.
1142  #[cfg(feature = "json")]
1143  pub fn to_json(&self) -> Result<String, serde_json::Error>
1144  where
1145    K: serde::Serialize,
1146    C: serde::Serialize,
1147    V: serde::Serialize,
1148  {
1149    serde_json::to_string(self)
1150  }
1151
1152  /// Deserializes a CRDT from a JSON string.
1153  ///
1154  /// Note: The parent relationship is not deserialized and will be `None`.
1155  /// Applications must rebuild parent-child relationships if needed.
1156  ///
1157  /// # Errors
1158  ///
1159  /// Returns an error if deserialization fails.
1160  #[cfg(feature = "json")]
1161  pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1162  where
1163    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1164    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1165    V: serde::de::DeserializeOwned + Clone,
1166  {
1167    serde_json::from_str(json)
1168  }
1169
1170  /// Serializes the CRDT to bytes using bincode.
1171  ///
1172  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1173  ///
1174  /// # Errors
1175  ///
1176  /// Returns an error if serialization fails.
1177  #[cfg(feature = "binary")]
1178  pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1179  where
1180    K: serde::Serialize,
1181    C: serde::Serialize,
1182    V: serde::Serialize,
1183  {
1184    bincode::serde::encode_to_vec(self, bincode::config::standard())
1185  }
1186
1187  /// Deserializes a CRDT from bytes using bincode.
1188  ///
1189  /// Note: The parent relationship is not deserialized and will be `None`.
1190  /// Applications must rebuild parent-child relationships if needed.
1191  ///
1192  /// # Errors
1193  ///
1194  /// Returns an error if deserialization fails.
1195  #[cfg(feature = "binary")]
1196  pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1197  where
1198    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1199    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1200    V: serde::de::DeserializeOwned + Clone,
1201  {
1202    let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1203    Ok(result)
1204  }
1205
1206  // Helper methods
1207
1208  fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1209    if self.tombstones.find(record_id).is_some() {
1210      return true;
1211    }
1212
1213    if !ignore_parent {
1214      if let Some(ref parent) = self.parent {
1215        return parent.is_record_tombstoned(record_id, false);
1216      }
1217    }
1218
1219    false
1220  }
1221
1222  fn get_or_create_record_unchecked(
1223    &mut self,
1224    record_id: &K,
1225    ignore_parent: bool,
1226  ) -> &mut Record<C, V> {
1227    #[cfg(feature = "std")]
1228    use std::collections::hash_map::Entry;
1229    #[cfg(all(not(feature = "std"), feature = "alloc"))]
1230    use hashbrown::hash_map::Entry;
1231
1232    match self.data.entry(record_id.clone()) {
1233      Entry::Occupied(e) => e.into_mut(),
1234      Entry::Vacant(e) => {
1235        let record = if !ignore_parent {
1236          self
1237            .parent
1238            .as_ref()
1239            .and_then(|p| p.get_record_ptr(record_id, false))
1240            .cloned()
1241            .unwrap_or_else(Record::new)
1242        } else {
1243          Record::new()
1244        };
1245        e.insert(record)
1246      }
1247    }
1248  }
1249
1250  fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1251    if let Some(record) = self.data.get(record_id) {
1252      return Some(record);
1253    }
1254
1255    if !ignore_parent {
1256      if let Some(ref parent) = self.parent {
1257        return parent.get_record_ptr(record_id, false);
1258      }
1259    }
1260
1261    None
1262  }
1263}
1264
1265#[cfg(test)]
1266mod tests {
1267  use super::*;
1268
1269  #[test]
1270  fn test_logical_clock() {
1271    let mut clock = LogicalClock::new();
1272    assert_eq!(clock.current_time(), 0);
1273
1274    let t1 = clock.tick();
1275    assert_eq!(t1, 1);
1276    assert_eq!(clock.current_time(), 1);
1277
1278    let t2 = clock.update(5);
1279    assert_eq!(t2, 6);
1280    assert_eq!(clock.current_time(), 6);
1281  }
1282
1283  #[test]
1284  fn test_tombstone_storage() {
1285    let mut storage = TombstoneStorage::new();
1286    let info = TombstoneInfo::new(10, 1, 10);
1287
1288    storage.insert_or_assign("key1".to_string(), info);
1289    assert_eq!(storage.len(), 1);
1290
1291    assert_eq!(storage.find(&"key1".to_string()), Some(info));
1292    assert_eq!(storage.find(&"key2".to_string()), None);
1293
1294    let removed = storage.compact(15);
1295    assert_eq!(removed, 1);
1296    assert_eq!(storage.len(), 0);
1297  }
1298
1299  #[test]
1300  fn test_basic_insert() {
1301    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1302
1303    let fields = vec![
1304      ("name".to_string(), "Alice".to_string()),
1305      ("age".to_string(), "30".to_string()),
1306    ];
1307
1308    let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1309
1310    assert_eq!(changes.len(), 2);
1311    assert_eq!(crdt.get_data().len(), 1);
1312
1313    let record = crdt.get_record(&"user1".to_string()).unwrap();
1314    assert_eq!(record.fields.get("name").unwrap(), "Alice");
1315    assert_eq!(record.fields.get("age").unwrap(), "30");
1316  }
1317
1318  #[test]
1319  fn test_delete_record() {
1320    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1321
1322    let fields = vec![("name".to_string(), "Bob".to_string())];
1323    let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1324
1325    let delete_change = crdt.delete_record(&"user2".to_string());
1326    assert!(delete_change.is_some());
1327    assert!(crdt.is_tombstoned(&"user2".to_string()));
1328    assert_eq!(crdt.get_data().len(), 0);
1329  }
1330
1331  #[test]
1332  fn test_merge_changes() {
1333    let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1334    let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1335
1336    let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1337    let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1338
1339    let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1340    let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1341
1342    let merge_rule = DefaultMergeRule;
1343    crdt1.merge_changes(changes2, &merge_rule);
1344    crdt2.merge_changes(changes1, &merge_rule);
1345
1346    // Node2 has higher node_id, so its value should win
1347    assert_eq!(
1348      crdt1
1349        .get_record(&"record1".to_string())
1350        .unwrap()
1351        .fields
1352        .get("tag")
1353        .unwrap(),
1354      "Node2"
1355    );
1356    assert_eq!(crdt1.get_data(), crdt2.get_data());
1357  }
1358
1359  #[test]
1360  #[cfg(feature = "serde")]
1361  fn test_change_serialization() {
1362    #[allow(unused_variables)]
1363    let change = Change::new(
1364      "record1".to_string(),
1365      Some("name".to_string()),
1366      Some("Alice".to_string()),
1367      1,
1368      10,
1369      1,
1370      10,
1371      0,
1372    );
1373
1374    // Test JSON serialization
1375    #[cfg(feature = "json")]
1376    {
1377      let json = serde_json::to_string(&change).unwrap();
1378      let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1379      assert_eq!(change, deserialized);
1380    }
1381
1382    // Test binary serialization
1383    #[cfg(feature = "binary")]
1384    {
1385      let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1386      let (deserialized, _): (Change<String, String, String>, _) =
1387        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1388      assert_eq!(change, deserialized);
1389    }
1390  }
1391
1392  #[test]
1393  #[cfg(feature = "serde")]
1394  fn test_record_serialization() {
1395    let mut fields = HashMap::new();
1396    fields.insert("name".to_string(), "Bob".to_string());
1397    fields.insert("age".to_string(), "25".to_string());
1398
1399    let mut column_versions = HashMap::new();
1400    column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1401    column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1402
1403    #[allow(unused_variables)]
1404    let record = Record::from_parts(fields, column_versions);
1405
1406    // Test JSON serialization
1407    #[cfg(feature = "json")]
1408    {
1409      let json = serde_json::to_string(&record).unwrap();
1410      let deserialized: Record<String> = serde_json::from_str(&json).unwrap();
1411      assert_eq!(record, deserialized);
1412    }
1413
1414    // Test binary serialization
1415    #[cfg(feature = "binary")]
1416    {
1417      let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1418      let (deserialized, _): (Record<String>, _) =
1419        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1420      assert_eq!(record, deserialized);
1421    }
1422  }
1423
1424  #[test]
1425  #[cfg(feature = "json")]
1426  fn test_crdt_json_serialization() {
1427    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1428
1429    // Add some data
1430    let fields = vec![
1431      ("name".to_string(), "Alice".to_string()),
1432      ("age".to_string(), "30".to_string()),
1433    ];
1434    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1435
1436    let fields2 = vec![("name".to_string(), "Bob".to_string())];
1437    let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1438
1439    // Delete one record
1440    let _ = crdt.delete_record(&"user2".to_string());
1441
1442    // Serialize to JSON
1443    let json = crdt.to_json().unwrap();
1444
1445    // Deserialize
1446    let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1447
1448    // Verify data is preserved
1449    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1450    assert_eq!(
1451      crdt.get_record(&"user1".to_string()).unwrap().fields,
1452      deserialized.get_record(&"user1".to_string()).unwrap().fields
1453    );
1454
1455    // Verify tombstones are preserved
1456    assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1457    assert!(deserialized.is_tombstoned(&"user2".to_string()));
1458
1459    // Verify clock is preserved
1460    assert_eq!(
1461      crdt.get_clock().current_time(),
1462      deserialized.get_clock().current_time()
1463    );
1464
1465    // Verify parent is None after deserialization
1466    let has_parent = deserialized.parent.is_some();
1467    assert!(!has_parent);
1468  }
1469
1470  #[test]
1471  #[cfg(feature = "binary")]
1472  fn test_crdt_binary_serialization() {
1473    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1474
1475    // Add some data
1476    let fields = vec![
1477      ("name".to_string(), "Alice".to_string()),
1478      ("age".to_string(), "30".to_string()),
1479    ];
1480    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1481
1482    // Serialize to bytes
1483    let bytes = crdt.to_bytes().unwrap();
1484
1485    // Deserialize
1486    let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1487
1488    // Verify data is preserved
1489    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1490    assert_eq!(
1491      crdt.get_record(&"user1".to_string()).unwrap().fields,
1492      deserialized.get_record(&"user1".to_string()).unwrap().fields
1493    );
1494
1495    // Verify clock is preserved
1496    assert_eq!(
1497      crdt.get_clock().current_time(),
1498      deserialized.get_clock().current_time()
1499    );
1500  }
1501
1502  #[test]
1503  #[cfg(feature = "serde")]
1504  fn test_parent_not_serialized() {
1505    // Create a parent CRDT
1506    let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1507    let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1508    let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1509
1510    // Create a child CRDT with parent
1511    let parent_arc = Arc::new(parent);
1512    let mut child = CRDT::new(2, Some(parent_arc.clone()));
1513    let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1514    let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1515
1516    // Serialize and deserialize the child
1517    #[cfg(feature = "json")]
1518    {
1519      let json = serde_json::to_string(&child).unwrap();
1520      let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1521
1522      // Verify parent is None
1523      assert!(deserialized.parent.is_none());
1524
1525      // Verify child's own data is preserved
1526      assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1527
1528      // Verify parent's data is NOT in deserialized child
1529      assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1530    }
1531  }
1532}