use crate::config::OrchestratorConfig;
use crate::error::Result;
use crate::openspec::Change;
use crate::vcs::{GitWorkspaceManager, WorkspaceManager};
use crossterm::event::{self, Event, KeyEventKind, MouseEventKind};
use ratatui::DefaultTerminal;
use std::collections::HashSet;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use super::command_handlers::{handle_tui_command, TuiCommandContext};
use super::events::{LogEntry, OrchestratorEvent, TuiCommand};
use super::key_handlers::{handle_key_event, KeyEventContext};
use super::log_deduplicator;
use super::queue::DynamicQueue;
use super::render::{render, SPINNER_CHARS};
use super::state::{AppState, AUTO_REFRESH_INTERVAL_SECS};
use super::terminal::restore_terminal;
use super::worktrees::load_worktrees_with_conflict_check;
#[allow(dead_code)]
pub async fn run_tui(
initial_changes: Vec<Change>,
config: OrchestratorConfig,
web_url: Option<String>,
#[cfg(feature = "web-monitoring")] web_state: Option<Arc<crate::web::WebState>>,
) -> Result<()> {
run_tui_with_remote(
initial_changes,
config,
web_url,
#[cfg(feature = "web-monitoring")]
web_state,
None,
)
.await
}
fn is_refresh_root_usable(repo_root: &Path) -> bool {
repo_root.is_dir()
}
fn should_skip_local_refresh(repo_root: &Path, stale_refresh_root_warned: &mut bool) -> bool {
let refresh_root_usable = is_refresh_root_usable(repo_root);
if !refresh_root_usable {
if !*stale_refresh_root_warned {
*stale_refresh_root_warned = true;
warn!(
repo_root = %repo_root.display(),
"Skipping local TUI auto-refresh: stale or missing refresh root"
);
}
return true;
}
*stale_refresh_root_warned = false;
false
}
fn should_bypass_local_refresh(is_remote_mode: bool) -> bool {
is_remote_mode
}
fn refresh_local_changes(repo_root: &Path) -> Result<(Vec<Change>, Vec<Change>)> {
let active_changes = crate::openspec::list_changes_native_from(repo_root)?;
let rejected_changes = crate::openspec::list_rejected_changes_native_from(repo_root)?;
Ok((active_changes, rejected_changes))
}
fn should_apply_event_to_tui_reducer(event: &crate::events::ExecutionEvent) -> bool {
use crate::events::ExecutionEvent;
match event {
ExecutionEvent::ProcessingStarted(_)
| ExecutionEvent::ProcessingCompleted(_)
| ExecutionEvent::ProcessingError { .. }
| ExecutionEvent::ApplyStarted { .. }
| ExecutionEvent::ApplyCompleted { .. }
| ExecutionEvent::ApplyFailed { .. }
| ExecutionEvent::ArchiveStarted { .. }
| ExecutionEvent::ArchiveResumed { .. }
| ExecutionEvent::ArchiveRetryScheduled { .. }
| ExecutionEvent::ChangeArchived(_)
| ExecutionEvent::ArchiveFailed { .. }
| ExecutionEvent::AcceptanceStarted { .. }
| ExecutionEvent::AcceptanceCompleted { .. }
| ExecutionEvent::AcceptanceFailed { .. }
| ExecutionEvent::ChangeRejected { .. }
| ExecutionEvent::RejectionReviewCompleted { .. }
| ExecutionEvent::RejectionReviewFailed { .. }
| ExecutionEvent::WorkspaceStatusUpdated { .. }
| ExecutionEvent::MergeCompleted { .. }
| ExecutionEvent::MergeDeferred { .. }
| ExecutionEvent::ResolveStarted { .. }
| ExecutionEvent::ResolveCompleted { .. }
| ExecutionEvent::ResolveFailed { .. }
| ExecutionEvent::DependencyBlocked { .. }
| ExecutionEvent::DependencyResolved { .. }
| ExecutionEvent::AcceptanceGated { .. }
| ExecutionEvent::ExecutionBlocked { .. }
| ExecutionEvent::ChangeDequeued { .. }
| ExecutionEvent::ChangeStopped { .. }
| ExecutionEvent::ChangesRefreshed { .. } => true,
ExecutionEvent::ApplyOutput { .. }
| ExecutionEvent::ArchiveOutput { .. }
| ExecutionEvent::AcceptanceOutput { .. }
| ExecutionEvent::ProgressUpdated { .. }
| ExecutionEvent::WorkspaceCreated { .. }
| ExecutionEvent::WorkspaceResumed { .. }
| ExecutionEvent::WorkspacePreserved { .. }
| ExecutionEvent::CleanupStarted { .. }
| ExecutionEvent::CleanupCompleted { .. }
| ExecutionEvent::MergeStarted { .. }
| ExecutionEvent::MergeConflict { .. }
| ExecutionEvent::ConflictResolutionStarted
| ExecutionEvent::ConflictResolutionCompleted
| ExecutionEvent::ConflictResolutionFailed { .. }
| ExecutionEvent::ChangeSkipped { .. }
| ExecutionEvent::AnalysisStarted { .. }
| ExecutionEvent::AnalysisOutput { .. }
| ExecutionEvent::AnalysisCompleted { .. }
| ExecutionEvent::ResolveOutput { .. }
| ExecutionEvent::HookStarted { .. }
| ExecutionEvent::HookCompleted { .. }
| ExecutionEvent::HookFailed { .. }
| ExecutionEvent::Warning { .. }
| ExecutionEvent::ParallelStartRejected { .. }
| ExecutionEvent::Log(_)
| ExecutionEvent::Stopping
| ExecutionEvent::Stopped
| ExecutionEvent::AllCompleted
| ExecutionEvent::Error { .. }
| ExecutionEvent::WorktreesRefreshed { .. }
| ExecutionEvent::BranchMergeStarted { .. }
| ExecutionEvent::BranchMergeCompleted { .. }
| ExecutionEvent::BranchMergeFailed { .. }
| ExecutionEvent::ChangeStopFailed { .. }
| ExecutionEvent::RemoteChangeUpdate { .. } => false,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum LocalOrchestratorShutdownOutcome {
NoTask,
AlreadyFinished,
Graceful,
AbortedAfterTimeout,
}
pub(crate) async fn shutdown_local_orchestrator_task(
orchestrator_handle: Option<tokio::task::JoinHandle<Result<()>>>,
orchestrator_cancel: Option<CancellationToken>,
grace_period: Duration,
) -> LocalOrchestratorShutdownOutcome {
if let Some(cancel) = orchestrator_cancel {
info!(
grace_ms = grace_period.as_millis(),
"Cancelling local TUI orchestrator during shutdown"
);
cancel.cancel();
}
let Some(handle) = orchestrator_handle else {
debug!("No local TUI orchestrator task to shut down");
return LocalOrchestratorShutdownOutcome::NoTask;
};
if handle.is_finished() {
let _ = handle.await;
info!("Local TUI orchestrator task was already finished during shutdown");
return LocalOrchestratorShutdownOutcome::AlreadyFinished;
}
tokio::pin!(handle);
tokio::select! {
join_result = &mut handle => {
match join_result {
Ok(Ok(())) => info!("Local TUI orchestrator task finished gracefully during shutdown"),
Ok(Err(err)) => warn!(error = %err, "Local TUI orchestrator task exited with error during shutdown"),
Err(err) => warn!(error = %err, "Local TUI orchestrator task join failed during shutdown"),
}
LocalOrchestratorShutdownOutcome::Graceful
}
_ = tokio::time::sleep(grace_period) => {
warn!(
grace_ms = grace_period.as_millis(),
"Local TUI orchestrator did not finish before shutdown grace period; aborting task to prevent detached local work"
);
handle.as_ref().abort_handle().abort();
match tokio::time::timeout(Duration::from_secs(1), &mut handle).await {
Ok(Ok(_)) => info!("Aborted local TUI orchestrator task joined after abort"),
Ok(Err(err)) if err.is_cancelled() => {
info!("Local TUI orchestrator task aborted successfully")
}
Ok(Err(err)) => {
warn!(error = %err, "Local TUI orchestrator task join failed after abort")
}
Err(_) => warn!("Timed out while joining aborted local TUI orchestrator task"),
}
LocalOrchestratorShutdownOutcome::AbortedAfterTimeout
}
}
}
pub async fn run_tui_with_remote(
initial_changes: Vec<Change>,
config: OrchestratorConfig,
web_url: Option<String>,
#[cfg(feature = "web-monitoring")] web_state: Option<Arc<crate::web::WebState>>,
remote_client: Option<crate::remote::RemoteClient>,
) -> Result<()> {
let original_hook = std::panic::take_hook();
std::panic::set_hook(Box::new(move |panic_info| {
restore_terminal();
original_hook(panic_info);
}));
let mut terminal = ratatui::init();
let result = run_tui_loop(
&mut terminal,
initial_changes,
config,
web_url,
#[cfg(feature = "web-monitoring")]
web_state,
remote_client,
)
.await;
restore_terminal();
result
}
async fn run_tui_loop(
terminal: &mut DefaultTerminal,
initial_changes: Vec<Change>,
config: OrchestratorConfig,
web_url: Option<String>,
#[cfg(feature = "web-monitoring")] web_state: Option<Arc<crate::web::WebState>>,
remote_client: Option<crate::remote::RemoteClient>,
) -> Result<()> {
let repo_root = std::env::current_dir()?;
let (committed_change_ids, uncommitted_file_change_ids): (HashSet<String>, HashSet<String>) =
if remote_client.is_some() {
(
initial_changes
.iter()
.map(|change| change.id.clone())
.collect(),
HashSet::new(),
)
} else {
let committed_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_in_head(&repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!("Failed to load committed change snapshot: {}", err);
initial_changes
.iter()
.map(|change| change.id.clone())
.collect()
}
};
let uncommitted_file_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_with_uncommitted_files(&repo_root)
.await
{
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!("Failed to detect uncommitted files in changes: {}", err);
HashSet::new()
}
};
(committed_change_ids, uncommitted_file_change_ids)
};
let worktree_base_dir = config
.get_workspace_base_dir()
.map(PathBuf::from)
.unwrap_or_else(|| crate::config::defaults::default_workspace_base_dir(Some(&repo_root)));
let worktree_manager = GitWorkspaceManager::new(
worktree_base_dir.clone(),
repo_root.clone(),
config.get_max_concurrent_workspaces(),
config.clone(),
);
let worktree_change_ids: HashSet<String> =
match worktree_manager.list_worktree_change_ids().await {
Ok(ids) => ids,
Err(err) => {
warn!("Failed to load worktree snapshot: {}", err);
HashSet::new()
}
};
let mut initial_worktree_paths = std::collections::HashMap::new();
for change in &initial_changes {
match crate::vcs::git::get_worktree_path_for_change(&repo_root, &change.id).await {
Ok(Some(wt_path)) => {
initial_worktree_paths.insert(change.id.clone(), wt_path);
}
Ok(None) => {
}
Err(e) => {
debug!("Failed to get worktree path for {}: {}", change.id, e);
}
}
}
let change_ids: Vec<String> = initial_changes.iter().map(|c| c.id.clone()).collect();
let max_iterations = config.get_max_iterations();
let shared_state = std::sync::Arc::new(tokio::sync::RwLock::new(
crate::orchestration::state::OrchestratorState::new(change_ids, max_iterations),
));
let tui_config = crate::tui::config::TuiConfig::load_user_config()?;
let mut app = AppState::new(initial_changes);
app.set_tui_config(tui_config);
app.worktree_paths = initial_worktree_paths;
app.set_shared_state(shared_state.clone());
let git_dir_exists = crate::cli::check_git_directory();
let mut parallel_available = crate::cli::check_parallel_available();
let mut parallel_mode = config.resolve_parallel_mode(false, git_dir_exists);
if remote_client.is_some() {
parallel_available = false;
parallel_mode = false;
}
if parallel_mode && !parallel_available {
parallel_mode = false;
app.warning_message =
Some("Parallel mode disabled because git is not available".to_string());
}
app.parallel_available = parallel_available;
app.parallel_mode = parallel_mode;
app.apply_parallel_eligibility(&committed_change_ids, &uncommitted_file_change_ids);
app.apply_worktree_status(&worktree_change_ids);
app.max_concurrent = config.get_max_concurrent_workspaces();
app.web_url = web_url;
use crate::ai_command_runner::{AiCommandRunner, SharedStaggerState};
use crate::command_queue::CommandQueueConfig;
use crate::config::defaults::*;
let shared_stagger_state: SharedStaggerState = Arc::new(tokio::sync::Mutex::new(None));
let queue_config = CommandQueueConfig {
stagger_delay_ms: config
.command_queue_stagger_delay_ms
.unwrap_or(DEFAULT_STAGGER_DELAY_MS),
max_retries: config
.command_queue_max_retries
.unwrap_or(DEFAULT_MAX_RETRIES),
retry_delay_ms: config
.command_queue_retry_delay_ms
.unwrap_or(DEFAULT_RETRY_DELAY_MS),
retry_error_patterns: config
.command_queue_retry_patterns
.clone()
.unwrap_or_else(default_retry_patterns),
retry_if_duration_under_secs: config
.command_queue_retry_if_duration_under_secs
.unwrap_or(DEFAULT_RETRY_IF_DURATION_UNDER_SECS),
inactivity_timeout_secs: config.get_command_inactivity_timeout_secs(),
inactivity_kill_grace_secs: config.get_command_inactivity_kill_grace_secs(),
inactivity_timeout_max_retries: config.get_command_inactivity_timeout_max_retries(),
strict_process_cleanup: config.get_command_strict_process_cleanup(),
};
let stream_json_textify = config.get_stream_json_textify();
let mut ai_runner = AiCommandRunner::new(queue_config.clone(), shared_stagger_state.clone());
ai_runner.set_stream_json_textify(stream_json_textify);
ai_runner.set_strict_process_cleanup(config.get_command_strict_process_cleanup());
let (tx, mut rx) = mpsc::channel::<OrchestratorEvent>(100);
let (cmd_tx, mut cmd_rx) = mpsc::channel::<TuiCommand>(100);
#[cfg(feature = "web-monitoring")]
if let Some(ref ws) = web_state {
ws.set_shared_state(shared_state.clone()).await;
}
let dynamic_queue = DynamicQueue::new();
let manual_resolve_counter = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let cancel_token = CancellationToken::new();
#[cfg(feature = "web-monitoring")]
if let Some(ref ws) = web_state {
let (control_tx, mut control_rx) =
mpsc::unbounded_channel::<crate::web::state::ControlCommand>();
ws.set_control_channel(control_tx).await;
let bridge_cmd_tx = cmd_tx.clone();
let bridge_cancel = cancel_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
_ = bridge_cancel.cancelled() => {
break;
}
Some(control_cmd) = control_rx.recv() => {
use crate::web::state::ControlCommand;
let tui_cmd_opt = match control_cmd {
ControlCommand::Start => {
Some(TuiCommand::StartProcessing(vec![]))
}
ControlCommand::Stop => Some(TuiCommand::Stop),
ControlCommand::CancelStop => Some(TuiCommand::CancelStop),
ControlCommand::ForceStop => Some(TuiCommand::ForceStop),
ControlCommand::Retry => Some(TuiCommand::Retry),
};
if let Some(tui_cmd) = tui_cmd_opt {
if bridge_cmd_tx.send(tui_cmd).await.is_err() {
break;
}
}
}
}
}
});
}
let remote_client_actions = remote_client.clone();
let _ws_handle: Option<tokio::task::JoinHandle<()>> = if let Some(client) = remote_client {
let ws_url = client.ws_url();
let ws_token = client.token().map(str::to_owned);
let ws_tx = tx.clone();
let ws_cancel = cancel_token.clone();
info!("Starting remote WebSocket subscriber: {}", ws_url);
let (ws_msg_tx, mut ws_msg_rx) =
tokio::sync::mpsc::channel::<crate::remote::RemoteStateUpdate>(64);
let ws_task = tokio::spawn(async move {
loop {
match crate::remote::ws::connect_and_subscribe(
ws_url.clone(),
ws_token.as_deref(),
ws_msg_tx.clone(),
)
.await
{
Ok(recv_handle) => {
let abort_handle = recv_handle.abort_handle();
tokio::select! {
_ = ws_cancel.cancelled() => {
abort_handle.abort();
break;
}
result = recv_handle => {
let _ = result; warn!("WS connection dropped, will reconnect in 5s");
}
}
}
Err(e) => {
warn!("WS connect failed: {}, retrying in 5s", e);
}
}
for _ in 0..5u32 {
if ws_cancel.is_cancelled() {
return;
}
tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
}
}
});
let translate_tx = ws_tx;
tokio::spawn(async move {
let mut project_name_map: std::collections::HashMap<String, String> =
std::collections::HashMap::new();
while let Some(update) = ws_msg_rx.recv().await {
use crate::remote::types::RemoteStateUpdate;
match update {
RemoteStateUpdate::FullState { projects, .. } => {
project_name_map.clear();
for proj in &projects {
project_name_map.insert(proj.id.clone(), proj.name.clone());
}
let changes = crate::remote::group_changes_by_project(&projects);
let _ = translate_tx
.send(super::events::OrchestratorEvent::ChangesRefreshed {
changes,
rejected_changes: Vec::new(),
committed_change_ids: std::collections::HashSet::new(),
uncommitted_file_change_ids: std::collections::HashSet::new(),
worktree_change_ids: std::collections::HashSet::new(),
worktree_paths: std::collections::HashMap::new(),
worktree_not_ahead_ids: std::collections::HashSet::new(),
merge_wait_ids: std::collections::HashSet::new(),
})
.await;
for proj in &projects {
let project_display = proj.name.clone();
for ch in &proj.changes {
let id = format!("{}::{}/{}", proj.id, project_display, ch.id);
let _ = translate_tx
.send(super::events::OrchestratorEvent::RemoteChangeUpdate {
id,
completed_tasks: ch.completed_tasks,
total_tasks: ch.total_tasks,
status: Some(ch.status.clone()),
iteration_number: ch.iteration_number,
})
.await;
}
}
}
RemoteStateUpdate::ChangeUpdate { change } => {
let project_display = project_name_map
.get(&change.project)
.cloned()
.unwrap_or_else(|| change.project.clone());
let id = format!("{}::{}/{}", change.project, project_display, change.id);
let _ = translate_tx
.send(super::events::OrchestratorEvent::RemoteChangeUpdate {
id,
completed_tasks: change.completed_tasks,
total_tasks: change.total_tasks,
status: Some(change.status),
iteration_number: change.iteration_number,
})
.await;
}
RemoteStateUpdate::Log { entry } => {
use crate::tui::events::LogLevel;
let level = match entry.level.as_str() {
"error" => LogLevel::Error,
"warn" | "warning" => LogLevel::Warn,
"success" => LogLevel::Success,
_ => LogLevel::Info,
};
let effective_change_id =
entry.change_id.or_else(|| entry.project_id.clone());
let log_entry = crate::tui::events::LogEntry {
timestamp: entry.timestamp.clone(),
created_at: chrono::Utc::now(),
message: entry.message,
color: ratatui::style::Color::Reset,
level,
change_id: effective_change_id,
operation: entry.operation,
iteration: entry.iteration,
workspace_path: None,
};
let _ = translate_tx
.send(super::events::OrchestratorEvent::Log(log_entry))
.await;
}
RemoteStateUpdate::ChangeRemoved { .. } | RemoteStateUpdate::Ping => {
}
}
}
});
Some(ws_task)
} else {
None
};
let is_remote_mode = _ws_handle.is_some();
let refresh_tx = tx.clone();
let refresh_cancel = cancel_token.clone();
let refresh_repo_root = repo_root.clone();
let refresh_worktree_base_dir = worktree_base_dir.clone();
let refresh_config = config.clone();
let refresh_handle = tokio::spawn(async move {
if should_bypass_local_refresh(is_remote_mode) {
return;
}
let worktree_manager = GitWorkspaceManager::new(
refresh_worktree_base_dir,
refresh_repo_root.clone(),
refresh_config.get_max_concurrent_workspaces(),
refresh_config,
);
let mut stale_refresh_root_warned = false;
let mut interval = tokio::time::interval(Duration::from_secs(AUTO_REFRESH_INTERVAL_SECS));
loop {
tokio::select! {
_ = refresh_cancel.cancelled() => {
break;
}
_ = interval.tick() => {
if should_skip_local_refresh(
&refresh_repo_root,
&mut stale_refresh_root_warned,
) {
continue;
}
match refresh_local_changes(&refresh_repo_root) {
Ok((mut changes, rejected_changes)) => {
let committed_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_in_head(&refresh_repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!("Failed to refresh committed change snapshot: {}", err);
changes.iter().map(|change| change.id.clone()).collect()
}
};
let uncommitted_file_change_ids: HashSet<String> =
match crate::vcs::git::commands::list_changes_with_uncommitted_files(&refresh_repo_root).await {
Ok(ids) => ids.into_iter().collect(),
Err(err) => {
warn!("Failed to refresh uncommitted files snapshot: {}", err);
HashSet::new()
}
};
let worktree_change_ids: HashSet<String> =
match worktree_manager.list_worktree_change_ids().await {
Ok(ids) => ids,
Err(err) => {
warn!("Failed to refresh worktree snapshot: {}", err);
HashSet::new()
}
};
let mut worktree_paths = std::collections::HashMap::new();
for change in &mut changes {
match crate::vcs::git::get_worktree_path_for_change(
&refresh_repo_root,
&change.id
).await {
Ok(Some(wt_path)) => {
worktree_paths.insert(change.id.clone(), wt_path.clone());
match crate::task_parser::parse_progress_with_fallback(
&change.id,
Some(&wt_path)
) {
Ok(progress) => {
if progress.total > 0 {
change.completed_tasks = progress.completed;
change.total_tasks = progress.total;
} else {
debug!("Keeping existing progress for {} (parsed: 0/0)", change.id);
}
}
Err(e) => {
debug!("Failed to read progress for {}: {}", change.id, e);
}
}
}
Ok(None) => {
}
Err(e) => {
warn!("Failed to get worktree path for {}: {}", change.id, e);
}
}
}
let mut worktree_not_ahead_ids = std::collections::HashSet::new();
let mut merge_wait_ids = std::collections::HashSet::new();
if let Ok(Some(base_branch)) = crate::vcs::git::commands::get_current_branch(&refresh_repo_root).await {
for (change_id, wt_path) in &worktree_paths {
if let Ok(Some(worktree_branch)) = crate::vcs::git::commands::get_current_branch(wt_path).await {
match crate::vcs::git::commands::count_commits_ahead(
&refresh_repo_root,
&base_branch,
&worktree_branch
).await {
Ok(0) => {
worktree_not_ahead_ids.insert(change_id.clone());
}
Ok(_) => {
}
Err(e) => {
debug!("Failed to count commits ahead for {}: {}", change_id, e);
}
}
}
match crate::execution::state::detect_workspace_state(change_id, wt_path, &base_branch).await {
Ok(crate::execution::state::WorkspaceState::Archived) => {
merge_wait_ids.insert(change_id.clone());
debug!("Detected MergeWait for '{}': archive complete, waiting for merge", change_id);
}
Ok(_) => {
}
Err(e) => {
debug!("Failed to detect workspace state for {}: {}", change_id, e);
}
}
}
}
if refresh_tx
.send(OrchestratorEvent::ChangesRefreshed {
changes,
rejected_changes,
committed_change_ids,
uncommitted_file_change_ids,
worktree_change_ids,
worktree_paths,
worktree_not_ahead_ids,
merge_wait_ids,
})
.await
.is_err()
{
break;
}
}
Err(e) => {
let _ = refresh_tx
.send(OrchestratorEvent::Log(LogEntry::error(format!(
"Refresh failed: {}",
e
))))
.await;
}
}
let wt_refresh_tx = refresh_tx.clone();
let wt_refresh_repo_root = refresh_repo_root.clone();
tokio::spawn(async move {
match load_worktrees_with_conflict_check(&wt_refresh_repo_root).await {
Ok(worktrees) => {
let _ = wt_refresh_tx
.send(OrchestratorEvent::WorktreesRefreshed { worktrees })
.await;
}
Err(e) => {
debug!("Failed to refresh worktrees: {}", e);
}
}
});
log_deduplicator::maybe_log_summary();
}
}
}
});
let mut orchestrator_handle: Option<tokio::task::JoinHandle<Result<()>>> = None;
let mut orchestrator_cancel: Option<CancellationToken> = None;
let graceful_stop_flag = Arc::new(AtomicBool::new(false));
loop {
app.spinner_frame = (app.spinner_frame + 1) % SPINNER_CHARS.len();
terminal.draw(|frame| render(frame, &mut app))?;
if event::poll(Duration::from_millis(100))? {
match event::read()? {
Event::Key(key) if key.kind == KeyEventKind::Press => {
let mut key_ctx = KeyEventContext {
app: &mut app,
terminal,
repo_root: &repo_root,
config: &config,
worktree_base_dir: &worktree_base_dir,
tx: &tx,
cmd_tx: &cmd_tx,
ai_runner: &ai_runner,
graceful_stop_flag: &graceful_stop_flag,
orchestrator_cancel: &orchestrator_cancel,
orchestrator_handle: &orchestrator_handle,
};
match handle_key_event(key, &mut key_ctx).await {
Ok(Some(cmd)) => {
let _ = cmd_tx.send(cmd).await;
}
Ok(None) => {
}
Err(e) => {
app.add_log(LogEntry::error(format!("Key handling error: {}", e)));
}
}
if app.should_quit {
break;
}
}
Event::Mouse(mouse) => {
match mouse.kind {
MouseEventKind::ScrollUp => {
app.scroll_logs_up(3);
}
MouseEventKind::ScrollDown => {
app.scroll_logs_down(3);
}
_ => {}
}
}
_ => {}
}
}
while let Ok(event) = rx.try_recv() {
if should_apply_event_to_tui_reducer(&event) {
let display_map = {
let mut state = shared_state.write().await;
state.apply_execution_event(&event);
state.all_display_statuses()
};
app.apply_display_statuses_from_reducer(&display_map);
}
#[cfg(feature = "web-monitoring")]
if let Some(ref web_state) = web_state {
use crate::events::ExecutionEvent;
match &event {
ExecutionEvent::ChangesRefreshed { changes, .. } => {
web_state.update(changes).await;
}
ExecutionEvent::ProcessingStarted(_)
| ExecutionEvent::ProcessingError { .. }
| ExecutionEvent::ChangeArchived(_)
| ExecutionEvent::MergeCompleted { .. }
| ExecutionEvent::ResolveStarted { .. }
| ExecutionEvent::ResolveCompleted { .. }
| ExecutionEvent::ResolveFailed { .. }
| ExecutionEvent::MergeDeferred { .. }
| ExecutionEvent::WorkspaceStatusUpdated { .. }
| ExecutionEvent::RejectionReviewCompleted { .. }
| ExecutionEvent::RejectionReviewFailed { .. }
| ExecutionEvent::Stopping
| ExecutionEvent::Stopped
| ExecutionEvent::AllCompleted => {
web_state.apply_execution_event(&event).await;
}
_ => {
}
}
}
if let Some(cmd) = app.handle_orchestrator_event(event) {
let _ = cmd_tx.send(cmd).await;
}
}
while let Ok(cmd) = cmd_rx.try_recv() {
let mut cmd_ctx = TuiCommandContext {
app: &mut app,
repo_root: &repo_root,
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: remote_client_actions.clone(),
orchestrator_running: orchestrator_handle
.as_ref()
.is_some_and(|handle| !handle.is_finished()),
#[cfg(feature = "web-monitoring")]
web_state: &web_state,
};
match handle_tui_command(
cmd,
&mut cmd_ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut orchestrator_cancel,
)
.await
{
Ok(Some(handle)) => {
orchestrator_handle = Some(handle);
}
Ok(None) => {
}
Err(e) => {
app.add_log(LogEntry::error(format!("Command handling error: {}", e)));
}
}
}
if app.should_quit {
break;
}
}
cancel_token.cancel();
refresh_handle.abort();
let _ = shutdown_local_orchestrator_task(
orchestrator_handle,
orchestrator_cancel,
Duration::from_secs(5),
)
.await;
Ok(())
}
#[cfg(test)]
mod tests {
use super::{
is_refresh_root_usable, refresh_local_changes, should_apply_event_to_tui_reducer,
shutdown_local_orchestrator_task, LocalOrchestratorShutdownOutcome,
};
use crate::events::{ExecutionEvent, RejectionOutcome, StalledBlocker};
use crate::openspec::{Change, ProposalMetadata};
use crate::vcs::WorkspaceStatus;
use std::collections::{HashMap, HashSet};
use std::path::PathBuf;
use std::time::Duration;
use tokio_util::sync::CancellationToken;
fn sample_change() -> Change {
Change {
id: "change-a".to_string(),
completed_tasks: 0,
total_tasks: 1,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
fn empty_changes_refreshed_event() -> ExecutionEvent {
ExecutionEvent::ChangesRefreshed {
changes: vec![sample_change()],
rejected_changes: Vec::new(),
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::<String, PathBuf>::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
}
}
fn stalled_blocker() -> StalledBlocker {
StalledBlocker::acceptance_infrastructure("managed verification job still running")
}
#[test]
fn tui_reducer_sync_includes_running_lifecycle_display_events() {
let reducer_visible_events = vec![
ExecutionEvent::ProcessingStarted("change-a".to_string()),
ExecutionEvent::ProcessingCompleted("change-a".to_string()),
ExecutionEvent::ProcessingError {
id: "change-a".to_string(),
error: "boom".to_string(),
},
ExecutionEvent::ApplyStarted {
change_id: "change-a".to_string(),
command: "apply".to_string(),
},
ExecutionEvent::ApplyCompleted {
change_id: "change-a".to_string(),
revision: "rev-a".to_string(),
},
ExecutionEvent::ApplyFailed {
change_id: "change-a".to_string(),
error: "boom".to_string(),
},
ExecutionEvent::AcceptanceStarted {
change_id: "change-a".to_string(),
command: "accept".to_string(),
},
ExecutionEvent::AcceptanceCompleted {
change_id: "change-a".to_string(),
},
ExecutionEvent::AcceptanceFailed {
change_id: "change-a".to_string(),
error: "boom".to_string(),
},
ExecutionEvent::ArchiveStarted {
change_id: "change-a".to_string(),
command: "archive".to_string(),
},
ExecutionEvent::ArchiveResumed {
change_id: "change-a".to_string(),
reason: Some("resume".to_string()),
summary: Some("resume archive".to_string()),
},
ExecutionEvent::ArchiveRetryScheduled {
change_id: "change-a".to_string(),
attempt: 1,
max_attempts: 2,
reason: Some("retry".to_string()),
summary: Some("retry archive".to_string()),
},
ExecutionEvent::ChangeArchived("change-a".to_string()),
ExecutionEvent::ArchiveFailed {
change_id: "change-a".to_string(),
error: "boom".to_string(),
reason: Some("failed".to_string()),
summary: Some("archive failed".to_string()),
},
ExecutionEvent::MergeDeferred {
change_id: "change-a".to_string(),
reason: "dirty base".to_string(),
auto_resumable: true,
},
ExecutionEvent::MergeCompleted {
change_id: "change-a".to_string(),
revision: "rev-a".to_string(),
},
ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "resolve".to_string(),
},
ExecutionEvent::ResolveCompleted {
change_id: "change-a".to_string(),
worktree_change_ids: None,
},
ExecutionEvent::ResolveFailed {
change_id: "change-a".to_string(),
error: "boom".to_string(),
},
ExecutionEvent::WorkspaceStatusUpdated {
change_id: "change-a".to_string(),
workspace_name: "ws-a".to_string(),
status: WorkspaceStatus::Applying,
},
ExecutionEvent::RejectionReviewCompleted {
change_id: "change-a".to_string(),
outcome: RejectionOutcome::Resume,
},
ExecutionEvent::RejectionReviewFailed {
change_id: "change-a".to_string(),
error: "boom".to_string(),
},
ExecutionEvent::DependencyBlocked {
change_id: "change-a".to_string(),
dependency_ids: vec!["dep".to_string()],
},
ExecutionEvent::DependencyResolved {
change_id: "change-a".to_string(),
},
ExecutionEvent::AcceptanceGated {
change_id: "change-a".to_string(),
blocker: stalled_blocker(),
},
ExecutionEvent::ExecutionBlocked {
change_id: "change-a".to_string(),
blocker: stalled_blocker(),
},
ExecutionEvent::ChangeDequeued {
change_id: "change-a".to_string(),
},
ExecutionEvent::ChangeStopped {
change_id: "change-a".to_string(),
},
empty_changes_refreshed_event(),
];
for event in reducer_visible_events {
assert!(
should_apply_event_to_tui_reducer(&event),
"event should sync to TUI reducer before display snapshot: {event:?}"
);
}
}
#[test]
fn tui_reducer_sync_excludes_presentation_only_events() {
let presentation_events = vec![
ExecutionEvent::ApplyOutput {
change_id: "change-a".to_string(),
output: "chunk".to_string(),
iteration: Some(1),
},
ExecutionEvent::ProgressUpdated {
change_id: "change-a".to_string(),
completed: 1,
total: 2,
},
ExecutionEvent::Log(crate::events::LogEntry::info("hello")),
ExecutionEvent::WorktreesRefreshed { worktrees: vec![] },
ExecutionEvent::RemoteChangeUpdate {
id: "change-a".to_string(),
completed_tasks: 0,
total_tasks: 1,
status: Some("applying".to_string()),
iteration_number: Some(1),
},
];
for event in presentation_events {
assert!(
!should_apply_event_to_tui_reducer(&event),
"presentation-only event should not sync to TUI reducer: {event:?}"
);
}
}
#[test]
fn refresh_root_usable_for_existing_directory() {
let temp_dir = tempfile::tempdir().expect("tempdir should be created");
assert!(is_refresh_root_usable(temp_dir.path()));
}
#[test]
fn refresh_root_not_usable_for_missing_directory() {
let temp_dir = tempfile::tempdir().expect("tempdir should be created");
let missing_path = temp_dir.path().join("missing-root");
assert!(!is_refresh_root_usable(&missing_path));
}
#[test]
fn stale_refresh_root_sets_warned_once_and_resets_when_root_recovers() {
let temp_dir = tempfile::tempdir().expect("tempdir should be created");
let missing_path = temp_dir.path().join("missing-root");
let mut warned = false;
assert!(super::should_skip_local_refresh(&missing_path, &mut warned));
assert!(warned, "first stale root check should set warned flag");
assert!(super::should_skip_local_refresh(&missing_path, &mut warned));
assert!(warned, "second stale root check keeps warned flag set");
assert!(!super::should_skip_local_refresh(
temp_dir.path(),
&mut warned
));
assert!(!warned, "usable root should reset warned flag");
}
#[test]
fn local_refresh_not_skipped_for_existing_root() {
let temp_dir = tempfile::tempdir().expect("tempdir should be created");
let mut warned = false;
assert!(!super::should_skip_local_refresh(
temp_dir.path(),
&mut warned
));
assert!(
!warned,
"existing root should not trigger stale warning suppression"
);
}
#[test]
fn refresh_local_changes_uses_explicit_repo_root_for_active_and_rejected_rows() {
let _lock = crate::test_support::cwd_lock().lock().unwrap();
let repo_dir = tempfile::tempdir().expect("repo tempdir");
let other_dir = tempfile::tempdir().expect("cwd tempdir");
let changes_dir = repo_dir.path().join("openspec").join("changes");
let active_dir = changes_dir.join("change-active");
std::fs::create_dir_all(&active_dir).expect("active dir");
std::fs::write(active_dir.join("proposal.md"), "# proposal").expect("active proposal");
std::fs::write(active_dir.join("tasks.md"), "- [ ] task").expect("active tasks");
let rejected_dir = changes_dir.join("change-rejected");
std::fs::create_dir_all(&rejected_dir).expect("rejected dir");
std::fs::write(rejected_dir.join("proposal.md"), "# proposal").expect("rejected proposal");
std::fs::write(rejected_dir.join("tasks.md"), "- [ ] task").expect("rejected tasks");
std::fs::write(rejected_dir.join("REJECTED.md"), "# REJECTED").expect("rejected marker");
let original_dir = std::env::current_dir().expect("cwd");
std::env::set_current_dir(other_dir.path()).expect("set cwd elsewhere");
let (active, rejected) = refresh_local_changes(repo_dir.path()).expect("refresh succeeds");
std::env::set_current_dir(original_dir).expect("restore cwd");
assert_eq!(
active.iter().map(|c| c.id.as_str()).collect::<Vec<_>>(),
vec!["change-active"]
);
assert_eq!(
rejected.iter().map(|c| c.id.as_str()).collect::<Vec<_>>(),
vec!["change-rejected"]
);
}
#[test]
fn remote_mode_bypasses_local_refresh_path() {
assert!(super::should_bypass_local_refresh(true));
assert!(!super::should_bypass_local_refresh(false));
}
#[tokio::test]
async fn shutdown_local_orchestrator_cancels_and_aborts_non_finishing_task() {
let token = CancellationToken::new();
let task_token = token.clone();
let (cancelled_tx, cancelled_rx) = tokio::sync::oneshot::channel();
let (post_abort_tx, mut post_abort_rx) = tokio::sync::mpsc::channel::<()>(1);
let handle = tokio::spawn(async move {
task_token.cancelled().await;
let _ = cancelled_tx.send(());
tokio::time::sleep(Duration::from_secs(60)).await;
loop {
let _ = post_abort_tx.send(()).await;
tokio::time::sleep(Duration::from_millis(20)).await;
}
#[allow(unreachable_code)]
Ok(())
});
let outcome =
shutdown_local_orchestrator_task(Some(handle), Some(token), Duration::from_millis(10))
.await;
assert_eq!(
outcome,
LocalOrchestratorShutdownOutcome::AbortedAfterTimeout
);
assert!(
cancelled_rx.await.is_ok(),
"shutdown should cancel orchestrator token"
);
tokio::task::yield_now().await;
let post_cleanup_event =
tokio::time::timeout(Duration::from_millis(80), post_abort_rx.recv()).await;
assert!(
!matches!(post_cleanup_event, Ok(Some(()))),
"aborted local orchestrator must not keep sending events after cleanup"
);
}
#[tokio::test]
async fn shutdown_local_orchestrator_without_handle_is_remote_client_safe_noop() {
let outcome = shutdown_local_orchestrator_task(None, None, Duration::from_millis(1)).await;
assert_eq!(outcome, LocalOrchestratorShutdownOutcome::NoTask);
}
}