1use std::collections::HashMap;
28use std::time::Instant;
29
30use serde::{Deserialize, Serialize};
31use tokio::sync::RwLock;
32
33pub struct OptimisticController {
35 versions: RwLock<HashMap<String, ResourceVersion>>,
37 resolution_strategies: RwLock<HashMap<String, ResolutionStrategy>>,
39 default_strategy: ResolutionStrategy,
41 conflict_history: RwLock<Vec<ConflictRecord>>,
43 max_history: usize,
45}
46
47#[derive(Debug, Clone, Serialize, Deserialize)]
49pub struct ResourceVersion {
50 pub version: u64,
52 pub content_hash: String,
54 pub last_modifier: String,
56 #[serde(skip, default = "Instant::now")]
58 pub modified_at: Instant,
59}
60
61impl ResourceVersion {
62 pub fn new(content_hash: impl Into<String>, modifier: impl Into<String>) -> Self {
64 Self {
65 version: 1,
66 content_hash: content_hash.into(),
67 last_modifier: modifier.into(),
68 modified_at: Instant::now(),
69 }
70 }
71
72 pub fn increment(&mut self, content_hash: impl Into<String>, modifier: impl Into<String>) {
74 self.version += 1;
75 self.content_hash = content_hash.into();
76 self.last_modifier = modifier.into();
77 self.modified_at = Instant::now();
78 }
79}
80
81#[derive(Debug, Clone, Default)]
83pub enum ResolutionStrategy {
84 LastWriterWins,
86 #[default]
88 FirstWriterWins,
89 Merge(MergeStrategy),
91 Escalate,
93 Retry {
95 max_attempts: u32,
97 },
98}
99
100#[derive(Debug, Clone)]
102pub enum MergeStrategy {
103 TextMerge,
105 JsonMerge,
107 Append,
109 Custom(String),
111}
112
113#[derive(Debug, Clone)]
115pub struct OptimisticConflict {
116 pub resource_id: String,
118 pub conflicting_agent: String,
120 pub expected_version: u64,
122 pub actual_version: u64,
124 pub holder_agent: String,
126 pub detected_at: Instant,
128}
129
130impl OptimisticConflict {
131 pub fn version_diff(&self) -> u64 {
133 self.actual_version.saturating_sub(self.expected_version)
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct OptimisticConflictDetails {
140 pub resource_id: String,
142 pub agent_a: String,
144 pub agent_b: String,
146 pub version_a: ResourceVersion,
148 pub version_b: ResourceVersion,
150 pub base_version: ResourceVersion,
152 pub content_a: Option<String>,
154 pub content_b: Option<String>,
156}
157
158#[derive(Debug, Clone)]
160pub enum Resolution {
161 UseVersion(String),
163 Merged(String),
165 AbortBoth,
167 KeepBoth {
169 suffix_a: String,
171 suffix_b: String,
173 },
174 Retry,
176 Escalate {
178 reason: String,
180 },
181}
182
183#[derive(Debug, Clone)]
185pub struct OptimisticToken {
186 pub resource_id: String,
188 pub base_version: u64,
190 pub base_hash: String,
192 pub agent_id: String,
194 pub created_at: Instant,
196}
197
198impl OptimisticToken {
199 pub fn is_stale(&self, max_age: std::time::Duration) -> bool {
201 self.created_at.elapsed() > max_age
202 }
203}
204
205#[derive(Debug, Clone)]
207pub struct ConflictRecord {
208 pub conflict: OptimisticConflict,
210 pub resolution: Resolution,
212 pub resolved_at: Instant,
214}
215
216impl OptimisticController {
217 pub fn new() -> Self {
219 Self {
220 versions: RwLock::new(HashMap::new()),
221 resolution_strategies: RwLock::new(HashMap::new()),
222 default_strategy: ResolutionStrategy::FirstWriterWins,
223 conflict_history: RwLock::new(Vec::new()),
224 max_history: 100,
225 }
226 }
227
228 pub fn with_default_strategy(strategy: ResolutionStrategy) -> Self {
230 Self {
231 versions: RwLock::new(HashMap::new()),
232 resolution_strategies: RwLock::new(HashMap::new()),
233 default_strategy: strategy,
234 conflict_history: RwLock::new(Vec::new()),
235 max_history: 100,
236 }
237 }
238
239 pub fn with_max_history(mut self, max: usize) -> Self {
241 self.max_history = max;
242 self
243 }
244
245 pub async fn begin_optimistic(&self, agent_id: &str, resource_id: &str) -> OptimisticToken {
247 let versions = self.versions.read().await;
248 let (base_version, base_hash) = versions
249 .get(resource_id)
250 .map(|v| (v.version, v.content_hash.clone()))
251 .unwrap_or((0, String::new()));
252
253 OptimisticToken {
254 resource_id: resource_id.to_string(),
255 base_version,
256 base_hash,
257 agent_id: agent_id.to_string(),
258 created_at: Instant::now(),
259 }
260 }
261
262 pub async fn commit_optimistic(
264 &self,
265 token: OptimisticToken,
266 new_content_hash: &str,
267 ) -> Result<u64, OptimisticConflict> {
268 let mut versions = self.versions.write().await;
269
270 if let Some(current) = versions.get(&token.resource_id)
272 && current.version != token.base_version
273 {
274 return Err(OptimisticConflict {
275 resource_id: token.resource_id,
276 conflicting_agent: token.agent_id,
277 expected_version: token.base_version,
278 actual_version: current.version,
279 holder_agent: current.last_modifier.clone(),
280 detected_at: Instant::now(),
281 });
282 }
283
284 let new_version = token.base_version + 1;
286 versions.insert(
287 token.resource_id,
288 ResourceVersion {
289 version: new_version,
290 content_hash: new_content_hash.to_string(),
291 last_modifier: token.agent_id,
292 modified_at: Instant::now(),
293 },
294 );
295
296 Ok(new_version)
297 }
298
299 pub async fn commit_or_resolve(
301 &self,
302 token: OptimisticToken,
303 new_content_hash: &str,
304 new_content: Option<&str>,
305 ) -> Result<CommitResult, String> {
306 match self
307 .commit_optimistic(token.clone(), new_content_hash)
308 .await
309 {
310 Ok(version) => Ok(CommitResult::Committed { version }),
311 Err(conflict) => {
312 let resolution = self.resolve_conflict_auto(&conflict, new_content).await;
313
314 self.record_conflict(conflict.clone(), resolution.clone())
316 .await;
317
318 match resolution {
319 Resolution::UseVersion(agent) => {
320 if agent == token.agent_id {
321 let version = self
323 .force_commit(&token.resource_id, new_content_hash, &token.agent_id)
324 .await;
325 Ok(CommitResult::Committed { version })
326 } else {
327 Ok(CommitResult::Rejected {
328 reason: format!("Conflict resolved in favor of {}", agent),
329 })
330 }
331 }
332 Resolution::Merged(merged_hash) => {
333 let version = self
334 .force_commit(&token.resource_id, &merged_hash, &token.agent_id)
335 .await;
336 Ok(CommitResult::Merged {
337 version,
338 merged_hash,
339 })
340 }
341 Resolution::Retry => Ok(CommitResult::RetryNeeded {
342 current_version: conflict.actual_version,
343 }),
344 Resolution::AbortBoth => Ok(CommitResult::Aborted {
345 reason: "Both operations aborted due to conflict".to_string(),
346 }),
347 Resolution::KeepBoth { suffix_a, suffix_b } => {
348 Ok(CommitResult::Split { suffix_a, suffix_b })
349 }
350 Resolution::Escalate { reason } => Ok(CommitResult::Escalated { reason }),
351 }
352 }
353 }
354 }
355
356 async fn force_commit(&self, resource_id: &str, content_hash: &str, agent_id: &str) -> u64 {
358 let mut versions = self.versions.write().await;
359 let current_version = versions.get(resource_id).map(|v| v.version).unwrap_or(0);
360 let new_version = current_version + 1;
361
362 versions.insert(
363 resource_id.to_string(),
364 ResourceVersion {
365 version: new_version,
366 content_hash: content_hash.to_string(),
367 last_modifier: agent_id.to_string(),
368 modified_at: Instant::now(),
369 },
370 );
371
372 new_version
373 }
374
375 async fn resolve_conflict_auto(
377 &self,
378 conflict: &OptimisticConflict,
379 _new_content: Option<&str>,
380 ) -> Resolution {
381 let strategies = self.resolution_strategies.read().await;
382 let strategy = strategies
383 .get(&conflict.resource_id)
384 .cloned()
385 .unwrap_or_else(|| self.default_strategy.clone());
386
387 match strategy {
388 ResolutionStrategy::LastWriterWins => {
389 Resolution::UseVersion(conflict.conflicting_agent.clone())
390 }
391 ResolutionStrategy::FirstWriterWins => {
392 Resolution::UseVersion(conflict.holder_agent.clone())
393 }
394 ResolutionStrategy::Retry { max_attempts } => {
395 if conflict.version_diff() < max_attempts as u64 {
397 Resolution::Retry
398 } else {
399 Resolution::Escalate {
400 reason: format!("Max retry attempts ({}) exceeded", max_attempts),
401 }
402 }
403 }
404 ResolutionStrategy::Escalate => Resolution::Escalate {
405 reason: "Configured to escalate all conflicts".to_string(),
406 },
407 ResolutionStrategy::Merge(_strategy) => {
408 Resolution::Escalate {
410 reason: "Merge requires content, not available".to_string(),
411 }
412 }
413 }
414 }
415
416 pub async fn resolve_conflict(&self, conflict: &OptimisticConflictDetails) -> Resolution {
418 let strategies = self.resolution_strategies.read().await;
419 let strategy = strategies
420 .get(&conflict.resource_id)
421 .cloned()
422 .unwrap_or_else(|| self.default_strategy.clone());
423
424 match strategy {
425 ResolutionStrategy::LastWriterWins => Resolution::UseVersion(conflict.agent_b.clone()),
426 ResolutionStrategy::FirstWriterWins => Resolution::UseVersion(conflict.agent_a.clone()),
427 ResolutionStrategy::Merge(merge_strategy) => {
428 self.try_merge(conflict, &merge_strategy).await
429 }
430 ResolutionStrategy::Escalate => Resolution::Escalate {
431 reason: "Policy requires manual resolution".to_string(),
432 },
433 ResolutionStrategy::Retry { .. } => Resolution::Retry,
434 }
435 }
436
437 async fn try_merge(
439 &self,
440 conflict: &OptimisticConflictDetails,
441 strategy: &MergeStrategy,
442 ) -> Resolution {
443 match (strategy, &conflict.content_a, &conflict.content_b) {
444 (MergeStrategy::Append, Some(a), Some(b)) => {
445 let merged = format!("{}\n{}", a, b);
446 Resolution::Merged(hash_content(&merged))
447 }
448 (MergeStrategy::TextMerge, Some(a), Some(b)) => {
449 let lines_a: Vec<&str> = a.lines().collect();
451 let lines_b: Vec<&str> = b.lines().collect();
452 let mut merged = Vec::new();
453 let mut used_b: Vec<bool> = vec![false; lines_b.len()];
454
455 for line_a in &lines_a {
456 merged.push(*line_a);
457 for (i, line_b) in lines_b.iter().enumerate() {
459 if !used_b[i] && line_a == line_b {
460 used_b[i] = true;
461 break;
462 }
463 }
464 }
465 for (i, line_b) in lines_b.iter().enumerate() {
467 if !used_b[i] {
468 merged.push(*line_b);
469 }
470 }
471
472 let merged_content = merged.join("\n");
473 Resolution::Merged(hash_content(&merged_content))
474 }
475 (MergeStrategy::JsonMerge, Some(a), Some(b)) => {
476 match (
478 serde_json::from_str::<serde_json::Value>(a),
479 serde_json::from_str::<serde_json::Value>(b),
480 ) {
481 (Ok(mut val_a), Ok(val_b)) => {
482 json_deep_merge(&mut val_a, &val_b);
483 let merged_content = serde_json::to_string_pretty(&val_a)
484 .unwrap_or_else(|_| format!("{}", val_a));
485 Resolution::Merged(hash_content(&merged_content))
486 }
487 _ => Resolution::Escalate {
488 reason: "Failed to parse content as JSON for merge".to_string(),
489 },
490 }
491 }
492 _ => Resolution::Escalate {
493 reason: "Content not available for merge".to_string(),
494 },
495 }
496 }
497
498 async fn record_conflict(&self, conflict: OptimisticConflict, resolution: Resolution) {
500 let mut history = self.conflict_history.write().await;
501
502 history.push(ConflictRecord {
503 conflict,
504 resolution,
505 resolved_at: Instant::now(),
506 });
507
508 while history.len() > self.max_history {
510 history.remove(0);
511 }
512 }
513
514 pub async fn register_strategy(&self, resource_pattern: &str, strategy: ResolutionStrategy) {
516 self.resolution_strategies
517 .write()
518 .await
519 .insert(resource_pattern.to_string(), strategy);
520 }
521
522 pub async fn get_version(&self, resource_id: &str) -> Option<ResourceVersion> {
524 self.versions.read().await.get(resource_id).cloned()
525 }
526
527 pub async fn has_changed(&self, resource_id: &str, since_version: u64) -> bool {
529 self.versions
530 .read()
531 .await
532 .get(resource_id)
533 .map(|v| v.version > since_version)
534 .unwrap_or(false)
535 }
536
537 pub async fn get_conflict_history(&self) -> Vec<ConflictRecord> {
539 self.conflict_history.read().await.clone()
540 }
541
542 pub async fn clear_history(&self) {
544 self.conflict_history.write().await.clear();
545 }
546
547 pub async fn get_stats(&self) -> OptimisticStats {
549 let history = self.conflict_history.read().await;
550 let versions = self.versions.read().await;
551
552 let total_conflicts = history.len();
553 let resolved_by_retry = history
554 .iter()
555 .filter(|r| matches!(r.resolution, Resolution::Retry))
556 .count();
557 let escalated = history
558 .iter()
559 .filter(|r| matches!(r.resolution, Resolution::Escalate { .. }))
560 .count();
561
562 OptimisticStats {
563 total_resources: versions.len(),
564 total_conflicts,
565 resolved_by_retry,
566 escalated,
567 }
568 }
569}
570
571impl Default for OptimisticController {
572 fn default() -> Self {
573 Self::new()
574 }
575}
576
577#[derive(Debug, Clone)]
579pub enum CommitResult {
580 Committed {
582 version: u64,
584 },
585 Merged {
587 version: u64,
589 merged_hash: String,
591 },
592 RetryNeeded {
594 current_version: u64,
596 },
597 Rejected {
599 reason: String,
601 },
602 Aborted {
604 reason: String,
606 },
607 Split {
609 suffix_a: String,
611 suffix_b: String,
613 },
614 Escalated {
616 reason: String,
618 },
619}
620
621impl CommitResult {
622 pub fn is_success(&self) -> bool {
624 matches!(
625 self,
626 CommitResult::Committed { .. } | CommitResult::Merged { .. }
627 )
628 }
629
630 pub fn version(&self) -> Option<u64> {
632 match self {
633 CommitResult::Committed { version } | CommitResult::Merged { version, .. } => {
634 Some(*version)
635 }
636 _ => None,
637 }
638 }
639}
640
641#[derive(Debug, Clone)]
643pub struct OptimisticStats {
644 pub total_resources: usize,
646 pub total_conflicts: usize,
648 pub resolved_by_retry: usize,
650 pub escalated: usize,
652}
653
654fn json_deep_merge(target: &mut serde_json::Value, source: &serde_json::Value) {
658 match (target, source) {
659 (serde_json::Value::Object(t), serde_json::Value::Object(s)) => {
660 for (key, value) in s {
661 json_deep_merge(
662 t.entry(key.clone()).or_insert(serde_json::Value::Null),
663 value,
664 );
665 }
666 }
667 (target, source) => {
668 *target = source.clone();
669 }
670 }
671}
672
673fn hash_content(content: &str) -> String {
674 use std::collections::hash_map::DefaultHasher;
675 use std::hash::{Hash, Hasher};
676
677 let mut hasher = DefaultHasher::new();
678 content.hash(&mut hasher);
679 format!("{:x}", hasher.finish())
680}
681
682#[cfg(test)]
683mod tests {
684 use super::*;
685
686 #[tokio::test]
687 async fn test_optimistic_commit_success() {
688 let controller = OptimisticController::new();
689
690 let token = controller.begin_optimistic("agent-1", "file.txt").await;
692 assert_eq!(token.base_version, 0);
693
694 let result = controller.commit_optimistic(token, "hash123").await;
696
697 assert!(result.is_ok());
698 assert_eq!(result.unwrap(), 1);
699 }
700
701 #[tokio::test]
702 async fn test_optimistic_commit_conflict() {
703 let controller = OptimisticController::new();
704
705 let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
707
708 let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
710
711 let result1 = controller.commit_optimistic(token1, "hash1").await;
713 assert!(result1.is_ok());
714
715 let result2 = controller.commit_optimistic(token2, "hash2").await;
717
718 assert!(result2.is_err());
719 let conflict = result2.unwrap_err();
720 assert_eq!(conflict.expected_version, 0);
721 assert_eq!(conflict.actual_version, 1);
722 assert_eq!(conflict.holder_agent, "agent-1");
723 }
724
725 #[tokio::test]
726 async fn test_version_tracking() {
727 let controller = OptimisticController::new();
728
729 let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
731 controller.commit_optimistic(token1, "hash1").await.unwrap();
732
733 let token2 = controller.begin_optimistic("agent-1", "file.txt").await;
735 assert_eq!(token2.base_version, 1);
736 controller.commit_optimistic(token2, "hash2").await.unwrap();
737
738 let version = controller.get_version("file.txt").await.unwrap();
740 assert_eq!(version.version, 2);
741 assert_eq!(version.content_hash, "hash2");
742 }
743
744 #[tokio::test]
745 async fn test_resolution_strategy_last_writer_wins() {
746 let controller =
747 OptimisticController::with_default_strategy(ResolutionStrategy::LastWriterWins);
748
749 let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
751 let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
752
753 controller.commit_optimistic(token1, "hash1").await.unwrap();
755
756 let result = controller
758 .commit_or_resolve(token2, "hash2", None)
759 .await
760 .unwrap();
761
762 assert!(result.is_success());
764 }
765
766 #[tokio::test]
767 async fn test_resolution_strategy_first_writer_wins() {
768 let controller =
769 OptimisticController::with_default_strategy(ResolutionStrategy::FirstWriterWins);
770
771 let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
773 let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
774
775 controller.commit_optimistic(token1, "hash1").await.unwrap();
777
778 let result = controller
780 .commit_or_resolve(token2, "hash2", None)
781 .await
782 .unwrap();
783
784 match result {
786 CommitResult::Rejected { reason } => {
787 assert!(reason.contains("agent-1"));
788 }
789 _ => panic!("Expected rejection"),
790 }
791 }
792
793 #[tokio::test]
794 async fn test_has_changed() {
795 let controller = OptimisticController::new();
796
797 assert!(!controller.has_changed("file.txt", 0).await);
799
800 let token = controller.begin_optimistic("agent-1", "file.txt").await;
802 controller.commit_optimistic(token, "hash1").await.unwrap();
803
804 assert!(controller.has_changed("file.txt", 0).await);
806 assert!(!controller.has_changed("file.txt", 1).await);
808 }
809
810 #[tokio::test]
811 async fn test_conflict_history() {
812 let controller = OptimisticController::new();
813
814 let token1 = controller.begin_optimistic("agent-1", "file.txt").await;
816 let token2 = controller.begin_optimistic("agent-2", "file.txt").await;
817
818 controller.commit_optimistic(token1, "hash1").await.unwrap();
819
820 let _ = controller.commit_or_resolve(token2, "hash2", None).await;
822
823 let history = controller.get_conflict_history().await;
825 assert_eq!(history.len(), 1);
826 assert_eq!(history[0].conflict.conflicting_agent, "agent-2");
827 }
828
829 #[tokio::test]
830 async fn test_stats() {
831 let controller = OptimisticController::new();
832
833 for i in 0..5 {
835 let token = controller
836 .begin_optimistic("agent-1", &format!("file{}.txt", i))
837 .await;
838 controller
839 .commit_optimistic(token, &format!("hash{}", i))
840 .await
841 .unwrap();
842 }
843
844 let stats = controller.get_stats().await;
845 assert_eq!(stats.total_resources, 5);
846 assert_eq!(stats.total_conflicts, 0);
847 }
848
849 #[test]
850 fn test_token_staleness() {
851 let token = OptimisticToken {
852 resource_id: "test".to_string(),
853 base_version: 0,
854 base_hash: String::new(),
855 agent_id: "agent-1".to_string(),
856 created_at: Instant::now() - std::time::Duration::from_secs(120),
857 };
858
859 assert!(token.is_stale(std::time::Duration::from_secs(60)));
861 assert!(!token.is_stale(std::time::Duration::from_secs(180)));
863 }
864
865 #[tokio::test]
866 async fn test_custom_strategy_per_resource() {
867 let controller = OptimisticController::new();
868
869 controller
871 .register_strategy("special.txt", ResolutionStrategy::LastWriterWins)
872 .await;
873
874 let token1 = controller.begin_optimistic("agent-1", "special.txt").await;
876 let token2 = controller.begin_optimistic("agent-2", "special.txt").await;
877
878 controller.commit_optimistic(token1, "hash1").await.unwrap();
879
880 let result = controller
881 .commit_or_resolve(token2, "hash2", None)
882 .await
883 .unwrap();
884
885 assert!(result.is_success());
887 }
888}