pollen-crdt 0.1.0

CRDT synchronization for Pollen
Documentation
//! CRDT synchronization for Pollen.
//!
//! Provides eventually consistent distributed state using
//! Conflict-free Replicated Data Types.
//!
//! # CRDT Types
//!
//! This crate provides several CRDT types:
//!
//! - [`LwwRegister<T>`] - Last-Write-Wins register for single values
//! - [`OrSet<T>`] - Observed-Remove Set for distributed set operations
//! - [`GCounter`] - Grow-only counter for distributed counting
//! - [`PnCounter`] - Positive-Negative counter (can increment/decrement)
//! - [`MvRegister<T>`] - Multi-value register for concurrent writes

mod merkle;
mod store;
mod sync;
mod types;

pub use merkle::MerkleTree;
pub use store::{CrdtStore, LwwRegister};
pub use sync::CrdtSyncService;
pub use types::{GCounter, MvRegister, OrSet, PnCounter};

use async_trait::async_trait;
use bytes::Bytes;
use pollen_types::{NodeId, Result};
use serde::{de::DeserializeOwned, Serialize};
use std::sync::Arc;
use tokio::sync::broadcast;

/// Marker trait for CRDT-compatible values.
pub trait CrdtValue: Serialize + DeserializeOwned + Clone + Send + Sync + 'static {
    /// Merge another value into this one.
    fn merge(&mut self, other: &Self);
}

/// CRDT key-value store trait.
#[async_trait]
pub trait CrdtKv: Send + Sync + 'static {
    /// Get a value.
    fn get<T: CrdtValue>(&self, key: &str) -> Option<T>;

    /// Set a value (with merge semantics).
    async fn set<T: CrdtValue>(&self, key: &str, value: T) -> Result<()>;

    /// Delete a value.
    async fn delete(&self, key: &str) -> Result<()>;

    /// Subscribe to changes.
    fn subscribe(&self, prefix: &str) -> broadcast::Receiver<CrdtEvent>;

    /// Force sync with a specific peer.
    async fn sync_with(&self, peer: NodeId) -> Result<()>;

    /// Get all keys.
    fn keys(&self) -> Vec<String>;

    /// Get all keys with a prefix.
    fn keys_with_prefix(&self, prefix: &str) -> Vec<String>;
}

/// CRDT change event.
#[derive(Clone, Debug)]
pub enum CrdtEvent {
    /// A key was updated.
    Updated { key: String },
    /// A key was deleted.
    Deleted { key: String },
}

/// Shared CRDT store.
pub type SharedCrdtKv = Arc<dyn CrdtKv>;

/// CRDT entry with metadata.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct CrdtEntry {
    /// The key.
    pub key: String,
    /// CRDT type identifier.
    pub crdt_type: String,
    /// Serialized value.
    pub data: Bytes,
    /// HLC timestamp.
    pub timestamp: u64,
    /// Whether this entry is a tombstone.
    pub deleted: bool,
}

impl CrdtEntry {
    /// Create a new entry.
    pub fn new(key: String, crdt_type: String, data: Bytes, timestamp: u64) -> Self {
        Self {
            key,
            crdt_type,
            data,
            timestamp,
            deleted: false,
        }
    }

    /// Create a tombstone entry.
    pub fn tombstone(key: String, timestamp: u64) -> Self {
        Self {
            key,
            crdt_type: "tombstone".to_string(),
            data: Bytes::new(),
            timestamp,
            deleted: true,
        }
    }
}