Skip to main content

pollen_crdt/
lib.rs

1//! CRDT synchronization for Pollen.
2//!
3//! Provides eventually consistent distributed state using
4//! Conflict-free Replicated Data Types.
5//!
6//! # CRDT Types
7//!
8//! This crate provides several CRDT types:
9//!
10//! - [`LwwRegister<T>`] - Last-Write-Wins register for single values
11//! - [`OrSet<T>`] - Observed-Remove Set for distributed set operations
12//! - [`GCounter`] - Grow-only counter for distributed counting
13//! - [`PnCounter`] - Positive-Negative counter (can increment/decrement)
14//! - [`MvRegister<T>`] - Multi-value register for concurrent writes
15
16mod merkle;
17mod store;
18mod sync;
19mod types;
20
21pub use merkle::MerkleTree;
22pub use store::{CrdtStore, LwwRegister};
23pub use sync::CrdtSyncService;
24pub use types::{GCounter, MvRegister, OrSet, PnCounter};
25
26use async_trait::async_trait;
27use bytes::Bytes;
28use pollen_types::{NodeId, Result};
29use serde::{de::DeserializeOwned, Serialize};
30use std::sync::Arc;
31use tokio::sync::broadcast;
32
33/// Marker trait for CRDT-compatible values.
34pub trait CrdtValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static {
35    /// Merge another value into this one.
36    fn merge(&mut self, other: &Self);
37}
38
39/// CRDT key-value store trait.
40#[async_trait]
41pub trait CrdtKv: Send + Sync + 'static {
42    /// Get a value.
43    fn get<T: CrdtValue>(&self, key: &str) -> Option<T>;
44
45    /// Set a value (with merge semantics).
46    async fn set<T: CrdtValue>(&self, key: &str, value: T) -> Result<()>;
47
48    /// Delete a value.
49    async fn delete(&self, key: &str) -> Result<()>;
50
51    /// Subscribe to changes.
52    fn subscribe(&self, prefix: &str) -> broadcast::Receiver<CrdtEvent>;
53
54    /// Force sync with a specific peer.
55    async fn sync_with(&self, peer: NodeId) -> Result<()>;
56
57    /// Get all keys.
58    fn keys(&self) -> Vec<String>;
59
60    /// Get all keys with a prefix.
61    fn keys_with_prefix(&self, prefix: &str) -> Vec<String>;
62}
63
64/// CRDT change event.
65#[derive(Clone, Debug)]
66pub enum CrdtEvent {
67    /// A key was updated.
68    Updated { key: String },
69    /// A key was deleted.
70    Deleted { key: String },
71}
72
73/// Shared CRDT store.
74pub type SharedCrdtKv = Arc<dyn CrdtKv>;
75
76/// CRDT entry with metadata.
77#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
78pub struct CrdtEntry {
79    /// The key.
80    pub key: String,
81    /// CRDT type identifier.
82    pub crdt_type: String,
83    /// Serialized value.
84    pub data: Bytes,
85    /// HLC timestamp.
86    pub timestamp: u64,
87    /// Whether this entry is a tombstone.
88    pub deleted: bool,
89}
90
91impl CrdtEntry {
92    /// Create a new entry.
93    pub fn new(key: String, crdt_type: String, data: Bytes, timestamp: u64) -> Self {
94        Self {
95            key,
96            crdt_type,
97            data,
98            timestamp,
99            deleted: false,
100        }
101    }
102
103    /// Create a tombstone entry.
104    pub fn tombstone(key: String, timestamp: u64) -> Self {
105        Self {
106            key,
107            crdt_type: "tombstone".to_string(),
108            data: Bytes::new(),
109            timestamp,
110            deleted: true,
111        }
112    }
113}