use std::collections::HashMap;
use std::time::Instant;
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
pub struct OptimisticController {
versions: RwLock<HashMap<String, ResourceVersion>>,
resolution_strategies: RwLock<HashMap<String, ResolutionStrategy>>,
default_strategy: ResolutionStrategy,
conflict_history: RwLock<Vec<ConflictRecord>>,
max_history: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ResourceVersion {
pub version: u64,
pub content_hash: String,
pub last_modifier: String,
#[serde(skip, default = "Instant::now")]
pub modified_at: Instant,
}
impl ResourceVersion {
pub fn new(content_hash: impl Into<String>, modifier: impl Into<String>) -> Self {
Self {
version: 1,
content_hash: content_hash.into(),
last_modifier: modifier.into(),
modified_at: Instant::now(),
}
}
pub fn increment(&mut self, content_hash: impl Into<String>, modifier: impl Into<String>) {
self.version += 1;
self.content_hash = content_hash.into();
self.last_modifier = modifier.into();
self.modified_at = Instant::now();
}
}
#[derive(Debug, Clone, Default)]
pub enum ResolutionStrategy {
LastWriterWins,
#[default]
FirstWriterWins,
Merge(MergeStrategy),
Escalate,
Retry {
max_attempts: u32,
},
}
#[derive(Debug, Clone)]
pub enum MergeStrategy {
TextMerge,
JsonMerge,
Append,
Custom(String),
}
#[derive(Debug, Clone)]
pub struct OptimisticConflict {
pub resource_id: String,
pub conflicting_agent: String,
pub expected_version: u64,
pub actual_version: u64,
pub holder_agent: String,
pub detected_at: Instant,
}
impl OptimisticConflict {
pub fn version_diff(&self) -> u64 {
self.actual_version.saturating_sub(self.expected_version)
}
}
#[derive(Debug, Clone)]
pub struct OptimisticConflictDetails {
pub resource_id: String,
pub agent_a: String,
pub agent_b: String,
pub version_a: ResourceVersion,
pub version_b: ResourceVersion,
pub base_version: ResourceVersion,
pub content_a: Option<String>,
pub content_b: Option<String>,
}
#[derive(Debug, Clone)]
pub enum Resolution {
UseVersion(String),
Merged(String),
AbortBoth,
KeepBoth {
suffix_a: String,
suffix_b: String,
},
Retry,
Escalate {
reason: String,
},
}
#[derive(Debug, Clone)]
pub struct OptimisticToken {
pub resource_id: String,
pub base_version: u64,
pub base_hash: String,
pub agent_id: String,
pub created_at: Instant,
}
impl OptimisticToken {
pub fn is_stale(&self, max_age: std::time::Duration) -> bool {
self.created_at.elapsed() > max_age
}
}
#[derive(Debug, Clone)]
pub struct ConflictRecord {
pub conflict: OptimisticConflict,
pub resolution: Resolution,
pub resolved_at: Instant,
}
impl OptimisticController {
pub fn new() -> Self {
Self {
versions: RwLock::new(HashMap::new()),
resolution_strategies: RwLock::new(HashMap::new()),
default_strategy: ResolutionStrategy::FirstWriterWins,
conflict_history: RwLock::new(Vec::new()),
max_history: 100,
}
}
pub fn with_default_strategy(strategy: ResolutionStrategy) -> Self {
Self {
versions: RwLock::new(HashMap::new()),
resolution_strategies: RwLock::new(HashMap::new()),
default_strategy: strategy,
conflict_history: RwLock::new(Vec::new()),
max_history: 100,
}
}
pub fn with_max_history(mut self, max: usize) -> Self {
self.max_history = max;
self
}
pub async fn begin_optimistic(&self, agent_id: &str, resource_id: &str) -> OptimisticToken {
let versions = self.versions.read().await;
let (base_version, base_hash) = versions
.get(resource_id)
.map(|v| (v.version, v.content_hash.clone()))
.unwrap_or((0, String::new()));
OptimisticToken {
resource_id: resource_id.to_string(),
base_version,
base_hash,
agent_id: agent_id.to_string(),
created_at: Instant::now(),
}
}
pub async fn commit_optimistic(
&self,
token: OptimisticToken,
new_content_hash: &str,
) -> Result<u64, OptimisticConflict> {
let mut versions = self.versions.write().await;
if let Some(current) = versions.get(&token.resource_id)
&& current.version != token.base_version
{
return Err(OptimisticConflict {
resource_id: token.resource_id,
conflicting_agent: token.agent_id,
expected_version: token.base_version,
actual_version: current.version,
holder_agent: current.last_modifier.clone(),
detected_at: Instant::now(),
});
}
let new_version = token.base_version + 1;
versions.insert(
token.resource_id,
ResourceVersion {
version: new_version,
content_hash: new_content_hash.to_string(),
last_modifier: token.agent_id,
modified_at: Instant::now(),
},
);
Ok(new_version)
}
pub async fn commit_or_resolve(
&self,
token: OptimisticToken,
new_content_hash: &str,
new_content: Option<&str>,
) -> Result<CommitResult, String> {
match self
.commit_optimistic(token.clone(), new_content_hash)
.await
{
Ok(version) => Ok(CommitResult::Committed { version }),
Err(conflict) => {
let resolution = self.resolve_conflict_auto(&conflict, new_content).await;
self.record_conflict(conflict.clone(), resolution.clone())
.await;
match resolution {
Resolution::UseVersion(agent) => {
if agent == token.agent_id {
let version = self
.force_commit(&token.resource_id, new_content_hash, &token.agent_id)
.await;
Ok(CommitResult::Committed { version })
} else {
Ok(CommitResult::Rejected {
reason: format!("Conflict resolved in favor of {}", agent),
})
}
}
Resolution::Merged(merged_hash) => {
let version = self
.force_commit(&token.resource_id, &merged_hash, &token.agent_id)
.await;
Ok(CommitResult::Merged {
version,
merged_hash,
})
}
Resolution::Retry => Ok(CommitResult::RetryNeeded {
current_version: conflict.actual_version,
}),
Resolution::AbortBoth => Ok(CommitResult::Aborted {
reason: "Both operations aborted due to conflict".to_string(),
}),
Resolution::KeepBoth { suffix_a, suffix_b } => {
Ok(CommitResult::Split { suffix_a, suffix_b })
}
Resolution::Escalate { reason } => Ok(CommitResult::Escalated { reason }),
}
}
}
}
async fn force_commit(&self, resource_id: &str, content_hash: &str, agent_id: &str) -> u64 {
let mut versions = self.versions.write().await;
let current_version = versions.get(resource_id).map(|v| v.version).unwrap_or(0);
let new_version = current_version + 1;
versions.insert(
resource_id.to_string(),
ResourceVersion {
version: new_version,
content_hash: content_hash.to_string(),
last_modifier: agent_id.to_string(),
modified_at: Instant::now(),
},
);
new_version
}
async fn resolve_conflict_auto(
&self,
conflict: &OptimisticConflict,
_new_content: Option<&str>,
) -> Resolution {
let strategies = self.resolution_strategies.read().await;
let strategy = strategies
.get(&conflict.resource_id)
.cloned()
.unwrap_or_else(|| self.default_strategy.clone());
match strategy {
ResolutionStrategy::LastWriterWins => {
Resolution::UseVersion(conflict.conflicting_agent.clone())
}
ResolutionStrategy::FirstWriterWins => {
Resolution::UseVersion(conflict.holder_agent.clone())
}
ResolutionStrategy::Retry { max_attempts } => {
if conflict.version_diff() < max_attempts as u64 {
Resolution::Retry
} else {
Resolution::Escalate {
reason: format!("Max retry attempts ({}) exceeded", max_attempts),
}
}
}
ResolutionStrategy::Escalate => Resolution::Escalate {
reason: "Configured to escalate all conflicts".to_string(),
},
ResolutionStrategy::Merge(_strategy) => {
Resolution::Escalate {
reason: "Merge requires content, not available".to_string(),
}
}
}
}
pub async fn resolve_conflict(&self, conflict: &OptimisticConflictDetails) -> Resolution {
let strategies = self.resolution_strategies.read().await;
let strategy = strategies
.get(&conflict.resource_id)
.cloned()
.unwrap_or_else(|| self.default_strategy.clone());
match strategy {
ResolutionStrategy::LastWriterWins => Resolution::UseVersion(conflict.agent_b.clone()),
ResolutionStrategy::FirstWriterWins => Resolution::UseVersion(conflict.agent_a.clone()),
ResolutionStrategy::Merge(merge_strategy) => {
self.try_merge(conflict, &merge_strategy).await
}
ResolutionStrategy::Escalate => Resolution::Escalate {
reason: "Policy requires manual resolution".to_string(),
},
ResolutionStrategy::Retry { .. } => Resolution::Retry,
}
}
async fn try_merge(
&self,
conflict: &OptimisticConflictDetails,
strategy: &MergeStrategy,
) -> Resolution {
match (strategy, &conflict.content_a, &conflict.content_b) {
(MergeStrategy::Append, Some(a), Some(b)) => {
let merged = format!("{}\n{}", a, b);
Resolution::Merged(hash_content(&merged))
}
(MergeStrategy::TextMerge, Some(a), Some(b)) => {
let lines_a: Vec<&str> = a.lines().collect();
let lines_b: Vec<&str> = b.lines().collect();
let mut merged = Vec::new();
let mut used_b: Vec<bool> = vec![false; lines_b.len()];
for line_a in &lines_a {
merged.push(*line_a);
for (i, line_b) in lines_b.iter().enumerate() {
if !used_b[i] && line_a == line_b {
used_b[i] = true;
break;
}
}
}
for (i, line_b) in lines_b.iter().enumerate() {
if !used_b[i] {
merged.push(*line_b);
}
}
let merged_content = merged.join("\n");
Resolution::Merged(hash_content(&merged_content))
}
(MergeStrategy::JsonMerge, Some(a), Some(b)) => {
match (
serde_json::from_str::<serde_json::Value>(a),
serde_json::from_str::<serde_json::Value>(b),
) {
(Ok(mut val_a), Ok(val_b)) => {
json_deep_merge(&mut val_a, &val_b);
let merged_content = serde_json::to_string_pretty(&val_a)
.unwrap_or_else(|_| format!("{}", val_a));
Resolution::Merged(hash_content(&merged_content))
}
_ => Resolution::Escalate {
reason: "Failed to parse content as JSON for merge".to_string(),
},
}
}
_ => Resolution::Escalate {
reason: "Content not available for merge".to_string(),
},
}
}
async fn record_conflict(&self, conflict: OptimisticConflict, resolution: Resolution) {
let mut history = self.conflict_history.write().await;
history.push(ConflictRecord {
conflict,
resolution,
resolved_at: Instant::now(),
});
while history.len() > self.max_history {
history.remove(0);
}
}
pub async fn register_strategy(&self, resource_pattern: &str, strategy: ResolutionStrategy) {
self.resolution_strategies
.write()
.await
.insert(resource_pattern.to_string(), strategy);
}
pub async fn get_version(&self, resource_id: &str) -> Option<ResourceVersion> {
self.versions.read().await.get(resource_id).cloned()
}
pub async fn has_changed(&self, resource_id: &str, since_version: u64) -> bool {
self.versions
.read()
.await
.get(resource_id)
.map(|v| v.version > since_version)
.unwrap_or(false)
}
pub async fn get_conflict_history(&self) -> Vec<ConflictRecord> {
self.conflict_history.read().await.clone()
}
pub async fn clear_history(&self) {
self.conflict_history.write().await.clear();
}
pub async fn get_stats(&self) -> OptimisticStats {
let history = self.conflict_history.read().await;
let versions = self.versions.read().await;
let total_conflicts = history.len();
let resolved_by_retry = history
.iter()
.filter(|r| matches!(r.resolution, Resolution::Retry))
.count();
let escalated = history
.iter()
.filter(|r| matches!(r.resolution, Resolution::Escalate { .. }))
.count();
OptimisticStats {
total_resources: versions.len(),
total_conflicts,
resolved_by_retry,
escalated,
}
}
}
impl Default for OptimisticController {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone)]
pub enum CommitResult {
Committed {
version: u64,
},
Merged {
version: u64,
merged_hash: String,
},
RetryNeeded {
current_version: u64,
},
Rejected {
reason: String,
},
Aborted {
reason: String,
},
Split {
suffix_a: String,
suffix_b: String,
},
Escalated {
reason: String,
},
}
impl CommitResult {
pub fn is_success(&self) -> bool {
matches!(
self,
CommitResult::Committed { .. } | CommitResult::Merged { .. }
)
}
pub fn version(&self) -> Option<u64> {
match self {
CommitResult::Committed { version } | CommitResult::Merged { version, .. } => {
Some(*version)
}
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub struct OptimisticStats {
pub total_resources: usize,
pub total_conflicts: usize,
pub resolved_by_retry: usize,
pub escalated: usize,
}
fn json_deep_merge(target: &mut serde_json::Value, source: &serde_json::Value) {
match (target, source) {
(serde_json::Value::Object(t), serde_json::Value::Object(s)) => {
for (key, value) in s {
json_deep_merge(
t.entry(key.clone()).or_insert(serde_json::Value::Null),
value,
);
}
}
(target, source) => {
*target = source.clone();
}
}
}
fn hash_content(content: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
content.hash(&mut hasher);
format!("{:x}", hasher.finish())
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_optimistic_commit_success() {
let controller = OptimisticController::new();
let token = controller.begin_optimistic("agent-1", "file.txt").await;
assert_eq!(token.base_version, 0);
let result = controller.commit_optimistic(token, "hash123").await;
assert!(result.is_ok());
assert_eq!(result.unwrap(), 1);
}
#[tokio::test]
async fn test_optimistic_commit_conflict() {
let controller = OptimisticController::new();
let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
let result1 = controller.commit_optimistic(token1, "hash1").await;
assert!(result1.is_ok());
let result2 = controller.commit_optimistic(token2, "hash2").await;
assert!(result2.is_err());
let conflict = result2.unwrap_err();
assert_eq!(conflict.expected_version, 0);
assert_eq!(conflict.actual_version, 1);
assert_eq!(conflict.holder_agent, "agent-1");
}
#[tokio::test]
async fn test_version_tracking() {
let controller = OptimisticController::new();
let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
controller.commit_optimistic(token1, "hash1").await.unwrap();
let token2 = controller.begin_optimistic("agent-1", "file.txt").await;
assert_eq!(token2.base_version, 1);
controller.commit_optimistic(token2, "hash2").await.unwrap();
let version = controller.get_version("file.txt").await.unwrap();
assert_eq!(version.version, 2);
assert_eq!(version.content_hash, "hash2");
}
#[tokio::test]
async fn test_resolution_strategy_last_writer_wins() {
let controller =
OptimisticController::with_default_strategy(ResolutionStrategy::LastWriterWins);
let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
controller.commit_optimistic(token1, "hash1").await.unwrap();
let result = controller
.commit_or_resolve(token2, "hash2", None)
.await
.unwrap();
assert!(result.is_success());
}
#[tokio::test]
async fn test_resolution_strategy_first_writer_wins() {
let controller =
OptimisticController::with_default_strategy(ResolutionStrategy::FirstWriterWins);
let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
controller.commit_optimistic(token1, "hash1").await.unwrap();
let result = controller
.commit_or_resolve(token2, "hash2", None)
.await
.unwrap();
match result {
CommitResult::Rejected { reason } => {
assert!(reason.contains("agent-1"));
}
_ => panic!("Expected rejection"),
}
}
#[tokio::test]
async fn test_has_changed() {
let controller = OptimisticController::new();
assert!(!controller.has_changed("file.txt", 0).await);
let token = controller.begin_optimistic("agent-1", "file.txt").await;
controller.commit_optimistic(token, "hash1").await.unwrap();
assert!(controller.has_changed("file.txt", 0).await);
assert!(!controller.has_changed("file.txt", 1).await);
}
#[tokio::test]
async fn test_conflict_history() {
let controller = OptimisticController::new();
let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
controller.commit_optimistic(token1, "hash1").await.unwrap();
let _ = controller.commit_or_resolve(token2, "hash2", None).await;
let history = controller.get_conflict_history().await;
assert_eq!(history.len(), 1);
assert_eq!(history[0].conflict.conflicting_agent, "agent-2");
}
#[tokio::test]
async fn test_stats() {
let controller = OptimisticController::new();
for i in 0..5 {
let token = controller
.begin_optimistic("agent-1", &format!("file{}.txt", i))
.await;
controller
.commit_optimistic(token, &format!("hash{}", i))
.await
.unwrap();
}
let stats = controller.get_stats().await;
assert_eq!(stats.total_resources, 5);
assert_eq!(stats.total_conflicts, 0);
}
#[test]
fn test_token_staleness() {
let token = OptimisticToken {
resource_id: "test".to_string(),
base_version: 0,
base_hash: String::new(),
agent_id: "agent-1".to_string(),
created_at: Instant::now() - std::time::Duration::from_secs(120),
};
assert!(token.is_stale(std::time::Duration::from_secs(60)));
assert!(!token.is_stale(std::time::Duration::from_secs(180)));
}
#[tokio::test]
async fn test_custom_strategy_per_resource() {
let controller = OptimisticController::new();
controller
.register_strategy("special.txt", ResolutionStrategy::LastWriterWins)
.await;
let token1 = controller.begin_optimistic("agent-1", "special.txt").await;
let token2 = controller.begin_optimistic("agent-2", "special.txt").await;
controller.commit_optimistic(token1, "hash1").await.unwrap();
let result = controller
.commit_or_resolve(token2, "hash2", None)
.await
.unwrap();
assert!(result.is_success());
}
}