crdt_lite/
lib.rs

1//! # crdt-lite
2//!
3//! A lightweight, column-based CRDT (Conflict-free Replicated Data Type) implementation in Rust.
4//!
5//! This library provides a generic CRDT with last-write-wins semantics, supporting:
6//! - Generic key and value types
7//! - Logical clock for causality tracking
8//! - Tombstone-based deletion
9//! - Parent-child CRDT hierarchies
10//! - Custom merge rules and comparators
11//! - Change compression
12//! - Optional serialization support via Serde
13//!
14//! ## Features
15//!
16//! This crate provides optional features through feature flags:
17//!
18//! - `std` - Enables standard library support (enabled by default)
19//! - `alloc` - Enables allocator support for no_std environments (pulls in hashbrown for HashMap)
20//! - `serde` - Enables Serde support for all CRDT types
21//! - `json` - Enables JSON serialization (includes `serde` + `serde_json`)
22//! - `binary` - Enables binary serialization (includes `serde` + `bincode`)
23//!
24//! ### no_std Support
25//!
26//! For no_std environments, disable default features and enable the `alloc` feature:
27//!
28//! ```toml
29//! [dependencies]
30//! crdt-lite = { version = "0.2", default-features = false, features = ["alloc"] }
31//! ```
32//!
33//! ### Usage Example with Serialization
34//!
35//! ```toml
36//! [dependencies]
37//! crdt-lite = { version = "0.1", features = ["json", "binary"] }
38//! ```
39//!
40//! ```rust,ignore
41//! use crdt_lite::CRDT;
42//!
43//! // Create and populate a CRDT
44//! let mut crdt: CRDT<String, String> = CRDT::new(1, None);
45//! let fields = vec![("name".to_string(), "Alice".to_string())];
46//! crdt.insert_or_update(&"user1".to_string(), fields);
47//!
48//! // Serialize to JSON (requires "json" feature)
49//! let json = crdt.to_json().unwrap();
50//!
51//! // Deserialize from JSON
52//! let restored: CRDT<String, String> = CRDT::from_json(&json).unwrap();
53//!
54//! // Serialize to binary (requires "binary" feature)
55//! let bytes = crdt.to_bytes().unwrap();
56//! let restored: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
57//!
58//! // Or use generic serde with any format (requires "serde" feature)
59//! let json = serde_json::to_string(&crdt).unwrap();
60//! let restored: CRDT<String, String> = serde_json::from_str(&json).unwrap();
61//! ```
62//!
63//! **Note on Parent Relationships**: Parent-child CRDT hierarchies are not serialized.
64//! After deserialization, the `parent` field will always be `None`. Applications must
65//! rebuild parent-child relationships if needed.
66//!
67//! ## Security and Resource Management
68//!
69//! ### Logical Clock Overflow
70//! The logical clock uses `u64` for version numbers. While overflow is theoretically possible
71//! after 2^64 operations (extremely unlikely in practice), applications with extreme longevity
72//! should be aware of this limitation.
73//!
74//! ### DoS Protection and Tombstone Management
75//! Tombstones accumulate indefinitely unless manually compacted. To prevent memory exhaustion:
76//! - Call `compact_tombstones()` periodically after all nodes have acknowledged a version
77//! - Implement application-level rate limiting for operations
78//! - Consider setting resource limits on the number of records and tombstones
79//!
80//! **Important**: Only compact tombstones when ALL participating nodes have acknowledged
81//! the minimum version. Compacting too early may cause deleted records to reappear on
82//! nodes that haven't received the deletion yet.
83
84#![cfg_attr(not(feature = "std"), no_std)]
85
86// Ensure valid feature combination: either std or alloc must be enabled
87#[cfg(not(any(feature = "std", feature = "alloc")))]
88compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
89
90#[cfg(not(feature = "std"))]
91extern crate alloc;
92
93#[cfg(feature = "std")]
94use std::{
95  cmp::Ordering,
96  collections::{HashMap, HashSet},
97  hash::Hash,
98  sync::Arc,
99};
100
101#[cfg(not(feature = "std"))]
102use alloc::{
103  string::String,
104  sync::Arc,
105  vec::Vec,
106};
107#[cfg(not(feature = "std"))]
108use core::{cmp::Ordering, hash::Hash};
109#[cfg(all(not(feature = "std"), feature = "alloc"))]
110use hashbrown::{HashMap, HashSet};
111
112/// Type alias for node IDs
113pub type NodeId = u64;
114
115/// Type alias for column keys (field names)
116pub type ColumnKey = String;
117
118/// Column version used for tombstone changes
119/// Using u64::MAX ensures tombstones are treated as having the highest possible version
120const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
121
122/// Represents a single change in the CRDT.
123///
124/// A change can represent:
125/// - An insertion or update of a column value (when `col_name` is `Some`)
126/// - A deletion of a specific column (when `col_name` is `Some` and `value` is `None`)
127/// - A deletion of an entire record (when `col_name` is `None`)
128#[derive(Debug, Clone, PartialEq)]
129#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
130#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
131#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
132pub struct Change<K, C, V> {
133  pub record_id: K,
134  /// `None` represents tombstone of the record
135  pub col_name: Option<C>,
136  /// `None` represents deletion of the column (not the record)
137  pub value: Option<V>,
138  pub col_version: u64,
139  pub db_version: u64,
140  pub node_id: NodeId,
141  /// Local db_version when the change was created (useful for `get_changes_since`)
142  pub local_db_version: u64,
143  /// Optional flags to indicate the type of change (ephemeral, not stored)
144  pub flags: u32,
145}
146
147impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
148
149impl<K, C, V> Change<K, C, V> {
150  /// Creates a new Change with all parameters
151  #[allow(clippy::too_many_arguments)]
152  pub fn new(
153    record_id: K,
154    col_name: Option<C>,
155    value: Option<V>,
156    col_version: u64,
157    db_version: u64,
158    node_id: NodeId,
159    local_db_version: u64,
160    flags: u32,
161  ) -> Self {
162    Self {
163      record_id,
164      col_name,
165      value,
166      col_version,
167      db_version,
168      node_id,
169      local_db_version,
170      flags,
171    }
172  }
173}
174
175/// Represents version information for a column.
176#[derive(Debug, Clone, Copy, PartialEq, Eq)]
177#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
178pub struct ColumnVersion {
179  pub col_version: u64,
180  pub db_version: u64,
181  pub node_id: NodeId,
182  /// Local db_version when the change was created
183  pub local_db_version: u64,
184}
185
186impl ColumnVersion {
187  pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
188    Self {
189      col_version,
190      db_version,
191      node_id,
192      local_db_version,
193    }
194  }
195}
196
197/// Minimal version information for tombstones.
198///
199/// Stores essential data: db_version for conflict resolution, node_id for sync exclusion,
200/// and local_db_version for sync.
201#[derive(Debug, Clone, Copy, PartialEq, Eq)]
202#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
203pub struct TombstoneInfo {
204  pub db_version: u64,
205  pub node_id: NodeId,
206  pub local_db_version: u64,
207}
208
209impl TombstoneInfo {
210  pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
211    Self {
212      db_version,
213      node_id,
214      local_db_version,
215    }
216  }
217
218  /// Helper to create a ColumnVersion for comparison with regular columns
219  pub fn as_column_version(&self) -> ColumnVersion {
220    ColumnVersion::new(
221      TOMBSTONE_COL_VERSION,
222      self.db_version,
223      self.node_id,
224      self.local_db_version,
225    )
226  }
227}
228
229/// Represents a logical clock for maintaining causality.
230#[derive(Debug, Clone, Copy, PartialEq, Eq)]
231#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
232pub struct LogicalClock {
233  time: u64,
234}
235
236impl LogicalClock {
237  /// Creates a new logical clock starting at 0
238  pub fn new() -> Self {
239    Self { time: 0 }
240  }
241
242  /// Increments the clock for a local event and returns the new time
243  pub fn tick(&mut self) -> u64 {
244    self.time += 1;
245    self.time
246  }
247
248  /// Updates the clock based on a received time and returns the new time
249  pub fn update(&mut self, received_time: u64) -> u64 {
250    self.time = self.time.max(received_time);
251    self.time += 1;
252    self.time
253  }
254
255  /// Sets the logical clock to a specific time
256  pub fn set_time(&mut self, time: u64) {
257    self.time = time;
258  }
259
260  /// Retrieves the current time
261  pub fn current_time(&self) -> u64 {
262    self.time
263  }
264}
265
266impl Default for LogicalClock {
267  fn default() -> Self {
268    Self::new()
269  }
270}
271
272/// Storage for tombstones (deleted records).
273///
274/// Uses a HashMap for efficient lookups and supports compaction.
275#[derive(Debug, Clone)]
276#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
277pub struct TombstoneStorage<K: Hash + Eq> {
278  entries: HashMap<K, TombstoneInfo>,
279}
280
281impl<K: Hash + Eq> TombstoneStorage<K> {
282  pub fn new() -> Self {
283    Self {
284      entries: HashMap::new(),
285    }
286  }
287
288  pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
289    self.entries.insert(key, info);
290  }
291
292  pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
293    self.entries.get(key).copied()
294  }
295
296  pub fn erase(&mut self, key: &K) -> bool {
297    self.entries.remove(key).is_some()
298  }
299
300  pub fn clear(&mut self) {
301    self.entries.clear();
302  }
303
304  pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
305    self.entries.iter()
306  }
307
308  pub fn len(&self) -> usize {
309    self.entries.len()
310  }
311
312  pub fn is_empty(&self) -> bool {
313    self.entries.is_empty()
314  }
315
316  /// Compact tombstones older than the specified version.
317  ///
318  /// Returns the number of tombstones removed.
319  pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
320    let initial_len = self.entries.len();
321    self
322      .entries
323      .retain(|_, info| info.db_version >= min_acknowledged_version);
324    initial_len - self.entries.len()
325  }
326}
327
328impl<K: Hash + Eq> Default for TombstoneStorage<K> {
329  fn default() -> Self {
330    Self::new()
331  }
332}
333
334/// Represents a record in the CRDT.
335#[derive(Debug, Clone)]
336#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
337#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
338#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
339pub struct Record<C, V> {
340  pub fields: HashMap<C, V>,
341  pub column_versions: HashMap<C, ColumnVersion>,
342  /// Track version boundaries for efficient filtering
343  pub lowest_local_db_version: u64,
344  pub highest_local_db_version: u64,
345}
346
347impl<C: Hash + Eq, V> Record<C, V> {
348  pub fn new() -> Self {
349    Self {
350      fields: HashMap::new(),
351      column_versions: HashMap::new(),
352      lowest_local_db_version: u64::MAX,
353      highest_local_db_version: 0,
354    }
355  }
356
357  /// Creates a record from existing fields and column versions
358  pub fn from_parts(
359    fields: HashMap<C, V>,
360    column_versions: HashMap<C, ColumnVersion>,
361  ) -> Self {
362    let mut lowest = u64::MAX;
363    let mut highest = 0;
364
365    for ver in column_versions.values() {
366      if ver.local_db_version < lowest {
367        lowest = ver.local_db_version;
368      }
369      if ver.local_db_version > highest {
370        highest = ver.local_db_version;
371      }
372    }
373
374    Self {
375      fields,
376      column_versions,
377      lowest_local_db_version: lowest,
378      highest_local_db_version: highest,
379    }
380  }
381}
382
383impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
384  fn eq(&self, other: &Self) -> bool {
385    // Compare only fields, not column_versions (those will differ per node)
386    self.fields == other.fields
387  }
388}
389
390impl<C: Hash + Eq, V> Default for Record<C, V> {
391  fn default() -> Self {
392    Self::new()
393  }
394}
395
396/// Trait for merge rules that determine conflict resolution.
397///
398/// Implementations should return `true` if the remote change should be accepted,
399/// `false` otherwise.
400pub trait MergeRule<K, C, V> {
401  /// Determines whether to accept a remote change over a local one
402  fn should_accept(
403    &self,
404    local_col: u64,
405    local_db: u64,
406    local_node: NodeId,
407    remote_col: u64,
408    remote_db: u64,
409    remote_node: NodeId,
410  ) -> bool;
411
412  /// Convenience method for Change objects
413  fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
414    self.should_accept(
415      local.col_version,
416      local.db_version,
417      local.node_id,
418      remote.col_version,
419      remote.db_version,
420      remote.node_id,
421    )
422  }
423}
424
425/// Default merge rule implementing last-write-wins semantics.
426///
427/// Comparison priority:
428/// 1. Column version (higher wins)
429/// 2. DB version (higher wins)
430/// 3. Node ID (higher wins as tiebreaker)
431#[derive(Debug, Clone, Copy, Default)]
432pub struct DefaultMergeRule;
433
434impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
435  fn should_accept(
436    &self,
437    local_col: u64,
438    local_db: u64,
439    local_node: NodeId,
440    remote_col: u64,
441    remote_db: u64,
442    remote_node: NodeId,
443  ) -> bool {
444    match remote_col.cmp(&local_col) {
445      Ordering::Greater => true,
446      Ordering::Less => false,
447      Ordering::Equal => match remote_db.cmp(&local_db) {
448        Ordering::Greater => true,
449        Ordering::Less => false,
450        Ordering::Equal => remote_node > local_node,
451      },
452    }
453  }
454}
455
456/// Trait for change comparators used in sorting and compression.
457pub trait ChangeComparator<K, C, V> {
458  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
459}
460
461/// Default change comparator.
462///
463/// Sorts by:
464/// 1. Record ID (ascending)
465/// 2. Column name presence (deletions/tombstones last)
466/// 3. Column name (ascending)
467/// 4. Column version (descending - most recent first)
468/// 5. DB version (descending)
469/// 6. Node ID (descending)
470#[derive(Debug, Clone, Copy, Default)]
471pub struct DefaultChangeComparator;
472
473impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
474  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
475    // Compare record IDs
476    match a.record_id.cmp(&b.record_id) {
477      Ordering::Equal => {}
478      ord => return ord,
479    }
480
481    // Deletions (None) come last for each record
482    match (a.col_name.as_ref(), b.col_name.as_ref()) {
483      (None, None) => {}
484      (None, Some(_)) => return Ordering::Greater,
485      (Some(_), None) => return Ordering::Less,
486      (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
487        Ordering::Equal => {}
488        ord => return ord,
489      },
490    }
491
492    // Compare versions (descending - most recent first)
493    match b.col_version.cmp(&a.col_version) {
494      Ordering::Equal => {}
495      ord => return ord,
496    }
497
498    match b.db_version.cmp(&a.db_version) {
499      Ordering::Equal => {}
500      ord => return ord,
501    }
502
503    b.node_id.cmp(&a.node_id)
504  }
505}
506
507/// Main CRDT structure, generic over key (K), column (C), and value (V) types.
508///
509/// This implements a column-based CRDT with last-write-wins semantics.
510#[derive(Debug)]
511#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
512#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
513#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
514pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
515  node_id: NodeId,
516  clock: LogicalClock,
517  data: HashMap<K, Record<C, V>>,
518  tombstones: TombstoneStorage<K>,
519  #[cfg_attr(feature = "serde", serde(skip, default))]
520  parent: Option<Arc<CRDT<K, C, V>>>,
521  #[allow(dead_code)]
522  base_version: u64,
523}
524
525impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
526  /// Creates a new empty CRDT.
527  ///
528  /// # Arguments
529  ///
530  /// * `node_id` - Unique identifier for this CRDT node
531  /// * `parent` - Optional parent CRDT for hierarchical structures
532  pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
533    let (clock, base_version) = if let Some(ref p) = parent {
534      let parent_clock = p.clock;
535      let base = parent_clock.current_time();
536      (parent_clock, base)
537    } else {
538      (LogicalClock::new(), 0)
539    };
540
541    Self {
542      node_id,
543      clock,
544      data: HashMap::new(),
545      tombstones: TombstoneStorage::new(),
546      parent,
547      base_version,
548    }
549  }
550
551  /// Creates a CRDT from a list of changes (e.g., loaded from disk).
552  ///
553  /// # Arguments
554  ///
555  /// * `node_id` - The unique identifier for this CRDT node
556  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
557  pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
558    let mut crdt = Self::new(node_id, None);
559    crdt.apply_changes(changes);
560    crdt
561  }
562
563  /// Resets the CRDT to a state as if it was constructed with the given changes.
564  ///
565  /// # Arguments
566  ///
567  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
568  pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
569    self.data.clear();
570    self.tombstones.clear();
571    self.clock = LogicalClock::new();
572    self.apply_changes(changes);
573  }
574
575  /// Applies a list of changes to reconstruct the CRDT state.
576  fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
577    // Determine the maximum db_version from the changes
578    let max_db_version = changes
579      .iter()
580      .map(|c| c.db_version.max(c.local_db_version))
581      .max()
582      .unwrap_or(0);
583
584    // Set the logical clock to the maximum db_version
585    self.clock.set_time(max_db_version);
586
587    // Apply each change to reconstruct the CRDT state
588    for change in changes {
589      let record_id = change.record_id.clone();
590      let col_name = change.col_name.clone();
591      let remote_col_version = change.col_version;
592      let remote_db_version = change.db_version;
593      let remote_node_id = change.node_id;
594      let remote_local_db_version = change.local_db_version;
595      let remote_value = change.value;
596
597      if col_name.is_none() {
598        // Handle deletion
599        self.data.remove(&record_id);
600
601        // Store deletion information in tombstones
602        self.tombstones.insert_or_assign(
603          record_id,
604          TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
605        );
606      } else if let Some(col_key) = col_name {
607        // Handle insertion or update
608        if !self.is_record_tombstoned(&record_id, false) {
609          let record = self.get_or_create_record_unchecked(&record_id, false);
610
611          // Insert or update the field value
612          if let Some(value) = remote_value {
613            record.fields.insert(col_key.clone(), value);
614          }
615
616          // Update the column version info
617          let col_ver = ColumnVersion::new(
618            remote_col_version,
619            remote_db_version,
620            remote_node_id,
621            remote_local_db_version,
622          );
623          record.column_versions.insert(col_key, col_ver);
624
625          // Update version boundaries
626          if remote_local_db_version < record.lowest_local_db_version {
627            record.lowest_local_db_version = remote_local_db_version;
628          }
629          if remote_local_db_version > record.highest_local_db_version {
630            record.highest_local_db_version = remote_local_db_version;
631          }
632        }
633      }
634    }
635  }
636
637  /// Inserts a new record or updates an existing record in the CRDT.
638  ///
639  /// # Arguments
640  ///
641  /// * `record_id` - The unique identifier for the record
642  /// * `fields` - An iterator of (column_name, value) pairs
643  ///
644  /// # Returns
645  ///
646  /// A vector of changes created by this operation
647  #[must_use = "changes should be propagated to other nodes"]
648  pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
649  where
650    I: IntoIterator<Item = (C, V)>,
651  {
652    self.insert_or_update_with_flags(record_id, 0, fields)
653  }
654
655  /// Inserts a new record or updates an existing record with flags.
656  ///
657  /// # Arguments
658  ///
659  /// * `record_id` - The unique identifier for the record
660  /// * `flags` - Flags to indicate the type of change
661  /// * `fields` - An iterator of (column_name, value) pairs
662  ///
663  /// # Returns
664  ///
665  /// A vector of changes created by this operation
666  #[must_use = "changes should be propagated to other nodes"]
667  pub fn insert_or_update_with_flags<I>(
668    &mut self,
669    record_id: &K,
670    flags: u32,
671    fields: I,
672  ) -> Vec<Change<K, C, V>>
673  where
674    I: IntoIterator<Item = (C, V)>,
675  {
676    let db_version = self.clock.tick();
677
678    // Check if the record is tombstoned
679    if self.is_record_tombstoned(record_id, false) {
680      return Vec::new();
681    }
682
683    let mut changes = Vec::new();
684    let node_id = self.node_id; // Store node_id before mutable borrow
685    let record = self.get_or_create_record_unchecked(record_id, false);
686
687    for (col_name, value) in fields {
688      let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
689        col_info.col_version += 1;
690        col_info.db_version = db_version;
691        col_info.node_id = node_id;
692        col_info.local_db_version = db_version;
693        col_info.col_version
694      } else {
695        record.column_versions.insert(
696          col_name.clone(),
697          ColumnVersion::new(1, db_version, node_id, db_version),
698        );
699        1
700      };
701
702      // Update record version boundaries
703      if db_version < record.lowest_local_db_version {
704        record.lowest_local_db_version = db_version;
705      }
706      if db_version > record.highest_local_db_version {
707        record.highest_local_db_version = db_version;
708      }
709
710      record.fields.insert(col_name.clone(), value.clone());
711      changes.push(Change::new(
712        record_id.clone(),
713        Some(col_name),
714        Some(value),
715        col_version,
716        db_version,
717        node_id,
718        db_version,
719        flags,
720      ));
721    }
722
723    changes
724  }
725
726  /// Deletes a record by marking it as tombstoned.
727  ///
728  /// # Arguments
729  ///
730  /// * `record_id` - The unique identifier for the record
731  ///
732  /// # Returns
733  ///
734  /// An optional Change representing the deletion
735  #[must_use = "changes should be propagated to other nodes"]
736  pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
737    self.delete_record_with_flags(record_id, 0)
738  }
739
740  /// Deletes a record with flags.
741  ///
742  /// # Arguments
743  ///
744  /// * `record_id` - The unique identifier for the record
745  /// * `flags` - Flags to indicate the type of change
746  ///
747  /// # Returns
748  ///
749  /// An optional Change representing the deletion
750  #[must_use = "changes should be propagated to other nodes"]
751  pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
752    if self.is_record_tombstoned(record_id, false) {
753      return None;
754    }
755
756    let db_version = self.clock.tick();
757
758    // Mark as tombstone and remove data
759    self.data.remove(record_id);
760
761    // Store deletion information in tombstones
762    self.tombstones.insert_or_assign(
763      record_id.clone(),
764      TombstoneInfo::new(db_version, self.node_id, db_version),
765    );
766
767    Some(Change::new(
768      record_id.clone(),
769      None,
770      None,
771      TOMBSTONE_COL_VERSION,
772      db_version,
773      self.node_id,
774      db_version,
775      flags,
776    ))
777  }
778
779  /// Deletes a specific field from a record.
780  ///
781  /// # Arguments
782  ///
783  /// * `record_id` - The unique identifier for the record
784  /// * `field_name` - The name of the field to delete
785  ///
786  /// # Returns
787  ///
788  /// An optional Change representing the field deletion. Returns None if:
789  /// - The record is tombstoned
790  /// - The record doesn't exist
791  /// - The field doesn't exist in the record
792  #[must_use = "changes should be propagated to other nodes"]
793  pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
794    self.delete_field_with_flags(record_id, field_name, 0)
795  }
796
797  /// Deletes a specific field from a record with flags.
798  ///
799  /// # Arguments
800  ///
801  /// * `record_id` - The unique identifier for the record
802  /// * `field_name` - The name of the field to delete
803  /// * `flags` - Flags to indicate the type of change
804  ///
805  /// # Returns
806  ///
807  /// An optional Change representing the field deletion. Returns None if:
808  /// - The record is tombstoned
809  /// - The record doesn't exist
810  /// - The field doesn't exist in the record
811  #[must_use = "changes should be propagated to other nodes"]
812  pub fn delete_field_with_flags(
813    &mut self,
814    record_id: &K,
815    field_name: &C,
816    flags: u32,
817  ) -> Option<Change<K, C, V>> {
818    // Check if the record is tombstoned
819    if self.is_record_tombstoned(record_id, false) {
820      return None;
821    }
822
823    // Get the record (return None if it doesn't exist)
824    let record = self.data.get_mut(record_id)?;
825
826    // Check if the field exists
827    if !record.fields.contains_key(field_name) {
828      return None;
829    }
830
831    let db_version = self.clock.tick();
832
833    // Get or create column version and increment it
834    let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
835      col_info.col_version += 1;
836      col_info.db_version = db_version;
837      col_info.node_id = self.node_id;
838      col_info.local_db_version = db_version;
839      col_info.col_version
840    } else {
841      // This shouldn't happen if the field exists, but handle it gracefully
842      record.column_versions.insert(
843        field_name.clone(),
844        ColumnVersion::new(1, db_version, self.node_id, db_version),
845      );
846      1
847    };
848
849    // Update record version boundaries
850    if db_version < record.lowest_local_db_version {
851      record.lowest_local_db_version = db_version;
852    }
853    if db_version > record.highest_local_db_version {
854      record.highest_local_db_version = db_version;
855    }
856
857    // Remove the field from the record (but keep the ColumnVersion as field tombstone)
858    record.fields.remove(field_name);
859
860    Some(Change::new(
861      record_id.clone(),
862      Some(field_name.clone()),
863      None, // None value indicates field deletion
864      col_version,
865      db_version,
866      self.node_id,
867      db_version,
868      flags,
869    ))
870  }
871
872  /// Merges incoming changes into the CRDT.
873  ///
874  /// # Arguments
875  ///
876  /// * `changes` - Vector of changes to merge
877  /// * `merge_rule` - The merge rule to use for conflict resolution
878  ///
879  /// # Returns
880  ///
881  /// Vector of accepted changes (if requested)
882  pub fn merge_changes<R: MergeRule<K, C, V>>(
883    &mut self,
884    changes: Vec<Change<K, C, V>>,
885    merge_rule: &R,
886  ) -> Vec<Change<K, C, V>> {
887    self.merge_changes_impl(changes, false, merge_rule)
888  }
889
890  fn merge_changes_impl<R: MergeRule<K, C, V>>(
891    &mut self,
892    changes: Vec<Change<K, C, V>>,
893    ignore_parent: bool,
894    merge_rule: &R,
895  ) -> Vec<Change<K, C, V>> {
896    let mut accepted_changes = Vec::new();
897
898    if changes.is_empty() {
899      return accepted_changes;
900    }
901
902    for change in changes {
903      // Move values from change to avoid unnecessary clones
904      let Change {
905        record_id,
906        col_name,
907        value: remote_value,
908        col_version: remote_col_version,
909        db_version: remote_db_version,
910        node_id: remote_node_id,
911        flags,
912        ..
913      } = change;
914
915      // Always update the logical clock to maintain causal consistency
916      let new_local_db_version = self.clock.update(remote_db_version);
917
918      // Skip all changes for tombstoned records
919      if self.is_record_tombstoned(&record_id, ignore_parent) {
920        continue;
921      }
922
923      // Retrieve local column version information
924      let local_col_info = if col_name.is_none() {
925        // For deletions, check tombstones
926        self
927          .tombstones
928          .find(&record_id)
929          .map(|info| info.as_column_version())
930      } else if let Some(ref col) = col_name {
931        // For column updates, check the record
932        self
933          .get_record_ptr(&record_id, ignore_parent)
934          .and_then(|record| record.column_versions.get(col).copied())
935      } else {
936        None
937      };
938
939      // Determine whether to accept the remote change
940      let should_accept = if let Some(local_info) = local_col_info {
941        merge_rule.should_accept(
942          local_info.col_version,
943          local_info.db_version,
944          local_info.node_id,
945          remote_col_version,
946          remote_db_version,
947          remote_node_id,
948        )
949      } else {
950        true
951      };
952
953      if should_accept {
954        if let Some(col_key) = col_name {
955          // Handle insertion or update
956          let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
957
958          // Update field value
959          if let Some(value) = remote_value.clone() {
960            record.fields.insert(col_key.clone(), value);
961          } else {
962            // If remote_value is None, remove the field
963            record.fields.remove(&col_key);
964          }
965
966          // Update the column version info and record version boundaries
967          record.column_versions.insert(
968            col_key.clone(),
969            ColumnVersion::new(
970              remote_col_version,
971              remote_db_version,
972              remote_node_id,
973              new_local_db_version,
974            ),
975          );
976
977          // Update version boundaries
978          if new_local_db_version < record.lowest_local_db_version {
979            record.lowest_local_db_version = new_local_db_version;
980          }
981          if new_local_db_version > record.highest_local_db_version {
982            record.highest_local_db_version = new_local_db_version;
983          }
984
985          accepted_changes.push(Change::new(
986            record_id,
987            Some(col_key),
988            remote_value,
989            remote_col_version,
990            remote_db_version,
991            remote_node_id,
992            new_local_db_version,
993            flags,
994          ));
995        } else {
996          // Handle deletion
997          self.data.remove(&record_id);
998
999          // Store deletion information in tombstones
1000          self.tombstones.insert_or_assign(
1001            record_id.clone(),
1002            TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1003          );
1004
1005          accepted_changes.push(Change::new(
1006            record_id,
1007            None,
1008            None,
1009            remote_col_version,
1010            remote_db_version,
1011            remote_node_id,
1012            new_local_db_version,
1013            flags,
1014          ));
1015        }
1016      }
1017    }
1018
1019    accepted_changes
1020  }
1021
1022  /// Retrieves all changes since a given `last_db_version`.
1023  ///
1024  /// # Arguments
1025  ///
1026  /// * `last_db_version` - The database version to retrieve changes since
1027  ///
1028  /// # Returns
1029  ///
1030  /// A vector of changes
1031  #[must_use]
1032  pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1033  where
1034    K: Ord,
1035    C: Ord,
1036  {
1037    self.get_changes_since_excluding(last_db_version, &HashSet::new())
1038  }
1039
1040  /// Retrieves all changes since a given `last_db_version`, excluding specific nodes.
1041  pub fn get_changes_since_excluding(
1042    &self,
1043    last_db_version: u64,
1044    excluding: &HashSet<NodeId>,
1045  ) -> Vec<Change<K, C, V>>
1046  where
1047    K: Ord,
1048    C: Ord,
1049  {
1050    let mut changes = Vec::new();
1051
1052    // Get changes from parent
1053    if let Some(ref parent) = self.parent {
1054      let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1055      changes.extend(parent_changes);
1056    }
1057
1058    // Get changes from regular records
1059    for (record_id, record) in &self.data {
1060      // Skip records that haven't changed since last_db_version
1061      if record.highest_local_db_version <= last_db_version {
1062        continue;
1063      }
1064
1065      for (col_name, clock_info) in &record.column_versions {
1066        if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1067        {
1068          let value = record.fields.get(col_name).cloned();
1069
1070          changes.push(Change::new(
1071            record_id.clone(),
1072            Some(col_name.clone()),
1073            value,
1074            clock_info.col_version,
1075            clock_info.db_version,
1076            clock_info.node_id,
1077            clock_info.local_db_version,
1078            0,
1079          ));
1080        }
1081      }
1082    }
1083
1084    // Get deletion changes from tombstones
1085    for (record_id, tombstone_info) in self.tombstones.iter() {
1086      if tombstone_info.local_db_version > last_db_version
1087        && !excluding.contains(&tombstone_info.node_id)
1088      {
1089        changes.push(Change::new(
1090          record_id.clone(),
1091          None,
1092          None,
1093          TOMBSTONE_COL_VERSION,
1094          tombstone_info.db_version,
1095          tombstone_info.node_id,
1096          tombstone_info.local_db_version,
1097          0,
1098        ));
1099      }
1100    }
1101
1102    if self.parent.is_some() {
1103      // Compress changes to remove redundant operations
1104      Self::compress_changes(&mut changes);
1105    }
1106
1107    changes
1108  }
1109
1110  /// Compresses a vector of changes in-place by removing redundant changes.
1111  ///
1112  /// Changes are sorted and then compressed using a two-pointer technique.
1113  ///
1114  /// # Performance
1115  ///
1116  /// This method uses `sort_unstable_by` which provides O(n log n) average time complexity
1117  /// but does not preserve the relative order of equal elements. Since the comparator
1118  /// provides a total ordering, stability is not required.
1119  pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1120  where
1121    K: Ord,
1122    C: Ord,
1123  {
1124    if changes.is_empty() {
1125      return;
1126    }
1127
1128    // Sort changes using the DefaultChangeComparator
1129    // Use sort_unstable for better performance since we don't need stable sorting
1130    let comparator = DefaultChangeComparator;
1131    changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1132
1133    // Use two-pointer technique to compress in-place
1134    let mut write = 0;
1135    for read in 1..changes.len() {
1136      if changes[read].record_id != changes[write].record_id {
1137        // New record, always keep it
1138        write += 1;
1139        if write != read {
1140          changes[write] = changes[read].clone();
1141        }
1142      } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1143        // Current read is a deletion, backtrack to first change for this record
1144        // and replace it with the deletion, effectively discarding all field updates
1145        let mut first_pos = write;
1146        while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1147          first_pos -= 1;
1148        }
1149        changes[first_pos] = changes[read].clone();
1150        write = first_pos;
1151      } else if changes[read].col_name != changes[write].col_name
1152        && changes[write].col_name.is_some()
1153      {
1154        // New column for the same record
1155        write += 1;
1156        if write != read {
1157          changes[write] = changes[read].clone();
1158        }
1159      }
1160      // Else: same record and column, keep the existing one (most recent due to sorting)
1161    }
1162
1163    changes.truncate(write + 1);
1164  }
1165
1166  /// Retrieves a reference to a record if it exists.
1167  pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1168    self.get_record_ptr(record_id, false)
1169  }
1170
1171  /// Checks if a record is tombstoned.
1172  pub fn is_tombstoned(&self, record_id: &K) -> bool {
1173    self.is_record_tombstoned(record_id, false)
1174  }
1175
1176  /// Gets tombstone information for a record.
1177  pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1178    if let Some(info) = self.tombstones.find(record_id) {
1179      return Some(info);
1180    }
1181
1182    if let Some(ref parent) = self.parent {
1183      return parent.get_tombstone(record_id);
1184    }
1185
1186    None
1187  }
1188
1189  /// Removes tombstones older than the specified version.
1190  ///
1191  /// # Safety and DoS Mitigation
1192  ///
1193  /// **IMPORTANT**: Only call this method when ALL participating nodes have acknowledged
1194  /// the `min_acknowledged_version`. Compacting too early may cause deleted records to
1195  /// reappear on nodes that haven't received the deletion yet.
1196  ///
1197  /// To prevent DoS via tombstone accumulation:
1198  /// - Call this method periodically as part of your sync protocol
1199  /// - Track which versions have been acknowledged by all nodes
1200  /// - Consider implementing a tombstone limit and rejecting operations when exceeded
1201  ///
1202  /// # Arguments
1203  ///
1204  /// * `min_acknowledged_version` - Tombstones with db_version < this value will be removed
1205  ///
1206  /// # Returns
1207  ///
1208  /// The number of tombstones removed
1209  pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1210    self.tombstones.compact(min_acknowledged_version)
1211  }
1212
1213  /// Gets the number of tombstones currently stored.
1214  pub fn tombstone_count(&self) -> usize {
1215    self.tombstones.len()
1216  }
1217
1218  /// Gets the current logical clock.
1219  pub fn get_clock(&self) -> &LogicalClock {
1220    &self.clock
1221  }
1222
1223  /// Gets a reference to the internal data map.
1224  pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1225    &self.data
1226  }
1227
1228  /// Serializes the CRDT to a JSON string.
1229  ///
1230  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1231  ///
1232  /// # Errors
1233  ///
1234  /// Returns an error if serialization fails.
1235  #[cfg(feature = "json")]
1236  pub fn to_json(&self) -> Result<String, serde_json::Error>
1237  where
1238    K: serde::Serialize,
1239    C: serde::Serialize,
1240    V: serde::Serialize,
1241  {
1242    serde_json::to_string(self)
1243  }
1244
1245  /// Deserializes a CRDT from a JSON string.
1246  ///
1247  /// Note: The parent relationship is not deserialized and will be `None`.
1248  /// Applications must rebuild parent-child relationships if needed.
1249  ///
1250  /// # Errors
1251  ///
1252  /// Returns an error if deserialization fails.
1253  #[cfg(feature = "json")]
1254  pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1255  where
1256    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1257    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1258    V: serde::de::DeserializeOwned + Clone,
1259  {
1260    serde_json::from_str(json)
1261  }
1262
1263  /// Serializes the CRDT to bytes using bincode.
1264  ///
1265  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1266  ///
1267  /// # Errors
1268  ///
1269  /// Returns an error if serialization fails.
1270  #[cfg(feature = "binary")]
1271  pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1272  where
1273    K: serde::Serialize,
1274    C: serde::Serialize,
1275    V: serde::Serialize,
1276  {
1277    bincode::serde::encode_to_vec(self, bincode::config::standard())
1278  }
1279
1280  /// Deserializes a CRDT from bytes using bincode.
1281  ///
1282  /// Note: The parent relationship is not deserialized and will be `None`.
1283  /// Applications must rebuild parent-child relationships if needed.
1284  ///
1285  /// # Errors
1286  ///
1287  /// Returns an error if deserialization fails.
1288  #[cfg(feature = "binary")]
1289  pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1290  where
1291    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1292    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1293    V: serde::de::DeserializeOwned + Clone,
1294  {
1295    let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1296    Ok(result)
1297  }
1298
1299  // Helper methods
1300
1301  fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1302    if self.tombstones.find(record_id).is_some() {
1303      return true;
1304    }
1305
1306    if !ignore_parent {
1307      if let Some(ref parent) = self.parent {
1308        return parent.is_record_tombstoned(record_id, false);
1309      }
1310    }
1311
1312    false
1313  }
1314
1315  fn get_or_create_record_unchecked(
1316    &mut self,
1317    record_id: &K,
1318    ignore_parent: bool,
1319  ) -> &mut Record<C, V> {
1320    #[cfg(feature = "std")]
1321    use std::collections::hash_map::Entry;
1322    #[cfg(all(not(feature = "std"), feature = "alloc"))]
1323    use hashbrown::hash_map::Entry;
1324
1325    match self.data.entry(record_id.clone()) {
1326      Entry::Occupied(e) => e.into_mut(),
1327      Entry::Vacant(e) => {
1328        let record = if !ignore_parent {
1329          self
1330            .parent
1331            .as_ref()
1332            .and_then(|p| p.get_record_ptr(record_id, false))
1333            .cloned()
1334            .unwrap_or_else(Record::new)
1335        } else {
1336          Record::new()
1337        };
1338        e.insert(record)
1339      }
1340    }
1341  }
1342
1343  fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1344    if let Some(record) = self.data.get(record_id) {
1345      return Some(record);
1346    }
1347
1348    if !ignore_parent {
1349      if let Some(ref parent) = self.parent {
1350        return parent.get_record_ptr(record_id, false);
1351      }
1352    }
1353
1354    None
1355  }
1356}
1357
1358#[cfg(test)]
1359mod tests {
1360  use super::*;
1361
1362  #[test]
1363  fn test_logical_clock() {
1364    let mut clock = LogicalClock::new();
1365    assert_eq!(clock.current_time(), 0);
1366
1367    let t1 = clock.tick();
1368    assert_eq!(t1, 1);
1369    assert_eq!(clock.current_time(), 1);
1370
1371    let t2 = clock.update(5);
1372    assert_eq!(t2, 6);
1373    assert_eq!(clock.current_time(), 6);
1374  }
1375
1376  #[test]
1377  fn test_tombstone_storage() {
1378    let mut storage = TombstoneStorage::new();
1379    let info = TombstoneInfo::new(10, 1, 10);
1380
1381    storage.insert_or_assign("key1".to_string(), info);
1382    assert_eq!(storage.len(), 1);
1383
1384    assert_eq!(storage.find(&"key1".to_string()), Some(info));
1385    assert_eq!(storage.find(&"key2".to_string()), None);
1386
1387    let removed = storage.compact(15);
1388    assert_eq!(removed, 1);
1389    assert_eq!(storage.len(), 0);
1390  }
1391
1392  #[test]
1393  fn test_basic_insert() {
1394    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1395
1396    let fields = vec![
1397      ("name".to_string(), "Alice".to_string()),
1398      ("age".to_string(), "30".to_string()),
1399    ];
1400
1401    let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1402
1403    assert_eq!(changes.len(), 2);
1404    assert_eq!(crdt.get_data().len(), 1);
1405
1406    let record = crdt.get_record(&"user1".to_string()).unwrap();
1407    assert_eq!(record.fields.get("name").unwrap(), "Alice");
1408    assert_eq!(record.fields.get("age").unwrap(), "30");
1409  }
1410
1411  #[test]
1412  fn test_delete_record() {
1413    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1414
1415    let fields = vec![("name".to_string(), "Bob".to_string())];
1416    let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1417
1418    let delete_change = crdt.delete_record(&"user2".to_string());
1419    assert!(delete_change.is_some());
1420    assert!(crdt.is_tombstoned(&"user2".to_string()));
1421    assert_eq!(crdt.get_data().len(), 0);
1422  }
1423
1424  #[test]
1425  fn test_merge_changes() {
1426    let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1427    let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1428
1429    let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1430    let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1431
1432    let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1433    let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1434
1435    let merge_rule = DefaultMergeRule;
1436    crdt1.merge_changes(changes2, &merge_rule);
1437    crdt2.merge_changes(changes1, &merge_rule);
1438
1439    // Node2 has higher node_id, so its value should win
1440    assert_eq!(
1441      crdt1
1442        .get_record(&"record1".to_string())
1443        .unwrap()
1444        .fields
1445        .get("tag")
1446        .unwrap(),
1447      "Node2"
1448    );
1449    assert_eq!(crdt1.get_data(), crdt2.get_data());
1450  }
1451
1452  #[test]
1453  #[cfg(feature = "serde")]
1454  fn test_change_serialization() {
1455    #[allow(unused_variables)]
1456    let change = Change::new(
1457      "record1".to_string(),
1458      Some("name".to_string()),
1459      Some("Alice".to_string()),
1460      1,
1461      10,
1462      1,
1463      10,
1464      0,
1465    );
1466
1467    // Test JSON serialization
1468    #[cfg(feature = "json")]
1469    {
1470      let json = serde_json::to_string(&change).unwrap();
1471      let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1472      assert_eq!(change, deserialized);
1473    }
1474
1475    // Test binary serialization
1476    #[cfg(feature = "binary")]
1477    {
1478      let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1479      let (deserialized, _): (Change<String, String, String>, _) =
1480        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1481      assert_eq!(change, deserialized);
1482    }
1483  }
1484
1485  #[test]
1486  #[cfg(feature = "serde")]
1487  fn test_record_serialization() {
1488    let mut fields = HashMap::new();
1489    fields.insert("name".to_string(), "Bob".to_string());
1490    fields.insert("age".to_string(), "25".to_string());
1491
1492    let mut column_versions = HashMap::new();
1493    column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1494    column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1495
1496    #[allow(unused_variables)]
1497    let record = Record::from_parts(fields, column_versions);
1498
1499    // Test JSON serialization
1500    #[cfg(feature = "json")]
1501    {
1502      let json = serde_json::to_string(&record).unwrap();
1503      let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1504      assert_eq!(record, deserialized);
1505    }
1506
1507    // Test binary serialization
1508    #[cfg(feature = "binary")]
1509    {
1510      let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1511      let (deserialized, _): (Record<String, String>, _) =
1512        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1513      assert_eq!(record, deserialized);
1514    }
1515  }
1516
1517  #[test]
1518  #[cfg(feature = "json")]
1519  fn test_crdt_json_serialization() {
1520    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1521
1522    // Add some data
1523    let fields = vec![
1524      ("name".to_string(), "Alice".to_string()),
1525      ("age".to_string(), "30".to_string()),
1526    ];
1527    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1528
1529    let fields2 = vec![("name".to_string(), "Bob".to_string())];
1530    let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1531
1532    // Delete one record
1533    let _ = crdt.delete_record(&"user2".to_string());
1534
1535    // Serialize to JSON
1536    let json = crdt.to_json().unwrap();
1537
1538    // Deserialize
1539    let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1540
1541    // Verify data is preserved
1542    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1543    assert_eq!(
1544      crdt.get_record(&"user1".to_string()).unwrap().fields,
1545      deserialized.get_record(&"user1".to_string()).unwrap().fields
1546    );
1547
1548    // Verify tombstones are preserved
1549    assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1550    assert!(deserialized.is_tombstoned(&"user2".to_string()));
1551
1552    // Verify clock is preserved
1553    assert_eq!(
1554      crdt.get_clock().current_time(),
1555      deserialized.get_clock().current_time()
1556    );
1557
1558    // Verify parent is None after deserialization
1559    let has_parent = deserialized.parent.is_some();
1560    assert!(!has_parent);
1561  }
1562
1563  #[test]
1564  #[cfg(feature = "binary")]
1565  fn test_crdt_binary_serialization() {
1566    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1567
1568    // Add some data
1569    let fields = vec![
1570      ("name".to_string(), "Alice".to_string()),
1571      ("age".to_string(), "30".to_string()),
1572    ];
1573    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1574
1575    // Serialize to bytes
1576    let bytes = crdt.to_bytes().unwrap();
1577
1578    // Deserialize
1579    let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1580
1581    // Verify data is preserved
1582    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1583    assert_eq!(
1584      crdt.get_record(&"user1".to_string()).unwrap().fields,
1585      deserialized.get_record(&"user1".to_string()).unwrap().fields
1586    );
1587
1588    // Verify clock is preserved
1589    assert_eq!(
1590      crdt.get_clock().current_time(),
1591      deserialized.get_clock().current_time()
1592    );
1593  }
1594
1595  #[test]
1596  #[cfg(feature = "serde")]
1597  fn test_parent_not_serialized() {
1598    // Create a parent CRDT
1599    let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1600    let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1601    let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1602
1603    // Create a child CRDT with parent
1604    let parent_arc = Arc::new(parent);
1605    let mut child = CRDT::new(2, Some(parent_arc.clone()));
1606    let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1607    let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1608
1609    // Serialize and deserialize the child
1610    #[cfg(feature = "json")]
1611    {
1612      let json = serde_json::to_string(&child).unwrap();
1613      let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1614
1615      // Verify parent is None
1616      assert!(deserialized.parent.is_none());
1617
1618      // Verify child's own data is preserved
1619      assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1620
1621      // Verify parent's data is NOT in deserialized child
1622      assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1623    }
1624  }
1625}