use crate::core::server::ResourceStateManager;
use crate::core::{Update, Version};
use serde_json::{json, Value};
#[derive(Clone)]
pub struct ConflictResolver {
resource_manager: ResourceStateManager,
}
impl ConflictResolver {
#[must_use]
pub fn new(resource_manager: ResourceStateManager) -> Self {
Self { resource_manager }
}
pub async fn resolve_update(
&self,
resource_id: &str,
update: &Update,
agent_id: &str,
) -> Result<Update, String> {
match &update.merge_type {
Some(merge_type) if merge_type == "diamond" => {
self.resolve_diamond_merge(resource_id, update, agent_id)
.await
}
_ => Ok(update.clone()),
}
}
async fn resolve_diamond_merge(
&self,
resource_id: &str,
update: &Update,
agent_id: &str,
) -> Result<Update, String> {
if let Some(body_bytes) = &update.body {
let body_str = String::from_utf8_lossy(body_bytes);
if body_str.starts_with('{') && body_str.ends_with('}') {
if let Ok(json_data) = serde_json::from_str::<Value>(&body_str) {
if json_data.is_object() {
let version_id = update.version.get(0).map(|v| v.to_string());
let parents_vec: Vec<String> =
update.parents.iter().map(|v| v.to_string()).collect();
let parents = if parents_vec.is_empty() {
None
} else {
Some(parents_vec.as_slice())
};
return self
.handle_diamond_json(
resource_id,
&json_data,
agent_id,
version_id.as_deref(),
parents,
)
.await;
}
}
}
let version_id = update.version.get(0).map(|v| v.to_string());
let version_ref = version_id.as_deref();
let _ = self.resource_manager.apply_update(
resource_id,
&body_str,
agent_id,
version_ref,
None, )?;
}
let update = self.build_merged_response(resource_id, agent_id).await?;
if let Some(v) = update.version.get(0) {
let frontier = {
let resource = self.resource_manager.get_resource(resource_id).unwrap();
let state = resource.lock();
state.crdt.get_local_frontier()
};
self.resource_manager
.register_version_mapping(resource_id, v.to_string(), frontier);
}
Ok(update)
}
pub async fn get_history(
&self,
resource_id: &str,
since_versions: &[&str],
) -> Result<Vec<Update>, String> {
let serialized_ops_list = self
.resource_manager
.get_history(resource_id, since_versions)?;
let mut updates = Vec::new();
for ops in serialized_ops_list {
let update = Update::snapshot(
crate::core::Version::new("history-delta"),
serde_json::to_vec(&ops).map_err(|e| e.to_string())?,
);
updates.push(update);
}
Ok(updates)
}
async fn handle_diamond_json(
&self,
resource_id: &str,
json_data: &Value,
agent_id: &str,
version_id: Option<&str>,
parents: Option<&[String]>,
) -> Result<Update, String> {
let parent_strs: Option<Vec<&str>> =
parents.map(|p| p.iter().map(|s| s.as_str()).collect());
let parent_refs = parent_strs.as_ref().map(|v| v.as_slice());
self.apply_insert_operations(resource_id, json_data, agent_id, version_id, parent_refs);
self.apply_delete_operations(resource_id, json_data, agent_id, version_id, parent_refs);
let merged_state = self
.resource_manager
.get_resource_state(resource_id)
.ok_or_else(|| "Failed to retrieve resource state after merge".to_string())?;
let merged_content = extract_string(&merged_state, "content", "");
let quality = self
.resource_manager
.get_merge_quality(resource_id)
.unwrap_or(0);
let version_str = extract_string(&merged_state, "version", &format!("merged-{}", agent_id));
let response_body = json!({
"content": merged_content,
"merge_quality": quality,
"agents": [agent_id],
})
.to_string();
Ok(Update::snapshot(Version::new(&version_str), response_body).with_merge_type("diamond"))
}
fn apply_insert_operations(
&self,
resource_id: &str,
json_data: &Value,
agent_id: &str,
version_id: Option<&str>,
parents: Option<&[&str]>,
) {
if let Some(inserts) = json_data.get("inserts").and_then(|v| v.as_array()) {
for insert in inserts {
if let (Some(pos), Some(text)) = (
insert.get("pos").and_then(|v| v.as_u64()),
insert.get("text").and_then(|v| v.as_str()),
) {
if let Some(p) = parents {
let _ = self.resource_manager.apply_remote_insert_versioned(
resource_id,
agent_id,
p,
pos as usize,
text,
version_id,
Some("diamond"),
);
} else {
let _ = self.resource_manager.apply_remote_insert(
resource_id,
agent_id,
pos as usize,
text,
version_id,
Some("diamond"),
);
}
}
}
}
}
fn apply_delete_operations(
&self,
resource_id: &str,
json_data: &Value,
agent_id: &str,
version_id: Option<&str>,
parents: Option<&[&str]>,
) {
if let Some(deletes) = json_data.get("deletes").and_then(|v| v.as_array()) {
for delete in deletes {
if let (Some(start), Some(end)) = (
delete.get("start").and_then(|v| v.as_u64()),
delete.get("end").and_then(|v| v.as_u64()),
) {
if let Some(p) = parents {
let _ = self.resource_manager.apply_remote_delete_versioned(
resource_id,
agent_id,
p,
(start as usize)..(end as usize),
version_id,
Some("diamond"),
);
} else {
let _ = self.resource_manager.apply_remote_delete(
resource_id,
agent_id,
start as usize,
end as usize,
version_id,
Some("diamond"),
);
}
}
}
}
}
async fn build_merged_response(
&self,
resource_id: &str,
agent_id: &str,
) -> Result<Update, String> {
let merged_state = self
.resource_manager
.get_resource_state(resource_id)
.ok_or_else(|| "Failed to retrieve merged resource state".to_string())?;
let merged_content = extract_string(&merged_state, "content", "");
let version_str = extract_string(&merged_state, "version", &format!("merged-{}", agent_id));
Ok(Update::snapshot(Version::new(&version_str), merged_content).with_merge_type("diamond"))
}
#[inline]
#[must_use]
pub fn get_resource_content(&self, resource_id: &str) -> Option<String> {
self.resource_manager
.get_resource_state(resource_id)
.and_then(|state| state["content"].as_str().map(|s| s.to_string()))
}
#[inline]
#[must_use]
pub fn get_resource_version(&self, resource_id: &str) -> Option<Version> {
self.resource_manager
.get_resource_state(resource_id)
.and_then(|state| state["version"].as_str().map(Version::new))
}
}
fn extract_string(json: &Value, key: &str, default: &str) -> String {
json.get(key)
.and_then(|v| v.as_str())
.unwrap_or(default)
.to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_resolve_non_diamond_update() {
let manager = ResourceStateManager::new();
let resolver = ConflictResolver::new(manager);
let update = Update::snapshot(Version::new("v1"), "test content");
let result = resolver.resolve_update("doc1", &update, "alice").await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_resolve_diamond_update() {
let manager = ResourceStateManager::new();
let resolver = ConflictResolver::new(manager);
let update =
Update::snapshot(Version::new("v1"), "test content").with_merge_type("diamond");
let result = resolver.resolve_update("doc1", &update, "alice").await;
assert!(result.is_ok());
let resolved = result.unwrap();
assert_eq!(resolved.merge_type, Some("diamond".to_string()));
}
#[tokio::test]
async fn test_concurrent_diamond_merges() {
let manager = ResourceStateManager::new();
let resolver = ConflictResolver::new(manager);
let update1 = Update::snapshot(Version::new("v1"), "hello").with_merge_type("diamond");
let update2 = Update::snapshot(Version::new("v2"), "world").with_merge_type("diamond");
let _ = resolver.resolve_update("doc1", &update1, "alice").await;
let result = resolver.resolve_update("doc1", &update2, "bob").await;
assert!(result.is_ok());
}
}