use crate::config::OrchestratorConfig;
use crate::error::Result;
use crate::orchestration::state::ReducerCommand;
use crate::tui::events::{LogEntry, OrchestratorEvent, TuiCommand};
use crate::tui::orchestrator::{run_orchestrator, run_orchestrator_parallel};
use crate::tui::queue::DynamicQueue;
use crate::tui::state::AppState;
use crate::tui::types::{AppMode, StopMode};
use std::path::Path;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::mpsc;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use super::worktrees::load_worktrees_with_conflict_check;
pub struct TuiCommandContext<'a> {
pub app: &'a mut AppState,
pub repo_root: &'a Path,
pub config: &'a OrchestratorConfig,
pub tx: &'a mpsc::Sender<OrchestratorEvent>,
pub dynamic_queue: &'a DynamicQueue,
pub remote_client: Option<crate::remote::RemoteClient>,
pub orchestrator_running: bool,
#[cfg(feature = "web-monitoring")]
pub web_state: &'a Option<Arc<crate::web::WebState>>,
}
pub async fn handle_start_processing_command(
ids: Vec<String>,
ctx: &mut TuiCommandContext<'_>,
graceful_stop_flag: &Arc<std::sync::atomic::AtomicBool>,
shared_state: &Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
manual_resolve_counter: &Arc<AtomicUsize>,
orchestrator_cancel: &mut Option<CancellationToken>,
) -> Option<tokio::task::JoinHandle<Result<()>>> {
let cmd = if ids.is_empty() {
if ctx.app.mode == AppMode::Error {
ctx.app.retry_error_changes()
} else if ctx.app.mode == AppMode::Stopped {
ctx.app.resume_processing()
} else {
ctx.app.start_processing()
}
} else {
Some(TuiCommand::StartProcessing(ids.clone()))
};
if let Some(TuiCommand::StartProcessing(selected_ids)) = cmd {
if !selected_ids.is_empty() {
if let Some(remote) = ctx.remote_client.as_ref() {
let mut by_project: std::collections::BTreeMap<String, Vec<String>> =
std::collections::BTreeMap::new();
for id in &selected_ids {
let Some((project_id, rest)) = id.split_once("::") else {
continue;
};
let change_id = rest.rsplit('/').next().unwrap_or(rest).to_string();
by_project
.entry(project_id.to_string())
.or_default()
.push(change_id);
}
for (project_id, change_ids) in by_project {
if let Err(e) = remote.control_run(&project_id, Some(change_ids)).await {
ctx.app.add_log(LogEntry::error(format!(
"Remote run failed for {}: {}",
project_id, e
)));
} else {
ctx.app.add_log(LogEntry::success(format!(
"Remote run started: {}",
project_id
)));
}
}
return None;
}
graceful_stop_flag.store(false, Ordering::SeqCst);
let orch_tx = ctx.tx.clone();
let orch_config = ctx.config.clone();
let orch_cancel = CancellationToken::new();
let orch_dynamic_queue = ctx.dynamic_queue.clone();
let orch_graceful_stop = graceful_stop_flag.clone();
let orch_shared_state = shared_state.clone();
let orch_manual_resolve = manual_resolve_counter.clone();
*orchestrator_cancel = Some(orch_cancel.clone());
let use_parallel = ctx.app.parallel_mode;
#[cfg(feature = "web-monitoring")]
let orch_web_state = ctx.web_state.clone();
return Some(tokio::spawn(async move {
#[cfg(feature = "web-monitoring")]
let result = if use_parallel {
run_orchestrator_parallel(
selected_ids,
orch_config,
orch_tx.clone(),
orch_cancel,
orch_dynamic_queue,
orch_graceful_stop,
orch_shared_state,
orch_manual_resolve.clone(),
orch_web_state,
)
.await
} else {
run_orchestrator(
selected_ids,
orch_config,
orch_tx.clone(),
orch_cancel,
orch_dynamic_queue,
orch_graceful_stop,
orch_shared_state,
orch_web_state,
)
.await
};
#[cfg(not(feature = "web-monitoring"))]
let result = if use_parallel {
run_orchestrator_parallel(
selected_ids,
orch_config,
orch_tx.clone(),
orch_cancel,
orch_dynamic_queue,
orch_graceful_stop,
orch_shared_state,
orch_manual_resolve,
)
.await
} else {
run_orchestrator(
selected_ids,
orch_config,
orch_tx.clone(),
orch_cancel,
orch_dynamic_queue,
orch_graceful_stop,
orch_shared_state,
)
.await
};
result
}));
}
}
None
}
#[allow(clippy::too_many_arguments)]
pub async fn handle_tui_command(
cmd: TuiCommand,
ctx: &mut TuiCommandContext<'_>,
graceful_stop_flag: &Arc<std::sync::atomic::AtomicBool>,
shared_state: &Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>,
manual_resolve_counter: &Arc<AtomicUsize>,
orchestrator_cancel: &mut Option<CancellationToken>,
) -> Result<Option<tokio::task::JoinHandle<Result<()>>>> {
match cmd {
TuiCommand::StartProcessing(ids) => {
let handle = handle_start_processing_command(
ids,
ctx,
graceful_stop_flag,
shared_state,
manual_resolve_counter,
orchestrator_cancel,
)
.await;
return Ok(handle);
}
TuiCommand::AddToQueue(id) => {
shared_state
.write()
.await
.apply_command(ReducerCommand::AddToQueue(id.clone()));
if ctx.dynamic_queue.push(id.clone()).await {
ctx.app
.add_log(LogEntry::info(format!("Added to dynamic queue: {}", id)));
} else {
ctx.app
.add_log(LogEntry::warn(format!("Already in dynamic queue: {}", id)));
}
}
TuiCommand::RemoveFromQueue(id) => {
shared_state
.write()
.await
.apply_command(ReducerCommand::RemoveFromQueue(id.clone()));
let removed_from_dynamic = ctx.dynamic_queue.remove(&id).await;
let removed_from_pending = ctx.dynamic_queue.mark_removed(id.clone()).await;
let mut details = Vec::new();
if removed_from_dynamic {
details.push("also removed from dynamic queue");
}
if removed_from_pending {
details.push("removed from pending");
}
let suffix = if details.is_empty() {
String::new()
} else {
format!(" ({})", details.join(", "))
};
ctx.app.add_log(LogEntry::info(format!(
"Removed from queue: {}{}",
id, suffix
)));
}
TuiCommand::DeleteWorktreeByPath(path, branch_name, skip_teardown) => {
match crate::vcs::git::commands::worktree_remove_with_options(
ctx.repo_root,
path.to_string_lossy().as_ref(),
crate::vcs::git::commands::WorktreeRemoveOptions { skip_teardown },
)
.await
{
Ok(_) => {
info!("Worktree deleted successfully: {}", path.display());
ctx.app.add_log(LogEntry::success(format!(
"Deleted worktree: {}",
path.display()
)));
if let Some(branch) = branch_name {
match crate::vcs::git::commands::branch_delete(ctx.repo_root, &branch).await
{
Ok(_) => {
info!("Branch deleted after worktree removal: {}", branch);
ctx.app.add_log(LogEntry::success(format!(
"Deleted branch: {}",
branch
)));
}
Err(e) => {
warn!(
"Branch deletion failed for '{}' after worktree removal: {} (non-fatal)",
branch, e
);
ctx.app.add_log(LogEntry::warn(format!(
"Failed to delete branch '{}': {}",
branch, e
)));
}
}
}
match load_worktrees_with_conflict_check(ctx.repo_root).await {
Ok(worktrees) => {
let _ = ctx
.tx
.send(OrchestratorEvent::WorktreesRefreshed { worktrees })
.await;
}
Err(e) => {
ctx.app.add_log(LogEntry::error(format!(
"Failed to refresh worktrees: {}",
e
)));
}
}
}
Err(e) => {
ctx.app.show_warning_popup(
"Worktree delete failed",
format!("Failed to delete worktree '{}': {}", path.display(), e),
);
ctx.app.add_log(LogEntry::error(format!(
"Worktree delete failed for '{}': {}",
path.display(),
e
)));
}
}
}
TuiCommand::Stop => {
if ctx.app.mode == AppMode::Running {
ctx.app.stop_mode = StopMode::GracefulPending;
graceful_stop_flag.store(true, Ordering::SeqCst);
ctx.app.mode = AppMode::Stopping;
ctx.app
.add_log(LogEntry::warn("Stopping after current change completes..."));
ctx.app
.handle_orchestrator_event(OrchestratorEvent::Stopping);
#[cfg(feature = "web-monitoring")]
if let Some(ref web_state) = ctx.web_state {
web_state
.apply_execution_event(&OrchestratorEvent::Stopping)
.await;
}
} else {
ctx.app.add_log(LogEntry::warn(format!(
"Cannot stop: not running (current mode: {:?})",
ctx.app.mode
)));
}
}
TuiCommand::CancelStop => {
if ctx.app.mode == AppMode::Stopping {
graceful_stop_flag.store(false, Ordering::SeqCst);
ctx.app.stop_mode = StopMode::None;
ctx.app.mode = AppMode::Running;
ctx.app
.add_log(LogEntry::info("Stop canceled, continuing..."));
#[cfg(feature = "web-monitoring")]
if let Some(ref web_state) = ctx.web_state {
web_state
.apply_execution_event(&OrchestratorEvent::ProcessingStarted(
"".to_string(),
))
.await;
}
} else {
ctx.app.add_log(LogEntry::warn(format!(
"Cannot cancel stop: not stopping (current mode: {:?})",
ctx.app.mode
)));
}
}
TuiCommand::ForceStop => {
if matches!(ctx.app.mode, AppMode::Running | AppMode::Stopping) {
ctx.app.stop_mode = StopMode::ForceStopped;
if let Some(cancel) = orchestrator_cancel {
cancel.cancel();
}
ctx.app
.handle_orchestrator_event(OrchestratorEvent::Stopped);
ctx.app.current_change = None;
ctx.app.add_log(LogEntry::warn("Force stopped"));
#[cfg(feature = "web-monitoring")]
if let Some(ref web_state) = ctx.web_state {
use crate::events::ExecutionEvent;
web_state
.apply_execution_event(&ExecutionEvent::Stopped)
.await;
}
} else {
ctx.app.add_log(LogEntry::warn(format!(
"Cannot force stop: not running or stopping (current mode: {:?})",
ctx.app.mode
)));
}
}
TuiCommand::Retry => {
if ctx.app.mode == AppMode::Error {
if let Some(TuiCommand::StartProcessing(ids)) = ctx.app.retry_error_changes() {
let handle = handle_start_processing_command(
ids,
ctx,
graceful_stop_flag,
shared_state,
manual_resolve_counter,
orchestrator_cancel,
)
.await;
return Ok(handle);
}
} else {
ctx.app.add_log(LogEntry::warn(format!(
"Cannot retry: not in error mode (current mode: {:?})",
ctx.app.mode
)));
}
}
TuiCommand::MergeWorktreeBranch {
worktree_path,
branch_name,
} => {
debug!(
"Processing TuiCommand::MergeWorktreeBranch: worktree_path={}, branch_name={}",
worktree_path.display(),
branch_name
);
let merge_tx = ctx.tx.clone();
let merge_repo_root = ctx.repo_root.to_path_buf();
let merge_branch = branch_name.clone();
let merge_config = ctx.config.clone();
let merge_worktree_path = worktree_path.clone();
tokio::spawn(async move {
debug!(
"Sending BranchMergeStarted event for branch: {}",
merge_branch
);
let _ = merge_tx
.send(OrchestratorEvent::BranchMergeStarted {
branch_name: merge_branch.clone(),
})
.await;
debug!(
"Executing merge in base repository: repo_root={}, branch={}",
merge_repo_root.display(),
merge_branch
);
match crate::vcs::git::commands::merge_branch(&merge_repo_root, &merge_branch).await
{
Ok(_) => {
debug!("Merge succeeded for branch: {}", merge_branch);
if let Some(change_id) =
crate::vcs::GitWorkspaceManager::extract_change_id_from_worktree_name(
&merge_branch,
)
{
let hooks_config = merge_config.get_hooks();
let merge_repo_root = std::env::current_dir()
.unwrap_or_else(|_| std::path::PathBuf::from("."));
let hooks = crate::hooks::HookRunner::with_event_tx(
hooks_config,
merge_repo_root,
merge_tx.clone(),
);
let (completed_tasks, total_tasks) =
match crate::openspec::list_changes_native() {
Ok(changes) => changes
.iter()
.find(|c| c.id == change_id)
.map(|c| (c.completed_tasks, c.total_tasks))
.unwrap_or((0, 0)),
Err(e) => {
warn!(
"Failed to fetch task counts for on_merged hook: {}",
e
);
(0, 0)
}
};
let hook_context = crate::hooks::HookContext::new(
0, 0, 0, false,
)
.with_change(&change_id, completed_tasks, total_tasks)
.with_apply_count(0)
.with_parallel_context(&merge_worktree_path.to_string_lossy(), None);
if let Err(e) = hooks
.run_hook(crate::hooks::HookType::OnMerged, &hook_context)
.await
{
let message = format!(
"on_merged hook failed for '{}'; branch merged transition blocked: {}",
change_id, e
);
warn!("{}", message);
let _ = merge_tx
.send(OrchestratorEvent::HookFailed {
change_id: change_id.clone(),
hook_type: crate::hooks::HookType::OnMerged.to_string(),
error: e.to_string(),
})
.await;
let _ = merge_tx
.send(OrchestratorEvent::BranchMergeFailed {
branch_name: merge_branch.clone(),
error: message,
})
.await;
return;
}
} else {
warn!(
"Could not extract change_id from branch name '{}', skipping on_merged hook",
merge_branch
);
}
let _ = merge_tx
.send(OrchestratorEvent::BranchMergeCompleted {
branch_name: merge_branch.clone(),
})
.await;
debug!("Refreshing worktree list after successful merge");
match load_worktrees_with_conflict_check(&merge_repo_root).await {
Ok(worktrees) => {
debug!("Worktree list refreshed: {} worktrees", worktrees.len());
let _ = merge_tx
.send(OrchestratorEvent::WorktreesRefreshed { worktrees })
.await;
}
Err(e) => {
debug!("Failed to refresh worktrees: {}", e);
let _ = merge_tx
.send(OrchestratorEvent::Log(LogEntry::error(format!(
"Failed to refresh worktrees: {}",
e
))))
.await;
}
}
}
Err(e) => {
debug!("Merge failed for branch {}: {}", merge_branch, e);
let _ = merge_tx
.send(OrchestratorEvent::BranchMergeFailed {
branch_name: merge_branch,
error: format!("{}", e),
})
.await;
}
}
});
}
TuiCommand::DequeueChange(id) => {
shared_state
.write()
.await
.apply_command(ReducerCommand::DequeueChange(id.clone()));
let had_token = ctx.dynamic_queue.force_kill(&id).await;
if had_token {
ctx.app.add_log(LogEntry::info(format!(
"Force-kill issued for active change: {}",
id
)));
} else {
ctx.app.add_log(LogEntry::info(format!(
"Stop-and-dequeue request received for: {}",
id
)));
}
}
TuiCommand::ResolveMerge(id) => {
shared_state
.write()
.await
.apply_command(ReducerCommand::ResolveMerge(id.clone()));
if ctx.orchestrator_running {
ctx.dynamic_queue.notify_scheduler();
ctx.app.add_log(LogEntry::info(format!(
"Scheduled merge-wait retry intent for '{}'; notified existing scheduler",
id
)));
} else {
graceful_stop_flag.store(false, Ordering::SeqCst);
let orch_tx = ctx.tx.clone();
let orch_config = ctx.config.clone();
let orch_cancel = CancellationToken::new();
let orch_dynamic_queue = ctx.dynamic_queue.clone();
let orch_graceful_stop = graceful_stop_flag.clone();
let orch_shared_state = shared_state.clone();
let orch_manual_resolve = manual_resolve_counter.clone();
*orchestrator_cancel = Some(orch_cancel.clone());
#[cfg(feature = "web-monitoring")]
let orch_web_state = ctx.web_state.clone();
let handle = tokio::spawn(async move {
#[cfg(feature = "web-monitoring")]
let result = run_orchestrator_parallel(
Vec::new(),
orch_config,
orch_tx,
orch_cancel,
orch_dynamic_queue,
orch_graceful_stop,
orch_shared_state,
orch_manual_resolve,
orch_web_state,
)
.await;
#[cfg(not(feature = "web-monitoring"))]
let result = run_orchestrator_parallel(
Vec::new(),
orch_config,
orch_tx,
orch_cancel,
orch_dynamic_queue,
orch_graceful_stop,
orch_shared_state,
orch_manual_resolve,
)
.await;
result
});
ctx.app.add_log(LogEntry::info(format!(
"Scheduled merge-wait retry intent for '{}'; started scheduler for manual resolve",
id
)));
return Ok(Some(handle));
}
}
}
Ok(None)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::openspec::{Change, ProposalMetadata};
use crate::orchestration::state::OrchestratorState;
use std::path::Path;
use std::sync::atomic::{AtomicBool, AtomicUsize};
use tokio::sync::RwLock;
fn create_test_change(id: &str) -> Change {
Change {
id: id.to_string(),
completed_tasks: 0,
total_tasks: 1,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
fn create_test_config() -> OrchestratorConfig {
OrchestratorConfig::default()
}
#[tokio::test]
async fn test_resolve_merge_starts_parallel_scheduler_when_idle() {
let (tx, _rx) = mpsc::channel(16);
let dynamic_queue = DynamicQueue::new();
let mut app = AppState::new(vec![create_test_change("change-a")]);
let config = create_test_config();
let shared_state = Arc::new(RwLock::new(OrchestratorState::with_mode(
vec!["change-a".to_string()],
10,
crate::orchestration::state::ExecutionMode::Parallel,
)));
{
let mut guard = shared_state.write().await;
guard.apply_observation(
"change-a",
crate::orchestration::state::WorkspaceObservation::WorkspaceArchived,
);
}
let graceful_stop_flag = Arc::new(AtomicBool::new(false));
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
let mut orchestrator_cancel: Option<CancellationToken> = None;
let mut ctx = TuiCommandContext {
app: &mut app,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: false,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let handle = handle_tui_command(
TuiCommand::ResolveMerge("change-a".to_string()),
&mut ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut orchestrator_cancel,
)
.await
.expect("resolve merge command should succeed");
assert!(
handle.is_some(),
"idle scheduler must spawn a new orchestrator task"
);
assert!(
orchestrator_cancel.is_some(),
"spawned scheduler must install cancellation token"
);
assert!(
ctx.app.logs.iter().any(|entry| entry
.message
.contains("started scheduler for manual resolve")),
"log must report scheduler startup"
);
{
let state = shared_state.read().await;
assert_eq!(
state.display_status("change-a"),
"resolve pending",
"ResolveMerge reducer intent must move change to resolve pending"
);
assert_eq!(
state.resolve_wait_change_ids(),
vec!["change-a".to_string()],
"idle handoff must leave reducer-owned ResolveWait visible to scheduler startup"
);
}
if let Some(join) = handle {
join.abort();
}
}
#[tokio::test]
async fn test_resolve_merge_notifies_live_scheduler_without_duplicate_spawn() {
let (tx, _rx) = mpsc::channel(16);
let dynamic_queue = DynamicQueue::new();
let mut app = AppState::new(vec![create_test_change("change-a")]);
let config = create_test_config();
let shared_state = Arc::new(RwLock::new(OrchestratorState::new(
vec!["change-a".to_string()],
10,
)));
let graceful_stop_flag = Arc::new(AtomicBool::new(false));
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
let mut orchestrator_cancel: Option<CancellationToken> = None;
let mut ctx = TuiCommandContext {
app: &mut app,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: true,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let handle = handle_tui_command(
TuiCommand::ResolveMerge("change-a".to_string()),
&mut ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut orchestrator_cancel,
)
.await
.expect("resolve merge command should succeed");
assert!(
handle.is_none(),
"live scheduler path must not spawn a duplicate orchestrator task"
);
assert!(
orchestrator_cancel.is_none(),
"live scheduler notification path must not replace cancel token"
);
assert!(
ctx.app
.logs
.iter()
.any(|entry| entry.message.contains("notified existing scheduler")),
"log must report scheduler notification"
);
}
#[tokio::test]
async fn test_resolve_merge_logs_scheduler_start_or_notify_truthfully() {
let (tx, _rx) = mpsc::channel(16);
let dynamic_queue = DynamicQueue::new();
let config = create_test_config();
let shared_state = Arc::new(RwLock::new(OrchestratorState::new(
vec!["change-a".to_string()],
10,
)));
let graceful_stop_flag = Arc::new(AtomicBool::new(false));
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
let mut app_idle = AppState::new(vec![create_test_change("change-a")]);
let mut cancel_idle: Option<CancellationToken> = None;
let mut idle_ctx = TuiCommandContext {
app: &mut app_idle,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: false,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let idle_handle = handle_tui_command(
TuiCommand::ResolveMerge("change-a".to_string()),
&mut idle_ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut cancel_idle,
)
.await
.expect("idle resolve merge should succeed");
assert!(idle_ctx.app.logs.iter().any(|entry| entry
.message
.contains("started scheduler for manual resolve")));
if let Some(join) = idle_handle {
join.abort();
}
let mut app_live = AppState::new(vec![create_test_change("change-a")]);
let mut cancel_live: Option<CancellationToken> = None;
let mut live_ctx = TuiCommandContext {
app: &mut app_live,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: true,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let live_handle = handle_tui_command(
TuiCommand::ResolveMerge("change-a".to_string()),
&mut live_ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut cancel_live,
)
.await
.expect("live resolve merge should succeed");
assert!(live_handle.is_none());
assert!(live_ctx
.app
.logs
.iter()
.any(|entry| entry.message.contains("notified existing scheduler")));
}
#[tokio::test]
async fn test_add_to_queue_updates_reducer_intent_even_if_dynamic_queue_already_contains_id() {
use crate::orchestration::state::ReducerCommand;
let (tx, _rx) = mpsc::channel(16);
let dynamic_queue = DynamicQueue::new();
let mut app = AppState::new(vec![create_test_change("change-a")]);
let config = create_test_config();
let shared_state = Arc::new(RwLock::new(OrchestratorState::new(
vec!["change-a".to_string()],
10,
)));
let graceful_stop_flag = Arc::new(AtomicBool::new(false));
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
let mut orchestrator_cancel: Option<CancellationToken> = None;
dynamic_queue.push("change-a".to_string()).await;
{
let mut guard = shared_state.write().await;
guard.apply_command(ReducerCommand::RemoveFromQueue("change-a".to_string()));
assert_eq!(guard.display_status("change-a"), "not queued");
}
let mut ctx = TuiCommandContext {
app: &mut app,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: true,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let handle = handle_tui_command(
TuiCommand::AddToQueue("change-a".to_string()),
&mut ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut orchestrator_cancel,
)
.await
.expect("add-to-queue command should succeed");
assert!(
handle.is_none(),
"queue command should not spawn orchestrator"
);
assert_eq!(
shared_state.read().await.display_status("change-a"),
"queued",
"reducer queue intent must be queued even when dynamic queue push is duplicate"
);
assert!(ctx
.app
.logs
.iter()
.any(|entry| entry.message.contains("Already in dynamic queue: change-a")));
}
#[tokio::test]
async fn test_resolve_merge_scheduler_liveness_none_finished_live() {
let (tx, _rx) = mpsc::channel(16);
let dynamic_queue = DynamicQueue::new();
let config = create_test_config();
let shared_state = Arc::new(RwLock::new(OrchestratorState::new(
vec!["change-a".to_string()],
10,
)));
let graceful_stop_flag = Arc::new(AtomicBool::new(false));
let manual_resolve_counter = Arc::new(AtomicUsize::new(0));
for running in [false, false] {
let mut app = AppState::new(vec![create_test_change("change-a")]);
let mut orchestrator_cancel: Option<CancellationToken> = None;
let mut ctx = TuiCommandContext {
app: &mut app,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: running,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let handle = handle_tui_command(
TuiCommand::ResolveMerge("change-a".to_string()),
&mut ctx,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut orchestrator_cancel,
)
.await
.expect("resolve merge should succeed");
assert!(
handle.is_some(),
"non-live scheduler state must spawn scheduler"
);
if let Some(join) = handle {
join.abort();
}
}
let mut app_live = AppState::new(vec![create_test_change("change-a")]);
let mut orchestrator_cancel_live: Option<CancellationToken> = None;
let mut ctx_live = TuiCommandContext {
app: &mut app_live,
repo_root: Path::new("."),
config: &config,
tx: &tx,
dynamic_queue: &dynamic_queue,
remote_client: None,
orchestrator_running: true,
#[cfg(feature = "web-monitoring")]
web_state: &None,
};
let handle_live = handle_tui_command(
TuiCommand::ResolveMerge("change-a".to_string()),
&mut ctx_live,
&graceful_stop_flag,
&shared_state,
&manual_resolve_counter,
&mut orchestrator_cancel_live,
)
.await
.expect("resolve merge should succeed");
assert!(
handle_live.is_none(),
"live scheduler state must not spawn scheduler"
);
}
}