braid-core 0.1.4

Unified Braid Protocol implementation in Rust, including Braid-HTTP, Antimatter CRDT, and BraidFS.
Documentation
//! Per-resource state management with thread-safe CRDT synchronization.
//!
//! This module provides centralized state management for collaborative document editing.
//! It maintains an in-memory registry of documents, each with its own CRDT instance and
//! metadata. All access is thread-safe via `Arc<RwLock<>>`.

use crate::core::merge::DiamondCRDT;
use parking_lot::Mutex;
use serde_json::Value;
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::SystemTime;
use tokio::sync::broadcast;

/// The state of a single collaborative resource.
///
/// Each resource maintains its own CRDT instance along with synchronization metadata.
/// State is protected by a Mutex for safe concurrent access from multiple async tasks.
#[derive(Debug, Clone)]
pub struct ResourceState {
    /// The document's CRDT with full operation history
    pub crdt: DiamondCRDT,

    /// When this resource was last modified
    pub last_sync: SystemTime,

    /// Set of version IDs seen for this resource (for idempotence)
    pub seen_versions: HashSet<String>,

    /// The merge strategy for this resource (e.g., "diamond", "sync9")
    pub merge_type: String,
}

/// Thread-safe registry of collaborative document resources.
///
/// `ResourceStateManager` maintains the canonical state for all active resources in the
/// system. It provides methods for creating resources, applying edits, and querying state.
/// All operations are atomic and thread-safe.
pub struct ResourceStateManager {
    /// Resource ID -> Arc<Mutex<ResourceState>>
    resources: Arc<Mutex<HashMap<String, Arc<Mutex<ResourceState>>>>>,

    /// Broadcast channel for new resource creation events
    new_resource_tx: broadcast::Sender<String>,
}

impl ResourceStateManager {
    /// Create a new empty resource manager.
    #[must_use]
    pub fn new() -> Self {
        let (tx, _) = broadcast::channel(1024);
        Self {
            resources: Arc::new(Mutex::new(HashMap::new())),
            new_resource_tx: tx,
        }
    }

    // ========== Resource Lifecycle ==========

    /// Get or create a resource, initializing its CRDT if needed.
    #[must_use]
    pub fn get_or_create_resource(
        &self,
        resource_id: &str,
        initial_agent_id: &str,
        requested_merge_type: Option<&str>,
    ) -> Arc<Mutex<ResourceState>> {
        let mut resources = self.resources.lock();

        resources
            .entry(resource_id.to_string())
            .or_insert_with(|| {
                let merge_type = requested_merge_type
                    .unwrap_or(crate::core::protocol_mod::constants::merge_types::DIAMOND)
                    .to_string();

                // Notify subscribers about the NEW resource
                let _ = self.new_resource_tx.send(resource_id.to_string());

                Arc::new(Mutex::new(ResourceState {
                    crdt: DiamondCRDT::new(initial_agent_id),
                    last_sync: SystemTime::now(),
                    seen_versions: HashSet::new(),
                    merge_type,
                }))
            })
            .clone()
    }

    /// Subscribe to new resource creation events.
    pub fn subscribe_to_indices(&self) -> broadcast::Receiver<String> {
        self.new_resource_tx.subscribe()
    }

    /// Get an existing resource without creating it.
    #[inline]
    #[must_use]
    pub fn get_resource(&self, resource_id: &str) -> Option<Arc<Mutex<ResourceState>>> {
        let resources = self.resources.lock();
        resources.get(resource_id).cloned()
    }

    /// List all resource IDs currently in memory.
    #[must_use]
    pub fn list_resources(&self) -> Vec<String> {
        let resources = self.resources.lock();
        resources.keys().cloned().collect()
    }

    /// Check if a resource has already seen a specific version.
    #[must_use]
    pub fn has_version(&self, resource_id: &str, version_id: &str) -> bool {
        if let Some(resource) = self.get_resource(resource_id) {
            let state = resource.lock();
            state.seen_versions.contains(version_id)
        } else {
            false
        }
    }

    // ========== Edit Operations ==========

    /// Apply a full document update (replacement).
    pub fn apply_update(
        &self,
        resource_id: &str,
        content: &str,
        agent_id: &str,
        version_id: Option<&str>,
        requested_merge_type: Option<&str>,
    ) -> Result<Value, String> {
        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
        let mut state = resource.lock();

        if let Some(req_mt) = requested_merge_type {
            if state.merge_type != req_mt {
                return Err(format!(
                    "Merge-type mismatch: resource is {}, requested {}",
                    state.merge_type, req_mt
                ));
            }
        }

        if let Some(vid) = version_id {
            if state.seen_versions.contains(vid) {
                return Ok(state.crdt.export_operations());
            }
            state.seen_versions.insert(vid.to_string());
        }

        state.crdt.add_insert(0, content);

        // Also register version mapping if provided
        if let Some(vid) = version_id {
            let frontier = state.crdt.get_local_frontier();
            state
                .crdt
                .register_version_mapping(vid.to_string(), frontier);
        }

        state.last_sync = SystemTime::now();
        Ok(state.crdt.export_operations())
    }

    /// Apply a remote insertion operation.
    pub fn apply_remote_insert(
        &self,
        resource_id: &str,
        agent_id: &str,
        pos: usize,
        text: &str,
        version_id: Option<&str>,
        requested_merge_type: Option<&str>,
    ) -> Result<Value, String> {
        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
        let mut state = resource.lock();

        if let Some(req_mt) = requested_merge_type {
            if state.merge_type != req_mt {
                return Err(format!(
                    "Merge-type mismatch: {} vs {}",
                    state.merge_type, req_mt
                ));
            }
        }

        if let Some(vid) = version_id {
            if state.seen_versions.contains(vid) {
                return Ok(state.crdt.export_operations());
            }
            state.seen_versions.insert(vid.to_string());
        }

        state.crdt.add_insert_remote(agent_id, pos, text);
        state.last_sync = SystemTime::now();

        Ok(state.crdt.export_operations())
    }

    /// Apply a remote insertion operation at a specific version.
    pub fn apply_remote_insert_versioned(
        &self,
        resource_id: &str,
        agent_id: &str,
        parents: &[&str],
        pos: usize,
        text: &str,
        version_id: Option<&str>,
        requested_merge_type: Option<&str>,
    ) -> Result<Value, String> {
        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
        let mut state = resource.lock();

        if let Some(vid) = version_id {
            if state.seen_versions.contains(vid) {
                return Ok(state.crdt.export_operations());
            }
            state.seen_versions.insert(vid.to_string());
        }

        state
            .crdt
            .add_insert_remote_versioned(agent_id, parents, pos, text, version_id);
        state.last_sync = SystemTime::now();

        Ok(state.crdt.export_operations())
    }

    /// Apply a remote deletion operation.
    pub fn apply_remote_delete(
        &self,
        resource_id: &str,
        agent_id: &str,
        start: usize,
        end: usize,
        version_id: Option<&str>,
        requested_merge_type: Option<&str>,
    ) -> Result<Value, String> {
        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
        let mut state = resource.lock();

        if let Some(req_mt) = requested_merge_type {
            if state.merge_type != req_mt {
                return Err(format!(
                    "Merge-type mismatch: {} vs {}",
                    state.merge_type, req_mt
                ));
            }
        }

        if let Some(vid) = version_id {
            if state.seen_versions.contains(vid) {
                return Ok(state.crdt.export_operations());
            }
            state.seen_versions.insert(vid.to_string());
        }

        state.crdt.add_delete_remote(agent_id, start..end);
        state.last_sync = SystemTime::now();

        Ok(state.crdt.export_operations())
    }

    /// Apply a remote deletion operation at a specific version.
    pub fn apply_remote_delete_versioned(
        &self,
        resource_id: &str,
        agent_id: &str,
        parents: &[&str],
        range: std::ops::Range<usize>,
        version_id: Option<&str>,
        requested_merge_type: Option<&str>,
    ) -> Result<Value, String> {
        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
        let mut state = resource.lock();

        if let Some(vid) = version_id {
            if state.seen_versions.contains(vid) {
                return Ok(state.crdt.export_operations());
            }
            state.seen_versions.insert(vid.to_string());
        }

        state
            .crdt
            .add_delete_remote_versioned(agent_id, parents, range, version_id);
        state.last_sync = SystemTime::now();

        Ok(state.crdt.export_operations())
    }

    // ========== Query Methods ==========

    /// Get a snapshot of a resource's current state.
    #[inline]
    #[must_use]
    pub fn get_resource_state(&self, resource_id: &str) -> Option<Value> {
        self.get_resource(resource_id).map(|resource| {
            let state = resource.lock();
            state.crdt.checkpoint()
        })
    }

    /// Get the merge quality score for a resource.
    #[inline]
    #[must_use]
    pub fn get_merge_quality(&self, resource_id: &str) -> Option<u32> {
        self.get_resource(resource_id).map(|resource| {
            let state = resource.lock();
            state.crdt.merge_quality()
        })
    }

    /// Register a version mapping for a resource.
    pub fn register_version_mapping(
        &self,
        resource_id: &str,
        version: String,
        frontier: crate::vendor::diamond_types::Frontier,
    ) {
        if let Some(resource) = self.get_resource(resource_id) {
            let mut state = resource.lock();
            state.crdt.register_version_mapping(version, frontier);
        }
    }

    /// Get history for a resource since a set of versions.
    pub fn get_history(
        &self,
        resource_id: &str,
        since_versions: &[&str],
    ) -> Result<Vec<crate::vendor::diamond_types::SerializedOpsOwned>, String> {
        let resource = self
            .get_resource(resource_id)
            .ok_or_else(|| format!("Resource not found: {}", resource_id))?;
        let state = resource.lock();

        let mut frontiers = Vec::new();
        for v in since_versions {
            if let Some(f) = state.crdt.resolve_version(v) {
                frontiers.push(f.clone());
            } else {
                return Err(format!("Version not found/pruned: {}", v));
            }
        }

        Ok(state.crdt.get_ops_since(&frontiers))
    }
}

impl Clone for ResourceStateManager {
    fn clone(&self) -> Self {
        Self {
            resources: Arc::clone(&self.resources),
            new_resource_tx: self.new_resource_tx.clone(),
        }
    }
}

impl Default for ResourceStateManager {
    fn default() -> Self {
        Self::new()
    }
}