pollen-crdt 0.1.0

CRDT synchronization for Pollen
Documentation
//! CRDT key-value store implementation.

use crate::{merkle::MerkleTree, CrdtEntry, CrdtEvent, CrdtKv, CrdtValue};
use async_trait::async_trait;
use bytes::Bytes;
use dashmap::DashMap;
use parking_lot::RwLock;
use pollen_clock::SharedClock;
use pollen_store::StoreBackend;
use pollen_types::{NodeId, Result};
use std::sync::Arc;
use tokio::sync::broadcast;
use tracing::info;

/// CRDT key-value store.
pub struct CrdtStore {
    /// Local node ID.
    #[allow(dead_code)] // Used for node identification in distributed sync
    node_id: NodeId,
    /// Clock for timestamps.
    clock: SharedClock,
    /// Persistent storage.
    store: Arc<StoreBackend>,
    /// In-memory state.
    state: DashMap<String, CrdtEntry>,
    /// Merkle tree for anti-entropy.
    merkle: RwLock<MerkleTree>,
    /// Event broadcaster.
    event_tx: broadcast::Sender<CrdtEvent>,
    /// Prefix-specific subscribers.
    prefix_subscribers: DashMap<String, broadcast::Sender<CrdtEvent>>,
}

impl CrdtStore {
    /// Create a new CRDT store.
    pub fn new(node_id: NodeId, clock: SharedClock, store: Arc<StoreBackend>) -> Self {
        let (event_tx, _) = broadcast::channel(1000);

        Self {
            node_id,
            clock,
            store,
            state: DashMap::new(),
            merkle: RwLock::new(MerkleTree::new()),
            event_tx,
            prefix_subscribers: DashMap::new(),
        }
    }

    /// Load state from persistent storage.
    pub async fn load(&self) -> Result<()> {
        // Load CRDT snapshots from store
        let keys = self.store.read(|_r| {
            // This is a simplified version - in production you'd iterate over snapshots
            Ok(Vec::<String>::new())
        }).await?;

        for key in keys {
            let key_clone = key.clone();
            if let Some(data) = self.store.read(move |r| r.get_crdt_snapshot(&key_clone)).await? {
                if let Ok(entry) = bincode::deserialize::<CrdtEntry>(&data) {
                    self.state.insert(key.clone(), entry.clone());
                    self.merkle.write().insert(&key, &entry.data);
                }
            }
        }

        info!("Loaded {} CRDT entries from storage", self.state.len());
        Ok(())
    }

    /// Apply a remote delta.
    pub async fn apply_delta(&self, entry: CrdtEntry) -> Result<bool> {
        let key = entry.key.clone();
        let mut changed = false;

        // Check if we should apply this update (LWW semantics with delete-wins tie-breaker)
        let should_apply = match self.state.get(&key) {
            Some(existing) => {
                if entry.timestamp > existing.timestamp {
                    true
                } else if entry.timestamp == existing.timestamp {
                    // Same timestamp: delete wins (or if both same state, no change needed)
                    entry.deleted && !existing.deleted
                } else {
                    false
                }
            }
            None => true,
        };

        if should_apply {
            // Update in-memory state
            self.state.insert(key.clone(), entry.clone());

            // Update Merkle tree
            if entry.deleted {
                self.merkle.write().remove(&key);
            } else {
                self.merkle.write().insert(&key, &entry.data);
            }

            // Persist to storage
            let data = bincode::serialize(&entry)?;
            let key_for_storage = key.clone();
            let timestamp = entry.timestamp;
            self.store.write(move |w| {
                w.save_crdt_snapshot(&key_for_storage, &data, timestamp)
            }).await?;

            // Notify subscribers
            let event = if entry.deleted {
                CrdtEvent::Deleted { key: key.clone() }
            } else {
                CrdtEvent::Updated { key: key.clone() }
            };

            let _ = self.event_tx.send(event.clone());

            // Notify prefix subscribers
            for sub in self.prefix_subscribers.iter() {
                if key.starts_with(sub.key()) {
                    let _ = sub.value().send(event.clone());
                }
            }

            changed = true;
        }

        Ok(changed)
    }

    /// Get the Merkle tree root hash.
    pub fn merkle_root(&self) -> Bytes {
        self.merkle.read().root_hash()
    }

    /// Get Merkle tree level hashes.
    pub fn merkle_level(&self, level: usize) -> Vec<(String, Bytes)> {
        self.merkle.read().level_hashes(level)
    }

    /// Get entries in a key range.
    pub fn entries_in_range(&self, start: &str, end: &str) -> Vec<CrdtEntry> {
        self.state
            .iter()
            .filter(|e| e.key().as_str() >= start && e.key().as_str() < end)
            .map(|e| e.value().clone())
            .collect()
    }

    /// Get all entries.
    pub fn all_entries(&self) -> Vec<CrdtEntry> {
        self.state.iter().map(|e| e.value().clone()).collect()
    }
}

#[async_trait]
impl CrdtKv for CrdtStore {
    fn get<T: CrdtValue>(&self, key: &str) -> Option<T> {
        self.state.get(key).and_then(|entry| {
            if entry.deleted {
                None
            } else {
                bincode::deserialize(&entry.data).ok()
            }
        })
    }

    async fn set<T: CrdtValue>(&self, key: &str, value: T) -> Result<()> {
        let timestamp = self.clock.now();
        let data = Bytes::from(bincode::serialize(&value)?);

        let entry = CrdtEntry {
            key: key.to_string(),
            crdt_type: std::any::type_name::<T>().to_string(),
            data,
            timestamp: timestamp.as_u128() as u64,
            deleted: false,
        };

        self.apply_delta(entry).await?;
        Ok(())
    }

    async fn delete(&self, key: &str) -> Result<()> {
        let timestamp = self.clock.now();
        let entry = CrdtEntry::tombstone(key.to_string(), timestamp.as_u128() as u64);
        self.apply_delta(entry).await?;
        Ok(())
    }

    fn subscribe(&self, prefix: &str) -> broadcast::Receiver<CrdtEvent> {
        if prefix.is_empty() {
            return self.event_tx.subscribe();
        }

        let prefix = prefix.to_string();
        let tx = self.prefix_subscribers
            .entry(prefix)
            .or_insert_with(|| {
                let (tx, _) = broadcast::channel(100);
                tx
            })
            .clone();

        tx.subscribe()
    }

    async fn sync_with(&self, _peer: NodeId) -> Result<()> {
        // Sync is handled by CrdtSyncService
        Ok(())
    }

    fn keys(&self) -> Vec<String> {
        self.state
            .iter()
            .filter(|e| !e.value().deleted)
            .map(|e| e.key().clone())
            .collect()
    }

    fn keys_with_prefix(&self, prefix: &str) -> Vec<String> {
        self.state
            .iter()
            .filter(|e| !e.value().deleted && e.key().starts_with(prefix))
            .map(|e| e.key().clone())
            .collect()
    }
}

/// Last-Write-Wins Register CRDT wrapper.
#[derive(Clone, Debug, serde::Serialize, serde::Deserialize)]
pub struct LwwRegister<T> {
    value: T,
    timestamp: u64,
}

impl<T: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned> LwwRegister<T> {
    pub fn new(value: T, timestamp: u64) -> Self {
        Self { value, timestamp }
    }

    pub fn value(&self) -> &T {
        &self.value
    }

    pub fn timestamp(&self) -> u64 {
        self.timestamp
    }
}

impl<T: Clone + Send + Sync + 'static + serde::Serialize + serde::de::DeserializeOwned> CrdtValue for LwwRegister<T> {
    fn merge(&mut self, other: &Self) {
        if other.timestamp > self.timestamp {
            self.value = other.value.clone();
            self.timestamp = other.timestamp;
        }
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use pollen_store::{MemoryStore, StoreBackend};

    #[tokio::test]
    async fn test_crdt_store_basic() {
        let node_id = NodeId::new();
        let clock = pollen_clock::new_clock_with_id(node_id);
        let store = Arc::new(StoreBackend::Memory(MemoryStore::new()));

        let crdt = CrdtStore::new(node_id, clock, store);

        // Set a value
        crdt.set("test:key", LwwRegister::new("hello".to_string(), 1)).await.unwrap();

        // Get the value
        let value: Option<LwwRegister<String>> = crdt.get("test:key");
        assert!(value.is_some());
        assert_eq!(value.unwrap().value(), "hello");

        // Delete the value
        crdt.delete("test:key").await.unwrap();

        let value: Option<LwwRegister<String>> = crdt.get("test:key");
        assert!(value.is_none());
    }
}