Skip to main content

braid_core/core/server/
resource_state.rs

1//! Per-resource state management with thread-safe CRDT synchronization.
2//!
3//! This module provides centralized state management for collaborative document editing.
4//! It maintains an in-memory registry of documents, each with its own CRDT instance and
5//! metadata. All access is thread-safe via `Arc<RwLock<>>`.
6
7use crate::core::merge::DiamondCRDT;
8use parking_lot::Mutex;
9use serde_json::Value;
10use std::collections::{HashMap, HashSet};
11use std::sync::Arc;
12use std::time::SystemTime;
13use tokio::sync::broadcast;
14
15/// The state of a single collaborative resource.
16///
17/// Each resource maintains its own CRDT instance along with synchronization metadata.
18/// State is protected by a Mutex for safe concurrent access from multiple async tasks.
19#[derive(Debug, Clone)]
20pub struct ResourceState {
21    /// The document's CRDT with full operation history
22    pub crdt: DiamondCRDT,
23
24    /// When this resource was last modified
25    pub last_sync: SystemTime,
26
27    /// Set of version IDs seen for this resource (for idempotence)
28    pub seen_versions: HashSet<String>,
29
30    /// The merge strategy for this resource (e.g., "diamond", "sync9")
31    pub merge_type: String,
32}
33
34/// Thread-safe registry of collaborative document resources.
35///
36/// `ResourceStateManager` maintains the canonical state for all active resources in the
37/// system. It provides methods for creating resources, applying edits, and querying state.
38/// All operations are atomic and thread-safe.
39pub struct ResourceStateManager {
40    /// Resource ID -> Arc<Mutex<ResourceState>>
41    resources: Arc<Mutex<HashMap<String, Arc<Mutex<ResourceState>>>>>,
42
43    /// Broadcast channel for new resource creation events
44    new_resource_tx: broadcast::Sender<String>,
45}
46
47impl ResourceStateManager {
48    /// Create a new empty resource manager.
49    #[must_use]
50    pub fn new() -> Self {
51        let (tx, _) = broadcast::channel(1024);
52        Self {
53            resources: Arc::new(Mutex::new(HashMap::new())),
54            new_resource_tx: tx,
55        }
56    }
57
58    // ========== Resource Lifecycle ==========
59
60    /// Get or create a resource, initializing its CRDT if needed.
61    #[must_use]
62    pub fn get_or_create_resource(
63        &self,
64        resource_id: &str,
65        initial_agent_id: &str,
66        requested_merge_type: Option<&str>,
67    ) -> Arc<Mutex<ResourceState>> {
68        let mut resources = self.resources.lock();
69
70        resources
71            .entry(resource_id.to_string())
72            .or_insert_with(|| {
73                let merge_type = requested_merge_type
74                    .unwrap_or(crate::core::protocol_mod::constants::merge_types::DIAMOND)
75                    .to_string();
76
77                // Notify subscribers about the NEW resource
78                let _ = self.new_resource_tx.send(resource_id.to_string());
79
80                Arc::new(Mutex::new(ResourceState {
81                    crdt: DiamondCRDT::new(initial_agent_id),
82                    last_sync: SystemTime::now(),
83                    seen_versions: HashSet::new(),
84                    merge_type,
85                }))
86            })
87            .clone()
88    }
89
90    /// Subscribe to new resource creation events.
91    pub fn subscribe_to_indices(&self) -> broadcast::Receiver<String> {
92        self.new_resource_tx.subscribe()
93    }
94
95    /// Get an existing resource without creating it.
96    #[inline]
97    #[must_use]
98    pub fn get_resource(&self, resource_id: &str) -> Option<Arc<Mutex<ResourceState>>> {
99        let resources = self.resources.lock();
100        resources.get(resource_id).cloned()
101    }
102
103    /// List all resource IDs currently in memory.
104    #[must_use]
105    pub fn list_resources(&self) -> Vec<String> {
106        let resources = self.resources.lock();
107        resources.keys().cloned().collect()
108    }
109
110    /// Check if a resource has already seen a specific version.
111    #[must_use]
112    pub fn has_version(&self, resource_id: &str, version_id: &str) -> bool {
113        if let Some(resource) = self.get_resource(resource_id) {
114            let state = resource.lock();
115            state.seen_versions.contains(version_id)
116        } else {
117            false
118        }
119    }
120
121    // ========== Edit Operations ==========
122
123    /// Apply a full document update (replacement).
124    pub fn apply_update(
125        &self,
126        resource_id: &str,
127        content: &str,
128        agent_id: &str,
129        version_id: Option<&str>,
130        requested_merge_type: Option<&str>,
131    ) -> Result<Value, String> {
132        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
133        let mut state = resource.lock();
134
135        if let Some(req_mt) = requested_merge_type {
136            if state.merge_type != req_mt {
137                return Err(format!(
138                    "Merge-type mismatch: resource is {}, requested {}",
139                    state.merge_type, req_mt
140                ));
141            }
142        }
143
144        if let Some(vid) = version_id {
145            if state.seen_versions.contains(vid) {
146                return Ok(state.crdt.export_operations());
147            }
148            state.seen_versions.insert(vid.to_string());
149        }
150
151        state.crdt.add_insert(0, content);
152
153        // Also register version mapping if provided
154        if let Some(vid) = version_id {
155            let frontier = state.crdt.get_local_frontier();
156            state
157                .crdt
158                .register_version_mapping(vid.to_string(), frontier);
159        }
160
161        state.last_sync = SystemTime::now();
162        Ok(state.crdt.export_operations())
163    }
164
165    /// Apply a remote insertion operation.
166    pub fn apply_remote_insert(
167        &self,
168        resource_id: &str,
169        agent_id: &str,
170        pos: usize,
171        text: &str,
172        version_id: Option<&str>,
173        requested_merge_type: Option<&str>,
174    ) -> Result<Value, String> {
175        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
176        let mut state = resource.lock();
177
178        if let Some(req_mt) = requested_merge_type {
179            if state.merge_type != req_mt {
180                return Err(format!(
181                    "Merge-type mismatch: {} vs {}",
182                    state.merge_type, req_mt
183                ));
184            }
185        }
186
187        if let Some(vid) = version_id {
188            if state.seen_versions.contains(vid) {
189                return Ok(state.crdt.export_operations());
190            }
191            state.seen_versions.insert(vid.to_string());
192        }
193
194        state.crdt.add_insert_remote(agent_id, pos, text);
195        state.last_sync = SystemTime::now();
196
197        Ok(state.crdt.export_operations())
198    }
199
200    /// Apply a remote insertion operation at a specific version.
201    pub fn apply_remote_insert_versioned(
202        &self,
203        resource_id: &str,
204        agent_id: &str,
205        parents: &[&str],
206        pos: usize,
207        text: &str,
208        version_id: Option<&str>,
209        requested_merge_type: Option<&str>,
210    ) -> Result<Value, String> {
211        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
212        let mut state = resource.lock();
213
214        if let Some(vid) = version_id {
215            if state.seen_versions.contains(vid) {
216                return Ok(state.crdt.export_operations());
217            }
218            state.seen_versions.insert(vid.to_string());
219        }
220
221        state
222            .crdt
223            .add_insert_remote_versioned(agent_id, parents, pos, text, version_id);
224        state.last_sync = SystemTime::now();
225
226        Ok(state.crdt.export_operations())
227    }
228
229    /// Apply a remote deletion operation.
230    pub fn apply_remote_delete(
231        &self,
232        resource_id: &str,
233        agent_id: &str,
234        start: usize,
235        end: usize,
236        version_id: Option<&str>,
237        requested_merge_type: Option<&str>,
238    ) -> Result<Value, String> {
239        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
240        let mut state = resource.lock();
241
242        if let Some(req_mt) = requested_merge_type {
243            if state.merge_type != req_mt {
244                return Err(format!(
245                    "Merge-type mismatch: {} vs {}",
246                    state.merge_type, req_mt
247                ));
248            }
249        }
250
251        if let Some(vid) = version_id {
252            if state.seen_versions.contains(vid) {
253                return Ok(state.crdt.export_operations());
254            }
255            state.seen_versions.insert(vid.to_string());
256        }
257
258        state.crdt.add_delete_remote(agent_id, start..end);
259        state.last_sync = SystemTime::now();
260
261        Ok(state.crdt.export_operations())
262    }
263
264    /// Apply a remote deletion operation at a specific version.
265    pub fn apply_remote_delete_versioned(
266        &self,
267        resource_id: &str,
268        agent_id: &str,
269        parents: &[&str],
270        range: std::ops::Range<usize>,
271        version_id: Option<&str>,
272        requested_merge_type: Option<&str>,
273    ) -> Result<Value, String> {
274        let resource = self.get_or_create_resource(resource_id, agent_id, requested_merge_type);
275        let mut state = resource.lock();
276
277        if let Some(vid) = version_id {
278            if state.seen_versions.contains(vid) {
279                return Ok(state.crdt.export_operations());
280            }
281            state.seen_versions.insert(vid.to_string());
282        }
283
284        state
285            .crdt
286            .add_delete_remote_versioned(agent_id, parents, range, version_id);
287        state.last_sync = SystemTime::now();
288
289        Ok(state.crdt.export_operations())
290    }
291
292    // ========== Query Methods ==========
293
294    /// Get a snapshot of a resource's current state.
295    #[inline]
296    #[must_use]
297    pub fn get_resource_state(&self, resource_id: &str) -> Option<Value> {
298        self.get_resource(resource_id).map(|resource| {
299            let state = resource.lock();
300            state.crdt.checkpoint()
301        })
302    }
303
304    /// Get the merge quality score for a resource.
305    #[inline]
306    #[must_use]
307    pub fn get_merge_quality(&self, resource_id: &str) -> Option<u32> {
308        self.get_resource(resource_id).map(|resource| {
309            let state = resource.lock();
310            state.crdt.merge_quality()
311        })
312    }
313
314    /// Register a version mapping for a resource.
315    pub fn register_version_mapping(
316        &self,
317        resource_id: &str,
318        version: String,
319        frontier: crate::vendor::diamond_types::Frontier,
320    ) {
321        if let Some(resource) = self.get_resource(resource_id) {
322            let mut state = resource.lock();
323            state.crdt.register_version_mapping(version, frontier);
324        }
325    }
326
327    /// Get history for a resource since a set of versions.
328    pub fn get_history(
329        &self,
330        resource_id: &str,
331        since_versions: &[&str],
332    ) -> Result<Vec<crate::vendor::diamond_types::SerializedOpsOwned>, String> {
333        let resource = self
334            .get_resource(resource_id)
335            .ok_or_else(|| format!("Resource not found: {}", resource_id))?;
336        let state = resource.lock();
337
338        let mut frontiers = Vec::new();
339        for v in since_versions {
340            if let Some(f) = state.crdt.resolve_version(v) {
341                frontiers.push(f.clone());
342            } else {
343                return Err(format!("Version not found/pruned: {}", v));
344            }
345        }
346
347        Ok(state.crdt.get_ops_since(&frontiers))
348    }
349}
350
351impl Clone for ResourceStateManager {
352    fn clone(&self) -> Self {
353        Self {
354            resources: Arc::clone(&self.resources),
355            new_resource_tx: self.new_resource_tx.clone(),
356        }
357    }
358}
359
360impl Default for ResourceStateManager {
361    fn default() -> Self {
362        Self::new()
363    }
364}