cflx 0.6.130

Conflux – a spec-driven parallel coding orchestrator that runs AI agents on git worktrees
//! Orchestration logic for parallel execution with order-based re-analysis.
//!
//! This module handles the main scheduler loop that:
//! - Does NOT block on dispatch (spawn tasks into JoinSet)
//! - Continues re-analysis even when apply commands are running
//! - Tracks in-flight changes to calculate available slots
//! - Responds to queue notifications, debounce timers, and task completions

use crate::error::Result;
use crate::events::LogEntry;
use std::collections::HashSet;
use std::sync::atomic::Ordering;
use std::sync::Arc;
use tokio::sync::Semaphore;
use tokio::task::JoinSet;
use tracing::{error, info, warn};

use super::cleanup::WorkspaceCleanupGuard;
use super::dynamic_queue::ReanalysisReason;
use super::events::send_event;
use super::types::WorkspaceResult;
use super::ParallelEvent;
use super::ParallelExecutor;
use super::SchedulerLifetime;

impl ParallelExecutor {
    pub(super) fn should_exit_when_idle(
        &self,
        join_set_empty: bool,
        queued_empty: bool,
        in_flight_empty: bool,
    ) -> bool {
        join_set_empty
            && queued_empty
            && in_flight_empty
            && self.resolve_wait_changes.is_empty()
            && self.reject_wait_changes.is_empty()
            && self.manual_resolve_active() == 0
            && self.pending_merge_count.load(Ordering::Relaxed) == 0
            && self.scheduler_lifetime == SchedulerLifetime::Finite
    }

    /// Execute changes with order-based dependency analysis and concurrent re-analysis.
    ///
    /// This method uses a `tokio::select!` based scheduler loop that:
    /// - Does NOT block on dispatch (spawn tasks into JoinSet)
    /// - Continues re-analysis even when apply commands are running
    /// - Tracks in-flight changes to calculate available slots
    /// - Responds to queue notifications, debounce timers, and task completions
    ///
    /// # Arguments
    /// * `changes` - Initial list of changes to execute
    /// * `analyzer` - Async function that returns AnalysisResult (order + dependencies)
    ///   - First parameter: queued changes to analyze
    ///   - Second parameter: in-flight change IDs (currently executing)
    ///   - Third parameter: iteration number
    pub async fn execute_with_order_based_reanalysis<F>(
        &mut self,
        changes: Vec<crate::openspec::Change>,
        analyzer: F,
    ) -> Result<()>
    where
        for<'a> F: Fn(
                &'a [crate::openspec::Change],
                &'a [String],
                u32,
            ) -> std::pin::Pin<
                Box<dyn std::future::Future<Output = crate::analyzer::AnalysisResult> + Send + 'a>,
            > + Send
            + Sync,
    {
        if changes.is_empty() {
            let (reducer_has_queued_intent, reducer_has_lane_wait) = self
                .shared_orchestrator_state
                .as_ref()
                .and_then(|state| state.try_read().ok())
                .map(|state| {
                    (
                        !state.queued_change_ids().is_empty(),
                        !state.resolve_wait_change_ids().is_empty()
                            || !state.reject_wait_change_ids().is_empty(),
                    )
                })
                .unwrap_or((false, false));
            if !reducer_has_queued_intent && !reducer_has_lane_wait {
                send_event(&self.event_tx, ParallelEvent::AllCompleted).await;
                return Ok(());
            }
            if reducer_has_lane_wait {
                info!(
                    "Starting scheduler loop with reducer-visible base-lane wait retry intent and empty local queue"
                );
            } else {
                info!(
                    "Starting scheduler loop with reducer-visible queued intent and empty local queue"
                );
            }
        }

        info!(
            "Starting order-based execution with re-analysis for {} changes",
            changes.len()
        );

        // Prepare for parallel execution (clean check for git)
        info!("Preparing for parallel execution...");
        match self.workspace_manager.prepare_for_parallel().await {
            Ok(Some(warning)) => {
                warn!("{}", warning.message);
                send_event(
                    &self.event_tx,
                    ParallelEvent::Warning {
                        title: warning.title,
                        message: warning.message,
                    },
                )
                .await;
            }
            Ok(None) => {}
            Err(e) => {
                let error_msg = format!("Failed to prepare for parallel execution: {}", e);
                error!("{}", error_msg);
                send_event(&self.event_tx, ParallelEvent::Error { message: error_msg }).await;
                return Err(e.into());
            }
        }
        info!("Preparation complete");

        // Initialize scheduler state
        let max_parallelism = self.workspace_manager.max_concurrent();
        let semaphore = Arc::new(Semaphore::new(max_parallelism));
        let mut join_set: JoinSet<WorkspaceResult> = JoinSet::new();
        let (merge_result_tx, mut merge_result_rx) = tokio::sync::mpsc::channel(64);
        let mut in_flight: HashSet<String> = HashSet::new();
        let mut queued: Vec<crate::openspec::Change> = changes;
        let mut iteration = 1u32;
        let mut cleanup_guard = WorkspaceCleanupGuard::new(
            self.workspace_manager.backend_type(),
            self.repo_root.clone(),
        );

        // Reanalysis reason is derived from scheduler events/state each iteration.
        let mut reanalysis_reason = ReanalysisReason::Initial;
        let mut cancelled = false;

        // Main scheduler loop: wait for triggers and dispatch changes
        loop {
            // Check for cancellation
            if self.is_cancelled() {
                let remaining_changes: Vec<String> = queued.iter().map(|c| c.id.clone()).collect();
                let cancel_msg = format!(
                    "Cancelled parallel execution ({} queued, {} in-flight: queued=[{}], in-flight=[{}])",
                    remaining_changes.len(),
                    in_flight.len(),
                    remaining_changes.join(", "),
                    in_flight.iter().cloned().collect::<Vec<_>>().join(", ")
                );
                send_event(
                    &self.event_tx,
                    ParallelEvent::Log(LogEntry::warn(&cancel_msg)),
                )
                .await;
                cancelled = true;
                break;
            }

            // Step 1: Check dynamic queue for newly added changes (TUI mode)
            self.check_dynamic_queue_and_add_changes(
                &mut queued,
                &in_flight,
                &mut reanalysis_reason,
            )
            .await;

            // Step 2: Sync reducer-owned ResolveWait intent before scheduler drain/idle checks.
            // This keeps manual resolve dispatch reducer-owned while making scheduler work detection truthful.
            self.sync_resolve_wait_from_shared_state_nonblocking();
            self.maybe_dispatch_resolve_wait_retry().await;

            // Step 2: Reconcile reducer-visible queue intent into scheduler-local candidates.
            let reconciliation = self
                .reconcile_queued_candidates_from_shared_state(&mut queued, &in_flight)
                .await;
            if reconciliation.has_queued_additions() {
                reanalysis_reason = ReanalysisReason::QueueNotification;
            } else if reconciliation.has_repair_additions() {
                reanalysis_reason = ReanalysisReason::RepairCandidate;
            }

            // Step 3: Re-analysis decision is derived from scheduler state.
            let work_drained = queued.is_empty()
                && in_flight.is_empty()
                && self.resolve_wait_changes.is_empty()
                && self.reject_wait_changes.is_empty()
                && self.manual_resolve_active() == 0
                && self.pending_merge_count.load(Ordering::Relaxed) == 0;
            if work_drained && self.scheduler_lifetime == SchedulerLifetime::Finite {
                info!(
                    "All changes completed (queued/in-flight/resolve_wait/manual_resolve empty), stopping"
                );
                break;
            }
            if work_drained && self.scheduler_lifetime == SchedulerLifetime::Persistent {
                info!(
                    "Scheduler idle with no work; waiting for dynamic queue notifications (persistent lifetime)"
                );
            }

            if !queued.is_empty() {
                let available_slots = self.calculate_available_slots(max_parallelism, &in_flight);
                let should_attempt_reanalysis = available_slots > 0;
                if should_attempt_reanalysis {
                    let (should_break, new_iteration) = self
                        .perform_reanalysis_and_dispatch(
                            &mut queued,
                            &mut in_flight,
                            max_parallelism,
                            iteration,
                            reanalysis_reason,
                            &analyzer,
                            semaphore.clone(),
                            &mut join_set,
                            &mut cleanup_guard,
                        )
                        .await?;

                    iteration = new_iteration;

                    if should_break {
                        break;
                    }
                } else {
                    self.emit_no_analysis_diagnostic(
                        &queued,
                        &in_flight,
                        max_parallelism,
                        "no_available_slots",
                    )
                    .await;
                }
            }

            // Step 3: Check if all work is done (before waiting on select)
            if self.should_exit_when_idle(
                join_set.is_empty(),
                queued.is_empty(),
                in_flight.is_empty(),
            ) {
                info!(
                    "All work completed (join_set/queued/resolve_wait/manual_resolve empty), exiting scheduler loop"
                );
                break;
            }

            // Step 4: Wait for events using tokio::select!
            // This makes the loop non-blocking and responsive to multiple triggers
            tokio::select! {
                // Join completion: task finished (apply+archive)
                Some(result) = join_set.join_next() => {
                    match result {
                        Ok(workspace_result) => {
                            self.handle_workspace_completion(workspace_result, max_parallelism, &mut in_flight, &merge_result_tx).await;

                            // Re-analysis is state-derived each loop.
                            // If a manual resolve is still active, keep the generic completion reason;
                            // otherwise treat the slot release as resolve-aware capacity recovery.
                            let manual_resolves_active = self
                                .manual_resolve_count
                                .as_ref()
                                .map(|counter| counter.load(std::sync::atomic::Ordering::Relaxed))
                                .unwrap_or(0);
                            reanalysis_reason = if manual_resolves_active == 0 {
                                ReanalysisReason::ResolveCompletion
                            } else {
                                ReanalysisReason::Completion
                            };
                            self.trigger_resolve_wait_retry_dispatch();
                        }
                        Err(e) => {
                            error!("Task panicked: {:?}", e);
                        }
                    }
                }

                // Background merge completion: merge+cleanup finished asynchronously
                Some(merge_result) = merge_result_rx.recv() => {
                    let merged = self.handle_merge_result(merge_result).await;
                    if merged {
                        self.trigger_resolve_wait_retry_dispatch();
                        reanalysis_reason = ReanalysisReason::ResolveCompletion;
                    }
                }

                // Queue notification: dynamic queue has new items
                Some(_) = async {
                    if let Some(queue) = &self.dynamic_queue {
                        queue.notified().await;
                        Some(())
                    } else {
                        std::future::pending().await
                    }
                } => {
                    info!("Queue notification received, will check queue on next iteration");
                    self.trigger_resolve_wait_retry_dispatch();
                    // Queue check happens at loop start
                }

                // Debounce timer: wait before allowing re-analysis
                _ = tokio::time::sleep(std::time::Duration::from_millis(500)) => {
                    // Timer expired; next loop derives re-analysis from current scheduler state.
                }
            }
        }

        // Drop cleanup guard without calling commit()
        // Workspaces are preserved by default for resume/debugging
        // Cleanup is only performed explicitly after successful merge via cleanup_workspace()
        drop(cleanup_guard);

        // Send appropriate completion event based on how we exited
        if cancelled {
            send_event(&self.event_tx, ParallelEvent::Stopped).await;
        } else {
            send_event(&self.event_tx, ParallelEvent::AllCompleted).await;
        }
        Ok(())
    }
}