crdt_lite/
lib.rs

1//! # crdt-lite
2//!
3//! A lightweight, column-based CRDT (Conflict-free Replicated Data Type) implementation in Rust.
4//!
5//! This library provides a generic CRDT with last-write-wins semantics, supporting:
6//! - Generic key and value types
7//! - Logical clock for causality tracking
8//! - Tombstone-based deletion
9//! - Parent-child CRDT hierarchies
10//! - Custom merge rules and comparators
11//! - Change compression
12//! - Optional serialization support via Serde
13//!
14//! ## Features
15//!
16//! This crate provides optional features through feature flags:
17//!
18//! - `std` - Enables standard library support (enabled by default)
19//! - `alloc` - Enables allocator support for no_std environments (pulls in hashbrown for HashMap)
20//! - `serde` - Enables Serde support for all CRDT types
21//! - `json` - Enables JSON serialization (includes `serde` + `serde_json`)
22//! - `binary` - Enables binary serialization (includes `serde` + `bincode`)
23//! - `persist` - Enables persistence layer with WAL and hooks (includes `binary` + `std`)
24//! - `node-id-u128` - Uses u128 for NodeId instead of u64 (useful for UUID-based node IDs)
25//!
26//! ### no_std Support
27//!
28//! For no_std environments, disable default features and enable the `alloc` feature:
29//!
30//! ```toml
31//! [dependencies]
32//! crdt-lite = { version = "0.2", default-features = false, features = ["alloc"] }
33//! ```
34//!
35//! ### Usage Example with Serialization
36//!
37//! ```toml
38//! [dependencies]
39//! crdt-lite = { version = "0.1", features = ["json", "binary"] }
40//! ```
41//!
42//! ```rust,ignore
43//! use crdt_lite::CRDT;
44//!
45//! // Create and populate a CRDT
46//! let mut crdt: CRDT<String, String> = CRDT::new(1, None);
47//! let fields = vec![("name".to_string(), "Alice".to_string())];
48//! crdt.insert_or_update(&"user1".to_string(), fields);
49//!
50//! // Serialize to JSON (requires "json" feature)
51//! let json = crdt.to_json().unwrap();
52//!
53//! // Deserialize from JSON
54//! let restored: CRDT<String, String> = CRDT::from_json(&json).unwrap();
55//!
56//! // Serialize to binary (requires "binary" feature)
57//! let bytes = crdt.to_bytes().unwrap();
58//! let restored: CRDT<String, String> = CRDT::from_bytes(&bytes).unwrap();
59//!
60//! // Or use generic serde with any format (requires "serde" feature)
61//! let json = serde_json::to_string(&crdt).unwrap();
62//! let restored: CRDT<String, String> = serde_json::from_str(&json).unwrap();
63//! ```
64//!
65//! **Note on Parent Relationships**: Parent-child CRDT hierarchies are not serialized.
66//! After deserialization, the `parent` field will always be `None`. Applications must
67//! rebuild parent-child relationships if needed.
68//!
69//! ## Security and Resource Management
70//!
71//! ### Logical Clock Overflow
72//! The logical clock uses `u64` for version numbers. While overflow is theoretically possible
73//! after 2^64 operations (extremely unlikely in practice), applications with extreme longevity
74//! should be aware of this limitation.
75//!
76//! ### DoS Protection and Tombstone Management
77//! Tombstones accumulate indefinitely unless manually compacted. To prevent memory exhaustion:
78//! - Call `compact_tombstones()` periodically after all nodes have acknowledged a version
79//! - Implement application-level rate limiting for operations
80//! - Consider setting resource limits on the number of records and tombstones
81//!
82//! **Important**: Only compact tombstones when ALL participating nodes have acknowledged
83//! the minimum version. Compacting too early may cause deleted records to reappear on
84//! nodes that haven't received the deletion yet.
85
86#![cfg_attr(not(feature = "std"), no_std)]
87
88// Persistence module (requires any persist-related feature)
89#[cfg(any(feature = "persist", feature = "persist-msgpack", feature = "persist-compressed"))]
90pub mod persist;
91
92// Ensure valid feature combination: either std or alloc must be enabled
93#[cfg(not(any(feature = "std", feature = "alloc")))]
94compile_error!("Either 'std' (default) or 'alloc' feature must be enabled. For no_std environments, use: cargo build --no-default-features --features alloc");
95
96#[cfg(not(feature = "std"))]
97extern crate alloc;
98
99#[cfg(feature = "std")]
100use std::{
101  cmp::Ordering,
102  collections::{HashMap, HashSet},
103  hash::Hash,
104  sync::Arc,
105};
106
107#[cfg(not(feature = "std"))]
108use alloc::{
109  string::String,
110  sync::Arc,
111  vec::Vec,
112};
113#[cfg(not(feature = "std"))]
114use core::{cmp::Ordering, hash::Hash};
115#[cfg(all(not(feature = "std"), feature = "alloc"))]
116use hashbrown::{HashMap, HashSet};
117
118// Conditional type aliases for sorted vs unsorted storage
119#[cfg(all(feature = "sorted-keys", feature = "std"))]
120type DataMap<K, V> = std::collections::BTreeMap<K, V>;
121#[cfg(all(feature = "sorted-keys", not(feature = "std"), feature = "alloc"))]
122type DataMap<K, V> = alloc::collections::BTreeMap<K, V>;
123#[cfg(all(not(feature = "sorted-keys"), feature = "std"))]
124type DataMap<K, V> = HashMap<K, V>;
125#[cfg(all(not(feature = "sorted-keys"), not(feature = "std"), feature = "alloc"))]
126type DataMap<K, V> = HashMap<K, V>;
127
128// Conditional Entry type aliases to match DataMap
129#[cfg(all(feature = "sorted-keys", feature = "std"))]
130type DataMapEntry<'a, K, V> = std::collections::btree_map::Entry<'a, K, V>;
131#[cfg(all(feature = "sorted-keys", not(feature = "std"), feature = "alloc"))]
132type DataMapEntry<'a, K, V> = alloc::collections::btree_map::Entry<'a, K, V>;
133#[cfg(all(not(feature = "sorted-keys"), feature = "std"))]
134type DataMapEntry<'a, K, V> = std::collections::hash_map::Entry<'a, K, V>;
135#[cfg(all(not(feature = "sorted-keys"), not(feature = "std"), feature = "alloc"))]
136type DataMapEntry<'a, K, V> = hashbrown::hash_map::Entry<'a, K, V, hashbrown::DefaultHashBuilder>;
137
138/// Type alias for node IDs
139///
140/// By default, NodeId is u64. Use the `node-id-u128` feature to enable u128 for UUID-based IDs:
141/// ```toml
142/// crdt-lite = { version = "0.4", features = ["node-id-u128"] }
143/// ```
144#[cfg(feature = "node-id-u128")]
145pub type NodeId = u128;
146
147#[cfg(not(feature = "node-id-u128"))]
148pub type NodeId = u64;
149
150/// Type alias for column keys (field names)
151pub type ColumnKey = String;
152
153/// Column version used for tombstone changes
154/// Using u64::MAX ensures tombstones are treated as having the highest possible version
155const TOMBSTONE_COL_VERSION: u64 = u64::MAX;
156
157/// Represents a single change in the CRDT.
158///
159/// A change can represent:
160/// - An insertion or update of a column value (when `col_name` is `Some`)
161/// - A deletion of a specific column (when `col_name` is `Some` and `value` is `None`)
162/// - A deletion of an entire record (when `col_name` is `None`)
163#[derive(Debug, Clone, PartialEq)]
164#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
165#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
166#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned, C: serde::de::DeserializeOwned, V: serde::de::DeserializeOwned")))]
167pub struct Change<K, C, V> {
168  pub record_id: K,
169  /// `None` represents tombstone of the record
170  pub col_name: Option<C>,
171  /// `None` represents deletion of the column (not the record)
172  pub value: Option<V>,
173  pub col_version: u64,
174  pub db_version: u64,
175  pub node_id: NodeId,
176  /// Local db_version when the change was created (useful for `get_changes_since`)
177  pub local_db_version: u64,
178  /// Optional flags to indicate the type of change (ephemeral, not stored)
179  pub flags: u32,
180}
181
182impl<K: Eq, C: Eq, V: Eq> Eq for Change<K, C, V> {}
183
184impl<K, C, V> Change<K, C, V> {
185  /// Creates a new Change with all parameters
186  #[allow(clippy::too_many_arguments)]
187  pub fn new(
188    record_id: K,
189    col_name: Option<C>,
190    value: Option<V>,
191    col_version: u64,
192    db_version: u64,
193    node_id: NodeId,
194    local_db_version: u64,
195    flags: u32,
196  ) -> Self {
197    Self {
198      record_id,
199      col_name,
200      value,
201      col_version,
202      db_version,
203      node_id,
204      local_db_version,
205      flags,
206    }
207  }
208}
209
210/// Represents version information for a column.
211#[derive(Debug, Clone, Copy, PartialEq, Eq)]
212#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
213pub struct ColumnVersion {
214  pub col_version: u64,
215  pub db_version: u64,
216  pub node_id: NodeId,
217  /// Local db_version when the change was created
218  pub local_db_version: u64,
219}
220
221impl ColumnVersion {
222  pub fn new(col_version: u64, db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
223    Self {
224      col_version,
225      db_version,
226      node_id,
227      local_db_version,
228    }
229  }
230}
231
232/// Minimal version information for tombstones.
233///
234/// Stores essential data: db_version for conflict resolution, node_id for sync exclusion,
235/// and local_db_version for sync.
236#[derive(Debug, Clone, Copy, PartialEq, Eq)]
237#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
238pub struct TombstoneInfo {
239  pub db_version: u64,
240  pub node_id: NodeId,
241  pub local_db_version: u64,
242}
243
244impl TombstoneInfo {
245  pub fn new(db_version: u64, node_id: NodeId, local_db_version: u64) -> Self {
246    Self {
247      db_version,
248      node_id,
249      local_db_version,
250    }
251  }
252
253  /// Helper to create a ColumnVersion for comparison with regular columns
254  pub fn as_column_version(&self) -> ColumnVersion {
255    ColumnVersion::new(
256      TOMBSTONE_COL_VERSION,
257      self.db_version,
258      self.node_id,
259      self.local_db_version,
260    )
261  }
262}
263
264/// Represents a logical clock for maintaining causality.
265#[derive(Debug, Clone, Copy, PartialEq, Eq)]
266#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
267pub struct LogicalClock {
268  time: u64,
269}
270
271impl LogicalClock {
272  /// Creates a new logical clock starting at 0
273  pub fn new() -> Self {
274    Self { time: 0 }
275  }
276
277  /// Increments the clock for a local event and returns the new time
278  pub fn tick(&mut self) -> u64 {
279    self.time += 1;
280    self.time
281  }
282
283  /// Updates the clock based on a received time and returns the new time
284  pub fn update(&mut self, received_time: u64) -> u64 {
285    self.time = self.time.max(received_time);
286    self.time += 1;
287    self.time
288  }
289
290  /// Sets the logical clock to a specific time
291  pub fn set_time(&mut self, time: u64) {
292    self.time = time;
293  }
294
295  /// Retrieves the current time
296  pub fn current_time(&self) -> u64 {
297    self.time
298  }
299}
300
301impl Default for LogicalClock {
302  fn default() -> Self {
303    Self::new()
304  }
305}
306
307/// Storage for tombstones (deleted records).
308///
309/// Uses a HashMap for efficient lookups and supports compaction.
310#[derive(Debug, Clone)]
311#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
312pub struct TombstoneStorage<K: Hash + Eq> {
313  entries: HashMap<K, TombstoneInfo>,
314}
315
316impl<K: Hash + Eq> TombstoneStorage<K> {
317  pub fn new() -> Self {
318    Self {
319      entries: HashMap::new(),
320    }
321  }
322
323  pub fn insert_or_assign(&mut self, key: K, info: TombstoneInfo) {
324    self.entries.insert(key, info);
325  }
326
327  pub fn find(&self, key: &K) -> Option<TombstoneInfo> {
328    self.entries.get(key).copied()
329  }
330
331  pub fn erase(&mut self, key: &K) -> bool {
332    self.entries.remove(key).is_some()
333  }
334
335  pub fn clear(&mut self) {
336    self.entries.clear();
337  }
338
339  pub fn iter(&self) -> impl Iterator<Item = (&K, &TombstoneInfo)> {
340    self.entries.iter()
341  }
342
343  pub fn len(&self) -> usize {
344    self.entries.len()
345  }
346
347  pub fn is_empty(&self) -> bool {
348    self.entries.is_empty()
349  }
350
351  /// Compact tombstones older than the specified version.
352  ///
353  /// Returns the number of tombstones removed.
354  pub fn compact(&mut self, min_acknowledged_version: u64) -> usize {
355    let initial_len = self.entries.len();
356    self
357      .entries
358      .retain(|_, info| info.db_version >= min_acknowledged_version);
359    initial_len - self.entries.len()
360  }
361}
362
363impl<K: Hash + Eq> Default for TombstoneStorage<K> {
364  fn default() -> Self {
365    Self::new()
366  }
367}
368
369/// Represents a record in the CRDT.
370#[derive(Debug, Clone)]
371#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
372#[cfg_attr(feature = "serde", serde(bound(serialize = "C: serde::Serialize + Hash + Eq, V: serde::Serialize")))]
373#[cfg_attr(feature = "serde", serde(bound(deserialize = "C: serde::de::DeserializeOwned + Hash + Eq, V: serde::de::DeserializeOwned")))]
374pub struct Record<C, V> {
375  pub fields: HashMap<C, V>,
376  pub column_versions: HashMap<C, ColumnVersion>,
377  /// Track version boundaries for efficient filtering
378  pub lowest_local_db_version: u64,
379  pub highest_local_db_version: u64,
380}
381
382impl<C: Hash + Eq, V> Record<C, V> {
383  pub fn new() -> Self {
384    Self {
385      fields: HashMap::new(),
386      column_versions: HashMap::new(),
387      lowest_local_db_version: u64::MAX,
388      highest_local_db_version: 0,
389    }
390  }
391
392  /// Creates a record from existing fields and column versions
393  pub fn from_parts(
394    fields: HashMap<C, V>,
395    column_versions: HashMap<C, ColumnVersion>,
396  ) -> Self {
397    let mut lowest = u64::MAX;
398    let mut highest = 0;
399
400    for ver in column_versions.values() {
401      if ver.local_db_version < lowest {
402        lowest = ver.local_db_version;
403      }
404      if ver.local_db_version > highest {
405        highest = ver.local_db_version;
406      }
407    }
408
409    Self {
410      fields,
411      column_versions,
412      lowest_local_db_version: lowest,
413      highest_local_db_version: highest,
414    }
415  }
416}
417
418impl<C: Hash + Eq + PartialEq, V: PartialEq> PartialEq for Record<C, V> {
419  fn eq(&self, other: &Self) -> bool {
420    // Compare only fields, not column_versions (those will differ per node)
421    self.fields == other.fields
422  }
423}
424
425impl<C: Hash + Eq, V> Default for Record<C, V> {
426  fn default() -> Self {
427    Self::new()
428  }
429}
430
431/// Trait for merge rules that determine conflict resolution.
432///
433/// Implementations should return `true` if the remote change should be accepted,
434/// `false` otherwise.
435pub trait MergeRule<K, C, V> {
436  /// Determines whether to accept a remote change over a local one
437  fn should_accept(
438    &self,
439    local_col: u64,
440    local_db: u64,
441    local_node: NodeId,
442    remote_col: u64,
443    remote_db: u64,
444    remote_node: NodeId,
445  ) -> bool;
446
447  /// Convenience method for Change objects
448  fn should_accept_change(&self, local: &Change<K, C, V>, remote: &Change<K, C, V>) -> bool {
449    self.should_accept(
450      local.col_version,
451      local.db_version,
452      local.node_id,
453      remote.col_version,
454      remote.db_version,
455      remote.node_id,
456    )
457  }
458}
459
460/// Default merge rule implementing last-write-wins semantics.
461///
462/// Comparison priority:
463/// 1. Column version (higher wins)
464/// 2. DB version (higher wins)
465/// 3. Node ID (higher wins as tiebreaker)
466#[derive(Debug, Clone, Copy, Default)]
467pub struct DefaultMergeRule;
468
469impl<K, C, V> MergeRule<K, C, V> for DefaultMergeRule {
470  fn should_accept(
471    &self,
472    local_col: u64,
473    local_db: u64,
474    local_node: NodeId,
475    remote_col: u64,
476    remote_db: u64,
477    remote_node: NodeId,
478  ) -> bool {
479    match remote_col.cmp(&local_col) {
480      Ordering::Greater => true,
481      Ordering::Less => false,
482      Ordering::Equal => match remote_db.cmp(&local_db) {
483        Ordering::Greater => true,
484        Ordering::Less => false,
485        Ordering::Equal => remote_node > local_node,
486      },
487    }
488  }
489}
490
491/// Trait for change comparators used in sorting and compression.
492pub trait ChangeComparator<K, C, V> {
493  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering;
494}
495
496/// Default change comparator.
497///
498/// Sorts by:
499/// 1. Record ID (ascending)
500/// 2. Column name presence (deletions/tombstones last)
501/// 3. Column name (ascending)
502/// 4. Column version (descending - most recent first)
503/// 5. DB version (descending)
504/// 6. Node ID (descending)
505#[derive(Debug, Clone, Copy, Default)]
506pub struct DefaultChangeComparator;
507
508impl<K: Ord, C: Ord, V> ChangeComparator<K, C, V> for DefaultChangeComparator {
509  fn compare(&self, a: &Change<K, C, V>, b: &Change<K, C, V>) -> Ordering {
510    // Compare record IDs
511    match a.record_id.cmp(&b.record_id) {
512      Ordering::Equal => {}
513      ord => return ord,
514    }
515
516    // Deletions (None) come last for each record
517    match (a.col_name.as_ref(), b.col_name.as_ref()) {
518      (None, None) => {}
519      (None, Some(_)) => return Ordering::Greater,
520      (Some(_), None) => return Ordering::Less,
521      (Some(a_col), Some(b_col)) => match a_col.cmp(b_col) {
522        Ordering::Equal => {}
523        ord => return ord,
524      },
525    }
526
527    // Compare versions (descending - most recent first)
528    match b.col_version.cmp(&a.col_version) {
529      Ordering::Equal => {}
530      ord => return ord,
531    }
532
533    match b.db_version.cmp(&a.db_version) {
534      Ordering::Equal => {}
535      ord => return ord,
536    }
537
538    b.node_id.cmp(&a.node_id)
539  }
540}
541
542/// Main CRDT structure, generic over key (K), column (C), and value (V) types.
543///
544/// This implements a column-based CRDT with last-write-wins semantics.
545///
546/// # Sorted Keys Feature
547///
548/// When the `sorted-keys` feature is enabled, the internal storage uses `BTreeMap`
549/// instead of `HashMap`, enabling ordered iteration and range queries at the cost
550/// of O(log n) operations instead of O(1).
551///
552/// # Type Requirements
553///
554/// K (record key) must implement `Ord + Hash + Eq + Clone`:
555/// - `Ord` is required for potential use with sorted-keys feature (BTreeMap)
556/// - Even without sorted-keys, requiring `Ord` keeps the API consistent and enables
557///   seamless feature toggling. Most common types (String, u64, etc.) already implement Ord.
558/// - This is consistent with PersistedCRDT which also requires K: Ord for serialization.
559#[derive(Debug)]
560#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))]
561#[cfg_attr(feature = "serde", serde(bound(serialize = "K: serde::Serialize, C: serde::Serialize, V: serde::Serialize")))]
562#[cfg_attr(feature = "serde", serde(bound(deserialize = "K: serde::de::DeserializeOwned + Ord + Hash + Eq + Clone, C: serde::de::DeserializeOwned + Hash + Eq + Clone, V: serde::de::DeserializeOwned + Clone")))]
563pub struct CRDT<K: Ord + Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> {
564  node_id: NodeId,
565  clock: LogicalClock,
566  data: DataMap<K, Record<C, V>>,
567  tombstones: TombstoneStorage<K>,
568  #[cfg_attr(feature = "serde", serde(skip, default))]
569  parent: Option<Arc<CRDT<K, C, V>>>,
570  #[allow(dead_code)]
571  base_version: u64,
572}
573
574// Main implementation (works with both HashMap and BTreeMap via DataMap alias)
575impl<K: Ord + Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
576  /// Creates a new empty CRDT.
577  ///
578  /// # Arguments
579  ///
580  /// * `node_id` - Unique identifier for this CRDT node
581  /// * `parent` - Optional parent CRDT for hierarchical structures
582  pub fn new(node_id: NodeId, parent: Option<Arc<CRDT<K, C, V>>>) -> Self {
583    let (clock, base_version) = if let Some(ref p) = parent {
584      let parent_clock = p.clock;
585      let base = parent_clock.current_time();
586      (parent_clock, base)
587    } else {
588      (LogicalClock::new(), 0)
589    };
590
591    Self {
592      node_id,
593      clock,
594      data: DataMap::new(),
595      tombstones: TombstoneStorage::new(),
596      parent,
597      base_version,
598    }
599  }
600
601  /// Creates a CRDT from a list of changes (e.g., loaded from disk).
602  ///
603  /// # Arguments
604  ///
605  /// * `node_id` - The unique identifier for this CRDT node
606  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
607  pub fn from_changes(node_id: NodeId, changes: Vec<Change<K, C, V>>) -> Self {
608    let mut crdt = Self::new(node_id, None);
609    crdt.apply_changes(changes);
610    crdt
611  }
612
613  /// Resets the CRDT to a state as if it was constructed with the given changes.
614  ///
615  /// # Arguments
616  ///
617  /// * `changes` - A list of changes to apply to reconstruct the CRDT state
618  pub fn reset(&mut self, changes: Vec<Change<K, C, V>>) {
619    self.data.clear();
620    self.tombstones.clear();
621    self.clock = LogicalClock::new();
622    self.apply_changes(changes);
623  }
624
625  /// Applies a list of changes to reconstruct the CRDT state.
626  fn apply_changes(&mut self, changes: Vec<Change<K, C, V>>) {
627    // Determine the maximum db_version from the changes
628    let max_db_version = changes
629      .iter()
630      .map(|c| c.db_version.max(c.local_db_version))
631      .max()
632      .unwrap_or(0);
633
634    // Set the logical clock to the maximum db_version
635    self.clock.set_time(max_db_version);
636
637    // Apply each change to reconstruct the CRDT state
638    for change in changes {
639      let record_id = change.record_id.clone();
640      let col_name = change.col_name.clone();
641      let remote_col_version = change.col_version;
642      let remote_db_version = change.db_version;
643      let remote_node_id = change.node_id;
644      let remote_local_db_version = change.local_db_version;
645      let remote_value = change.value;
646
647      if col_name.is_none() {
648        // Handle deletion
649        self.data.remove(&record_id);
650
651        // Store deletion information in tombstones
652        self.tombstones.insert_or_assign(
653          record_id,
654          TombstoneInfo::new(remote_db_version, remote_node_id, remote_local_db_version),
655        );
656      } else if let Some(col_key) = col_name {
657        // Handle insertion or update
658        if !self.is_record_tombstoned(&record_id, false) {
659          let record = self.get_or_create_record_unchecked(&record_id, false);
660
661          // Insert or update the field value
662          if let Some(value) = remote_value {
663            record.fields.insert(col_key.clone(), value);
664          }
665
666          // Update the column version info
667          let col_ver = ColumnVersion::new(
668            remote_col_version,
669            remote_db_version,
670            remote_node_id,
671            remote_local_db_version,
672          );
673          record.column_versions.insert(col_key, col_ver);
674
675          // Update version boundaries
676          if remote_local_db_version < record.lowest_local_db_version {
677            record.lowest_local_db_version = remote_local_db_version;
678          }
679          if remote_local_db_version > record.highest_local_db_version {
680            record.highest_local_db_version = remote_local_db_version;
681          }
682        }
683      }
684    }
685  }
686
687  /// Inserts a new record or updates an existing record in the CRDT.
688  ///
689  /// # Arguments
690  ///
691  /// * `record_id` - The unique identifier for the record
692  /// * `fields` - An iterator of (column_name, value) pairs
693  ///
694  /// # Returns
695  ///
696  /// A vector of changes created by this operation
697  #[must_use = "changes should be propagated to other nodes"]
698  pub fn insert_or_update<I>(&mut self, record_id: &K, fields: I) -> Vec<Change<K, C, V>>
699  where
700    I: IntoIterator<Item = (C, V)>,
701  {
702    self.insert_or_update_with_flags(record_id, 0, fields)
703  }
704
705  /// Inserts a new record or updates an existing record with flags.
706  ///
707  /// # Arguments
708  ///
709  /// * `record_id` - The unique identifier for the record
710  /// * `flags` - Flags to indicate the type of change
711  /// * `fields` - An iterator of (column_name, value) pairs
712  ///
713  /// # Returns
714  ///
715  /// A vector of changes created by this operation
716  #[must_use = "changes should be propagated to other nodes"]
717  pub fn insert_or_update_with_flags<I>(
718    &mut self,
719    record_id: &K,
720    flags: u32,
721    fields: I,
722  ) -> Vec<Change<K, C, V>>
723  where
724    I: IntoIterator<Item = (C, V)>,
725  {
726    let db_version = self.clock.tick();
727
728    // Check if the record is tombstoned
729    if self.is_record_tombstoned(record_id, false) {
730      return Vec::new();
731    }
732
733    let mut changes = Vec::new();
734    let node_id = self.node_id; // Store node_id before mutable borrow
735    let record = self.get_or_create_record_unchecked(record_id, false);
736
737    for (col_name, value) in fields {
738      let col_version = if let Some(col_info) = record.column_versions.get_mut(&col_name) {
739        col_info.col_version += 1;
740        col_info.db_version = db_version;
741        col_info.node_id = node_id;
742        col_info.local_db_version = db_version;
743        col_info.col_version
744      } else {
745        record.column_versions.insert(
746          col_name.clone(),
747          ColumnVersion::new(1, db_version, node_id, db_version),
748        );
749        1
750      };
751
752      // Update record version boundaries
753      if db_version < record.lowest_local_db_version {
754        record.lowest_local_db_version = db_version;
755      }
756      if db_version > record.highest_local_db_version {
757        record.highest_local_db_version = db_version;
758      }
759
760      record.fields.insert(col_name.clone(), value.clone());
761      changes.push(Change::new(
762        record_id.clone(),
763        Some(col_name),
764        Some(value),
765        col_version,
766        db_version,
767        node_id,
768        db_version,
769        flags,
770      ));
771    }
772
773    changes
774  }
775
776  /// Deletes a record by marking it as tombstoned.
777  ///
778  /// # Arguments
779  ///
780  /// * `record_id` - The unique identifier for the record
781  ///
782  /// # Returns
783  ///
784  /// An optional Change representing the deletion
785  #[must_use = "changes should be propagated to other nodes"]
786  pub fn delete_record(&mut self, record_id: &K) -> Option<Change<K, C, V>> {
787    self.delete_record_with_flags(record_id, 0)
788  }
789
790  /// Deletes a record with flags.
791  ///
792  /// # Arguments
793  ///
794  /// * `record_id` - The unique identifier for the record
795  /// * `flags` - Flags to indicate the type of change
796  ///
797  /// # Returns
798  ///
799  /// An optional Change representing the deletion
800  #[must_use = "changes should be propagated to other nodes"]
801  pub fn delete_record_with_flags(&mut self, record_id: &K, flags: u32) -> Option<Change<K, C, V>> {
802    if self.is_record_tombstoned(record_id, false) {
803      return None;
804    }
805
806    let db_version = self.clock.tick();
807
808    // Mark as tombstone and remove data
809    self.data.remove(record_id);
810
811    // Store deletion information in tombstones
812    self.tombstones.insert_or_assign(
813      record_id.clone(),
814      TombstoneInfo::new(db_version, self.node_id, db_version),
815    );
816
817    Some(Change::new(
818      record_id.clone(),
819      None,
820      None,
821      TOMBSTONE_COL_VERSION,
822      db_version,
823      self.node_id,
824      db_version,
825      flags,
826    ))
827  }
828
829  /// Deletes a specific field from a record.
830  ///
831  /// # Arguments
832  ///
833  /// * `record_id` - The unique identifier for the record
834  /// * `field_name` - The name of the field to delete
835  ///
836  /// # Returns
837  ///
838  /// An optional Change representing the field deletion. Returns None if:
839  /// - The record is tombstoned
840  /// - The record doesn't exist
841  /// - The field doesn't exist in the record
842  #[must_use = "changes should be propagated to other nodes"]
843  pub fn delete_field(&mut self, record_id: &K, field_name: &C) -> Option<Change<K, C, V>> {
844    self.delete_field_with_flags(record_id, field_name, 0)
845  }
846
847  /// Deletes a specific field from a record with flags.
848  ///
849  /// # Arguments
850  ///
851  /// * `record_id` - The unique identifier for the record
852  /// * `field_name` - The name of the field to delete
853  /// * `flags` - Flags to indicate the type of change
854  ///
855  /// # Returns
856  ///
857  /// An optional Change representing the field deletion. Returns None if:
858  /// - The record is tombstoned
859  /// - The record doesn't exist
860  /// - The field doesn't exist in the record
861  #[must_use = "changes should be propagated to other nodes"]
862  pub fn delete_field_with_flags(
863    &mut self,
864    record_id: &K,
865    field_name: &C,
866    flags: u32,
867  ) -> Option<Change<K, C, V>> {
868    // Check if the record is tombstoned
869    if self.is_record_tombstoned(record_id, false) {
870      return None;
871    }
872
873    // Get the record (return None if it doesn't exist)
874    let record = self.data.get_mut(record_id)?;
875
876    // Check if the field exists
877    if !record.fields.contains_key(field_name) {
878      return None;
879    }
880
881    let db_version = self.clock.tick();
882
883    // Get or create column version and increment it
884    let col_version = if let Some(col_info) = record.column_versions.get_mut(field_name) {
885      col_info.col_version += 1;
886      col_info.db_version = db_version;
887      col_info.node_id = self.node_id;
888      col_info.local_db_version = db_version;
889      col_info.col_version
890    } else {
891      // This shouldn't happen if the field exists, but handle it gracefully
892      record.column_versions.insert(
893        field_name.clone(),
894        ColumnVersion::new(1, db_version, self.node_id, db_version),
895      );
896      1
897    };
898
899    // Update record version boundaries
900    if db_version < record.lowest_local_db_version {
901      record.lowest_local_db_version = db_version;
902    }
903    if db_version > record.highest_local_db_version {
904      record.highest_local_db_version = db_version;
905    }
906
907    // Remove the field from the record (but keep the ColumnVersion as field tombstone)
908    record.fields.remove(field_name);
909
910    Some(Change::new(
911      record_id.clone(),
912      Some(field_name.clone()),
913      None, // None value indicates field deletion
914      col_version,
915      db_version,
916      self.node_id,
917      db_version,
918      flags,
919    ))
920  }
921
922  /// Merges incoming changes into the CRDT.
923  ///
924  /// # Arguments
925  ///
926  /// * `changes` - Vector of changes to merge
927  /// * `merge_rule` - The merge rule to use for conflict resolution
928  ///
929  /// # Returns
930  ///
931  /// Vector of accepted changes (if requested)
932  pub fn merge_changes<R: MergeRule<K, C, V>>(
933    &mut self,
934    changes: Vec<Change<K, C, V>>,
935    merge_rule: &R,
936  ) -> Vec<Change<K, C, V>> {
937    self.merge_changes_impl(changes, false, merge_rule)
938  }
939
940  fn merge_changes_impl<R: MergeRule<K, C, V>>(
941    &mut self,
942    changes: Vec<Change<K, C, V>>,
943    ignore_parent: bool,
944    merge_rule: &R,
945  ) -> Vec<Change<K, C, V>> {
946    let mut accepted_changes = Vec::new();
947
948    if changes.is_empty() {
949      return accepted_changes;
950    }
951
952    for change in changes {
953      // Move values from change to avoid unnecessary clones
954      let Change {
955        record_id,
956        col_name,
957        value: remote_value,
958        col_version: remote_col_version,
959        db_version: remote_db_version,
960        node_id: remote_node_id,
961        flags,
962        ..
963      } = change;
964
965      // Always update the logical clock to maintain causal consistency
966      let new_local_db_version = self.clock.update(remote_db_version);
967
968      // Skip all changes for tombstoned records
969      if self.is_record_tombstoned(&record_id, ignore_parent) {
970        continue;
971      }
972
973      // Retrieve local column version information
974      let local_col_info = if col_name.is_none() {
975        // For deletions, check tombstones
976        self
977          .tombstones
978          .find(&record_id)
979          .map(|info| info.as_column_version())
980      } else if let Some(ref col) = col_name {
981        // For column updates, check the record
982        self
983          .get_record_ptr(&record_id, ignore_parent)
984          .and_then(|record| record.column_versions.get(col).copied())
985      } else {
986        None
987      };
988
989      // Determine whether to accept the remote change
990      let should_accept = if let Some(local_info) = local_col_info {
991        merge_rule.should_accept(
992          local_info.col_version,
993          local_info.db_version,
994          local_info.node_id,
995          remote_col_version,
996          remote_db_version,
997          remote_node_id,
998        )
999      } else {
1000        true
1001      };
1002
1003      if should_accept {
1004        if let Some(col_key) = col_name {
1005          // Handle insertion or update
1006          let record = self.get_or_create_record_unchecked(&record_id, ignore_parent);
1007
1008          // Update field value
1009          if let Some(value) = remote_value.clone() {
1010            record.fields.insert(col_key.clone(), value);
1011          } else {
1012            // If remote_value is None, remove the field
1013            record.fields.remove(&col_key);
1014          }
1015
1016          // Update the column version info and record version boundaries
1017          record.column_versions.insert(
1018            col_key.clone(),
1019            ColumnVersion::new(
1020              remote_col_version,
1021              remote_db_version,
1022              remote_node_id,
1023              new_local_db_version,
1024            ),
1025          );
1026
1027          // Update version boundaries
1028          if new_local_db_version < record.lowest_local_db_version {
1029            record.lowest_local_db_version = new_local_db_version;
1030          }
1031          if new_local_db_version > record.highest_local_db_version {
1032            record.highest_local_db_version = new_local_db_version;
1033          }
1034
1035          accepted_changes.push(Change::new(
1036            record_id,
1037            Some(col_key),
1038            remote_value,
1039            remote_col_version,
1040            remote_db_version,
1041            remote_node_id,
1042            new_local_db_version,
1043            flags,
1044          ));
1045        } else {
1046          // Handle deletion
1047          self.data.remove(&record_id);
1048
1049          // Store deletion information in tombstones
1050          self.tombstones.insert_or_assign(
1051            record_id.clone(),
1052            TombstoneInfo::new(remote_db_version, remote_node_id, new_local_db_version),
1053          );
1054
1055          accepted_changes.push(Change::new(
1056            record_id,
1057            None,
1058            None,
1059            remote_col_version,
1060            remote_db_version,
1061            remote_node_id,
1062            new_local_db_version,
1063            flags,
1064          ));
1065        }
1066      }
1067    }
1068
1069    accepted_changes
1070  }
1071
1072  /// Retrieves all changes since a given `last_db_version`.
1073  ///
1074  /// # Arguments
1075  ///
1076  /// * `last_db_version` - The database version to retrieve changes since
1077  ///
1078  /// # Returns
1079  ///
1080  /// A vector of changes
1081  #[must_use]
1082  pub fn get_changes_since(&self, last_db_version: u64) -> Vec<Change<K, C, V>>
1083  where
1084    K: Ord,
1085    C: Ord,
1086  {
1087    self.get_changes_since_excluding(last_db_version, &HashSet::new())
1088  }
1089
1090  /// Retrieves all changes since a given `last_db_version`, excluding specific nodes.
1091  pub fn get_changes_since_excluding(
1092    &self,
1093    last_db_version: u64,
1094    excluding: &HashSet<NodeId>,
1095  ) -> Vec<Change<K, C, V>>
1096  where
1097    K: Ord,
1098    C: Ord,
1099  {
1100    let mut changes = Vec::new();
1101
1102    // Get changes from parent
1103    if let Some(ref parent) = self.parent {
1104      let parent_changes = parent.get_changes_since_excluding(last_db_version, excluding);
1105      changes.extend(parent_changes);
1106    }
1107
1108    // Get changes from regular records
1109    for (record_id, record) in &self.data {
1110      // Skip records that haven't changed since last_db_version
1111      if record.highest_local_db_version <= last_db_version {
1112        continue;
1113      }
1114
1115      for (col_name, clock_info) in &record.column_versions {
1116        if clock_info.local_db_version > last_db_version && !excluding.contains(&clock_info.node_id)
1117        {
1118          let value = record.fields.get(col_name).cloned();
1119
1120          changes.push(Change::new(
1121            record_id.clone(),
1122            Some(col_name.clone()),
1123            value,
1124            clock_info.col_version,
1125            clock_info.db_version,
1126            clock_info.node_id,
1127            clock_info.local_db_version,
1128            0,
1129          ));
1130        }
1131      }
1132    }
1133
1134    // Get deletion changes from tombstones
1135    for (record_id, tombstone_info) in self.tombstones.iter() {
1136      if tombstone_info.local_db_version > last_db_version
1137        && !excluding.contains(&tombstone_info.node_id)
1138      {
1139        changes.push(Change::new(
1140          record_id.clone(),
1141          None,
1142          None,
1143          TOMBSTONE_COL_VERSION,
1144          tombstone_info.db_version,
1145          tombstone_info.node_id,
1146          tombstone_info.local_db_version,
1147          0,
1148        ));
1149      }
1150    }
1151
1152    if self.parent.is_some() {
1153      // Compress changes to remove redundant operations
1154      Self::compress_changes(&mut changes);
1155    }
1156
1157    changes
1158  }
1159
1160  /// Compresses a vector of changes in-place by removing redundant changes.
1161  ///
1162  /// Changes are sorted and then compressed using a two-pointer technique.
1163  ///
1164  /// # Performance
1165  ///
1166  /// This method uses `sort_unstable_by` which provides O(n log n) average time complexity
1167  /// but does not preserve the relative order of equal elements. Since the comparator
1168  /// provides a total ordering, stability is not required.
1169  pub fn compress_changes(changes: &mut Vec<Change<K, C, V>>)
1170  where
1171    K: Ord,
1172    C: Ord,
1173  {
1174    if changes.is_empty() {
1175      return;
1176    }
1177
1178    // Sort changes using the DefaultChangeComparator
1179    // Use sort_unstable for better performance since we don't need stable sorting
1180    let comparator = DefaultChangeComparator;
1181    changes.sort_unstable_by(|a, b| comparator.compare(a, b));
1182
1183    // Use two-pointer technique to compress in-place
1184    let mut write = 0;
1185    for read in 1..changes.len() {
1186      if changes[read].record_id != changes[write].record_id {
1187        // New record, always keep it
1188        write += 1;
1189        if write != read {
1190          changes[write] = changes[read].clone();
1191        }
1192      } else if changes[read].col_name.is_none() && changes[write].col_name.is_some() {
1193        // Current read is a deletion, backtrack to first change for this record
1194        // and replace it with the deletion, effectively discarding all field updates
1195        let mut first_pos = write;
1196        while first_pos > 0 && changes[first_pos - 1].record_id == changes[read].record_id {
1197          first_pos -= 1;
1198        }
1199        changes[first_pos] = changes[read].clone();
1200        write = first_pos;
1201      } else if changes[read].col_name != changes[write].col_name
1202        && changes[write].col_name.is_some()
1203      {
1204        // New column for the same record
1205        write += 1;
1206        if write != read {
1207          changes[write] = changes[read].clone();
1208        }
1209      }
1210      // Else: same record and column, keep the existing one (most recent due to sorting)
1211    }
1212
1213    changes.truncate(write + 1);
1214  }
1215
1216  /// Retrieves a reference to a record if it exists.
1217  pub fn get_record(&self, record_id: &K) -> Option<&Record<C, V>> {
1218    self.get_record_ptr(record_id, false)
1219  }
1220
1221  /// Checks if a record is tombstoned.
1222  pub fn is_tombstoned(&self, record_id: &K) -> bool {
1223    self.is_record_tombstoned(record_id, false)
1224  }
1225
1226  /// Gets tombstone information for a record.
1227  pub fn get_tombstone(&self, record_id: &K) -> Option<TombstoneInfo> {
1228    if let Some(info) = self.tombstones.find(record_id) {
1229      return Some(info);
1230    }
1231
1232    if let Some(ref parent) = self.parent {
1233      return parent.get_tombstone(record_id);
1234    }
1235
1236    None
1237  }
1238
1239  /// Removes tombstones older than the specified version.
1240  ///
1241  /// # Safety and DoS Mitigation
1242  ///
1243  /// **IMPORTANT**: Only call this method when ALL participating nodes have acknowledged
1244  /// the `min_acknowledged_version`. Compacting too early may cause deleted records to
1245  /// reappear on nodes that haven't received the deletion yet.
1246  ///
1247  /// To prevent DoS via tombstone accumulation:
1248  /// - Call this method periodically as part of your sync protocol
1249  /// - Track which versions have been acknowledged by all nodes
1250  /// - Consider implementing a tombstone limit and rejecting operations when exceeded
1251  ///
1252  /// # Arguments
1253  ///
1254  /// * `min_acknowledged_version` - Tombstones with db_version < this value will be removed
1255  ///
1256  /// # Returns
1257  ///
1258  /// The number of tombstones removed
1259  pub fn compact_tombstones(&mut self, min_acknowledged_version: u64) -> usize {
1260    self.tombstones.compact(min_acknowledged_version)
1261  }
1262
1263  /// Gets the number of tombstones currently stored.
1264  pub fn tombstone_count(&self) -> usize {
1265    self.tombstones.len()
1266  }
1267
1268  /// Gets the current logical clock.
1269  pub fn get_clock(&self) -> &LogicalClock {
1270    &self.clock
1271  }
1272
1273  /// Gets a reference to the internal data map.
1274  pub fn get_data(&self) -> &DataMap<K, Record<C, V>> {
1275    &self.data
1276  }
1277
1278  /// Serializes the CRDT to a JSON string.
1279  ///
1280  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1281  ///
1282  /// # Errors
1283  ///
1284  /// Returns an error if serialization fails.
1285  #[cfg(feature = "json")]
1286  pub fn to_json(&self) -> Result<String, serde_json::Error>
1287  where
1288    K: serde::Serialize,
1289    C: serde::Serialize,
1290    V: serde::Serialize,
1291  {
1292    serde_json::to_string(self)
1293  }
1294
1295  /// Deserializes a CRDT from a JSON string.
1296  ///
1297  /// Note: The parent relationship is not deserialized and will be `None`.
1298  /// Applications must rebuild parent-child relationships if needed.
1299  ///
1300  /// # Errors
1301  ///
1302  /// Returns an error if deserialization fails.
1303  #[cfg(feature = "json")]
1304  pub fn from_json(json: &str) -> Result<Self, serde_json::Error>
1305  where
1306    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1307    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1308    V: serde::de::DeserializeOwned + Clone,
1309  {
1310    serde_json::from_str(json)
1311  }
1312
1313  /// Serializes the CRDT to bytes using bincode.
1314  ///
1315  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1316  ///
1317  /// # Errors
1318  ///
1319  /// Returns an error if serialization fails.
1320  #[cfg(feature = "binary")]
1321  pub fn to_bytes(&self) -> Result<Vec<u8>, bincode::error::EncodeError>
1322  where
1323    K: serde::Serialize,
1324    C: serde::Serialize,
1325    V: serde::Serialize,
1326  {
1327    bincode::serde::encode_to_vec(self, bincode::config::standard())
1328  }
1329
1330  /// Deserializes a CRDT from bytes using bincode.
1331  ///
1332  /// Note: The parent relationship is not deserialized and will be `None`.
1333  /// Applications must rebuild parent-child relationships if needed.
1334  ///
1335  /// # Errors
1336  ///
1337  /// Returns an error if deserialization fails.
1338  #[cfg(feature = "binary")]
1339  pub fn from_bytes(bytes: &[u8]) -> Result<Self, bincode::error::DecodeError>
1340  where
1341    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1342    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1343    V: serde::de::DeserializeOwned + Clone,
1344  {
1345    let (result, _len) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
1346    Ok(result)
1347  }
1348
1349  /// Serializes the CRDT to bytes using MessagePack.
1350  ///
1351  /// MessagePack supports schema evolution - fields can be added with `#[serde(default)]`
1352  /// without breaking compatibility with older snapshots.
1353  ///
1354  /// Note: The parent relationship is not serialized and must be rebuilt after deserialization.
1355  ///
1356  /// # Errors
1357  ///
1358  /// Returns an error if serialization fails.
1359  #[cfg(feature = "msgpack")]
1360  pub fn to_msgpack_bytes(&self) -> Result<Vec<u8>, rmp_serde::encode::Error>
1361  where
1362    K: serde::Serialize,
1363    C: serde::Serialize,
1364    V: serde::Serialize,
1365  {
1366    rmp_serde::to_vec(self)
1367  }
1368
1369  /// Deserializes a CRDT from MessagePack bytes.
1370  ///
1371  /// MessagePack supports schema evolution - older snapshots can be loaded even if
1372  /// the CRDT structure has new fields with `#[serde(default)]`.
1373  ///
1374  /// Note: The parent relationship is not deserialized and will be `None`.
1375  /// Applications must rebuild parent-child relationships if needed.
1376  ///
1377  /// # Errors
1378  ///
1379  /// Returns an error if deserialization fails.
1380  #[cfg(feature = "msgpack")]
1381  pub fn from_msgpack_bytes(bytes: &[u8]) -> Result<Self, rmp_serde::decode::Error>
1382  where
1383    K: serde::de::DeserializeOwned + Hash + Eq + Clone,
1384    C: serde::de::DeserializeOwned + Hash + Eq + Clone,
1385    V: serde::de::DeserializeOwned + Clone,
1386  {
1387    rmp_serde::from_slice(bytes)
1388  }
1389
1390  /// Gets records and tombstones that have changed since a specific version.
1391  ///
1392  /// This is used for creating incremental snapshots, which only contain
1393  /// records that have been modified since the base snapshot.
1394  ///
1395  /// # Arguments
1396  ///
1397  /// * `since_version` - Only return changes after this db_version
1398  ///
1399  /// # Returns
1400  ///
1401  /// Tuple of (changed_records, new_tombstones)
1402  ///
1403  /// # Example
1404  ///
1405  /// ```ignore
1406  /// // Get all changes since version 1000
1407  /// let (records, tombstones) = crdt.get_changed_since(1000);
1408  /// // records contains only records modified after version 1000
1409  /// // tombstones contains only records deleted after version 1000
1410  /// ```
1411  #[cfg(feature = "std")]
1412  pub fn get_changed_since(&self, since_version: u64) -> (
1413    DataMap<K, Record<C, V>>,
1414    HashMap<K, TombstoneInfo>,
1415  ) {
1416    let records = self.data
1417      .iter()
1418      .filter(|(_, record)| record.highest_local_db_version > since_version)
1419      .map(|(k, v)| (k.clone(), v.clone()))
1420      .collect();
1421
1422    let tombstones = self.tombstones
1423      .iter()
1424      .filter(|(_, info)| info.local_db_version > since_version)
1425      .map(|(k, v)| (k.clone(), *v))
1426      .collect();
1427
1428    (records, tombstones)
1429  }
1430
1431  /// Gets records and tombstones that have changed since a specific version (no_std version).
1432  #[cfg(not(feature = "std"))]
1433  pub fn get_changed_since(&self, since_version: u64) -> (
1434    DataMap<K, Record<C, V>>,
1435    HashMap<K, TombstoneInfo>,
1436  ) {
1437    let records = self.data
1438      .iter()
1439      .filter(|(_, record)| record.highest_local_db_version > since_version)
1440      .map(|(k, v)| (k.clone(), v.clone()))
1441      .collect();
1442
1443    let tombstones = self.tombstones
1444      .iter()
1445      .filter(|(_, info)| info.local_db_version > since_version)
1446      .map(|(k, v)| (k.clone(), *v))
1447      .collect();
1448
1449    (records, tombstones)
1450  }
1451
1452  // Helper methods
1453
1454  fn is_record_tombstoned(&self, record_id: &K, ignore_parent: bool) -> bool {
1455    if self.tombstones.find(record_id).is_some() {
1456      return true;
1457    }
1458
1459    if !ignore_parent {
1460      if let Some(ref parent) = self.parent {
1461        return parent.is_record_tombstoned(record_id, false);
1462      }
1463    }
1464
1465    false
1466  }
1467
1468  fn get_or_create_record_unchecked(
1469    &mut self,
1470    record_id: &K,
1471    ignore_parent: bool,
1472  ) -> &mut Record<C, V> {
1473    match self.data.entry(record_id.clone()) {
1474      DataMapEntry::Occupied(e) => e.into_mut(),
1475      DataMapEntry::Vacant(e) => {
1476        let record = if !ignore_parent {
1477          self
1478            .parent
1479            .as_ref()
1480            .and_then(|p| p.get_record_ptr(record_id, false))
1481            .cloned()
1482            .unwrap_or_else(Record::new)
1483        } else {
1484          Record::new()
1485        };
1486        e.insert(record)
1487      }
1488    }
1489  }
1490
1491  fn get_record_ptr(&self, record_id: &K, ignore_parent: bool) -> Option<&Record<C, V>> {
1492    if let Some(record) = self.data.get(record_id) {
1493      return Some(record);
1494    }
1495
1496    if !ignore_parent {
1497      if let Some(ref parent) = self.parent {
1498        return parent.get_record_ptr(record_id, false);
1499      }
1500    }
1501
1502    None
1503  }
1504}
1505
1506// Additional methods requiring Ord (sorted-keys feature only)
1507#[cfg(feature = "sorted-keys")]
1508impl<K: Ord + Hash + Eq + Clone, C: Hash + Eq + Clone, V: Clone> CRDT<K, C, V> {
1509  /// Query records within a specific key range (only available with sorted-keys feature).
1510  ///
1511  /// This method leverages BTreeMap's range query capability to efficiently retrieve
1512  /// all records whose keys fall within the specified range.
1513  ///
1514  /// **IMPORTANT**: This method only queries the local CRDT's data. If this CRDT has a parent,
1515  /// parent records are NOT included in the range query results. For parent-aware queries,
1516  /// iterate over the full range and use `get_record()` for each key.
1517  ///
1518  /// # Arguments
1519  ///
1520  /// * `range` - A range of keys to query (e.g., `"session-abc-".."session-abd-"`)
1521  ///
1522  /// # Returns
1523  ///
1524  /// An iterator over key-value pairs within the range (local records only)
1525  ///
1526  /// # Example
1527  ///
1528  /// ```ignore
1529  /// // Get all records for a specific session
1530  /// for (key, record) in crdt.range("session-abc-".."session-abd-") {
1531  ///     println!("Found record: {:?}", record);
1532  /// }
1533  ///
1534  /// // Get all records with a specific prefix
1535  /// for (key, record) in crdt.range("user-".."user/") {
1536  ///     // Keys starting with "user-" (using "user/" as exclusive upper bound)
1537  /// }
1538  /// ```
1539  pub fn range<R>(&self, range: R) -> impl Iterator<Item = (&K, &Record<C, V>)>
1540  where
1541    R: core::ops::RangeBounds<K>,
1542  {
1543    self.data.range(range)
1544  }
1545}
1546
1547#[cfg(test)]
1548mod tests {
1549  use super::*;
1550
1551  #[cfg(not(feature = "std"))]
1552  use alloc::{string::ToString, vec};
1553
1554  #[test]
1555  fn test_logical_clock() {
1556    let mut clock = LogicalClock::new();
1557    assert_eq!(clock.current_time(), 0);
1558
1559    let t1 = clock.tick();
1560    assert_eq!(t1, 1);
1561    assert_eq!(clock.current_time(), 1);
1562
1563    let t2 = clock.update(5);
1564    assert_eq!(t2, 6);
1565    assert_eq!(clock.current_time(), 6);
1566  }
1567
1568  #[test]
1569  fn test_tombstone_storage() {
1570    let mut storage = TombstoneStorage::new();
1571    let info = TombstoneInfo::new(10, 1, 10);
1572
1573    storage.insert_or_assign("key1".to_string(), info);
1574    assert_eq!(storage.len(), 1);
1575
1576    assert_eq!(storage.find(&"key1".to_string()), Some(info));
1577    assert_eq!(storage.find(&"key2".to_string()), None);
1578
1579    let removed = storage.compact(15);
1580    assert_eq!(removed, 1);
1581    assert_eq!(storage.len(), 0);
1582  }
1583
1584  #[test]
1585  fn test_basic_insert() {
1586    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1587
1588    let fields = vec![
1589      ("name".to_string(), "Alice".to_string()),
1590      ("age".to_string(), "30".to_string()),
1591    ];
1592
1593    let changes = crdt.insert_or_update(&"user1".to_string(), fields);
1594
1595    assert_eq!(changes.len(), 2);
1596    assert_eq!(crdt.get_data().len(), 1);
1597
1598    let record = crdt.get_record(&"user1".to_string()).unwrap();
1599    assert_eq!(record.fields.get("name").unwrap(), "Alice");
1600    assert_eq!(record.fields.get("age").unwrap(), "30");
1601  }
1602
1603  #[test]
1604  fn test_delete_record() {
1605    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1606
1607    let fields = vec![("name".to_string(), "Bob".to_string())];
1608    let _ = crdt.insert_or_update(&"user2".to_string(), fields);
1609
1610    let delete_change = crdt.delete_record(&"user2".to_string());
1611    assert!(delete_change.is_some());
1612    assert!(crdt.is_tombstoned(&"user2".to_string()));
1613    assert_eq!(crdt.get_data().len(), 0);
1614  }
1615
1616  #[test]
1617  fn test_merge_changes() {
1618    let mut crdt1: CRDT<String, String, String> = CRDT::new(1, None);
1619    let mut crdt2: CRDT<String, String, String> = CRDT::new(2, None);
1620
1621    let fields1 = vec![("tag".to_string(), "Node1".to_string())];
1622    let changes1 = crdt1.insert_or_update(&"record1".to_string(), fields1);
1623
1624    let fields2 = vec![("tag".to_string(), "Node2".to_string())];
1625    let changes2 = crdt2.insert_or_update(&"record1".to_string(), fields2);
1626
1627    let merge_rule = DefaultMergeRule;
1628    crdt1.merge_changes(changes2, &merge_rule);
1629    crdt2.merge_changes(changes1, &merge_rule);
1630
1631    // Node2 has higher node_id, so its value should win
1632    assert_eq!(
1633      crdt1
1634        .get_record(&"record1".to_string())
1635        .unwrap()
1636        .fields
1637        .get("tag")
1638        .unwrap(),
1639      "Node2"
1640    );
1641    assert_eq!(crdt1.get_data(), crdt2.get_data());
1642  }
1643
1644  #[test]
1645  #[cfg(feature = "serde")]
1646  fn test_change_serialization() {
1647    #[allow(unused_variables)]
1648    let change = Change::new(
1649      "record1".to_string(),
1650      Some("name".to_string()),
1651      Some("Alice".to_string()),
1652      1,
1653      10,
1654      1,
1655      10,
1656      0,
1657    );
1658
1659    // Test JSON serialization
1660    #[cfg(feature = "json")]
1661    {
1662      let json = serde_json::to_string(&change).unwrap();
1663      let deserialized: Change<String, String, String> = serde_json::from_str(&json).unwrap();
1664      assert_eq!(change, deserialized);
1665    }
1666
1667    // Test binary serialization
1668    #[cfg(feature = "binary")]
1669    {
1670      let bytes = bincode::serde::encode_to_vec(&change, bincode::config::standard()).unwrap();
1671      let (deserialized, _): (Change<String, String, String>, _) =
1672        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1673      assert_eq!(change, deserialized);
1674    }
1675  }
1676
1677  #[test]
1678  #[cfg(feature = "serde")]
1679  fn test_record_serialization() {
1680    let mut fields = HashMap::new();
1681    fields.insert("name".to_string(), "Bob".to_string());
1682    fields.insert("age".to_string(), "25".to_string());
1683
1684    let mut column_versions = HashMap::new();
1685    column_versions.insert("name".to_string(), ColumnVersion::new(1, 10, 1, 10));
1686    column_versions.insert("age".to_string(), ColumnVersion::new(1, 11, 1, 11));
1687
1688    #[allow(unused_variables)]
1689    let record = Record::from_parts(fields, column_versions);
1690
1691    // Test JSON serialization
1692    #[cfg(feature = "json")]
1693    {
1694      let json = serde_json::to_string(&record).unwrap();
1695      let deserialized: Record<String, String> = serde_json::from_str(&json).unwrap();
1696      assert_eq!(record, deserialized);
1697    }
1698
1699    // Test binary serialization
1700    #[cfg(feature = "binary")]
1701    {
1702      let bytes = bincode::serde::encode_to_vec(&record, bincode::config::standard()).unwrap();
1703      let (deserialized, _): (Record<String, String>, _) =
1704        bincode::serde::decode_from_slice(&bytes, bincode::config::standard()).unwrap();
1705      assert_eq!(record, deserialized);
1706    }
1707  }
1708
1709  #[test]
1710  #[cfg(feature = "json")]
1711  fn test_crdt_json_serialization() {
1712    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1713
1714    // Add some data
1715    let fields = vec![
1716      ("name".to_string(), "Alice".to_string()),
1717      ("age".to_string(), "30".to_string()),
1718    ];
1719    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1720
1721    let fields2 = vec![("name".to_string(), "Bob".to_string())];
1722    let _ = crdt.insert_or_update(&"user2".to_string(), fields2);
1723
1724    // Delete one record
1725    let _ = crdt.delete_record(&"user2".to_string());
1726
1727    // Serialize to JSON
1728    let json = crdt.to_json().unwrap();
1729
1730    // Deserialize
1731    let deserialized: CRDT<String, String, String> = CRDT::from_json(&json).unwrap();
1732
1733    // Verify data is preserved
1734    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1735    assert_eq!(
1736      crdt.get_record(&"user1".to_string()).unwrap().fields,
1737      deserialized.get_record(&"user1".to_string()).unwrap().fields
1738    );
1739
1740    // Verify tombstones are preserved
1741    assert_eq!(crdt.tombstone_count(), deserialized.tombstone_count());
1742    assert!(deserialized.is_tombstoned(&"user2".to_string()));
1743
1744    // Verify clock is preserved
1745    assert_eq!(
1746      crdt.get_clock().current_time(),
1747      deserialized.get_clock().current_time()
1748    );
1749
1750    // Verify parent is None after deserialization
1751    let has_parent = deserialized.parent.is_some();
1752    assert!(!has_parent);
1753  }
1754
1755  #[test]
1756  #[cfg(feature = "binary")]
1757  fn test_crdt_binary_serialization() {
1758    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1759
1760    // Add some data
1761    let fields = vec![
1762      ("name".to_string(), "Alice".to_string()),
1763      ("age".to_string(), "30".to_string()),
1764    ];
1765    let _ = crdt.insert_or_update(&"user1".to_string(), fields);
1766
1767    // Serialize to bytes
1768    let bytes = crdt.to_bytes().unwrap();
1769
1770    // Deserialize
1771    let deserialized: CRDT<String, String, String> = CRDT::from_bytes(&bytes).unwrap();
1772
1773    // Verify data is preserved
1774    assert_eq!(crdt.get_data().len(), deserialized.get_data().len());
1775    assert_eq!(
1776      crdt.get_record(&"user1".to_string()).unwrap().fields,
1777      deserialized.get_record(&"user1".to_string()).unwrap().fields
1778    );
1779
1780    // Verify clock is preserved
1781    assert_eq!(
1782      crdt.get_clock().current_time(),
1783      deserialized.get_clock().current_time()
1784    );
1785  }
1786
1787  #[test]
1788  #[cfg(feature = "serde")]
1789  fn test_parent_not_serialized() {
1790    // Create a parent CRDT
1791    let mut parent: CRDT<String, String, String> = CRDT::new(1, None);
1792    let fields = vec![("parent_field".to_string(), "parent_value".to_string())];
1793    let _ = parent.insert_or_update(&"parent_record".to_string(), fields);
1794
1795    // Create a child CRDT with parent
1796    let parent_arc = Arc::new(parent);
1797    let mut child = CRDT::new(2, Some(parent_arc.clone()));
1798    let child_fields = vec![("child_field".to_string(), "child_value".to_string())];
1799    let _ = child.insert_or_update(&"child_record".to_string(), child_fields);
1800
1801    // Serialize and deserialize the child
1802    #[cfg(feature = "json")]
1803    {
1804      let json = serde_json::to_string(&child).unwrap();
1805      let deserialized: CRDT<String, String, String> = serde_json::from_str(&json).unwrap();
1806
1807      // Verify parent is None
1808      assert!(deserialized.parent.is_none());
1809
1810      // Verify child's own data is preserved
1811      assert!(deserialized.get_record(&"child_record".to_string()).is_some());
1812
1813      // Verify parent's data is NOT in deserialized child
1814      assert!(deserialized.get_record(&"parent_record".to_string()).is_none());
1815    }
1816  }
1817
1818  #[test]
1819  #[cfg(feature = "sorted-keys")]
1820  fn test_sorted_keys_range_queries() {
1821    let mut crdt: CRDT<String, String, String> = CRDT::new(1, None);
1822
1823    // Insert records with composite keys
1824    let _ = crdt.insert_or_update(
1825      &String::from("session-abc-001"),
1826      vec![(String::from("data"), String::from("first"))],
1827    );
1828    let _ = crdt.insert_or_update(
1829      &String::from("session-abc-002"),
1830      vec![(String::from("data"), String::from("second"))],
1831    );
1832    let _ = crdt.insert_or_update(
1833      &String::from("session-abc-003"),
1834      vec![(String::from("data"), String::from("third"))],
1835    );
1836    let _ = crdt.insert_or_update(
1837      &String::from("session-xyz-001"),
1838      vec![(String::from("data"), String::from("other"))],
1839    );
1840    let _ = crdt.insert_or_update(
1841      &String::from("user-001"),
1842      vec![(String::from("name"), String::from("Alice"))],
1843    );
1844
1845    // Test range query for session-abc records
1846    let session_abc_records: Vec<_> = crdt
1847      .range(String::from("session-abc-")..String::from("session-abd-"))
1848      .collect();
1849
1850    assert_eq!(session_abc_records.len(), 3);
1851    assert!(session_abc_records
1852      .iter()
1853      .all(|(k, _)| k.starts_with("session-abc-")));
1854
1855    // Test that we can iterate in sorted order
1856    let all_keys: Vec<String> = crdt.get_data().keys().cloned().collect();
1857    let mut sorted_keys = all_keys.clone();
1858    sorted_keys.sort();
1859    assert_eq!(all_keys, sorted_keys, "Keys should be in sorted order");
1860
1861    // Test range query boundaries
1862    let range_from_user: Vec<_> = crdt.range(String::from("user-")..).collect();
1863    assert_eq!(range_from_user.len(), 1);
1864    assert_eq!(range_from_user[0].0, "user-001");
1865  }
1866}