crdt_lite/
lib.rs

1//! # crdt-lite
2//!
3//! A lightweight, column-based CRDT (Conflict-free Replicated Data Type) implementation in Rust.
4//!
5//! This library provides a generic CRDT with last-write-wins semantics, supporting:
6//! - Generic key and value types
7//! - Logical clock for causality tracking
8//! - Tombstone-based deletion
9//! - Parent-child CRDT hierarchies
10//! - Custom merge rules and comparators
11//! - Change compression
12//! - Optional serialization support via Serde
13//!
14//! ## Features
15//!
16//! This crate provides optional features through feature flags:
17//!
18//! - `std` - Enables standard library support (enabled by default)
19//! - `alloc` - Enables allocator support for no_std environments (pulls in hashbrown for HashMap)
20//! - `serde` - Enables Serde support for all CRDT types
21//! - `json` - Enables JSON serialization (includes `serde` + `serde_json`)
22//! - `binary` - Enables binary serialization (includes `serde` + `bincode`)
23//! - `node-id-u128` - Uses u128 for NodeId instead of u64 (useful for UUID-based node IDs)
24//!
25//! ### no_std Support
26//!
27//! For no_std environments, disable default features and enable the `alloc` feature:
28//!
29//! ```toml
30//! [dependencies]
31//! crdt-lite = { version = "0.2", default-features = false, features = ["alloc"] }
32//! ```
33//!
34//! ### Usage Example with Serialization
35//!
36//! ```toml
37//! [dependencies]
38//! crdt-lite = { version = "0.1", features = ["json", "binary"] }
39//! ```
40//!
41//! ```rust,ignore
42//! use crdt_lite::CRDT;
43//!
44//! // Create and populate a CRDT
45//! let mut crdt: CRDT<String, String> = CRDT::new(1, None);
46//! let fields = vec![("name".to_string(), "Alice".to_string())];
47//! crdt.insert_or_update(&"user1".to_string(), fields);
48//!
49//! // Serialize to JSON (requires "json" feature)
50//! let json = crdt.to_json().unwrap();
51//!
52//! // Deserialize from JSON
53//! let restored: CRDT<String, String> = CRDT::from_json(&json).unwrap();
54//!
55//! // Serialize to binary (requires "binary" feature)
56//! let bytes = crdt.to_bytes().unwrap();
57//! let restored: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
58//!
59//! // Or use generic serde with any format (requires "serde" feature)
60//! let json = serde_json::to_string(&crdt).unwrap();
61//! let restored: CRDT<String, String> = serde_json::from_str(&json).unwrap();
62//! ```
63//!
64//! **Note on Parent Relationships**: Parent-child CRDT hierarchies are not serialized.
65//! After deserialization, the `parent` field will always be `None`. Applications must
66//! rebuild parent-child relationships if needed.
67//!
68//! ## Security and Resource Management
69//!
70//! ### Logical Clock Overflow
71//! The logical clock uses `u64` for version numbers. While overflow is theoretically possible
72//! after 2^64 operations (extremely unlikely in practice), applications with extreme longevity
73//! should be aware of this limitation.
74//!
75//! ### DoS Protection and Tombstone Management
76//! Tombstones accumulate indefinitely unless manually compacted. To prevent memory exhaustion:
77//! - Call `compact_tombstones()` periodically after all nodes have acknowledged a version
78//! - Implement application-level rate limiting for operations
79//! - Consider setting resource limits on the number of records and tombstones
80//!
81//! **Important**: Only compact tombstones when ALL participating nodes have acknowledged
82//! the minimum version. Compacting too early may cause deleted records to reappear on
83//! nodes that haven't received the deletion yet.
84
85#![cfg_attr(not(feature = "std"), no_std)]
86
87// Ensure valid feature combination: either std or alloc must be enabled
88#[cfg(not(any(feature = "std", feature = "alloc")))]
89compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
90
91#[cfg(not(feature = "std"))]
92extern crate alloc;
93
94#[cfg(feature = "std")]
95use std::{
96  cmp::Ordering,
97  collections::{HashMap, HashSet},
98  hash::Hash,
99  sync::Arc,
100};
101
102#[cfg(not(feature = "std"))]
103use alloc::{
104  string::String,
105  sync::Arc,
106  vec::Vec,
107};
108#[cfg(not(feature = "std"))]
109use core::{cmp::Ordering, hash::Hash};
110#[cfg(all(not(feature = "std"), feature = "alloc"))]
111use hashbrown::{HashMap, HashSet};
112
113/// Type alias for node IDs
114///
115/// By default, NodeId is u64. Use the `node-id-u128` feature to enable u128 for UUID-based IDs:
116/// ```toml
117/// crdt-lite = { version = "0.4", features = ["node-id-u128"] }
118/// ```
119#[cfg(feature = "node-id-u128")]
120pub type NodeId = u128;
121
122#[cfg(not(feature = "node-id-u128"))]
123pub type NodeId = u64;
124
125/// Type alias for column keys (field names)
126pub type ColumnKey = String;
127
128/// Column version used for tombstone changes
129/// Using u64::MAX ensures tombstones are treated as having the highest possible version
130const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
131
132/// Represents a single change in the CRDT.
133///
134/// A change can represent:
135/// - An insertion or update of a column value (when `col_name` is `Some`)
136/// - A deletion of a specific column (when `col_name` is `Some` and `value` is `None`)
137/// - A deletion of an entire record (when `col_name` is `None`)
138#[derive(Debug, Clone, PartialEq)]
139#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
140#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
141#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
142pub struct Change<K, C, V> {
143  pub record_id: K,
144  /// `None` represents tombstone of the record
145  pub col_name: Option<C>,
146  /// `None` represents deletion of the column (not the record)
147  pub value: Option<V>,
148  pub col_version: u64,
149  pub db_version: u64,
150  pub node_id: NodeId,
151  /// Local db_version when the change was created (useful for `get_changes_since`)
152  pub local_db_version: u64,
153  /// Optional flags to indicate the type of change (ephemeral, not stored)
154  pub flags: u32,
155}
156
157impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
158
159impl<K, C, V> Change<K, C, V> {
160  /// Creates a new Change with all parameters
161  #[allow(clippy::too_many_arguments)]
162  pub fn new(
163    record_id: K,
164    col_name: Option<C>,
165    value: Option<V>,
166    col_version: u64,
167    db_version: u64,
168    node_id: NodeId,
169    local_db_version: u64,
170    flags: u32,
171  ) -> Self {
172    Self {
173      record_id,
174      col_name,
175      value,
176      col_version,
177      db_version,
178      node_id,
179      local_db_version,
180      flags,
181    }
182  }
183}
184
185/// Represents version information for a column.
186#[derive(Debug, Clone, Copy, PartialEq, Eq)]
187#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
188pub struct ColumnVersion {
189  pub col_version: u64,
190  pub db_version: u64,
191  pub node_id: NodeId,
192  /// Local db_version when the change was created
193  pub local_db_version: u64,
194}
195
196impl ColumnVersion {
197  pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
198    Self {
199      col_version,
200      db_version,
201      node_id,
202      local_db_version,
203    }
204  }
205}
206
207/// Minimal version information for tombstones.
208///
209/// Stores essential data: db_version for conflict resolution, node_id for sync exclusion,
210/// and local_db_version for sync.
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
213pub struct TombstoneInfo {
214  pub db_version: u64,
215  pub node_id: NodeId,
216  pub local_db_version: u64,
217}
218
219impl TombstoneInfo {
220  pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
221    Self {
222      db_version,
223      node_id,
224      local_db_version,
225    }
226  }
227
228  /// Helper to create a ColumnVersion for comparison with regular columns
229  pub fn as_column_version(&self) -> ColumnVersion {
230    ColumnVersion::new(
231      TOMBSTONE_COL_VERSION,
232      self.db_version,
233      self.node_id,
234      self.local_db_version,
235    )
236  }
237}
238
239/// Represents a logical clock for maintaining causality.
240#[derive(Debug, Clone, Copy, PartialEq, Eq)]
241#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
242pub struct LogicalClock {
243  time: u64,
244}
245
246impl LogicalClock {
247  /// Creates a new logical clock starting at 0
248  pub fn new() -> Self {
249    Self { time: 0 }
250  }
251
252  /// Increments the clock for a local event and returns the new time
253  pub fn tick(&mut self) -> u64 {
254    self.time += 1;
255    self.time
256  }
257
258  /// Updates the clock based on a received time and returns the new time
259  pub fn update(&mut self, received_time: u64) -> u64 {
260    self.time = self.time.max(received_time);
261    self.time += 1;
262    self.time
263  }
264
265  /// Sets the logical clock to a specific time
266  pub fn set_time(&mut self, time: u64) {
267    self.time = time;
268  }
269
270  /// Retrieves the current time
271  pub fn current_time(&self) -> u64 {
272    self.time
273  }
274}
275
276impl Default for LogicalClock {
277  fn default() -> Self {
278    Self::new()
279  }
280}
281
282/// Storage for tombstones (deleted records).
283///
284/// Uses a HashMap for efficient lookups and supports compaction.
285#[derive(Debug, Clone)]
286#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
287pub struct TombstoneStorage<K: Hash + Eq> {
288  entries: HashMap<K, TombstoneInfo>,
289}
290
291impl<K: Hash + Eq> TombstoneStorage<K> {
292  pub fn new() -> Self {
293    Self {
294      entries: HashMap::new(),
295    }
296  }
297
298  pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
299    self.entries.insert(key, info);
300  }
301
302  pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
303    self.entries.get(key).copied()
304  }
305
306  pub fn erase(&mut self, key: &K) -> bool {
307    self.entries.remove(key).is_some()
308  }
309
310  pub fn clear(&mut self) {
311    self.entries.clear();
312  }
313
314  pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
315    self.entries.iter()
316  }
317
318  pub fn len(&self) -> usize {
319    self.entries.len()
320  }
321
322  pub fn is_empty(&self) -> bool {
323    self.entries.is_empty()
324  }
325
326  /// Compact tombstones older than the specified version.
327  ///
328  /// Returns the number of tombstones removed.
329  pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
330    let initial_len = self.entries.len();
331    self
332      .entries
333      .retain(|_, info| info.db_version >= min_acknowledged_version);
334    initial_len - self.entries.len()
335  }
336}
337
338impl<K: Hash + Eq> Default for TombstoneStorage<K> {
339  fn default() -> Self {
340    Self::new()
341  }
342}
343
344/// Represents a record in the CRDT.
345#[derive(Debug, Clone)]
346#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
347#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
348#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
349pub struct Record<C, V> {
350  pub fields: HashMap<C, V>,
351  pub column_versions: HashMap<C, ColumnVersion>,
352  /// Track version boundaries for efficient filtering
353  pub lowest_local_db_version: u64,
354  pub highest_local_db_version: u64,
355}
356
357impl<C: Hash + Eq, V> Record<C, V> {
358  pub fn new() -> Self {
359    Self {
360      fields: HashMap::new(),
361      column_versions: HashMap::new(),
362      lowest_local_db_version: u64::MAX,
363      highest_local_db_version: 0,
364    }
365  }
366
367  /// Creates a record from existing fields and column versions
368  pub fn from_parts(
369    fields: HashMap<C, V>,
370    column_versions: HashMap<C, ColumnVersion>,
371  ) -> Self {
372    let mut lowest = u64::MAX;
373    let mut highest = 0;
374
375    for ver in column_versions.values() {
376      if ver.local_db_version < lowest {
377        lowest = ver.local_db_version;
378      }
379      if ver.local_db_version > highest {
380        highest = ver.local_db_version;
381      }
382    }
383
384    Self {
385      fields,
386      column_versions,
387      lowest_local_db_version: lowest,
388      highest_local_db_version: highest,
389    }
390  }
391}
392
393impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
394  fn eq(&self, other: &Self) -> bool {
395    // Compare only fields, not column_versions (those will differ per node)
396    self.fields == other.fields
397  }
398}
399
400impl<C: Hash + Eq, V> Default for Record<C, V> {
401  fn default() -> Self {
402    Self::new()
403  }
404}
405
406/// Trait for merge rules that determine conflict resolution.
407///
408/// Implementations should return `true` if the remote change should be accepted,
409/// `false` otherwise.
410pub trait MergeRule<K, C, V> {
411  /// Determines whether to accept a remote change over a local one
412  fn should_accept(
413    &self,
414    local_col: u64,
415    local_db: u64,
416    local_node: NodeId,
417    remote_col: u64,
418    remote_db: u64,
419    remote_node: NodeId,
420  ) -> bool;
421
422  /// Convenience method for Change objects
423  fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
424    self.should_accept(
425      local.col_version,
426      local.db_version,
427      local.node_id,
428      remote.col_version,
429      remote.db_version,
430      remote.node_id,
431    )
432  }
433}
434
435/// Default merge rule implementing last-write-wins semantics.
436///
437/// Comparison priority:
438/// 1. Column version (higher wins)
439/// 2. DB version (higher wins)
440/// 3. Node ID (higher wins as tiebreaker)
441#[derive(Debug, Clone, Copy, Default)]
442pub struct DefaultMergeRule;
443
444impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
445  fn should_accept(
446    &self,
447    local_col: u64,
448    local_db: u64,
449    local_node: NodeId,
450    remote_col: u64,
451    remote_db: u64,
452    remote_node: NodeId,
453  ) -> bool {
454    match remote_col.cmp(&local_col) {
455      Ordering::Greater => true,
456      Ordering::Less => false,
457      Ordering::Equal => match remote_db.cmp(&local_db) {
458        Ordering::Greater => true,
459        Ordering::Less => false,
460        Ordering::Equal => remote_node > local_node,
461      },
462    }
463  }
464}
465
466/// Trait for change comparators used in sorting and compression.
467pub trait ChangeComparator<K, C, V> {
468  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
469}
470
471/// Default change comparator.
472///
473/// Sorts by:
474/// 1. Record ID (ascending)
475/// 2. Column name presence (deletions/tombstones last)
476/// 3. Column name (ascending)
477/// 4. Column version (descending - most recent first)
478/// 5. DB version (descending)
479/// 6. Node ID (descending)
480#[derive(Debug, Clone, Copy, Default)]
481pub struct DefaultChangeComparator;
482
483impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
484  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
485    // Compare record IDs
486    match a.record_id.cmp(&b.record_id) {
487      Ordering::Equal => {}
488      ord => return ord,
489    }
490
491    // Deletions (None) come last for each record
492    match (a.col_name.as_ref(), b.col_name.as_ref()) {
493      (None, None) => {}
494      (None, Some(_)) => return Ordering::Greater,
495      (Some(_), None) => return Ordering::Less,
496      (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
497        Ordering::Equal => {}
498        ord => return ord,
499      },
500    }
501
502    // Compare versions (descending - most recent first)
503    match b.col_version.cmp(&a.col_version) {
504      Ordering::Equal => {}
505      ord => return ord,
506    }
507
508    match b.db_version.cmp(&a.db_version) {
509      Ordering::Equal => {}
510      ord => return ord,
511    }
512
513    b.node_id.cmp(&a.node_id)
514  }
515}
516
517/// Main CRDT structure, generic over key (K), column (C), and value (V) types.
518///
519/// This implements a column-based CRDT with last-write-wins semantics.
520#[derive(Debug)]
521#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
522#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
523#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
524pub struct CRDT<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
525  node_id: NodeId,
526  clock: LogicalClock,
527  data: HashMap<K, Record<C, V>>,
528  tombstones: TombstoneStorage<K>,
529  #[cfg_attr(feature = "serde", serde(skip, default))]
530  parent: Option<Arc<CRDT<K, C, V>>>,
531  #[allow(dead_code)]
532  base_version: u64,
533}
534
535impl<K: Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
536  /// Creates a new empty CRDT.
537  ///
538  /// # Arguments
539  ///
540  /// * `node_id` - Unique identifier for this CRDT node
541  /// * `parent` - Optional parent CRDT for hierarchical structures
542  pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
543    let (clock, base_version) = if let Some(ref p) = parent {
544      let parent_clock = p.clock;
545      let base = parent_clock.current_time();
546      (parent_clock, base)
547    } else {
548      (LogicalClock::new(), 0)
549    };
550
551    Self {
552      node_id,
553      clock,
554      data: HashMap::new(),
555      tombstones: TombstoneStorage::new(),
556      parent,
557      base_version,
558    }
559  }
560
561  /// Creates a CRDT from a list of changes (e.g., loaded from disk).
562  ///
563  /// # Arguments
564  ///
565  /// * `node_id` - The unique identifier for this CRDT node
566  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
567  pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
568    let mut crdt = Self::new(node_id, None);
569    crdt.apply_changes(changes);
570    crdt
571  }
572
573  /// Resets the CRDT to a state as if it was constructed with the given changes.
574  ///
575  /// # Arguments
576  ///
577  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
578  pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
579    self.data.clear();
580    self.tombstones.clear();
581    self.clock = LogicalClock::new();
582    self.apply_changes(changes);
583  }
584
585  /// Applies a list of changes to reconstruct the CRDT state.
586  fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
587    // Determine the maximum db_version from the changes
588    let max_db_version = changes
589      .iter()
590      .map(|c| c.db_version.max(c.local_db_version))
591      .max()
592      .unwrap_or(0);
593
594    // Set the logical clock to the maximum db_version
595    self.clock.set_time(max_db_version);
596
597    // Apply each change to reconstruct the CRDT state
598    for change in changes {
599      let record_id = change.record_id.clone();
600      let col_name = change.col_name.clone();
601      let remote_col_version = change.col_version;
602      let remote_db_version = change.db_version;
603      let remote_node_id = change.node_id;
604      let remote_local_db_version = change.local_db_version;
605      let remote_value = change.value;
606
607      if col_name.is_none() {
608        // Handle deletion
609        self.data.remove(&record_id);
610
611        // Store deletion information in tombstones
612        self.tombstones.insert_or_assign(
613          record_id,
614          TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
615        );
616      } else if let Some(col_key) = col_name {
617        // Handle insertion or update
618        if !self.is_record_tombstoned(&record_id, false) {
619          let record = self.get_or_create_record_unchecked(&record_id, false);
620
621          // Insert or update the field value
622          if let Some(value) = remote_value {
623            record.fields.insert(col_key.clone(), value);
624          }
625
626          // Update the column version info
627          let col_ver = ColumnVersion::new(
628            remote_col_version,
629            remote_db_version,
630            remote_node_id,
631            remote_local_db_version,
632          );
633          record.column_versions.insert(col_key, col_ver);
634
635          // Update version boundaries
636          if remote_local_db_version < record.lowest_local_db_version {
637            record.lowest_local_db_version = remote_local_db_version;
638          }
639          if remote_local_db_version > record.highest_local_db_version {
640            record.highest_local_db_version = remote_local_db_version;
641          }
642        }
643      }
644    }
645  }
646
647  /// Inserts a new record or updates an existing record in the CRDT.
648  ///
649  /// # Arguments
650  ///
651  /// * `record_id` - The unique identifier for the record
652  /// * `fields` - An iterator of (column_name, value) pairs
653  ///
654  /// # Returns
655  ///
656  /// A vector of changes created by this operation
657  #[must_use = "changes should be propagated to other nodes"]
658  pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
659  where
660    I: IntoIterator<Item = (C, V)>,
661  {
662    self.insert_or_update_with_flags(record_id, 0, fields)
663  }
664
665  /// Inserts a new record or updates an existing record with flags.
666  ///
667  /// # Arguments
668  ///
669  /// * `record_id` - The unique identifier for the record
670  /// * `flags` - Flags to indicate the type of change
671  /// * `fields` - An iterator of (column_name, value) pairs
672  ///
673  /// # Returns
674  ///
675  /// A vector of changes created by this operation
676  #[must_use = "changes should be propagated to other nodes"]
677  pub fn insert_or_update_with_flags<I>(
678    &mut self,
679    record_id: &K,
680    flags: u32,
681    fields: I,
682  ) -> Vec<Change<K, C, V>>
683  where
684    I: IntoIterator<Item = (C, V)>,
685  {
686    let db_version = self.clock.tick();
687
688    // Check if the record is tombstoned
689    if self.is_record_tombstoned(record_id, false) {
690      return Vec::new();
691    }
692
693    let mut changes = Vec::new();
694    let node_id = self.node_id; // Store node_id before mutable borrow
695    let record = self.get_or_create_record_unchecked(record_id, false);
696
697    for (col_name, value) in fields {
698      let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
699        col_info.col_version += 1;
700        col_info.db_version = db_version;
701        col_info.node_id = node_id;
702        col_info.local_db_version = db_version;
703        col_info.col_version
704      } else {
705        record.column_versions.insert(
706          col_name.clone(),
707          ColumnVersion::new(1, db_version, node_id, db_version),
708        );
709        1
710      };
711
712      // Update record version boundaries
713      if db_version < record.lowest_local_db_version {
714        record.lowest_local_db_version = db_version;
715      }
716      if db_version > record.highest_local_db_version {
717        record.highest_local_db_version = db_version;
718      }
719
720      record.fields.insert(col_name.clone(), value.clone());
721      changes.push(Change::new(
722        record_id.clone(),
723        Some(col_name),
724        Some(value),
725        col_version,
726        db_version,
727        node_id,
728        db_version,
729        flags,
730      ));
731    }
732
733    changes
734  }
735
736  /// Deletes a record by marking it as tombstoned.
737  ///
738  /// # Arguments
739  ///
740  /// * `record_id` - The unique identifier for the record
741  ///
742  /// # Returns
743  ///
744  /// An optional Change representing the deletion
745  #[must_use = "changes should be propagated to other nodes"]
746  pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
747    self.delete_record_with_flags(record_id, 0)
748  }
749
750  /// Deletes a record with flags.
751  ///
752  /// # Arguments
753  ///
754  /// * `record_id` - The unique identifier for the record
755  /// * `flags` - Flags to indicate the type of change
756  ///
757  /// # Returns
758  ///
759  /// An optional Change representing the deletion
760  #[must_use = "changes should be propagated to other nodes"]
761  pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
762    if self.is_record_tombstoned(record_id, false) {
763      return None;
764    }
765
766    let db_version = self.clock.tick();
767
768    // Mark as tombstone and remove data
769    self.data.remove(record_id);
770
771    // Store deletion information in tombstones
772    self.tombstones.insert_or_assign(
773      record_id.clone(),
774      TombstoneInfo::new(db_version, self.node_id, db_version),
775    );
776
777    Some(Change::new(
778      record_id.clone(),
779      None,
780      None,
781      TOMBSTONE_COL_VERSION,
782      db_version,
783      self.node_id,
784      db_version,
785      flags,
786    ))
787  }
788
789  /// Deletes a specific field from a record.
790  ///
791  /// # Arguments
792  ///
793  /// * `record_id` - The unique identifier for the record
794  /// * `field_name` - The name of the field to delete
795  ///
796  /// # Returns
797  ///
798  /// An optional Change representing the field deletion. Returns None if:
799  /// - The record is tombstoned
800  /// - The record doesn't exist
801  /// - The field doesn't exist in the record
802  #[must_use = "changes should be propagated to other nodes"]
803  pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
804    self.delete_field_with_flags(record_id, field_name, 0)
805  }
806
807  /// Deletes a specific field from a record with flags.
808  ///
809  /// # Arguments
810  ///
811  /// * `record_id` - The unique identifier for the record
812  /// * `field_name` - The name of the field to delete
813  /// * `flags` - Flags to indicate the type of change
814  ///
815  /// # Returns
816  ///
817  /// An optional Change representing the field deletion. Returns None if:
818  /// - The record is tombstoned
819  /// - The record doesn't exist
820  /// - The field doesn't exist in the record
821  #[must_use = "changes should be propagated to other nodes"]
822  pub fn delete_field_with_flags(
823    &mut self,
824    record_id: &K,
825    field_name: &C,
826    flags: u32,
827  ) -> Option<Change<K, C, V>> {
828    // Check if the record is tombstoned
829    if self.is_record_tombstoned(record_id, false) {
830      return None;
831    }
832
833    // Get the record (return None if it doesn't exist)
834    let record = self.data.get_mut(record_id)?;
835
836    // Check if the field exists
837    if !record.fields.contains_key(field_name) {
838      return None;
839    }
840
841    let db_version = self.clock.tick();
842
843    // Get or create column version and increment it
844    let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
845      col_info.col_version += 1;
846      col_info.db_version = db_version;
847      col_info.node_id = self.node_id;
848      col_info.local_db_version = db_version;
849      col_info.col_version
850    } else {
851      // This shouldn't happen if the field exists, but handle it gracefully
852      record.column_versions.insert(
853        field_name.clone(),
854        ColumnVersion::new(1, db_version, self.node_id, db_version),
855      );
856      1
857    };
858
859    // Update record version boundaries
860    if db_version < record.lowest_local_db_version {
861      record.lowest_local_db_version = db_version;
862    }
863    if db_version > record.highest_local_db_version {
864      record.highest_local_db_version = db_version;
865    }
866
867    // Remove the field from the record (but keep the ColumnVersion as field tombstone)
868    record.fields.remove(field_name);
869
870    Some(Change::new(
871      record_id.clone(),
872      Some(field_name.clone()),
873      None, // None value indicates field deletion
874      col_version,
875      db_version,
876      self.node_id,
877      db_version,
878      flags,
879    ))
880  }
881
882  /// Merges incoming changes into the CRDT.
883  ///
884  /// # Arguments
885  ///
886  /// * `changes` - Vector of changes to merge
887  /// * `merge_rule` - The merge rule to use for conflict resolution
888  ///
889  /// # Returns
890  ///
891  /// Vector of accepted changes (if requested)
892  pub fn merge_changes<R: MergeRule<K, C, V>>(
893    &mut self,
894    changes: Vec<Change<K, C, V>>,
895    merge_rule: &R,
896  ) -> Vec<Change<K, C, V>> {
897    self.merge_changes_impl(changes, false, merge_rule)
898  }
899
900  fn merge_changes_impl<R: MergeRule<K, C, V>>(
901    &mut self,
902    changes: Vec<Change<K, C, V>>,
903    ignore_parent: bool,
904    merge_rule: &R,
905  ) -> Vec<Change<K, C, V>> {
906    let mut accepted_changes = Vec::new();
907
908    if changes.is_empty() {
909      return accepted_changes;
910    }
911
912    for change in changes {
913      // Move values from change to avoid unnecessary clones
914      let Change {
915        record_id,
916        col_name,
917        value: remote_value,
918        col_version: remote_col_version,
919        db_version: remote_db_version,
920        node_id: remote_node_id,
921        flags,
922        ..
923      } = change;
924
925      // Always update the logical clock to maintain causal consistency
926      let new_local_db_version = self.clock.update(remote_db_version);
927
928      // Skip all changes for tombstoned records
929      if self.is_record_tombstoned(&record_id, ignore_parent) {
930        continue;
931      }
932
933      // Retrieve local column version information
934      let local_col_info = if col_name.is_none() {
935        // For deletions, check tombstones
936        self
937          .tombstones
938          .find(&record_id)
939          .map(|info| info.as_column_version())
940      } else if let Some(ref col) = col_name {
941        // For column updates, check the record
942        self
943          .get_record_ptr(&record_id, ignore_parent)
944          .and_then(|record| record.column_versions.get(col).copied())
945      } else {
946        None
947      };
948
949      // Determine whether to accept the remote change
950      let should_accept = if let Some(local_info) = local_col_info {
951        merge_rule.should_accept(
952          local_info.col_version,
953          local_info.db_version,
954          local_info.node_id,
955          remote_col_version,
956          remote_db_version,
957          remote_node_id,
958        )
959      } else {
960        true
961      };
962
963      if should_accept {
964        if let Some(col_key) = col_name {
965          // Handle insertion or update
966          let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
967
968          // Update field value
969          if let Some(value) = remote_value.clone() {
970            record.fields.insert(col_key.clone(), value);
971          } else {
972            // If remote_value is None, remove the field
973            record.fields.remove(&col_key);
974          }
975
976          // Update the column version info and record version boundaries
977          record.column_versions.insert(
978            col_key.clone(),
979            ColumnVersion::new(
980              remote_col_version,
981              remote_db_version,
982              remote_node_id,
983              new_local_db_version,
984            ),
985          );
986
987          // Update version boundaries
988          if new_local_db_version < record.lowest_local_db_version {
989            record.lowest_local_db_version = new_local_db_version;
990          }
991          if new_local_db_version > record.highest_local_db_version {
992            record.highest_local_db_version = new_local_db_version;
993          }
994
995          accepted_changes.push(Change::new(
996            record_id,
997            Some(col_key),
998            remote_value,
999            remote_col_version,
1000            remote_db_version,
1001            remote_node_id,
1002            new_local_db_version,
1003            flags,
1004          ));
1005        } else {
1006          // Handle deletion
1007          self.data.remove(&record_id);
1008
1009          // Store deletion information in tombstones
1010          self.tombstones.insert_or_assign(
1011            record_id.clone(),
1012            TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1013          );
1014
1015          accepted_changes.push(Change::new(
1016            record_id,
1017            None,
1018            None,
1019            remote_col_version,
1020            remote_db_version,
1021            remote_node_id,
1022            new_local_db_version,
1023            flags,
1024          ));
1025        }
1026      }
1027    }
1028
1029    accepted_changes
1030  }
1031
1032  /// Retrieves all changes since a given `last_db_version`.
1033  ///
1034  /// # Arguments
1035  ///
1036  /// * `last_db_version` - The database version to retrieve changes since
1037  ///
1038  /// # Returns
1039  ///
1040  /// A vector of changes
1041  #[must_use]
1042  pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1043  where
1044    K: Ord,
1045    C: Ord,
1046  {
1047    self.get_changes_since_excluding(last_db_version, &HashSet::new())
1048  }
1049
1050  /// Retrieves all changes since a given `last_db_version`, excluding specific nodes.
1051  pub fn get_changes_since_excluding(
1052    &self,
1053    last_db_version: u64,
1054    excluding: &HashSet<NodeId>,
1055  ) -> Vec<Change<K, C, V>>
1056  where
1057    K: Ord,
1058    C: Ord,
1059  {
1060    let mut changes = Vec::new();
1061
1062    // Get changes from parent
1063    if let Some(ref parent) = self.parent {
1064      let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1065      changes.extend(parent_changes);
1066    }
1067
1068    // Get changes from regular records
1069    for (record_id, record) in &self.data {
1070      // Skip records that haven't changed since last_db_version
1071      if record.highest_local_db_version <= last_db_version {
1072        continue;
1073      }
1074
1075      for (col_name, clock_info) in &record.column_versions {
1076        if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1077        {
1078          let value = record.fields.get(col_name).cloned();
1079
1080          changes.push(Change::new(
1081            record_id.clone(),
1082            Some(col_name.clone()),
1083            value,
1084            clock_info.col_version,
1085            clock_info.db_version,
1086            clock_info.node_id,
1087            clock_info.local_db_version,
1088            0,
1089          ));
1090        }
1091      }
1092    }
1093
1094    // Get deletion changes from tombstones
1095    for (record_id, tombstone_info) in self.tombstones.iter() {
1096      if tombstone_info.local_db_version > last_db_version
1097        && !excluding.contains(&tombstone_info.node_id)
1098      {
1099        changes.push(Change::new(
1100          record_id.clone(),
1101          None,
1102          None,
1103          TOMBSTONE_COL_VERSION,
1104          tombstone_info.db_version,
1105          tombstone_info.node_id,
1106          tombstone_info.local_db_version,
1107          0,
1108        ));
1109      }
1110    }
1111
1112    if self.parent.is_some() {
1113      // Compress changes to remove redundant operations
1114      Self::compress_changes(&mut changes);
1115    }
1116
1117    changes
1118  }
1119
1120  /// Compresses a vector of changes in-place by removing redundant changes.
1121  ///
1122  /// Changes are sorted and then compressed using a two-pointer technique.
1123  ///
1124  /// # Performance
1125  ///
1126  /// This method uses `sort_unstable_by` which provides O(n log n) average time complexity
1127  /// but does not preserve the relative order of equal elements. Since the comparator
1128  /// provides a total ordering, stability is not required.
1129  pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1130  where
1131    K: Ord,
1132    C: Ord,
1133  {
1134    if changes.is_empty() {
1135      return;
1136    }
1137
1138    // Sort changes using the DefaultChangeComparator
1139    // Use sort_unstable for better performance since we don't need stable sorting
1140    let comparator = DefaultChangeComparator;
1141    changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1142
1143    // Use two-pointer technique to compress in-place
1144    let mut write = 0;
1145    for read in 1..changes.len() {
1146      if changes[read].record_id != changes[write].record_id {
1147        // New record, always keep it
1148        write += 1;
1149        if write != read {
1150          changes[write] = changes[read].clone();
1151        }
1152      } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1153        // Current read is a deletion, backtrack to first change for this record
1154        // and replace it with the deletion, effectively discarding all field updates
1155        let mut first_pos = write;
1156        while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1157          first_pos -= 1;
1158        }
1159        changes[first_pos] = changes[read].clone();
1160        write = first_pos;
1161      } else if changes[read].col_name != changes[write].col_name
1162        && changes[write].col_name.is_some()
1163      {
1164        // New column for the same record
1165        write += 1;
1166        if write != read {
1167          changes[write] = changes[read].clone();
1168        }
1169      }
1170      // Else: same record and column, keep the existing one (most recent due to sorting)
1171    }
1172
1173    changes.truncate(write + 1);
1174  }
1175
1176  /// Retrieves a reference to a record if it exists.
1177  pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1178    self.get_record_ptr(record_id, false)
1179  }
1180
1181  /// Checks if a record is tombstoned.
1182  pub fn is_tombstoned(&self, record_id: &K) -> bool {
1183    self.is_record_tombstoned(record_id, false)
1184  }
1185
1186  /// Gets tombstone information for a record.
1187  pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1188    if let Some(info) = self.tombstones.find(record_id) {
1189      return Some(info);
1190    }
1191
1192    if let Some(ref parent) = self.parent {
1193      return parent.get_tombstone(record_id);
1194    }
1195
1196    None
1197  }
1198
1199  /// Removes tombstones older than the specified version.
1200  ///
1201  /// # Safety and DoS Mitigation
1202  ///
1203  /// **IMPORTANT**: Only call this method when ALL participating nodes have acknowledged
1204  /// the `min_acknowledged_version`. Compacting too early may cause deleted records to
1205  /// reappear on nodes that haven't received the deletion yet.
1206  ///
1207  /// To prevent DoS via tombstone accumulation:
1208  /// - Call this method periodically as part of your sync protocol
1209  /// - Track which versions have been acknowledged by all nodes
1210  /// - Consider implementing a tombstone limit and rejecting operations when exceeded
1211  ///
1212  /// # Arguments
1213  ///
1214  /// * `min_acknowledged_version` - Tombstones with db_version < this value will be removed
1215  ///
1216  /// # Returns
1217  ///
1218  /// The number of tombstones removed
1219  pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1220    self.tombstones.compact(min_acknowledged_version)
1221  }
1222
1223  /// Gets the number of tombstones currently stored.
1224  pub fn tombstone_count(&self) -> usize {
1225    self.tombstones.len()
1226  }
1227
1228  /// Gets the current logical clock.
1229  pub fn get_clock(&self) -> &LogicalClock {
1230    &self.clock
1231  }
1232
1233  /// Gets a reference to the internal data map.
1234  pub fn get_data(&self) -> &HashMap<K, Record<C, V>> {
1235    &self.data
1236  }
1237
1238  /// Serializes the CRDT to a JSON string.
1239  ///
1240  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1241  ///
1242  /// # Errors
1243  ///
1244  /// Returns an error if serialization fails.
1245  #[cfg(feature = "json")]
1246  pub fn to_json(&self) -> Result<String, serde_json::Error>
1247  where
1248    K: serde::Serialize,
1249    C: serde::Serialize,
1250    V: serde::Serialize,
1251  {
1252    serde_json::to_string(self)
1253  }
1254
1255  /// Deserializes a CRDT from a JSON string.
1256  ///
1257  /// Note: The parent relationship is not deserialized and will be `None`.
1258  /// Applications must rebuild parent-child relationships if needed.
1259  ///
1260  /// # Errors
1261  ///
1262  /// Returns an error if deserialization fails.
1263  #[cfg(feature = "json")]
1264  pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1265  where
1266    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1267    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1268    V: serde::de::DeserializeOwned + Clone,
1269  {
1270    serde_json::from_str(json)
1271  }
1272
1273  /// Serializes the CRDT to bytes using bincode.
1274  ///
1275  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1276  ///
1277  /// # Errors
1278  ///
1279  /// Returns an error if serialization fails.
1280  #[cfg(feature = "binary")]
1281  pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1282  where
1283    K: serde::Serialize,
1284    C: serde::Serialize,
1285    V: serde::Serialize,
1286  {
1287    bincode::serde::encode_to_vec(self, bincode::config::standard())
1288  }
1289
1290  /// Deserializes a CRDT from bytes using bincode.
1291  ///
1292  /// Note: The parent relationship is not deserialized and will be `None`.
1293  /// Applications must rebuild parent-child relationships if needed.
1294  ///
1295  /// # Errors
1296  ///
1297  /// Returns an error if deserialization fails.
1298  #[cfg(feature = "binary")]
1299  pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1300  where
1301    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1302    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1303    V: serde::de::DeserializeOwned + Clone,
1304  {
1305    let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1306    Ok(result)
1307  }
1308
1309  // Helper methods
1310
1311  fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1312    if self.tombstones.find(record_id).is_some() {
1313      return true;
1314    }
1315
1316    if !ignore_parent {
1317      if let Some(ref parent) = self.parent {
1318        return parent.is_record_tombstoned(record_id, false);
1319      }
1320    }
1321
1322    false
1323  }
1324
1325  fn get_or_create_record_unchecked(
1326    &mut self,
1327    record_id: &K,
1328    ignore_parent: bool,
1329  ) -> &mut Record<C, V> {
1330    #[cfg(feature = "std")]
1331    use std::collections::hash_map::Entry;
1332    #[cfg(all(not(feature = "std"), feature = "alloc"))]
1333    use hashbrown::hash_map::Entry;
1334
1335    match self.data.entry(record_id.clone()) {
1336      Entry::Occupied(e) => e.into_mut(),
1337      Entry::Vacant(e) => {
1338        let record = if !ignore_parent {
1339          self
1340            .parent
1341            .as_ref()
1342            .and_then(|p| p.get_record_ptr(record_id, false))
1343            .cloned()
1344            .unwrap_or_else(Record::new)
1345        } else {
1346          Record::new()
1347        };
1348        e.insert(record)
1349      }
1350    }
1351  }
1352
1353  fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1354    if let Some(record) = self.data.get(record_id) {
1355      return Some(record);
1356    }
1357
1358    if !ignore_parent {
1359      if let Some(ref parent) = self.parent {
1360        return parent.get_record_ptr(record_id, false);
1361      }
1362    }
1363
1364    None
1365  }
1366}
1367
1368#[cfg(test)]
1369mod tests {
1370  use super::*;
1371
1372  #[test]
1373  fn test_logical_clock() {
1374    let mut clock = LogicalClock::new();
1375    assert_eq!(clock.current_time(), 0);
1376
1377    let t1 = clock.tick();
1378    assert_eq!(t1, 1);
1379    assert_eq!(clock.current_time(), 1);
1380
1381    let t2 = clock.update(5);
1382    assert_eq!(t2, 6);
1383    assert_eq!(clock.current_time(), 6);
1384  }
1385
1386  #[test]
1387  fn test_tombstone_storage() {
1388    let mut storage = TombstoneStorage::new();
1389    let info = TombstoneInfo::new(10, 1, 10);
1390
1391    storage.insert_or_assign("key1".to_string(), info);
1392    assert_eq!(storage.len(), 1);
1393
1394    assert_eq!(storage.find(&"key1".to_string()), Some(info));
1395    assert_eq!(storage.find(&"key2".to_string()), None);
1396
1397    let removed = storage.compact(15);
1398    assert_eq!(removed, 1);
1399    assert_eq!(storage.len(), 0);
1400  }
1401
1402  #[test]
1403  fn test_basic_insert() {
1404    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1405
1406    let fields = vec![
1407      ("name".to_string(), "Alice".to_string()),
1408      ("age".to_string(), "30".to_string()),
1409    ];
1410
1411    let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1412
1413    assert_eq!(changes.len(), 2);
1414    assert_eq!(crdt.get_data().len(), 1);
1415
1416    let record = crdt.get_record(&"user1".to_string()).unwrap();
1417    assert_eq!(record.fields.get("name").unwrap(), "Alice");
1418    assert_eq!(record.fields.get("age").unwrap(), "30");
1419  }
1420
1421  #[test]
1422  fn test_delete_record() {
1423    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1424
1425    let fields = vec![("name".to_string(), "Bob".to_string())];
1426    let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1427
1428    let delete_change = crdt.delete_record(&"user2".to_string());
1429    assert!(delete_change.is_some());
1430    assert!(crdt.is_tombstoned(&"user2".to_string()));
1431    assert_eq!(crdt.get_data().len(), 0);
1432  }
1433
1434  #[test]
1435  fn test_merge_changes() {
1436    let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1437    let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1438
1439    let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1440    let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1441
1442    let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1443    let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1444
1445    let merge_rule = DefaultMergeRule;
1446    crdt1.merge_changes(changes2, &merge_rule);
1447    crdt2.merge_changes(changes1, &merge_rule);
1448
1449    // Node2 has higher node_id, so its value should win
1450    assert_eq!(
1451      crdt1
1452        .get_record(&"record1".to_string())
1453        .unwrap()
1454        .fields
1455        .get("tag")
1456        .unwrap(),
1457      "Node2"
1458    );
1459    assert_eq!(crdt1.get_data(), crdt2.get_data());
1460  }
1461
1462  #[test]
1463  #[cfg(feature = "serde")]
1464  fn test_change_serialization() {
1465    #[allow(unused_variables)]
1466    let change = Change::new(
1467      "record1".to_string(),
1468      Some("name".to_string()),
1469      Some("Alice".to_string()),
1470      1,
1471      10,
1472      1,
1473      10,
1474      0,
1475    );
1476
1477    // Test JSON serialization
1478    #[cfg(feature = "json")]
1479    {
1480      let json = serde_json::to_string(&change).unwrap();
1481      let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1482      assert_eq!(change, deserialized);
1483    }
1484
1485    // Test binary serialization
1486    #[cfg(feature = "binary")]
1487    {
1488      let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1489      let (deserialized, _): (Change<String, String, String>, _) =
1490        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1491      assert_eq!(change, deserialized);
1492    }
1493  }
1494
1495  #[test]
1496  #[cfg(feature = "serde")]
1497  fn test_record_serialization() {
1498    let mut fields = HashMap::new();
1499    fields.insert("name".to_string(), "Bob".to_string());
1500    fields.insert("age".to_string(), "25".to_string());
1501
1502    let mut column_versions = HashMap::new();
1503    column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1504    column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1505
1506    #[allow(unused_variables)]
1507    let record = Record::from_parts(fields, column_versions);
1508
1509    // Test JSON serialization
1510    #[cfg(feature = "json")]
1511    {
1512      let json = serde_json::to_string(&record).unwrap();
1513      let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1514      assert_eq!(record, deserialized);
1515    }
1516
1517    // Test binary serialization
1518    #[cfg(feature = "binary")]
1519    {
1520      let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1521      let (deserialized, _): (Record<String, String>, _) =
1522        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1523      assert_eq!(record, deserialized);
1524    }
1525  }
1526
1527  #[test]
1528  #[cfg(feature = "json")]
1529  fn test_crdt_json_serialization() {
1530    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1531
1532    // Add some data
1533    let fields = vec![
1534      ("name".to_string(), "Alice".to_string()),
1535      ("age".to_string(), "30".to_string()),
1536    ];
1537    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1538
1539    let fields2 = vec![("name".to_string(), "Bob".to_string())];
1540    let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1541
1542    // Delete one record
1543    let _ = crdt.delete_record(&"user2".to_string());
1544
1545    // Serialize to JSON
1546    let json = crdt.to_json().unwrap();
1547
1548    // Deserialize
1549    let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1550
1551    // Verify data is preserved
1552    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1553    assert_eq!(
1554      crdt.get_record(&"user1".to_string()).unwrap().fields,
1555      deserialized.get_record(&"user1".to_string()).unwrap().fields
1556    );
1557
1558    // Verify tombstones are preserved
1559    assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1560    assert!(deserialized.is_tombstoned(&"user2".to_string()));
1561
1562    // Verify clock is preserved
1563    assert_eq!(
1564      crdt.get_clock().current_time(),
1565      deserialized.get_clock().current_time()
1566    );
1567
1568    // Verify parent is None after deserialization
1569    let has_parent = deserialized.parent.is_some();
1570    assert!(!has_parent);
1571  }
1572
1573  #[test]
1574  #[cfg(feature = "binary")]
1575  fn test_crdt_binary_serialization() {
1576    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1577
1578    // Add some data
1579    let fields = vec![
1580      ("name".to_string(), "Alice".to_string()),
1581      ("age".to_string(), "30".to_string()),
1582    ];
1583    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1584
1585    // Serialize to bytes
1586    let bytes = crdt.to_bytes().unwrap();
1587
1588    // Deserialize
1589    let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1590
1591    // Verify data is preserved
1592    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1593    assert_eq!(
1594      crdt.get_record(&"user1".to_string()).unwrap().fields,
1595      deserialized.get_record(&"user1".to_string()).unwrap().fields
1596    );
1597
1598    // Verify clock is preserved
1599    assert_eq!(
1600      crdt.get_clock().current_time(),
1601      deserialized.get_clock().current_time()
1602    );
1603  }
1604
1605  #[test]
1606  #[cfg(feature = "serde")]
1607  fn test_parent_not_serialized() {
1608    // Create a parent CRDT
1609    let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1610    let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1611    let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1612
1613    // Create a child CRDT with parent
1614    let parent_arc = Arc::new(parent);
1615    let mut child = CRDT::new(2, Some(parent_arc.clone()));
1616    let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1617    let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1618
1619    // Serialize and deserialize the child
1620    #[cfg(feature = "json")]
1621    {
1622      let json = serde_json::to_string(&child).unwrap();
1623      let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1624
1625      // Verify parent is None
1626      assert!(deserialized.parent.is_none());
1627
1628      // Verify child's own data is preserved
1629      assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1630
1631      // Verify parent's data is NOT in deserialized child
1632      assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1633    }
1634  }
1635}