1use crate::core::server::ResourceStateManager;
8use crate::core::{Update, Version};
9use serde_json::{json, Value};
10
11#[derive(Clone)]
28pub struct ConflictResolver {
29 resource_manager: ResourceStateManager,
31}
32
33impl ConflictResolver {
34 #[must_use]
49 pub fn new(resource_manager: ResourceStateManager) -> Self {
50 Self { resource_manager }
51 }
52
53 pub async fn resolve_update(
77 &self,
78 resource_id: &str,
79 update: &Update,
80 agent_id: &str,
81 ) -> Result<Update, String> {
82 match &update.merge_type {
83 Some(merge_type) if merge_type == "diamond" => {
84 self.resolve_diamond_merge(resource_id, update, agent_id)
85 .await
86 }
87 _ => Ok(update.clone()),
88 }
89 }
90
91 async fn resolve_diamond_merge(
107 &self,
108 resource_id: &str,
109 update: &Update,
110 agent_id: &str,
111 ) -> Result<Update, String> {
112 if let Some(body_bytes) = &update.body {
113 let body_str = String::from_utf8_lossy(body_bytes);
114
115 if body_str.starts_with('{') && body_str.ends_with('}') {
116 if let Ok(json_data) = serde_json::from_str::<Value>(&body_str) {
117 if json_data.is_object() {
118 let version_id = update.version.get(0).map(|v| v.to_string());
119 let parents_vec: Vec<String> =
120 update.parents.iter().map(|v| v.to_string()).collect();
121 let parents = if parents_vec.is_empty() {
122 None
123 } else {
124 Some(parents_vec.as_slice())
125 };
126 return self
127 .handle_diamond_json(
128 resource_id,
129 &json_data,
130 agent_id,
131 version_id.as_deref(),
132 parents,
133 )
134 .await;
135 }
136 }
137 }
138
139 let version_id = update.version.get(0).map(|v| v.to_string());
140 let version_ref = version_id.as_deref();
141
142 let _ = self.resource_manager.apply_update(
143 resource_id,
144 &body_str,
145 agent_id,
146 version_ref,
147 None, )?;
149 }
150
151 let update = self.build_merged_response(resource_id, agent_id).await?;
152
153 if let Some(v) = update.version.get(0) {
155 let frontier = {
156 let resource = self.resource_manager.get_resource(resource_id).unwrap();
157 let state = resource.lock();
158 state.crdt.get_local_frontier()
159 };
160 self.resource_manager
161 .register_version_mapping(resource_id, v.to_string(), frontier);
162 }
163
164 Ok(update)
165 }
166
167 pub async fn get_history(
169 &self,
170 resource_id: &str,
171 since_versions: &[&str],
172 ) -> Result<Vec<Update>, String> {
173 let serialized_ops_list = self
174 .resource_manager
175 .get_history(resource_id, since_versions)?;
176
177 let mut updates = Vec::new();
178 for ops in serialized_ops_list {
179 let update = Update::snapshot(
184 crate::core::Version::new("history-delta"),
185 serde_json::to_vec(&ops).map_err(|e| e.to_string())?,
186 );
187 updates.push(update);
188 }
189
190 Ok(updates)
191 }
192
193 async fn handle_diamond_json(
213 &self,
214 resource_id: &str,
215 json_data: &Value,
216 agent_id: &str,
217 version_id: Option<&str>,
218 parents: Option<&[String]>,
219 ) -> Result<Update, String> {
220 let parent_strs: Option<Vec<&str>> =
221 parents.map(|p| p.iter().map(|s| s.as_str()).collect());
222 let parent_refs = parent_strs.as_ref().map(|v| v.as_slice());
223
224 self.apply_insert_operations(resource_id, json_data, agent_id, version_id, parent_refs);
225 self.apply_delete_operations(resource_id, json_data, agent_id, version_id, parent_refs);
226
227 let merged_state = self
228 .resource_manager
229 .get_resource_state(resource_id)
230 .ok_or_else(|| "Failed to retrieve resource state after merge".to_string())?;
231
232 let merged_content = extract_string(&merged_state, "content", "");
233 let quality = self
234 .resource_manager
235 .get_merge_quality(resource_id)
236 .unwrap_or(0);
237 let version_str = extract_string(&merged_state, "version", &format!("merged-{}", agent_id));
238
239 let response_body = json!({
240 "content": merged_content,
241 "merge_quality": quality,
242 "agents": [agent_id],
243 })
244 .to_string();
245
246 Ok(Update::snapshot(Version::new(&version_str), response_body).with_merge_type("diamond"))
247 }
248
249 fn apply_insert_operations(
253 &self,
254 resource_id: &str,
255 json_data: &Value,
256 agent_id: &str,
257 version_id: Option<&str>,
258 parents: Option<&[&str]>,
259 ) {
260 if let Some(inserts) = json_data.get("inserts").and_then(|v| v.as_array()) {
261 for insert in inserts {
262 if let (Some(pos), Some(text)) = (
263 insert.get("pos").and_then(|v| v.as_u64()),
264 insert.get("text").and_then(|v| v.as_str()),
265 ) {
266 if let Some(p) = parents {
267 let _ = self.resource_manager.apply_remote_insert_versioned(
268 resource_id,
269 agent_id,
270 p,
271 pos as usize,
272 text,
273 version_id,
274 Some("diamond"),
275 );
276 } else {
277 let _ = self.resource_manager.apply_remote_insert(
278 resource_id,
279 agent_id,
280 pos as usize,
281 text,
282 version_id,
283 Some("diamond"),
284 );
285 }
286 }
287 }
288 }
289 }
290
291 fn apply_delete_operations(
295 &self,
296 resource_id: &str,
297 json_data: &Value,
298 agent_id: &str,
299 version_id: Option<&str>,
300 parents: Option<&[&str]>,
301 ) {
302 if let Some(deletes) = json_data.get("deletes").and_then(|v| v.as_array()) {
303 for delete in deletes {
304 if let (Some(start), Some(end)) = (
305 delete.get("start").and_then(|v| v.as_u64()),
306 delete.get("end").and_then(|v| v.as_u64()),
307 ) {
308 if let Some(p) = parents {
309 let _ = self.resource_manager.apply_remote_delete_versioned(
310 resource_id,
311 agent_id,
312 p,
313 (start as usize)..(end as usize),
314 version_id,
315 Some("diamond"),
316 );
317 } else {
318 let _ = self.resource_manager.apply_remote_delete(
319 resource_id,
320 agent_id,
321 start as usize,
322 end as usize,
323 version_id,
324 Some("diamond"),
325 );
326 }
327 }
328 }
329 }
330 }
331
332 async fn build_merged_response(
336 &self,
337 resource_id: &str,
338 agent_id: &str,
339 ) -> Result<Update, String> {
340 let merged_state = self
341 .resource_manager
342 .get_resource_state(resource_id)
343 .ok_or_else(|| "Failed to retrieve merged resource state".to_string())?;
344
345 let merged_content = extract_string(&merged_state, "content", "");
346 let version_str = extract_string(&merged_state, "version", &format!("merged-{}", agent_id));
347
348 Ok(Update::snapshot(Version::new(&version_str), merged_content).with_merge_type("diamond"))
349 }
350
351 #[inline]
361 #[must_use]
362 pub fn get_resource_content(&self, resource_id: &str) -> Option<String> {
363 self.resource_manager
364 .get_resource_state(resource_id)
365 .and_then(|state| state["content"].as_str().map(|s| s.to_string()))
366 }
367
368 #[inline]
378 #[must_use]
379 pub fn get_resource_version(&self, resource_id: &str) -> Option<Version> {
380 self.resource_manager
381 .get_resource_state(resource_id)
382 .and_then(|state| state["version"].as_str().map(Version::new))
383 }
384}
385
386fn extract_string(json: &Value, key: &str, default: &str) -> String {
390 json.get(key)
391 .and_then(|v| v.as_str())
392 .unwrap_or(default)
393 .to_string()
394}
395
396#[cfg(test)]
397mod tests {
398 use super::*;
399
400 #[tokio::test]
401 async fn test_resolve_non_diamond_update() {
402 let manager = ResourceStateManager::new();
403 let resolver = ConflictResolver::new(manager);
404
405 let update = Update::snapshot(Version::new("v1"), "test content");
406 let result = resolver.resolve_update("doc1", &update, "alice").await;
407
408 assert!(result.is_ok());
409 }
410
411 #[tokio::test]
412 async fn test_resolve_diamond_update() {
413 let manager = ResourceStateManager::new();
414 let resolver = ConflictResolver::new(manager);
415
416 let update =
417 Update::snapshot(Version::new("v1"), "test content").with_merge_type("diamond");
418
419 let result = resolver.resolve_update("doc1", &update, "alice").await;
420 assert!(result.is_ok());
421
422 let resolved = result.unwrap();
423 assert_eq!(resolved.merge_type, Some("diamond".to_string()));
424 }
425
426 #[tokio::test]
427 async fn test_concurrent_diamond_merges() {
428 let manager = ResourceStateManager::new();
429 let resolver = ConflictResolver::new(manager);
430
431 let update1 = Update::snapshot(Version::new("v1"), "hello").with_merge_type("diamond");
432 let update2 = Update::snapshot(Version::new("v2"), "world").with_merge_type("diamond");
433
434 let _ = resolver.resolve_update("doc1", &update1, "alice").await;
435 let result = resolver.resolve_update("doc1", &update2, "bob").await;
436
437 assert!(result.is_ok());
438 }
439}