Skip to main content

ryo_executor/executor/
orchestrator.rs

1//! ExecutionOrchestrator: Coordinate multiple Executions for large-scale mutations
2//!
3//! L3 Orchestration layer that manages multiple L2 Executions, handling:
4//! - Speculative parallel execution with result composition
5//! - Conflict detection and resolution at ItemRef granularity
6//! - Resume points for failure recovery
7//!
8//! ```text
9//! ┌─────────────────────────────────────────────────────────────────┐
10//! │  L3 Orchestration                                                │
11//! │  ┌─────────────────────────────────────────────────────────────┐│
12//! │  │  ExecutionOrchestrator                                       ││
13//! │  │    ├── strategy: OrchestrationStrategy                       ││
14//! │  │    ├── run() → OrchestratedResult                            ││
15//! │  │    └── compose_results() → Merge at ItemRef level            ││
16//! │  └─────────────────────────────────────────────────────────────┘│
17//! │                              │                                   │
18//! │                              ▼ spawn executions                  │
19//! ├─────────────────────────────────────────────────────────────────┤
20//! │  L2 Executions                                                   │
21//! │  ┌───────────┐  ┌───────────┐  ┌───────────┐                    │
22//! │  │ Execution │  │ Execution │  │ Execution │  ...               │
23//! │  │  Group A  │  │  Group B  │  │  Group C  │                    │
24//! │  └───────────┘  └───────────┘  └───────────┘                    │
25//! └─────────────────────────────────────────────────────────────────┘
26//! ```
27
28use super::blueprint::ParallelBlueprint;
29use super::blueprint_executor::{BlueprintExecutor, ExecutionStrategy};
30use super::conflict;
31use super::spec::MutationSpec;
32use im::HashMap as ImHashMap;
33use rayon::prelude::*;
34use ryo_analysis::AnalysisContext;
35use ryo_source::pure::{PureFile, ToSynError};
36use ryo_symbol::{WorkspaceFilePath, WorkspacePathResolver};
37use ryo_verification::{FileChange, PipelineResult, VerificationInput, VerificationPipeline};
38use std::collections::{HashMap, HashSet};
39use std::sync::Arc;
40
41/// Orchestration strategy for coordinating multiple executions
42#[derive(Debug, Clone, PartialEq, Default)]
43pub enum OrchestrationStrategy {
44    /// Execute all specs sequentially in a single execution (safe, debuggable)
45    #[default]
46    Sequential,
47
48    /// Execute spec groups in parallel, then compose results
49    Speculative,
50
51    /// Agent-like execution with tick-based coordination (future)
52    Murmuration { tick_budget_ms: u64 },
53}
54
55/// Result of orchestrated execution
56#[derive(Debug, Clone)]
57pub enum OrchestratedResult {
58    /// All specs applied successfully
59    Success {
60        applied: Vec<usize>,
61        modified_files: Vec<WorkspaceFilePath>,
62        total_changes: usize,
63    },
64
65    /// Some specs applied, others had conflicts requiring resolution
66    PartialSuccess {
67        applied: Vec<usize>,
68        conflicts: Vec<ConflictInfo>,
69        modified_files: Vec<WorkspaceFilePath>,
70        total_changes: usize,
71    },
72
73    /// Execution failed
74    Error(OrchestratorError),
75}
76
77impl OrchestratedResult {
78    pub fn is_success(&self) -> bool {
79        matches!(self, Self::Success { .. })
80    }
81
82    pub fn applied_count(&self) -> usize {
83        match self {
84            Self::Success { applied, .. } => applied.len(),
85            Self::PartialSuccess { applied, .. } => applied.len(),
86            Self::Error(_) => 0,
87        }
88    }
89}
90
91/// Error during orchestration
92#[derive(Debug, Clone)]
93pub struct OrchestratorError {
94    pub kind: OrchestratorErrorKind,
95    pub message: String,
96}
97
98#[derive(Debug, Clone)]
99pub enum OrchestratorErrorKind {
100    /// Blueprint has unresolved conflicts
101    BlueprintConflict,
102    /// Execution failed
103    ExecutionFailed,
104    /// Compose failed due to conflicts
105    ComposeFailed,
106}
107
108/// Information about a conflict between executions
109#[derive(Debug, Clone)]
110pub struct ConflictInfo {
111    /// File affected by the conflict
112    pub file: Option<WorkspaceFilePath>,
113    /// Indices of specs that conflicted
114    pub spec_indices: Vec<usize>,
115    /// Description of the conflict
116    pub description: String,
117}
118
119/// Result of verified orchestrated execution
120///
121/// Includes both the orchestration result and the verification pipeline result.
122#[derive(Debug)]
123pub struct VerifiedOrchestratedResult {
124    /// The underlying orchestration result
125    pub orchestration: OrchestratedResult,
126    /// Verification pipeline result (pre-check + post-check)
127    pub verification: Option<PipelineResult>,
128    /// Whether the result is fully verified
129    pub verified: bool,
130}
131
132impl VerifiedOrchestratedResult {
133    /// Create a new verified result
134    pub fn new(orchestration: OrchestratedResult, verification: PipelineResult) -> Self {
135        let verified = verification.is_success();
136        Self {
137            orchestration,
138            verification: Some(verification),
139            verified,
140        }
141    }
142
143    /// Create from orchestration failure (no verification attempted)
144    pub fn from_orchestration_failure(orchestration: OrchestratedResult) -> Self {
145        Self {
146            orchestration,
147            verification: None,
148            verified: false,
149        }
150    }
151
152    /// Check if both orchestration and verification succeeded
153    pub fn is_success(&self) -> bool {
154        self.orchestration.is_success() && self.verified
155    }
156
157    /// Get the number of applied specs
158    pub fn applied_count(&self) -> usize {
159        self.orchestration.applied_count()
160    }
161}
162
163/// A group of specs that can be executed together
164#[derive(Debug, Clone)]
165struct SpecGroup {
166    /// Indices of specs in this group
167    indices: Vec<usize>,
168}
169
170/// Result of executing a single group
171#[derive(Debug)]
172struct GroupResult {
173    /// Modified files from this group's execution
174    files: ImHashMap<WorkspaceFilePath, Arc<PureFile>>,
175    /// Indices of specs that were applied
176    applied_indices: Vec<usize>,
177    /// Files that were modified
178    modified_files: Vec<WorkspaceFilePath>,
179    /// Number of changes made
180    changes: usize,
181}
182
183/// Orchestrator for coordinating multiple executions
184pub struct ExecutionOrchestrator {
185    specs: Vec<MutationSpec>,
186    strategy: OrchestrationStrategy,
187}
188
189impl ExecutionOrchestrator {
190    pub fn new(specs: Vec<MutationSpec>) -> Self {
191        Self {
192            specs,
193            strategy: OrchestrationStrategy::default(),
194        }
195    }
196
197    pub fn with_strategy(mut self, strategy: OrchestrationStrategy) -> Self {
198        self.strategy = strategy;
199        self
200    }
201
202    /// Run the orchestrator with the configured strategy
203    pub fn run(&self, ctx: &mut AnalysisContext) -> OrchestratedResult {
204        if self.specs.is_empty() {
205            return OrchestratedResult::Success {
206                applied: vec![],
207                modified_files: vec![],
208                total_changes: 0,
209            };
210        }
211
212        match &self.strategy {
213            OrchestrationStrategy::Sequential => self.run_sequential(ctx),
214            OrchestrationStrategy::Speculative => self.run_speculative(ctx),
215            OrchestrationStrategy::Murmuration { .. } => {
216                // Murmuration falls back to Speculative for now
217                self.run_speculative(ctx)
218            }
219        }
220    }
221
222    /// Run speculative execution with verification pipeline.
223    ///
224    /// This method:
225    /// 1. Saves the original file state
226    /// 2. Runs the orchestrated execution
227    /// 3. If successful, runs verification pipeline (pre-check + post-check)
228    /// 4. Returns combined result with verification status
229    ///
230    /// # Arguments
231    /// - `ctx`: The analysis context (will be modified on success)
232    /// - `pipeline`: The verification pipeline to use
233    ///
234    /// # Returns
235    /// A `VerifiedOrchestratedResult` containing both orchestration and verification results.
236    pub fn run_speculative_verified(
237        &self,
238        ctx: &mut AnalysisContext,
239        pipeline: &VerificationPipeline,
240    ) -> Result<VerifiedOrchestratedResult, ToSynError> {
241        use ryo_verification::Status;
242
243        // Save original file state for diff calculation
244        let original_files = ctx.files.clone();
245        let original_sources = self.collect_sources(&original_files)?;
246
247        // Run orchestrated execution
248        let orchestration_result = self.run(ctx);
249
250        // If orchestration failed, return without verification
251        if !orchestration_result.is_success() {
252            return Ok(VerifiedOrchestratedResult::from_orchestration_failure(
253                orchestration_result,
254            ));
255        }
256
257        // Calculate file changes for verification
258        let changes =
259            FileChange::from_execution_diff(&original_files, &ctx.files, &original_sources)?;
260
261        if changes.is_empty() {
262            // No changes to verify
263            return Ok(VerifiedOrchestratedResult {
264                orchestration: orchestration_result,
265                verification: None,
266                verified: true,
267            });
268        }
269
270        // Create verification input
271        let resolver = WorkspacePathResolver::new(ctx.workspace_root.to_path_buf());
272        let input = VerificationInput::new(changes, resolver);
273
274        // Run pre-check (in-memory graph verification)
275        let precheck_result = pipeline.run_precheck(&input, ctx);
276        if !precheck_result.is_success() {
277            return Ok(VerifiedOrchestratedResult {
278                orchestration: orchestration_result,
279                verification: Some(precheck_result),
280                verified: false,
281            });
282        }
283
284        // Run post-check (filesystem verification with cargo check)
285        let postcheck_result = pipeline.run_postcheck(&input);
286        let pipeline_result = match postcheck_result {
287            Ok(result) => {
288                // Check success before moving results
289                let postcheck_success = result.pipeline_result.is_success();
290
291                // Combine precheck and postcheck results
292                let mut combined_results = precheck_result.results;
293                combined_results.extend(result.pipeline_result.results);
294
295                let overall = if postcheck_success {
296                    Status::Passed
297                } else {
298                    Status::Failed
299                };
300
301                PipelineResult {
302                    overall,
303                    results: combined_results,
304                }
305            }
306            Err(e) => {
307                // TempWorkspace creation failed - create error result
308                let error_result = ryo_verification::VerificationResult::failure(
309                    "postcheck",
310                    std::time::Duration::ZERO,
311                    vec![ryo_verification::Diagnostic::error(format!(
312                        "Post-check failed: {}",
313                        e
314                    ))],
315                );
316                PipelineResult {
317                    overall: Status::Failed,
318                    results: vec![error_result],
319                }
320            }
321        };
322
323        Ok(VerifiedOrchestratedResult::new(
324            orchestration_result,
325            pipeline_result,
326        ))
327    }
328
329    /// Collect source strings from files for diff calculation
330    fn collect_sources(
331        &self,
332        files: &ImHashMap<WorkspaceFilePath, Arc<PureFile>>,
333    ) -> Result<HashMap<WorkspaceFilePath, String>, ToSynError> {
334        files
335            .iter()
336            .map(|(wfp, pf)| Ok((wfp.clone(), pf.to_source()?)))
337            .collect()
338    }
339
340    /// Sequential execution: single BlueprintExecutor run
341    fn run_sequential(&self, ctx: &mut AnalysisContext) -> OrchestratedResult {
342        let blueprint = ParallelBlueprint::from_mutations(self.specs.clone());
343
344        // Check for blueprint-level conflicts
345        if blueprint.needs_escalation() {
346            return OrchestratedResult::Error(OrchestratorError {
347                kind: OrchestratorErrorKind::BlueprintConflict,
348                message: format!(
349                    "Blueprint has {} conflicts requiring escalation",
350                    blueprint.conflicts.len()
351                ),
352            });
353        }
354
355        let executor = BlueprintExecutor::new().with_strategy(ExecutionStrategy::Wavefront);
356        let result = executor.execute_v2(&blueprint, ctx);
357
358        if result.success {
359            let applied: Vec<usize> = (0..self.specs.len()).collect();
360            OrchestratedResult::Success {
361                applied,
362                modified_files: result.modified_files,
363                total_changes: result.total_changes,
364            }
365        } else {
366            OrchestratedResult::Error(OrchestratorError {
367                kind: OrchestratorErrorKind::ExecutionFailed,
368                message: result.error.unwrap_or_else(|| "Unknown error".to_string()),
369            })
370        }
371    }
372
373    /// Speculative execution: partition specs, run in parallel, compose results
374    ///
375    /// Uses ItemRef-based conflict detection for fine-grained parallelism.
376    fn run_speculative(&self, ctx: &mut AnalysisContext) -> OrchestratedResult {
377        // 1. Classify specs into parallel groups and sequential specs
378        let (parallel_groups, sequential_indices) = classify_for_parallel_execution(&self.specs);
379
380        // If no parallelizable groups or only one group, fall back to sequential
381        if parallel_groups.len() <= 1 && sequential_indices.is_empty() {
382            return self.run_sequential(ctx);
383        }
384
385        // 2. Convert to SpecGroups for parallel execution
386        let groups: Vec<SpecGroup> = parallel_groups
387            .into_iter()
388            .map(|indices| SpecGroup { indices })
389            .collect();
390
391        // If only one group after ItemRef analysis, fall back to sequential
392        if groups.len() <= 1 {
393            return self.run_sequential(ctx);
394        }
395
396        // 3. Execute parallel groups concurrently
397        // Clone the context for each group to preserve registry and graphs
398        let base_ctx = ctx.fork_clone();
399        let group_results: Vec<(SpecGroup, Result<GroupResult, String>)> = groups
400            .into_par_iter()
401            .map(|group| {
402                let result = self.execute_group(&group, &base_ctx);
403                (group, result)
404            })
405            .collect();
406
407        // 4. Compose parallel results
408        let mut result = self.compose_results(ctx, group_results);
409
410        // 5. Execute sequential specs after parallel ones complete
411        if !sequential_indices.is_empty() {
412            let sequential_specs: Vec<MutationSpec> = sequential_indices
413                .iter()
414                .map(|&i| self.specs[i].clone())
415                .collect();
416
417            let sequential_blueprint = ParallelBlueprint::from_mutations(sequential_specs);
418            let executor = BlueprintExecutor::new().with_strategy(ExecutionStrategy::Wavefront);
419            let seq_result = executor.execute_v2(&sequential_blueprint, ctx);
420
421            // Merge sequential results
422            result = match result {
423                OrchestratedResult::Success {
424                    mut applied,
425                    mut modified_files,
426                    total_changes,
427                } => {
428                    if seq_result.success {
429                        applied.extend(sequential_indices);
430                        modified_files.extend(seq_result.modified_files);
431                        OrchestratedResult::Success {
432                            applied,
433                            modified_files,
434                            total_changes: total_changes + seq_result.total_changes,
435                        }
436                    } else {
437                        OrchestratedResult::PartialSuccess {
438                            applied,
439                            conflicts: vec![ConflictInfo {
440                                file: None,
441                                spec_indices: sequential_indices,
442                                description: seq_result
443                                    .error
444                                    .unwrap_or_else(|| "Sequential execution failed".to_string()),
445                            }],
446                            modified_files,
447                            total_changes,
448                        }
449                    }
450                }
451                OrchestratedResult::PartialSuccess {
452                    mut applied,
453                    mut conflicts,
454                    mut modified_files,
455                    total_changes,
456                } => {
457                    if seq_result.success {
458                        applied.extend(sequential_indices);
459                        modified_files.extend(seq_result.modified_files);
460                    } else {
461                        conflicts.push(ConflictInfo {
462                            file: None,
463                            spec_indices: sequential_indices,
464                            description: seq_result
465                                .error
466                                .unwrap_or_else(|| "Sequential execution failed".to_string()),
467                        });
468                    }
469                    OrchestratedResult::PartialSuccess {
470                        applied,
471                        conflicts,
472                        modified_files,
473                        total_changes: total_changes + seq_result.total_changes,
474                    }
475                }
476                err @ OrchestratedResult::Error(_) => err,
477            };
478        }
479
480        result
481    }
482
483    /// Execute a single group of specs on a cloned context
484    fn execute_group(
485        &self,
486        group: &SpecGroup,
487        base_ctx: &AnalysisContext,
488    ) -> Result<GroupResult, String> {
489        // Clone the context with full registry and graphs for this group
490        let mut ctx = base_ctx.fork_clone();
491
492        // Collect specs for this group
493        let group_specs: Vec<MutationSpec> = group
494            .indices
495            .iter()
496            .map(|&idx| self.specs[idx].clone())
497            .collect();
498
499        let blueprint = ParallelBlueprint::from_mutations(group_specs);
500
501        if blueprint.needs_escalation() {
502            return Err(format!("Group {:?} has conflicts", group.indices));
503        }
504
505        let executor = BlueprintExecutor::new().with_strategy(ExecutionStrategy::Wavefront);
506        let result = executor.execute_v2(&blueprint, &mut ctx);
507
508        if result.success {
509            Ok(GroupResult {
510                files: ctx.files,
511                applied_indices: group.indices.clone(),
512                modified_files: result.modified_files,
513                changes: result.total_changes,
514            })
515        } else {
516            Err(result.error.unwrap_or_else(|| "Unknown error".to_string()))
517        }
518    }
519
520    /// Compose results from multiple group executions into the context
521    fn compose_results(
522        &self,
523        ctx: &mut AnalysisContext,
524        group_results: Vec<(SpecGroup, Result<GroupResult, String>)>,
525    ) -> OrchestratedResult {
526        let mut all_applied: Vec<usize> = Vec::new();
527        let mut all_modified: HashSet<WorkspaceFilePath> = HashSet::new();
528        let mut total_changes: usize = 0;
529        let mut conflicts: Vec<ConflictInfo> = Vec::new();
530
531        // Track which files were modified by which groups
532        let mut file_to_groups: HashMap<WorkspaceFilePath, Vec<usize>> = HashMap::new();
533
534        // First pass: collect successful results and detect file-level conflicts
535        let mut successful_results: Vec<(usize, GroupResult)> = Vec::new();
536
537        for (group_idx, (group, result)) in group_results.into_iter().enumerate() {
538            match result {
539                Ok(group_result) => {
540                    // Track file modifications for conflict detection
541                    for file in &group_result.modified_files {
542                        file_to_groups
543                            .entry(file.clone())
544                            .or_default()
545                            .push(group_idx);
546                    }
547                    successful_results.push((group_idx, group_result));
548                }
549                Err(err) => {
550                    conflicts.push(ConflictInfo {
551                        file: None,
552                        spec_indices: group.indices,
553                        description: err,
554                    });
555                }
556            }
557        }
558
559        // Detect file-level conflicts (multiple groups modifying same file)
560        let conflicting_files: HashSet<WorkspaceFilePath> = file_to_groups
561            .iter()
562            .filter(|(_, groups)| groups.len() > 1)
563            .map(|(key, _)| key.clone())
564            .collect();
565
566        // Apply non-conflicting results
567        for (_group_idx, group_result) in successful_results {
568            let has_conflict = group_result
569                .modified_files
570                .iter()
571                .any(|f| conflicting_files.contains(f));
572
573            if has_conflict {
574                // Record conflict for later resolution
575                let conflicting: Vec<WorkspaceFilePath> = group_result
576                    .modified_files
577                    .iter()
578                    .filter(|f| conflicting_files.contains(*f))
579                    .cloned()
580                    .collect();
581
582                for file_path in conflicting {
583                    if !conflicts
584                        .iter()
585                        .any(|c| c.file.as_ref() == Some(&file_path))
586                    {
587                        conflicts.push(ConflictInfo {
588                            file: Some(file_path.clone()),
589                            spec_indices: group_result.applied_indices.clone(),
590                            description: format!("Multiple groups modified file: {:?}", file_path),
591                        });
592                    }
593                }
594            } else {
595                // Apply this group's changes to ctx
596                for file_path in &group_result.modified_files {
597                    if let Some(pure_file) = group_result.files.get(file_path) {
598                        ctx.files_mut().insert(file_path.clone(), pure_file.clone());
599                    }
600                    all_modified.insert(file_path.clone());
601                }
602                all_applied.extend(group_result.applied_indices);
603                total_changes += group_result.changes;
604            }
605        }
606
607        if conflicts.is_empty() {
608            OrchestratedResult::Success {
609                applied: all_applied,
610                modified_files: all_modified.into_iter().collect(),
611                total_changes,
612            }
613        } else if !all_applied.is_empty() {
614            OrchestratedResult::PartialSuccess {
615                applied: all_applied,
616                conflicts,
617                modified_files: all_modified.into_iter().collect(),
618                total_changes,
619            }
620        } else {
621            OrchestratedResult::Error(OrchestratorError {
622                kind: OrchestratorErrorKind::ComposeFailed,
623                message: "All groups had conflicts".to_string(),
624            })
625        }
626    }
627}
628
629/// Extract a grouping key from a target string
630/// Suggest the best orchestration strategy based on specs and context
631pub fn suggest_orchestration(
632    specs: &[MutationSpec],
633    _ctx: &AnalysisContext,
634) -> OrchestrationStrategy {
635    let spec_count = specs.len();
636
637    // Estimate number of independent groups using conflict detection
638    let groups = conflict::group_by_conflicts(specs);
639    let estimated_groups = groups.len();
640
641    // Simple conflict density estimation based on target overlap
642    let conflict_density = if spec_count == 0 {
643        0.0
644    } else {
645        1.0 - (estimated_groups as f64 / spec_count as f64)
646    };
647
648    // Strategy selection (exclusive, evaluated top to bottom)
649    match (spec_count, estimated_groups, conflict_density) {
650        // 1. Few specs → Sequential
651        (n, _, _) if n <= 5 => OrchestrationStrategy::Sequential,
652
653        // 2. High conflict → Sequential (safety first)
654        (_, _, d) if d >= 0.5 => OrchestrationStrategy::Sequential,
655
656        // 3. Good independence & low conflict → Speculative
657        (_, g, d) if g >= 3 && d < 0.15 => OrchestrationStrategy::Speculative,
658
659        // 4. Otherwise → Murmuration (fallback to Speculative for now)
660        _ => OrchestrationStrategy::Murmuration { tick_budget_ms: 10 },
661    }
662}
663
664// ============================================================================
665// Public API: Re-export from conflict module
666// ============================================================================
667
668/// Partition specs into conflict-free groups.
669///
670/// Uses Union-Find to efficiently group specs that transitively conflict.
671/// Renamed from `partition_by_item_refs` but now uses MutationTargetSymbol-based detection.
672pub use conflict::group_by_conflicts as partition_by_item_refs;
673
674/// Classify specs into parallelizable groups (DEPRECATED).
675///
676/// This function is deprecated and will be removed. Use `conflict::group_by_conflicts` directly.
677/// The old logic depended on `item_refs()` which doesn't work with lazy MutationTargetSymbol.
678///
679/// For now, this function simply calls `group_by_conflicts` for all specs.
680pub fn classify_for_parallel_execution(specs: &[MutationSpec]) -> (Vec<Vec<usize>>, Vec<usize>) {
681    // All specs are grouped by conflicts now
682    // No special "sequential" handling needed - conflict detection handles project-wide mutations
683    let groups = conflict::group_by_conflicts(specs);
684    (groups, vec![])
685}
686
687#[cfg(test)]
688mod tests {
689    use super::*;
690    use crate::executor::conflict::{find_conflicting_pairs, specs_conflict};
691    use crate::executor::spec::{InsertPosition, MutationTargetSymbol, SymbolPath};
692    use ryo_analysis::testing::{ContextBuilder, ContextTestExt};
693    use ryo_symbol::SymbolId;
694
695    fn create_multi_file_context() -> AnalysisContext {
696        ContextBuilder::new()
697            .with_file("src/lib.rs", "// lib\n")
698            .with_file("src/models.rs", "struct User {}\n")
699            .with_file("src/api.rs", "struct Api {}\n")
700            .build()
701    }
702
703    /// Create a dummy SymbolId for testing (index starts from 1)
704    fn dummy_id(index: u32) -> SymbolId {
705        SymbolId::parse(&format!("{}v1", index)).expect("valid dummy id")
706    }
707
708    #[test]
709    fn test_orchestrator_sequential_basic() {
710        let mut ctx = create_multi_file_context();
711        let user_id = ctx.registry().lookup_by_name("User").unwrap();
712
713        let specs = vec![MutationSpec::AddDerive {
714            target: MutationTargetSymbol::ById(user_id),
715            derives: vec!["Debug".to_string()],
716        }];
717
718        let orchestrator =
719            ExecutionOrchestrator::new(specs).with_strategy(OrchestrationStrategy::Sequential);
720
721        let result = orchestrator.run(&mut ctx);
722
723        assert!(result.is_success(), "Sequential execution failed");
724        assert_eq!(result.applied_count(), 1);
725    }
726
727    #[test]
728    fn test_orchestrator_speculative_independent_groups() {
729        let mut ctx = create_multi_file_context();
730        let user_id = ctx.registry().lookup_by_name("User").unwrap();
731        let api_id = ctx.registry().lookup_by_name("Api").unwrap();
732
733        // Specs targeting different structs (should be parallelizable)
734        let specs = vec![
735            MutationSpec::AddDerive {
736                target: MutationTargetSymbol::ById(user_id),
737                derives: vec!["Debug".to_string()],
738            },
739            MutationSpec::AddDerive {
740                target: MutationTargetSymbol::ById(api_id),
741                derives: vec!["Clone".to_string()],
742            },
743        ];
744
745        let orchestrator =
746            ExecutionOrchestrator::new(specs).with_strategy(OrchestrationStrategy::Speculative);
747
748        let result = orchestrator.run(&mut ctx);
749
750        assert!(result.is_success(), "Speculative execution failed");
751        assert_eq!(result.applied_count(), 2);
752    }
753
754    #[test]
755    fn test_orchestrator_empty_specs() {
756        let mut ctx = create_multi_file_context();
757        let specs: Vec<MutationSpec> = vec![];
758
759        let orchestrator = ExecutionOrchestrator::new(specs);
760        let result = orchestrator.run(&mut ctx);
761
762        assert!(result.is_success());
763        assert_eq!(result.applied_count(), 0);
764    }
765
766    #[test]
767    fn test_suggest_orchestration_few_specs() {
768        let ctx = create_multi_file_context();
769
770        let specs = vec![MutationSpec::AddDerive {
771            target: MutationTargetSymbol::ById(dummy_id(1)),
772            derives: vec!["Debug".to_string()],
773        }];
774
775        let strategy = suggest_orchestration(&specs, &ctx);
776        assert_eq!(strategy, OrchestrationStrategy::Sequential);
777    }
778
779    #[test]
780    fn test_suggest_orchestration_many_independent() {
781        let ctx = create_multi_file_context();
782
783        // Many specs with different targets (use dummy IDs since we're testing orchestration logic)
784        let specs = vec![
785            MutationSpec::AddDerive {
786                target: MutationTargetSymbol::ById(dummy_id(1)),
787                derives: vec!["Debug".to_string()],
788            },
789            MutationSpec::AddDerive {
790                target: MutationTargetSymbol::ById(dummy_id(2)),
791                derives: vec!["Clone".to_string()],
792            },
793            MutationSpec::AddDerive {
794                target: MutationTargetSymbol::ById(dummy_id(3)),
795                derives: vec!["Default".to_string()],
796            },
797            MutationSpec::AddDerive {
798                target: MutationTargetSymbol::ById(dummy_id(4)),
799                derives: vec!["Hash".to_string()],
800            },
801            MutationSpec::AddDerive {
802                target: MutationTargetSymbol::ById(dummy_id(5)),
803                derives: vec!["Default".to_string()],
804            },
805            MutationSpec::AddDerive {
806                target: MutationTargetSymbol::ById(dummy_id(4)),
807                derives: vec!["Hash".to_string()],
808            },
809            MutationSpec::AddDerive {
810                target: MutationTargetSymbol::ById(dummy_id(5)),
811                derives: vec!["Eq".to_string()],
812            },
813            MutationSpec::AddDerive {
814                target: MutationTargetSymbol::ById(dummy_id(6)),
815                derives: vec!["PartialEq".to_string()],
816            },
817        ];
818
819        let strategy = suggest_orchestration(&specs, &ctx);
820        // With 8 specs and some conflicts (id4 and id5 appear twice each),
821        // we get 6 groups: [1], [2], [3], [4,4], [5,5], [6]
822        // conflict_density = 1.0 - 6/8 = 0.25 (>= 0.15)
823        // So Murmuration is returned (not Speculative which requires d < 0.15)
824        assert_eq!(
825            strategy,
826            OrchestrationStrategy::Murmuration { tick_budget_ms: 10 }
827        );
828    }
829
830    #[test]
831    #[ignore = "V1 path disabled - needs V2 migration"]
832    fn test_orchestrator_with_add_items() {
833        let mut ctx = create_multi_file_context();
834
835        let specs = vec![
836            MutationSpec::AddItem {
837                target: MutationTargetSymbol::ByPath(Box::new(
838                    SymbolPath::parse("test_crate::models").unwrap(),
839                )),
840                content: "pub struct Order {}".to_string(),
841                position: InsertPosition::Bottom,
842            },
843            MutationSpec::AddItem {
844                target: MutationTargetSymbol::ByPath(Box::new(
845                    SymbolPath::parse("test_crate::api").unwrap(),
846                )),
847                content: "pub fn handler() {}".to_string(),
848                position: InsertPosition::Bottom,
849            },
850        ];
851
852        // Use Sequential strategy as Speculative has known issues with module-based ItemRefs
853        let orchestrator =
854            ExecutionOrchestrator::new(specs).with_strategy(OrchestrationStrategy::Sequential);
855
856        let result = orchestrator.run(&mut ctx);
857
858        assert!(result.is_success(), "Speculative with AddItem failed");
859
860        let models = ctx.test_file("src/models.rs").unwrap().to_source().unwrap();
861        assert!(models.contains("Order"), "Order not added to models");
862
863        let api = ctx.test_file("src/api.rs").unwrap().to_source().unwrap();
864        assert!(api.contains("handler"), "handler not added to api");
865    }
866
867    #[test]
868    #[ignore = "V1 path disabled - needs V2 migration"]
869    fn test_orchestrator_verifies_changes_applied() {
870        let mut ctx = create_multi_file_context();
871        let user_id = ctx.registry().lookup_by_name("User").unwrap();
872        let api_id = ctx.registry().lookup_by_name("Api").unwrap();
873
874        let specs = vec![
875            MutationSpec::AddDerive {
876                target: MutationTargetSymbol::ById(user_id),
877                derives: vec!["Debug".to_string(), "Clone".to_string()],
878            },
879            MutationSpec::AddDerive {
880                target: MutationTargetSymbol::ById(api_id),
881                derives: vec!["Default".to_string()],
882            },
883        ];
884
885        let orchestrator =
886            ExecutionOrchestrator::new(specs).with_strategy(OrchestrationStrategy::Sequential);
887
888        let result = orchestrator.run(&mut ctx);
889        assert!(result.is_success());
890
891        // Verify changes were applied to ctx
892        let models = ctx.test_file("src/models.rs").unwrap().to_source().unwrap();
893        assert!(models.contains("Debug"), "Debug not added to User");
894        assert!(models.contains("Clone"), "Clone not added to User");
895
896        let api = ctx.test_file("src/api.rs").unwrap().to_source().unwrap();
897        assert!(api.contains("Default"), "Default not added to Api");
898    }
899
900    // ========================================================================
901    // ItemRef-based Conflict Detection Tests
902    // ========================================================================
903
904    #[test]
905    fn test_specs_conflict_different_fields_no_conflict() {
906        // AddField to same struct but different fields → NO conflict
907        let spec_a = MutationSpec::AddField {
908            target: MutationTargetSymbol::ById(dummy_id(1)),
909            field_name: "name".to_string(),
910            field_type: "String".to_string(),
911            visibility: Default::default(),
912        };
913        let spec_b = MutationSpec::AddField {
914            target: MutationTargetSymbol::ById(dummy_id(1)),
915            field_name: "email".to_string(),
916            field_type: "String".to_string(),
917            visibility: Default::default(),
918        };
919
920        // Different fields should NOT conflict - can run in parallel
921        // AddField only references the field path, not the struct
922        let conflicts = specs_conflict(&spec_a, &spec_b);
923        assert!(
924            !conflicts,
925            "AddField to different fields should NOT conflict"
926        );
927    }
928
929    #[test]
930    fn test_specs_conflict_same_field_conflict() {
931        // AddField and RemoveField on same field → CONFLICT
932        let spec_add = MutationSpec::AddField {
933            target: MutationTargetSymbol::ById(dummy_id(1)),
934            field_name: "email".to_string(),
935            field_type: "String".to_string(),
936            visibility: Default::default(),
937        };
938        let spec_remove = MutationSpec::RemoveField {
939            target: MutationTargetSymbol::ById(dummy_id(1)),
940            field_name: "email".to_string(),
941        };
942
943        assert!(
944            specs_conflict(&spec_add, &spec_remove),
945            "Same field operations should conflict"
946        );
947    }
948
949    #[test]
950    fn test_specs_conflict_different_structs_no_conflict() {
951        // AddField to different structs → no conflict
952        let spec_a = MutationSpec::AddField {
953            target: MutationTargetSymbol::ById(dummy_id(1)),
954            field_name: "name".to_string(),
955            field_type: "String".to_string(),
956            visibility: Default::default(),
957        };
958        let spec_b = MutationSpec::AddField {
959            target: MutationTargetSymbol::ById(dummy_id(2)),
960            field_name: "id".to_string(),
961            field_type: "u64".to_string(),
962            visibility: Default::default(),
963        };
964
965        assert!(
966            !specs_conflict(&spec_a, &spec_b),
967            "Different structs should not conflict"
968        );
969    }
970
971    #[test]
972    fn test_specs_conflict_parent_child_conflict() {
973        use crate::executor::ItemKind;
974
975        // RemoveItem on User vs AddField to User → conflict (parent-child)
976        let spec_remove = MutationSpec::RemoveItem {
977            target: MutationTargetSymbol::ById(dummy_id(1)),
978            item_kind: ItemKind::Struct,
979        };
980        let spec_add = MutationSpec::AddField {
981            target: MutationTargetSymbol::ById(dummy_id(1)),
982            field_name: "email".to_string(),
983            field_type: "String".to_string(),
984            visibility: Default::default(),
985        };
986
987        assert!(
988            specs_conflict(&spec_remove, &spec_add),
989            "Remove parent should conflict with add child"
990        );
991    }
992
993    #[test]
994    fn test_partition_by_item_refs_independent_groups() {
995        // Three specs targeting different structs → three groups
996        let specs = vec![
997            MutationSpec::AddDerive {
998                target: MutationTargetSymbol::ById(dummy_id(1)),
999                derives: vec!["Debug".to_string()],
1000            },
1001            MutationSpec::AddDerive {
1002                target: MutationTargetSymbol::ById(dummy_id(2)),
1003                derives: vec!["Clone".to_string()],
1004            },
1005            MutationSpec::AddDerive {
1006                target: MutationTargetSymbol::ById(dummy_id(3)),
1007                derives: vec!["Default".to_string()],
1008            },
1009        ];
1010
1011        let groups = partition_by_item_refs(&specs);
1012        assert_eq!(
1013            groups.len(),
1014            3,
1015            "Three independent specs should form three groups"
1016        );
1017    }
1018
1019    #[test]
1020    fn test_partition_by_item_refs_conflicting_merged() {
1021        // Two specs on same struct → one group
1022        let specs = vec![
1023            MutationSpec::AddDerive {
1024                target: MutationTargetSymbol::ById(dummy_id(1)),
1025                derives: vec!["Debug".to_string()],
1026            },
1027            MutationSpec::AddDerive {
1028                target: MutationTargetSymbol::ById(dummy_id(1)),
1029                derives: vec!["Clone".to_string()],
1030            },
1031        ];
1032
1033        let groups = partition_by_item_refs(&specs);
1034        assert_eq!(groups.len(), 1, "Conflicting specs should be in same group");
1035        assert_eq!(groups[0].len(), 2);
1036    }
1037
1038    #[test]
1039    fn test_classify_for_parallel_execution() {
1040        use ryo_symbol::{SymbolKind, SymbolPath, SymbolRegistry};
1041
1042        // Create dummy symbol IDs for testing
1043        let mut symbol_registry = SymbolRegistry::new();
1044        let path_user = SymbolPath::parse("test_crate::User").unwrap();
1045        let path_order = SymbolPath::parse("test_crate::Order").unwrap();
1046        let path_old = SymbolPath::parse("test_crate::old_name").unwrap();
1047        let symbol_user = symbol_registry
1048            .register(path_user, SymbolKind::Struct)
1049            .unwrap();
1050        let symbol_order = symbol_registry
1051            .register(path_order, SymbolKind::Struct)
1052            .unwrap();
1053        let symbol_old = symbol_registry
1054            .register(path_old, SymbolKind::Function)
1055            .unwrap();
1056
1057        // All three specs have item_refs (different targets) → all parallel
1058        let specs = vec![
1059            MutationSpec::AddDerive {
1060                target: MutationTargetSymbol::ById(symbol_user),
1061                derives: vec!["Debug".to_string()],
1062            },
1063            MutationSpec::AddDerive {
1064                target: MutationTargetSymbol::ById(symbol_order),
1065                derives: vec!["Clone".to_string()],
1066            },
1067            // Rename with "from" generates ItemRef for crate::old_name
1068            MutationSpec::Rename {
1069                target: MutationTargetSymbol::ById(symbol_old),
1070                to: "new_name".to_string(),
1071                scope: Default::default(),
1072            },
1073        ];
1074
1075        let (parallel, sequential) = classify_for_parallel_execution(&specs);
1076
1077        // User, Order, and old_name are all independent → 3 parallel groups
1078        assert_eq!(parallel.len(), 3, "Should have 3 parallel groups");
1079
1080        // No specs are sequential (all have item_refs)
1081        assert_eq!(sequential.len(), 0, "No specs should be sequential");
1082    }
1083
1084    #[test]
1085    fn test_find_conflicting_pairs() {
1086        let specs = vec![
1087            MutationSpec::AddDerive {
1088                target: MutationTargetSymbol::ById(dummy_id(1)),
1089                derives: vec!["Debug".to_string()],
1090            },
1091            MutationSpec::AddDerive {
1092                target: MutationTargetSymbol::ById(dummy_id(1)),
1093                derives: vec!["Clone".to_string()],
1094            },
1095            MutationSpec::AddDerive {
1096                target: MutationTargetSymbol::ById(dummy_id(2)),
1097                derives: vec!["Default".to_string()],
1098            },
1099        ];
1100
1101        let conflicts = find_conflicting_pairs(&specs);
1102
1103        // specs[0] and specs[1] conflict (same target: User)
1104        assert!(conflicts.contains(&(0, 1)), "User specs should conflict");
1105        // specs[0]/specs[1] and specs[2] don't conflict
1106        assert!(
1107            !conflicts.contains(&(0, 2)),
1108            "User and Order should not conflict"
1109        );
1110        assert!(
1111            !conflicts.contains(&(1, 2)),
1112            "User and Order should not conflict"
1113        );
1114    }
1115
1116    // ========================================================================
1117    // Verified Orchestration Tests
1118    // ========================================================================
1119
1120    #[test]
1121    fn test_verified_orchestrated_result_success() {
1122        let orch_result = OrchestratedResult::Success {
1123            applied: vec![0, 1],
1124            modified_files: vec![],
1125            total_changes: 2,
1126        };
1127
1128        let pipeline_result = PipelineResult {
1129            overall: ryo_verification::Status::Passed,
1130            results: vec![],
1131        };
1132
1133        let verified = VerifiedOrchestratedResult::new(orch_result, pipeline_result);
1134
1135        assert!(verified.is_success());
1136        assert!(verified.verified);
1137        assert_eq!(verified.applied_count(), 2);
1138    }
1139
1140    #[test]
1141    fn test_verified_orchestrated_result_orchestration_failure() {
1142        let orch_result = OrchestratedResult::Error(OrchestratorError {
1143            kind: OrchestratorErrorKind::ExecutionFailed,
1144            message: "Test error".to_string(),
1145        });
1146
1147        let verified = VerifiedOrchestratedResult::from_orchestration_failure(orch_result);
1148
1149        assert!(!verified.is_success());
1150        assert!(!verified.verified);
1151        assert!(verified.verification.is_none());
1152        assert_eq!(verified.applied_count(), 0);
1153    }
1154
1155    #[test]
1156    fn test_verified_orchestrated_result_verification_failure() {
1157        let orch_result = OrchestratedResult::Success {
1158            applied: vec![0],
1159            modified_files: vec![],
1160            total_changes: 1,
1161        };
1162
1163        let pipeline_result = PipelineResult {
1164            overall: ryo_verification::Status::Failed,
1165            results: vec![ryo_verification::VerificationResult::failure(
1166                "test",
1167                std::time::Duration::ZERO,
1168                vec![ryo_verification::Diagnostic::error("Test error")],
1169            )],
1170        };
1171
1172        let verified = VerifiedOrchestratedResult::new(orch_result, pipeline_result);
1173
1174        assert!(!verified.is_success());
1175        assert!(!verified.verified);
1176        assert!(verified.orchestration.is_success());
1177    }
1178
1179    #[test]
1180    fn test_run_speculative_verified_empty_specs() {
1181        let mut ctx = create_multi_file_context();
1182
1183        let orchestrator = ExecutionOrchestrator::new(vec![]);
1184        let pipeline = VerificationPipeline::new();
1185
1186        let result = orchestrator
1187            .run_speculative_verified(&mut ctx, &pipeline)
1188            .unwrap();
1189
1190        // Empty specs should succeed trivially
1191        assert!(result.is_success());
1192        assert!(result.orchestration.is_success());
1193        assert!(result.verified);
1194    }
1195
1196    #[test]
1197    fn test_run_speculative_verified_basic() {
1198        let mut ctx = create_multi_file_context();
1199
1200        let specs = vec![MutationSpec::AddDerive {
1201            target: MutationTargetSymbol::ById(dummy_id(1)),
1202            derives: vec!["Debug".to_string()],
1203        }];
1204
1205        let orchestrator =
1206            ExecutionOrchestrator::new(specs).with_strategy(OrchestrationStrategy::Sequential);
1207
1208        // Use empty pipeline (no verifiers)
1209        let pipeline = VerificationPipeline::new();
1210
1211        let result = orchestrator
1212            .run_speculative_verified(&mut ctx, &pipeline)
1213            .unwrap();
1214
1215        // Orchestration should succeed
1216        assert!(result.orchestration.is_success());
1217        // With no verifiers, verification passes trivially
1218        assert!(result.verified);
1219    }
1220
1221    #[test]
1222    fn test_run_speculative_verified_with_graph_verifier() {
1223        use ryo_verification::GraphVerifier;
1224
1225        let mut ctx = create_multi_file_context();
1226
1227        let specs = vec![MutationSpec::AddDerive {
1228            target: MutationTargetSymbol::ById(dummy_id(1)),
1229            derives: vec!["Debug".to_string()],
1230        }];
1231
1232        let orchestrator =
1233            ExecutionOrchestrator::new(specs).with_strategy(OrchestrationStrategy::Sequential);
1234
1235        // Pipeline with GraphVerifier only (fast pre-check)
1236        let pipeline = VerificationPipeline::new().add_in_memory(GraphVerifier::new());
1237
1238        let result = orchestrator
1239            .run_speculative_verified(&mut ctx, &pipeline)
1240            .unwrap();
1241
1242        // Both orchestration and pre-check should succeed
1243        assert!(result.orchestration.is_success());
1244        // Verification status depends on pre-check result
1245        assert!(result.verification.is_some() || result.verified);
1246    }
1247}