cflx 0.6.98

Conflux – a spec-driven parallel coding orchestrator that runs AI agents on git worktrees
use std::{collections::HashSet, sync::Arc, time::Instant};

use tokio::sync::RwLock;

use tracing::warn;

use crate::{
    openspec::Change,
    orchestration::state::{OrchestratorState, ReducerCommand},
    task_parser,
    tui::events::{LogEntry, LogLevel},
};

use super::{AppState, ChangeState, TuiCommand};

pub(super) fn collect_start_processing_targets(
    changes: &[ChangeState],
    parallel_mode: bool,
) -> Result<Vec<String>, String> {
    let selected: Vec<String> = changes
        .iter()
        .filter(|c| c.selected && matches!(c.display_status_cache.as_str(), "not queued"))
        .map(|c| c.id.clone())
        .collect();

    if parallel_mode {
        let ineligible: Vec<String> = changes
            .iter()
            .filter(|c| {
                c.selected
                    && !c.is_parallel_eligible
                    && matches!(c.display_status_cache.as_str(), "not queued")
            })
            .map(|c| c.id.clone())
            .collect();
        if !ineligible.is_empty() {
            return Err(format!(
                "Parallel mode requires committed changes. Uncommitted: {}",
                ineligible.join(", ")
            ));
        }
    }

    if selected.is_empty() {
        return Err("No changes selected".to_string());
    }

    Ok(selected)
}

pub(super) fn collect_resume_processing_targets(
    changes: &[ChangeState],
) -> Result<Vec<String>, String> {
    let marked_ids: Vec<String> = changes
        .iter()
        .filter(|c| c.selected && matches!(c.display_status_cache.as_str(), "not queued"))
        .map(|c| c.id.clone())
        .collect();

    if marked_ids.is_empty() {
        return Err("No changes marked for execution".to_string());
    }

    Ok(marked_ids)
}

pub(super) fn collect_retry_error_targets(changes: &[ChangeState]) -> Vec<String> {
    changes
        .iter()
        .filter(|c| c.display_status_cache == "error")
        .map(|c| c.id.clone())
        .collect()
}

pub(super) fn mark_changes_queued(changes: &mut [ChangeState], ids: &[String]) {
    for change in changes {
        if ids.contains(&change.id) {
            let was_error = change.display_status_cache == "error";
            change.set_display_status_cache("queued");
            if was_error {
                change.selected = true;
            }
        }
    }
}

pub(super) fn sync_queue_intent(shared: Option<&Arc<RwLock<OrchestratorState>>>, ids: &[String]) {
    if let Some(shared) = shared {
        if let Ok(mut guard) = shared.try_write() {
            for id in ids {
                guard.apply_command(ReducerCommand::AddToQueue(id.clone()));
            }
        }
    }
}

pub(super) fn emit_retry_logs(ids: &[String]) -> Vec<LogEntry> {
    ids.iter()
        .map(|id| {
            let mut entry = LogEntry::info(format!("Retrying: {}", id));
            entry.level = LogLevel::Info;
            entry
        })
        .collect()
}

pub(super) fn build_start_command(ids: Vec<String>) -> TuiCommand {
    TuiCommand::StartProcessing(ids)
}

pub(super) fn update_changes(state: &mut AppState, fetched_changes: Vec<Change>) {
    let rejected_changes = crate::openspec::list_rejected_changes_native().unwrap_or_else(|err| {
        warn!(error = %err, "Failed to list rejected changes for TUI refresh");
        Vec::new()
    });

    update_changes_with_rejected(state, fetched_changes, rejected_changes);
}

pub(super) fn update_changes_with_rejected(
    state: &mut AppState,
    fetched_changes: Vec<Change>,
    rejected_changes: Vec<Change>,
) {
    let active_ids: HashSet<String> = fetched_changes.iter().map(|c| c.id.clone()).collect();
    let rejected_ids: HashSet<String> = rejected_changes.iter().map(|c| c.id.clone()).collect();

    if let Some(shared_state) = &state.shared_orchestrator_state {
        if let Ok(mut guard) = shared_state.try_write() {
            for change in &fetched_changes {
                if change.total_tasks > 0 {
                    guard.set_task_progress(
                        change.id.clone(),
                        change.completed_tasks,
                        change.total_tasks,
                    );
                }
            }
            for change in &rejected_changes {
                if change.total_tasks > 0 {
                    guard.set_task_progress(
                        change.id.clone(),
                        change.completed_tasks,
                        change.total_tasks,
                    );
                }
            }
        }
    }

    let new_ids: Vec<String> = fetched_changes
        .iter()
        .chain(rejected_changes.iter())
        .filter(|c| !state.known_change_ids.contains(&c.id))
        .map(|c| c.id.clone())
        .collect();

    for fetched in &fetched_changes {
        if let Some(existing) = state.changes.iter_mut().find(|c| c.id == fetched.id) {
            let was_rejected = existing.display_status_cache == "rejected";
            let was_archived = existing.display_status_cache == "archived";
            let is_merge_wait = existing.display_status_cache == "merge wait";
            let is_resolve_wait = existing.display_status_cache == "resolve pending";

            if was_rejected && !rejected_ids.contains(&fetched.id) {
                existing.set_display_status_cache("not queued");
                existing.selected = false;
            }

            let (completed, total) = if let Some(shared_state) = &state.shared_orchestrator_state {
                if let Ok(guard) = shared_state.try_read() {
                    let progress = guard.task_progress(&fetched.id);
                    if progress.1 > 0 {
                        progress
                    } else {
                        (fetched.completed_tasks, fetched.total_tasks)
                    }
                } else {
                    (fetched.completed_tasks, fetched.total_tasks)
                }
            } else {
                (fetched.completed_tasks, fetched.total_tasks)
            };

            if was_archived {
                existing.set_display_status_cache("not queued");
                if total > 0 {
                    existing.completed_tasks = completed;
                    existing.total_tasks = total;
                }
            } else if is_merge_wait || is_resolve_wait {
                if total > 0 {
                    existing.completed_tasks = completed;
                    existing.total_tasks = total;
                }
            } else if total > 0 {
                existing.completed_tasks = completed;
                existing.total_tasks = total;
            } else {
                let worktree_path = state.worktree_paths.get(&fetched.id).map(|p| p.as_path());
                match existing.display_status_cache.as_str() {
                    "archiving" | "resolving" | "archived" | "merged" => {
                        if let Ok(progress) =
                            task_parser::parse_progress_with_fallback(&fetched.id, worktree_path)
                        {
                            if progress.total > 0 {
                                existing.completed_tasks = progress.completed;
                                existing.total_tasks = progress.total;
                            }
                        }
                    }
                    _ => {}
                }
            }
        }
    }

    for rejected in &rejected_changes {
        if let Some(existing) = state.changes.iter_mut().find(|c| c.id == rejected.id) {
            let (completed, total) = if let Some(shared_state) = &state.shared_orchestrator_state {
                if let Ok(guard) = shared_state.try_read() {
                    let progress = guard.task_progress(&rejected.id);
                    if progress.1 > 0 {
                        progress
                    } else {
                        (rejected.completed_tasks, rejected.total_tasks)
                    }
                } else {
                    (rejected.completed_tasks, rejected.total_tasks)
                }
            } else {
                (rejected.completed_tasks, rejected.total_tasks)
            };

            if total > 0 {
                existing.completed_tasks = completed;
                existing.total_tasks = total;
            }
            existing.selected = false;
            existing.set_display_status_cache("rejected");
        }
    }

    for id in &new_ids {
        if let Some(fetched) = fetched_changes.iter().find(|c| &c.id == id) {
            let mut new_state = ChangeState::from_change(fetched);
            new_state.is_new = true;
            state.changes.push(new_state);
            continue;
        }

        if let Some(rejected) = rejected_changes.iter().find(|c| &c.id == id) {
            let mut rejected_state = ChangeState::from_change(rejected);
            rejected_state.is_new = false;
            rejected_state.selected = false;
            rejected_state.set_display_status_cache("rejected");
            state.changes.push(rejected_state);
        }
    }

    state.known_change_ids.extend(new_ids);
    state.new_change_count = state.changes.iter().filter(|c| c.is_new).count();
    state.last_refresh = Instant::now();

    if let Some(shared_state) = &state.shared_orchestrator_state {
        if let Ok(guard) = shared_state.try_read() {
            for change in &mut state.changes {
                let apply_count = guard.apply_count(&change.id);
                if apply_count > 0 {
                    match change.iteration_number {
                        Some(existing) => {
                            if apply_count > existing {
                                change.iteration_number = Some(apply_count);
                            }
                        }
                        None => change.iteration_number = Some(apply_count),
                    }
                }
            }
        }
    }

    state.changes.retain(|c| {
        active_ids.contains(&c.id)
            || rejected_ids.contains(&c.id)
            || c.started_at.is_some()
            || matches!(
                c.display_status_cache.as_str(),
                "archiving"
                    | "archived"
                    | "merged"
                    | "merge wait"
                    | "resolving"
                    | "resolve pending"
                    | "rejected"
                    | "error"
            )
    });

    if state.cursor_index >= state.changes.len() && !state.changes.is_empty() {
        state.cursor_index = state.changes.len() - 1;
        state.list_state.select(Some(state.cursor_index));
    }
}