Skip to main content

braid_core/core/server/
conflict_resolver.rs

1//! Conflict resolution for Diamond-Types CRDT merges.
2//!
3//! This module handles incoming updates with the "diamond" merge-type, applying
4//! CRDT operations and returning merged results. It bridges Braid-HTTP protocol
5//! updates with the underlying diamond-types CRDT engine.
6
7use crate::core::server::ResourceStateManager;
8use crate::core::{Update, Version};
9use serde_json::{json, Value};
10
11/// Handles conflict resolution using Diamond-Types CRDT.
12///
13/// The conflict resolver intercepts updates marked with `merge-type: "diamond"`,
14/// applies them to the appropriate resource's CRDT, and returns the merged result.
15/// This ensures deterministic convergence across all peers.
16///
17/// # Request/Response Formats
18///
19/// **Plain Text Updates:**
20/// - Inserts text at position 0
21/// - Body can be plain text or JSON
22///
23/// **Structured JSON Updates:**
24/// - `"inserts"`: Array of `{pos, text}` objects
25/// - `"deletes"`: Array of `{start, end}` objects
26/// - All operations are applied and merged into the CRDT
27#[derive(Clone)]
28pub struct ConflictResolver {
29    /// Manages per-resource CRDT state
30    resource_manager: ResourceStateManager,
31}
32
33impl ConflictResolver {
34    /// Create a new conflict resolver with the given resource manager.
35    ///
36    /// # Arguments
37    ///
38    /// * `resource_manager` - The centralized resource state registry
39    ///
40    /// # Examples
41    ///
42    /// ```ignore
43    /// use crate::core::server::{ConflictResolver, ResourceStateManager};
44    ///
45    /// let manager = ResourceStateManager::new();
46    /// let resolver = ConflictResolver::new(manager);
47    /// ```
48    #[must_use]
49    pub fn new(resource_manager: ResourceStateManager) -> Self {
50        Self { resource_manager }
51    }
52
53    /// Resolve an update by applying CRDT semantics if needed.
54    ///
55    /// If the update has `merge-type: "diamond"`, it's applied to the resource's CRDT.
56    /// Otherwise, the update is returned unchanged (no merge strategy applied).
57    ///
58    /// # Arguments
59    ///
60    /// * `resource_id` - The resource being updated
61    /// * `update` - The incoming Braid update
62    /// * `agent_id` - Origin agent identifier
63    ///
64    /// # Returns
65    ///
66    /// The resolved update with merged content and current version.
67    ///
68    /// # Examples
69    ///
70    /// ```ignore
71    /// let resolver = ConflictResolver::new(manager);
72    /// let update = Update::snapshot(Version::new("v1"), "hello")
73    ///     .with_merge_type("diamond");
74    /// let result = resolver.resolve_update("doc1", &update, "alice").await?;
75    /// ```
76    pub async fn resolve_update(
77        &self,
78        resource_id: &str,
79        update: &Update,
80        agent_id: &str,
81    ) -> Result<Update, String> {
82        match &update.merge_type {
83            Some(merge_type) if merge_type == "diamond" => {
84                self.resolve_diamond_merge(resource_id, update, agent_id)
85                    .await
86            }
87            _ => Ok(update.clone()),
88        }
89    }
90
91    /// Apply and merge a diamond-type update.
92    ///
93    /// Detects whether the body is:
94    /// - Plain text (applies as insertion at position 0)
95    /// - Structured JSON with operation arrays (applies each operation)
96    ///
97    /// # Arguments
98    ///
99    /// * `resource_id` - Resource to update
100    /// * `update` - The Braid update with body
101    /// * `agent_id` - Origin agent
102    ///
103    /// # Returns
104    ///
105    /// A new update with merged content and current version.
106    async fn resolve_diamond_merge(
107        &self,
108        resource_id: &str,
109        update: &Update,
110        agent_id: &str,
111    ) -> Result<Update, String> {
112        if let Some(body_bytes) = &update.body {
113            let body_str = String::from_utf8_lossy(body_bytes);
114
115            if body_str.starts_with('{') && body_str.ends_with('}') {
116                if let Ok(json_data) = serde_json::from_str::<Value>(&body_str) {
117                    if json_data.is_object() {
118                        let version_id = update.version.get(0).map(|v| v.to_string());
119                        let parents_vec: Vec<String> =
120                            update.parents.iter().map(|v| v.to_string()).collect();
121                        let parents = if parents_vec.is_empty() {
122                            None
123                        } else {
124                            Some(parents_vec.as_slice())
125                        };
126                        return self
127                            .handle_diamond_json(
128                                resource_id,
129                                &json_data,
130                                agent_id,
131                                version_id.as_deref(),
132                                parents,
133                            )
134                            .await;
135                    }
136                }
137            }
138
139            let version_id = update.version.get(0).map(|v| v.to_string());
140            let version_ref = version_id.as_deref();
141
142            let _ = self.resource_manager.apply_update(
143                resource_id,
144                &body_str,
145                agent_id,
146                version_ref,
147                None, // Use existing merge type
148            )?;
149        }
150
151        let update = self.build_merged_response(resource_id, agent_id).await?;
152
153        // Register the new version mapping
154        if let Some(v) = update.version.get(0) {
155            let frontier = {
156                let resource = self.resource_manager.get_resource(resource_id).unwrap();
157                let state = resource.lock();
158                state.crdt.get_local_frontier()
159            };
160            self.resource_manager
161                .register_version_mapping(resource_id, v.to_string(), frontier);
162        }
163
164        Ok(update)
165    }
166
167    /// Retrieve history for a resource since specific versions.
168    pub async fn get_history(
169        &self,
170        resource_id: &str,
171        since_versions: &[&str],
172    ) -> Result<Vec<Update>, String> {
173        let serialized_ops_list = self
174            .resource_manager
175            .get_history(resource_id, since_versions)?;
176
177        let mut updates = Vec::new();
178        for ops in serialized_ops_list {
179            // Convert Diamond-Types internal ops to Braid updates
180            // (Note: This is a simplified conversion, actual implementation might need multi-patch)
181            // For now, we'll return a special Update that carries the ops
182            // In a real Braid system, these would be converted to application/braid-patch
183            let update = Update::snapshot(
184                crate::core::Version::new("history-delta"),
185                serde_json::to_vec(&ops).map_err(|e| e.to_string())?,
186            );
187            updates.push(update);
188        }
189
190        Ok(updates)
191    }
192
193    /// Parse and apply structured JSON operations.
194    ///
195    /// Expected JSON format:
196    /// ```json
197    /// {
198    ///   "inserts": [{"pos": 0, "text": "hello"}],
199    ///   "deletes": [{"start": 5, "end": 6}]
200    /// }
201    /// ```
202    ///
203    /// # Arguments
204    ///
205    /// * `resource_id` - Resource to update
206    /// * `json_data` - Parsed JSON operations
207    /// * `agent_id` - Origin agent
208    ///
209    /// # Returns
210    ///
211    /// A response update with merged state.
212    async fn handle_diamond_json(
213        &self,
214        resource_id: &str,
215        json_data: &Value,
216        agent_id: &str,
217        version_id: Option<&str>,
218        parents: Option<&[String]>,
219    ) -> Result<Update, String> {
220        let parent_strs: Option<Vec<&str>> =
221            parents.map(|p| p.iter().map(|s| s.as_str()).collect());
222        let parent_refs = parent_strs.as_ref().map(|v| v.as_slice());
223
224        self.apply_insert_operations(resource_id, json_data, agent_id, version_id, parent_refs);
225        self.apply_delete_operations(resource_id, json_data, agent_id, version_id, parent_refs);
226
227        let merged_state = self
228            .resource_manager
229            .get_resource_state(resource_id)
230            .ok_or_else(|| "Failed to retrieve resource state after merge".to_string())?;
231
232        let merged_content = extract_string(&merged_state, "content", "");
233        let quality = self
234            .resource_manager
235            .get_merge_quality(resource_id)
236            .unwrap_or(0);
237        let version_str = extract_string(&merged_state, "version", &format!("merged-{}", agent_id));
238
239        let response_body = json!({
240            "content": merged_content,
241            "merge_quality": quality,
242            "agents": [agent_id],
243        })
244        .to_string();
245
246        Ok(Update::snapshot(Version::new(&version_str), response_body).with_merge_type("diamond"))
247    }
248
249    /// Apply insertion operations from JSON array.
250    ///
251    /// Silently skips malformed operations.
252    fn apply_insert_operations(
253        &self,
254        resource_id: &str,
255        json_data: &Value,
256        agent_id: &str,
257        version_id: Option<&str>,
258        parents: Option<&[&str]>,
259    ) {
260        if let Some(inserts) = json_data.get("inserts").and_then(|v| v.as_array()) {
261            for insert in inserts {
262                if let (Some(pos), Some(text)) = (
263                    insert.get("pos").and_then(|v| v.as_u64()),
264                    insert.get("text").and_then(|v| v.as_str()),
265                ) {
266                    if let Some(p) = parents {
267                        let _ = self.resource_manager.apply_remote_insert_versioned(
268                            resource_id,
269                            agent_id,
270                            p,
271                            pos as usize,
272                            text,
273                            version_id,
274                            Some("diamond"),
275                        );
276                    } else {
277                        let _ = self.resource_manager.apply_remote_insert(
278                            resource_id,
279                            agent_id,
280                            pos as usize,
281                            text,
282                            version_id,
283                            Some("diamond"),
284                        );
285                    }
286                }
287            }
288        }
289    }
290
291    /// Apply deletion operations from JSON array.
292    ///
293    /// Silently skips malformed operations.
294    fn apply_delete_operations(
295        &self,
296        resource_id: &str,
297        json_data: &Value,
298        agent_id: &str,
299        version_id: Option<&str>,
300        parents: Option<&[&str]>,
301    ) {
302        if let Some(deletes) = json_data.get("deletes").and_then(|v| v.as_array()) {
303            for delete in deletes {
304                if let (Some(start), Some(end)) = (
305                    delete.get("start").and_then(|v| v.as_u64()),
306                    delete.get("end").and_then(|v| v.as_u64()),
307                ) {
308                    if let Some(p) = parents {
309                        let _ = self.resource_manager.apply_remote_delete_versioned(
310                            resource_id,
311                            agent_id,
312                            p,
313                            (start as usize)..(end as usize),
314                            version_id,
315                            Some("diamond"),
316                        );
317                    } else {
318                        let _ = self.resource_manager.apply_remote_delete(
319                            resource_id,
320                            agent_id,
321                            start as usize,
322                            end as usize,
323                            version_id,
324                            Some("diamond"),
325                        );
326                    }
327                }
328            }
329        }
330    }
331
332    /// Build a Braid response with merged content.
333    ///
334    /// Extracts content and version from the current resource state.
335    async fn build_merged_response(
336        &self,
337        resource_id: &str,
338        agent_id: &str,
339    ) -> Result<Update, String> {
340        let merged_state = self
341            .resource_manager
342            .get_resource_state(resource_id)
343            .ok_or_else(|| "Failed to retrieve merged resource state".to_string())?;
344
345        let merged_content = extract_string(&merged_state, "content", "");
346        let version_str = extract_string(&merged_state, "version", &format!("merged-{}", agent_id));
347
348        Ok(Update::snapshot(Version::new(&version_str), merged_content).with_merge_type("diamond"))
349    }
350
351    /// Get the current content of a resource.
352    ///
353    /// # Arguments
354    ///
355    /// * `resource_id` - Resource to query
356    ///
357    /// # Returns
358    ///
359    /// Current document text, or `None` if the resource doesn't exist.
360    #[inline]
361    #[must_use]
362    pub fn get_resource_content(&self, resource_id: &str) -> Option<String> {
363        self.resource_manager
364            .get_resource_state(resource_id)
365            .and_then(|state| state["content"].as_str().map(|s| s.to_string()))
366    }
367
368    /// Get the current version of a resource.
369    ///
370    /// # Arguments
371    ///
372    /// * `resource_id` - Resource to query
373    ///
374    /// # Returns
375    ///
376    /// Current version identifier, or `None` if the resource doesn't exist.
377    #[inline]
378    #[must_use]
379    pub fn get_resource_version(&self, resource_id: &str) -> Option<Version> {
380        self.resource_manager
381            .get_resource_state(resource_id)
382            .and_then(|state| state["version"].as_str().map(Version::new))
383    }
384}
385
386/// Helper to safely extract string values from JSON.
387///
388/// Returns the specified default if the field is missing or not a string.
389fn extract_string(json: &Value, key: &str, default: &str) -> String {
390    json.get(key)
391        .and_then(|v| v.as_str())
392        .unwrap_or(default)
393        .to_string()
394}
395
396#[cfg(test)]
397mod tests {
398    use super::*;
399
400    #[tokio::test]
401    async fn test_resolve_non_diamond_update() {
402        let manager = ResourceStateManager::new();
403        let resolver = ConflictResolver::new(manager);
404
405        let update = Update::snapshot(Version::new("v1"), "test content");
406        let result = resolver.resolve_update("doc1", &update, "alice").await;
407
408        assert!(result.is_ok());
409    }
410
411    #[tokio::test]
412    async fn test_resolve_diamond_update() {
413        let manager = ResourceStateManager::new();
414        let resolver = ConflictResolver::new(manager);
415
416        let update =
417            Update::snapshot(Version::new("v1"), "test content").with_merge_type("diamond");
418
419        let result = resolver.resolve_update("doc1", &update, "alice").await;
420        assert!(result.is_ok());
421
422        let resolved = result.unwrap();
423        assert_eq!(resolved.merge_type, Some("diamond".to_string()));
424    }
425
426    #[tokio::test]
427    async fn test_concurrent_diamond_merges() {
428        let manager = ResourceStateManager::new();
429        let resolver = ConflictResolver::new(manager);
430
431        let update1 = Update::snapshot(Version::new("v1"), "hello").with_merge_type("diamond");
432        let update2 = Update::snapshot(Version::new("v2"), "world").with_merge_type("diamond");
433
434        let _ = resolver.resolve_update("doc1", &update1, "alice").await;
435        let result = resolver.resolve_update("doc1", &update2, "bob").await;
436
437        assert!(result.is_ok());
438    }
439}