Skip to main content

brainwires_agents/
optimistic.rs

1//! Optimistic Concurrency with Conflict Resolution
2//!
3//! Based on Multi-Agent Coordination Survey research, this module provides
4//! optimistic concurrency control that allows agents to proceed with operations
5//! without acquiring locks upfront. Conflicts are detected at commit time
6//! and resolved using configured strategies.
7//!
8//! # When to Use Optimistic vs Pessimistic
9//!
10//! | Scenario | Approach | Rationale |
11//! |----------|----------|-----------|
12//! | File reads | Optimistic | High contention unlikely |
13//! | File writes to different files | Optimistic | No actual conflict |
14//! | File writes to same file | Pessimistic | Real conflict likely |
15//! | Build operations | Pessimistic | Expensive to retry |
16//! | Git staging | Optimistic | Can merge staging areas |
17//! | Git commit | Pessimistic | Must be sequential |
18//! | Git push | Pessimistic | Remote state matters |
19//!
20//! # Key Concepts
21//!
22//! - **OptimisticToken**: Captures the version at the start of an operation
23//! - **ResourceVersion**: Tracks version, hash, and modifier for each resource
24//! - **ResolutionStrategy**: Configures how conflicts are resolved
25//! - **OptimisticConflict**: Describes a detected conflict for resolution
26
27use std::collections::HashMap;
28use std::time::Instant;
29
30use serde::{Deserialize, Serialize};
31use tokio::sync::RwLock;
32
33/// Optimistic concurrency controller
34pub struct OptimisticController {
35    /// Version tracking for resources
36    versions: RwLock<HashMap<String, ResourceVersion>>,
37    /// Conflict resolution strategies by resource pattern
38    resolution_strategies: RwLock<HashMap<String, ResolutionStrategy>>,
39    /// Default resolution strategy
40    default_strategy: ResolutionStrategy,
41    /// Conflict history for debugging/analysis
42    conflict_history: RwLock<Vec<ConflictRecord>>,
43    /// Maximum history entries to keep
44    max_history: usize,
45}
46
47/// Version information for a resource
48#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ResourceVersion {
50    /// Monotonic version number
51    pub version: u64,
52    /// Content hash for change detection
53    pub content_hash: String,
54    /// Agent that last modified this resource
55    pub last_modifier: String,
56    /// When the modification occurred
57    #[serde(skip, default = "Instant::now")]
58    pub modified_at: Instant,
59}
60
61impl ResourceVersion {
62    /// Create a new resource version
63    pub fn new(content_hash: impl Into<String>, modifier: impl Into<String>) -> Self {
64        Self {
65            version: 1,
66            content_hash: content_hash.into(),
67            last_modifier: modifier.into(),
68            modified_at: Instant::now(),
69        }
70    }
71
72    /// Increment version with new hash
73    pub fn increment(&mut self, content_hash: impl Into<String>, modifier: impl Into<String>) {
74        self.version += 1;
75        self.content_hash = content_hash.into();
76        self.last_modifier = modifier.into();
77        self.modified_at = Instant::now();
78    }
79}
80
81/// Strategy for resolving conflicts
82#[derive(Debug, Clone, Default)]
83pub enum ResolutionStrategy {
84    /// Last writer wins (overwrite)
85    LastWriterWins,
86    /// First writer wins (reject later)
87    #[default]
88    FirstWriterWins,
89    /// Attempt to merge changes
90    Merge(MergeStrategy),
91    /// Escalate to orchestrator/user
92    Escalate,
93    /// Retry the operation with fresh state
94    Retry {
95        /// Maximum number of retry attempts.
96        max_attempts: u32,
97    },
98}
99
100/// Strategies for merging conflicting changes
101#[derive(Debug, Clone)]
102pub enum MergeStrategy {
103    /// Line-by-line merge for text files
104    TextMerge,
105    /// JSON deep merge
106    JsonMerge,
107    /// Append both versions
108    Append,
109    /// Custom merge function name (for extension)
110    Custom(String),
111}
112
113/// Describes a conflict between two operations
114#[derive(Debug, Clone)]
115pub struct OptimisticConflict {
116    /// Resource that had the conflict
117    pub resource_id: String,
118    /// Agent that tried to commit
119    pub conflicting_agent: String,
120    /// Version the agent expected
121    pub expected_version: u64,
122    /// Current version when commit was attempted
123    pub actual_version: u64,
124    /// Agent that made the conflicting change
125    pub holder_agent: String,
126    /// When the conflict was detected
127    pub detected_at: Instant,
128}
129
130impl OptimisticConflict {
131    /// Get the version difference
132    pub fn version_diff(&self) -> u64 {
133        self.actual_version.saturating_sub(self.expected_version)
134    }
135}
136
137/// Full conflict information for resolution
138#[derive(Debug, Clone)]
139pub struct OptimisticConflictDetails {
140    /// The conflicting resource
141    pub resource_id: String,
142    /// First agent's data
143    pub agent_a: String,
144    /// Second agent's data
145    pub agent_b: String,
146    /// First agent's version
147    pub version_a: ResourceVersion,
148    /// Second agent's version
149    pub version_b: ResourceVersion,
150    /// Base version before both changes
151    pub base_version: ResourceVersion,
152    /// First agent's content (if available)
153    pub content_a: Option<String>,
154    /// Second agent's content (if available)
155    pub content_b: Option<String>,
156}
157
158/// Result of conflict resolution
159#[derive(Debug, Clone)]
160pub enum Resolution {
161    /// Use version from specified agent
162    UseVersion(String),
163    /// Use merged content
164    Merged(String),
165    /// Abort both operations
166    AbortBoth,
167    /// Keep both as separate resources
168    KeepBoth {
169        /// Suffix for the first version.
170        suffix_a: String,
171        /// Suffix for the second version.
172        suffix_b: String,
173    },
174    /// Retry with fresh state
175    Retry,
176    /// Escalate to higher authority
177    Escalate {
178        /// Reason for escalation.
179        reason: String,
180    },
181}
182
183/// Token for optimistic operations
184#[derive(Debug, Clone)]
185pub struct OptimisticToken {
186    /// Resource identifier
187    pub resource_id: String,
188    /// Version at the start of the operation
189    pub base_version: u64,
190    /// Content hash at the start
191    pub base_hash: String,
192    /// Agent performing the operation
193    pub agent_id: String,
194    /// When the token was created
195    pub created_at: Instant,
196}
197
198impl OptimisticToken {
199    /// Check if this token has expired (stale)
200    pub fn is_stale(&self, max_age: std::time::Duration) -> bool {
201        self.created_at.elapsed() > max_age
202    }
203}
204
205/// Record of a conflict for history/debugging
206#[derive(Debug, Clone)]
207pub struct ConflictRecord {
208    /// The conflict details
209    pub conflict: OptimisticConflict,
210    /// How it was resolved
211    pub resolution: Resolution,
212    /// When resolved
213    pub resolved_at: Instant,
214}
215
216impl OptimisticController {
217    /// Create a new optimistic controller with default settings
218    pub fn new() -> Self {
219        Self {
220            versions: RwLock::new(HashMap::new()),
221            resolution_strategies: RwLock::new(HashMap::new()),
222            default_strategy: ResolutionStrategy::FirstWriterWins,
223            conflict_history: RwLock::new(Vec::new()),
224            max_history: 100,
225        }
226    }
227
228    /// Create with a custom default strategy
229    pub fn with_default_strategy(strategy: ResolutionStrategy) -> Self {
230        Self {
231            versions: RwLock::new(HashMap::new()),
232            resolution_strategies: RwLock::new(HashMap::new()),
233            default_strategy: strategy,
234            conflict_history: RwLock::new(Vec::new()),
235            max_history: 100,
236        }
237    }
238
239    /// Set maximum history entries
240    pub fn with_max_history(mut self, max: usize) -> Self {
241        self.max_history = max;
242        self
243    }
244
245    /// Start an optimistic operation - returns token with current version
246    pub async fn begin_optimistic(&self, agent_id: &str, resource_id: &str) -> OptimisticToken {
247        let versions = self.versions.read().await;
248        let (base_version, base_hash) = versions
249            .get(resource_id)
250            .map(|v| (v.version, v.content_hash.clone()))
251            .unwrap_or((0, String::new()));
252
253        OptimisticToken {
254            resource_id: resource_id.to_string(),
255            base_version,
256            base_hash,
257            agent_id: agent_id.to_string(),
258            created_at: Instant::now(),
259        }
260    }
261
262    /// Commit optimistic operation - returns conflict if version changed
263    pub async fn commit_optimistic(
264        &self,
265        token: OptimisticToken,
266        new_content_hash: &str,
267    ) -> Result<u64, OptimisticConflict> {
268        let mut versions = self.versions.write().await;
269
270        // Check for conflict
271        if let Some(current) = versions.get(&token.resource_id)
272            && current.version != token.base_version
273        {
274            return Err(OptimisticConflict {
275                resource_id: token.resource_id,
276                conflicting_agent: token.agent_id,
277                expected_version: token.base_version,
278                actual_version: current.version,
279                holder_agent: current.last_modifier.clone(),
280                detected_at: Instant::now(),
281            });
282        }
283
284        // No conflict - commit the change
285        let new_version = token.base_version + 1;
286        versions.insert(
287            token.resource_id,
288            ResourceVersion {
289                version: new_version,
290                content_hash: new_content_hash.to_string(),
291                last_modifier: token.agent_id,
292                modified_at: Instant::now(),
293            },
294        );
295
296        Ok(new_version)
297    }
298
299    /// Try to commit, and if conflict occurs, resolve it
300    pub async fn commit_or_resolve(
301        &self,
302        token: OptimisticToken,
303        new_content_hash: &str,
304        new_content: Option<&str>,
305    ) -> Result<CommitResult, String> {
306        match self
307            .commit_optimistic(token.clone(), new_content_hash)
308            .await
309        {
310            Ok(version) => Ok(CommitResult::Committed { version }),
311            Err(conflict) => {
312                let resolution = self.resolve_conflict_auto(&conflict, new_content).await;
313
314                // Record the conflict
315                self.record_conflict(conflict.clone(), resolution.clone())
316                    .await;
317
318                match resolution {
319                    Resolution::UseVersion(agent) => {
320                        if agent == token.agent_id {
321                            // Force our version
322                            let version = self
323                                .force_commit(&token.resource_id, new_content_hash, &token.agent_id)
324                                .await;
325                            Ok(CommitResult::Committed { version })
326                        } else {
327                            Ok(CommitResult::Rejected {
328                                reason: format!("Conflict resolved in favor of {}", agent),
329                            })
330                        }
331                    }
332                    Resolution::Merged(merged_hash) => {
333                        let version = self
334                            .force_commit(&token.resource_id, &merged_hash, &token.agent_id)
335                            .await;
336                        Ok(CommitResult::Merged {
337                            version,
338                            merged_hash,
339                        })
340                    }
341                    Resolution::Retry => Ok(CommitResult::RetryNeeded {
342                        current_version: conflict.actual_version,
343                    }),
344                    Resolution::AbortBoth => Ok(CommitResult::Aborted {
345                        reason: "Both operations aborted due to conflict".to_string(),
346                    }),
347                    Resolution::KeepBoth { suffix_a, suffix_b } => {
348                        Ok(CommitResult::Split { suffix_a, suffix_b })
349                    }
350                    Resolution::Escalate { reason } => Ok(CommitResult::Escalated { reason }),
351                }
352            }
353        }
354    }
355
356    /// Force commit without version check (for resolution)
357    async fn force_commit(&self, resource_id: &str, content_hash: &str, agent_id: &str) -> u64 {
358        let mut versions = self.versions.write().await;
359        let current_version = versions.get(resource_id).map(|v| v.version).unwrap_or(0);
360        let new_version = current_version + 1;
361
362        versions.insert(
363            resource_id.to_string(),
364            ResourceVersion {
365                version: new_version,
366                content_hash: content_hash.to_string(),
367                last_modifier: agent_id.to_string(),
368                modified_at: Instant::now(),
369            },
370        );
371
372        new_version
373    }
374
375    /// Resolve a conflict using the configured strategy
376    async fn resolve_conflict_auto(
377        &self,
378        conflict: &OptimisticConflict,
379        _new_content: Option<&str>,
380    ) -> Resolution {
381        let strategies = self.resolution_strategies.read().await;
382        let strategy = strategies
383            .get(&conflict.resource_id)
384            .cloned()
385            .unwrap_or_else(|| self.default_strategy.clone());
386
387        match strategy {
388            ResolutionStrategy::LastWriterWins => {
389                Resolution::UseVersion(conflict.conflicting_agent.clone())
390            }
391            ResolutionStrategy::FirstWriterWins => {
392                Resolution::UseVersion(conflict.holder_agent.clone())
393            }
394            ResolutionStrategy::Retry { max_attempts } => {
395                // Check if we should retry
396                if conflict.version_diff() < max_attempts as u64 {
397                    Resolution::Retry
398                } else {
399                    Resolution::Escalate {
400                        reason: format!("Max retry attempts ({}) exceeded", max_attempts),
401                    }
402                }
403            }
404            ResolutionStrategy::Escalate => Resolution::Escalate {
405                reason: "Configured to escalate all conflicts".to_string(),
406            },
407            ResolutionStrategy::Merge(_strategy) => {
408                // Merge requires content - if not available, escalate
409                Resolution::Escalate {
410                    reason: "Merge requires content, not available".to_string(),
411                }
412            }
413        }
414    }
415
416    /// Manually resolve a conflict with full details
417    pub async fn resolve_conflict(&self, conflict: &OptimisticConflictDetails) -> Resolution {
418        let strategies = self.resolution_strategies.read().await;
419        let strategy = strategies
420            .get(&conflict.resource_id)
421            .cloned()
422            .unwrap_or_else(|| self.default_strategy.clone());
423
424        match strategy {
425            ResolutionStrategy::LastWriterWins => Resolution::UseVersion(conflict.agent_b.clone()),
426            ResolutionStrategy::FirstWriterWins => Resolution::UseVersion(conflict.agent_a.clone()),
427            ResolutionStrategy::Merge(merge_strategy) => {
428                self.try_merge(conflict, &merge_strategy).await
429            }
430            ResolutionStrategy::Escalate => Resolution::Escalate {
431                reason: "Policy requires manual resolution".to_string(),
432            },
433            ResolutionStrategy::Retry { .. } => Resolution::Retry,
434        }
435    }
436
437    /// Attempt to merge content
438    async fn try_merge(
439        &self,
440        conflict: &OptimisticConflictDetails,
441        strategy: &MergeStrategy,
442    ) -> Resolution {
443        match (strategy, &conflict.content_a, &conflict.content_b) {
444            (MergeStrategy::Append, Some(a), Some(b)) => {
445                let merged = format!("{}\n{}", a, b);
446                Resolution::Merged(hash_content(&merged))
447            }
448            (MergeStrategy::TextMerge, Some(a), Some(b)) => {
449                // Line-by-line merge: deduplicate shared lines, append unique lines from both
450                let lines_a: Vec<&str> = a.lines().collect();
451                let lines_b: Vec<&str> = b.lines().collect();
452                let mut merged = Vec::new();
453                let mut used_b: Vec<bool> = vec![false; lines_b.len()];
454
455                for line_a in &lines_a {
456                    merged.push(*line_a);
457                    // Mark matching lines in b as consumed
458                    for (i, line_b) in lines_b.iter().enumerate() {
459                        if !used_b[i] && line_a == line_b {
460                            used_b[i] = true;
461                            break;
462                        }
463                    }
464                }
465                // Append lines from b that weren't already present
466                for (i, line_b) in lines_b.iter().enumerate() {
467                    if !used_b[i] {
468                        merged.push(*line_b);
469                    }
470                }
471
472                let merged_content = merged.join("\n");
473                Resolution::Merged(hash_content(&merged_content))
474            }
475            (MergeStrategy::JsonMerge, Some(a), Some(b)) => {
476                // JSON deep merge: parse both as JSON objects and merge keys
477                match (
478                    serde_json::from_str::<serde_json::Value>(a),
479                    serde_json::from_str::<serde_json::Value>(b),
480                ) {
481                    (Ok(mut val_a), Ok(val_b)) => {
482                        json_deep_merge(&mut val_a, &val_b);
483                        let merged_content = serde_json::to_string_pretty(&val_a)
484                            .unwrap_or_else(|_| format!("{}", val_a));
485                        Resolution::Merged(hash_content(&merged_content))
486                    }
487                    _ => Resolution::Escalate {
488                        reason: "Failed to parse content as JSON for merge".to_string(),
489                    },
490                }
491            }
492            _ => Resolution::Escalate {
493                reason: "Content not available for merge".to_string(),
494            },
495        }
496    }
497
498    /// Record a conflict in history
499    async fn record_conflict(&self, conflict: OptimisticConflict, resolution: Resolution) {
500        let mut history = self.conflict_history.write().await;
501
502        history.push(ConflictRecord {
503            conflict,
504            resolution,
505            resolved_at: Instant::now(),
506        });
507
508        // Trim history if needed
509        while history.len() > self.max_history {
510            history.remove(0);
511        }
512    }
513
514    /// Register a resolution strategy for a resource pattern
515    pub async fn register_strategy(&self, resource_pattern: &str, strategy: ResolutionStrategy) {
516        self.resolution_strategies
517            .write()
518            .await
519            .insert(resource_pattern.to_string(), strategy);
520    }
521
522    /// Get the current version of a resource
523    pub async fn get_version(&self, resource_id: &str) -> Option<ResourceVersion> {
524        self.versions.read().await.get(resource_id).cloned()
525    }
526
527    /// Check if a resource has been modified since a given version
528    pub async fn has_changed(&self, resource_id: &str, since_version: u64) -> bool {
529        self.versions
530            .read()
531            .await
532            .get(resource_id)
533            .map(|v| v.version > since_version)
534            .unwrap_or(false)
535    }
536
537    /// Get conflict history
538    pub async fn get_conflict_history(&self) -> Vec<ConflictRecord> {
539        self.conflict_history.read().await.clone()
540    }
541
542    /// Clear conflict history
543    pub async fn clear_history(&self) {
544        self.conflict_history.write().await.clear();
545    }
546
547    /// Get statistics about conflicts
548    pub async fn get_stats(&self) -> OptimisticStats {
549        let history = self.conflict_history.read().await;
550        let versions = self.versions.read().await;
551
552        let total_conflicts = history.len();
553        let resolved_by_retry = history
554            .iter()
555            .filter(|r| matches!(r.resolution, Resolution::Retry))
556            .count();
557        let escalated = history
558            .iter()
559            .filter(|r| matches!(r.resolution, Resolution::Escalate { .. }))
560            .count();
561
562        OptimisticStats {
563            total_resources: versions.len(),
564            total_conflicts,
565            resolved_by_retry,
566            escalated,
567        }
568    }
569}
570
571impl Default for OptimisticController {
572    fn default() -> Self {
573        Self::new()
574    }
575}
576
577/// Result of a commit operation
578#[derive(Debug, Clone)]
579pub enum CommitResult {
580    /// Successfully committed
581    Committed {
582        /// New version number.
583        version: u64,
584    },
585    /// Merged with existing changes
586    Merged {
587        /// New version number.
588        version: u64,
589        /// Hash of the merged content.
590        merged_hash: String,
591    },
592    /// Need to retry with fresh state
593    RetryNeeded {
594        /// Current version to retry against.
595        current_version: u64,
596    },
597    /// Commit was rejected
598    Rejected {
599        /// Reason for rejection.
600        reason: String,
601    },
602    /// Both operations aborted
603    Aborted {
604        /// Reason for abort.
605        reason: String,
606    },
607    /// Split into separate resources
608    Split {
609        /// Suffix for the first version.
610        suffix_a: String,
611        /// Suffix for the second version.
612        suffix_b: String,
613    },
614    /// Escalated to higher authority
615    Escalated {
616        /// Reason for escalation.
617        reason: String,
618    },
619}
620
621impl CommitResult {
622    /// Check if the commit succeeded
623    pub fn is_success(&self) -> bool {
624        matches!(
625            self,
626            CommitResult::Committed { .. } | CommitResult::Merged { .. }
627        )
628    }
629
630    /// Get the new version if successful
631    pub fn version(&self) -> Option<u64> {
632        match self {
633            CommitResult::Committed { version } | CommitResult::Merged { version, .. } => {
634                Some(*version)
635            }
636            _ => None,
637        }
638    }
639}
640
641/// Statistics about optimistic concurrency
642#[derive(Debug, Clone)]
643pub struct OptimisticStats {
644    /// Number of resources being tracked
645    pub total_resources: usize,
646    /// Total number of conflicts recorded
647    pub total_conflicts: usize,
648    /// Conflicts resolved by retry
649    pub resolved_by_retry: usize,
650    /// Conflicts escalated
651    pub escalated: usize,
652}
653
654/// Helper function to hash content
655/// Recursively merge `source` into `target`. For objects, keys from `source`
656/// are inserted/overwritten in `target`. For non-object values, `source` wins.
657fn json_deep_merge(target: &mut serde_json::Value, source: &serde_json::Value) {
658    match (target, source) {
659        (serde_json::Value::Object(t), serde_json::Value::Object(s)) => {
660            for (key, value) in s {
661                json_deep_merge(
662                    t.entry(key.clone()).or_insert(serde_json::Value::Null),
663                    value,
664                );
665            }
666        }
667        (target, source) => {
668            *target = source.clone();
669        }
670    }
671}
672
673fn hash_content(content: &str) -> String {
674    use std::collections::hash_map::DefaultHasher;
675    use std::hash::{Hash, Hasher};
676
677    let mut hasher = DefaultHasher::new();
678    content.hash(&mut hasher);
679    format!("{:x}", hasher.finish())
680}
681
682#[cfg(test)]
683mod tests {
684    use super::*;
685
686    #[tokio::test]
687    async fn test_optimistic_commit_success() {
688        let controller = OptimisticController::new();
689
690        // Begin optimistic operation
691        let token = controller.begin_optimistic("agent-1", "file.txt").await;
692        assert_eq!(token.base_version, 0);
693
694        // Commit should succeed
695        let result = controller.commit_optimistic(token, "hash123").await;
696
697        assert!(result.is_ok());
698        assert_eq!(result.unwrap(), 1);
699    }
700
701    #[tokio::test]
702    async fn test_optimistic_commit_conflict() {
703        let controller = OptimisticController::new();
704
705        // Agent 1 begins operation
706        let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
707
708        // Agent 2 begins operation
709        let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
710
711        // Agent 1 commits first
712        let result1 = controller.commit_optimistic(token1, "hash1").await;
713        assert!(result1.is_ok());
714
715        // Agent 2 tries to commit - should fail
716        let result2 = controller.commit_optimistic(token2, "hash2").await;
717
718        assert!(result2.is_err());
719        let conflict = result2.unwrap_err();
720        assert_eq!(conflict.expected_version, 0);
721        assert_eq!(conflict.actual_version, 1);
722        assert_eq!(conflict.holder_agent, "agent-1");
723    }
724
725    #[tokio::test]
726    async fn test_version_tracking() {
727        let controller = OptimisticController::new();
728
729        // First commit
730        let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
731        controller.commit_optimistic(token1, "hash1").await.unwrap();
732
733        // Second commit
734        let token2 = controller.begin_optimistic("agent-1", "file.txt").await;
735        assert_eq!(token2.base_version, 1);
736        controller.commit_optimistic(token2, "hash2").await.unwrap();
737
738        // Verify version
739        let version = controller.get_version("file.txt").await.unwrap();
740        assert_eq!(version.version, 2);
741        assert_eq!(version.content_hash, "hash2");
742    }
743
744    #[tokio::test]
745    async fn test_resolution_strategy_last_writer_wins() {
746        let controller =
747            OptimisticController::with_default_strategy(ResolutionStrategy::LastWriterWins);
748
749        // Two agents start
750        let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
751        let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
752
753        // Agent 1 commits
754        controller.commit_optimistic(token1, "hash1").await.unwrap();
755
756        // Agent 2 commits with resolution
757        let result = controller
758            .commit_or_resolve(token2, "hash2", None)
759            .await
760            .unwrap();
761
762        // With LastWriterWins, agent-2 should succeed
763        assert!(result.is_success());
764    }
765
766    #[tokio::test]
767    async fn test_resolution_strategy_first_writer_wins() {
768        let controller =
769            OptimisticController::with_default_strategy(ResolutionStrategy::FirstWriterWins);
770
771        // Two agents start
772        let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
773        let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
774
775        // Agent 1 commits
776        controller.commit_optimistic(token1, "hash1").await.unwrap();
777
778        // Agent 2 commits with resolution
779        let result = controller
780            .commit_or_resolve(token2, "hash2", None)
781            .await
782            .unwrap();
783
784        // With FirstWriterWins, agent-2 should be rejected
785        match result {
786            CommitResult::Rejected { reason } => {
787                assert!(reason.contains("agent-1"));
788            }
789            _ => panic!("Expected rejection"),
790        }
791    }
792
793    #[tokio::test]
794    async fn test_has_changed() {
795        let controller = OptimisticController::new();
796
797        // Initially no changes
798        assert!(!controller.has_changed("file.txt", 0).await);
799
800        // Make a commit
801        let token = controller.begin_optimistic("agent-1", "file.txt").await;
802        controller.commit_optimistic(token, "hash1").await.unwrap();
803
804        // Now changed since version 0
805        assert!(controller.has_changed("file.txt", 0).await);
806        // But not since version 1
807        assert!(!controller.has_changed("file.txt", 1).await);
808    }
809
810    #[tokio::test]
811    async fn test_conflict_history() {
812        let controller = OptimisticController::new();
813
814        // Create a conflict
815        let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
816        let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
817
818        controller.commit_optimistic(token1, "hash1").await.unwrap();
819
820        // This will create a conflict
821        let _ = controller.commit_or_resolve(token2, "hash2", None).await;
822
823        // Check history
824        let history = controller.get_conflict_history().await;
825        assert_eq!(history.len(), 1);
826        assert_eq!(history[0].conflict.conflicting_agent, "agent-2");
827    }
828
829    #[tokio::test]
830    async fn test_stats() {
831        let controller = OptimisticController::new();
832
833        // Create some commits
834        for i in 0..5 {
835            let token = controller
836                .begin_optimistic("agent-1", &format!("file{}.txt", i))
837                .await;
838            controller
839                .commit_optimistic(token, &format!("hash{}", i))
840                .await
841                .unwrap();
842        }
843
844        let stats = controller.get_stats().await;
845        assert_eq!(stats.total_resources, 5);
846        assert_eq!(stats.total_conflicts, 0);
847    }
848
849    #[test]
850    fn test_token_staleness() {
851        let token = OptimisticToken {
852            resource_id: "test".to_string(),
853            base_version: 0,
854            base_hash: String::new(),
855            agent_id: "agent-1".to_string(),
856            created_at: Instant::now() - std::time::Duration::from_secs(120),
857        };
858
859        // Token should be stale after 60 seconds
860        assert!(token.is_stale(std::time::Duration::from_secs(60)));
861        // But not after 180 seconds
862        assert!(!token.is_stale(std::time::Duration::from_secs(180)));
863    }
864
865    #[tokio::test]
866    async fn test_custom_strategy_per_resource() {
867        let controller = OptimisticController::new();
868
869        // Register LastWriterWins for specific file
870        controller
871            .register_strategy("special.txt", ResolutionStrategy::LastWriterWins)
872            .await;
873
874        // Conflict on special.txt should use LastWriterWins
875        let token1 = controller.begin_optimistic("agent-1", "special.txt").await;
876        let token2 = controller.begin_optimistic("agent-2", "special.txt").await;
877
878        controller.commit_optimistic(token1, "hash1").await.unwrap();
879
880        let result = controller
881            .commit_or_resolve(token2, "hash2", None)
882            .await
883            .unwrap();
884
885        // LastWriterWins means agent-2 succeeds
886        assert!(result.is_success());
887    }
888}