use std::collections::{HashMap, HashSet};
use crate::error_history::{CircuitBreakerConfig, ErrorHistory};
#[derive(Debug, Clone, Copy, PartialEq, Default)]
pub enum ExecutionMode {
#[default]
Serial,
Parallel,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum QueueIntent {
#[default]
NotQueued,
Queued,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum ActivityState {
#[default]
Idle,
Applying,
Accepting,
Rejecting,
Archiving,
Resolving,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum WaitState {
#[default]
None,
MergeWait,
ResolveWait,
DependencyBlocked,
Stalled,
AcceptanceGated,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub struct BlockedMetadata {
pub blocker_reason: Option<String>,
pub unblock_metadata: Option<String>,
pub worktree_snapshot: Option<String>,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum TerminalState {
#[default]
None,
Archived,
Merged,
Rejected(String),
Error(String),
Stopped,
}
#[derive(Debug, Clone, PartialEq, Default)]
pub enum WorkspaceObservation {
#[default]
None,
WorkspaceArchived,
WorktreeNotAhead,
}
#[derive(Debug, Clone, Default)]
pub struct ChangeRuntimeState {
pub queue_intent: QueueIntent,
pub activity: ActivityState,
pub wait_state: WaitState,
pub blocked_metadata: BlockedMetadata,
pub terminal: TerminalState,
pub observation: WorkspaceObservation,
pub dequeued: bool,
}
impl ChangeRuntimeState {
fn clear_blocked_metadata(&mut self) {
self.blocked_metadata = BlockedMetadata::default();
}
fn set_blocked_metadata(
&mut self,
blocker_reason: impl Into<String>,
unblock_metadata: impl Into<String>,
worktree_snapshot: impl Into<String>,
) {
self.blocked_metadata = BlockedMetadata {
blocker_reason: Some(blocker_reason.into()),
unblock_metadata: Some(unblock_metadata.into()),
worktree_snapshot: Some(worktree_snapshot.into()),
};
}
pub fn is_active(&self) -> bool {
!matches!(self.activity, ActivityState::Idle)
}
pub fn is_terminal(&self) -> bool {
!matches!(self.terminal, TerminalState::None)
}
#[allow(dead_code)]
pub fn invariants_hold(&self) -> bool {
if self.is_terminal() && self.is_active() {
return false;
}
if matches!(self.wait_state, WaitState::ResolveWait)
&& matches!(self.activity, ActivityState::Resolving)
{
return false;
}
true
}
pub fn display_status(&self) -> &'static str {
match &self.terminal {
TerminalState::Archived => return "archived",
TerminalState::Merged => return "merged",
TerminalState::Rejected(_) => return "rejected",
TerminalState::Error(_) => return "error",
TerminalState::Stopped => return "stopped",
TerminalState::None => {}
}
match self.activity {
ActivityState::Applying => return "applying",
ActivityState::Accepting => return "accepting",
ActivityState::Rejecting => return "rejecting",
ActivityState::Archiving => return "archiving",
ActivityState::Resolving => return "resolving",
ActivityState::Idle => {}
}
match self.wait_state {
WaitState::MergeWait => return "merge wait",
WaitState::ResolveWait => return "resolve pending",
WaitState::DependencyBlocked => return "blocked",
WaitState::Stalled => return "stalled",
WaitState::AcceptanceGated => return "gated",
WaitState::None => {}
}
match self.queue_intent {
QueueIntent::Queued => "queued",
QueueIntent::NotQueued => "not queued",
}
}
#[allow(dead_code)]
pub fn display_color(&self) -> ratatui::style::Color {
match self.display_status() {
"not queued" => ratatui::style::Color::DarkGray,
"queued" => ratatui::style::Color::Yellow,
"blocked" => ratatui::style::Color::Gray,
"stalled" => ratatui::style::Color::LightYellow,
"gated" => ratatui::style::Color::LightRed,
"applying" => ratatui::style::Color::Cyan,
"accepting" => ratatui::style::Color::LightGreen,
"rejecting" => ratatui::style::Color::LightYellow,
"archiving" => ratatui::style::Color::Magenta,
"archived" => ratatui::style::Color::Blue,
"merged" => ratatui::style::Color::LightBlue,
"rejected" => ratatui::style::Color::LightRed,
"merge wait" => ratatui::style::Color::LightMagenta,
"resolving" => ratatui::style::Color::LightCyan,
"resolve pending" => ratatui::style::Color::Magenta,
"error" => ratatui::style::Color::Red,
"stopped" => ratatui::style::Color::DarkGray,
_ => ratatui::style::Color::DarkGray,
}
}
#[allow(dead_code)]
pub fn error_message(&self) -> Option<&str> {
match &self.terminal {
TerminalState::Error(message) => Some(message.as_str()),
_ => None,
}
}
}
#[derive(Debug, Clone)]
pub enum ReducerCommand {
AddToQueue(String),
RemoveFromQueue(String),
ResolveMerge(String),
DequeueChange(String),
#[allow(dead_code)]
StopChange(String),
}
#[derive(Debug, Clone)]
#[allow(dead_code)]
pub enum ReduceOutcome {
Changed(ReducerEffect),
NoOp,
}
#[derive(Debug, Clone)]
#[allow(clippy::enum_variant_names, dead_code)]
pub enum ReducerEffect {
QueueIntentSet {
change_id: String,
intent: QueueIntent,
},
WaitStateSet { change_id: String, wait: WaitState },
TerminalStateSet {
change_id: String,
terminal: TerminalState,
},
}
#[derive(Debug, Clone)]
pub struct OrchestratorState {
initial_change_ids: HashSet<String>,
pending_changes: HashSet<String>,
archived_changes: HashSet<String>,
apply_counts: HashMap<String, u32>,
stalled_change_ids: HashSet<String>,
skipped_change_ids: HashSet<String>,
task_progress: HashMap<String, (u32, u32)>,
error_histories: HashMap<String, ErrorHistory>,
changes_processed: usize,
total_changes: usize,
max_iterations: u32,
iteration: u32,
current_change_id: Option<String>,
change_runtime: HashMap<String, ChangeRuntimeState>,
resolve_wait_queue: Vec<String>,
execution_mode: ExecutionMode,
}
#[allow(dead_code)] impl OrchestratorState {
pub fn new(change_ids: Vec<String>, max_iterations: u32) -> Self {
Self::with_mode(change_ids, max_iterations, ExecutionMode::Serial)
}
pub fn with_mode(
change_ids: Vec<String>,
max_iterations: u32,
execution_mode: ExecutionMode,
) -> Self {
let initial_set: HashSet<String> = change_ids.iter().cloned().collect();
let pending_set = initial_set.clone();
let total = change_ids.len();
let change_runtime: HashMap<String, ChangeRuntimeState> = change_ids
.iter()
.map(|id| (id.clone(), ChangeRuntimeState::default()))
.collect();
Self {
initial_change_ids: initial_set,
pending_changes: pending_set,
archived_changes: HashSet::new(),
apply_counts: HashMap::new(),
stalled_change_ids: HashSet::new(),
skipped_change_ids: HashSet::new(),
task_progress: HashMap::new(),
error_histories: HashMap::new(),
changes_processed: 0,
total_changes: total,
max_iterations,
iteration: 0,
current_change_id: None,
change_runtime,
resolve_wait_queue: Vec::new(),
execution_mode,
}
}
pub fn initial_change_ids(&self) -> &HashSet<String> {
&self.initial_change_ids
}
pub fn pending_changes(&self) -> &HashSet<String> {
&self.pending_changes
}
pub fn archived_changes(&self) -> &HashSet<String> {
&self.archived_changes
}
pub fn stalled_change_ids(&self) -> &HashSet<String> {
&self.stalled_change_ids
}
pub fn skipped_change_ids(&self) -> &HashSet<String> {
&self.skipped_change_ids
}
pub fn mark_stalled(&mut self, change_id: String) {
self.stalled_change_ids.insert(change_id);
}
pub fn mark_skipped(&mut self, change_id: String) -> bool {
self.skipped_change_ids.insert(change_id)
}
pub fn clear_stalled_change(&mut self, change_id: &str) {
self.skipped_change_ids.remove(change_id);
self.apply_counts.remove(change_id);
if self.current_change_id.as_deref() == Some(change_id) {
self.current_change_id = None;
}
}
pub fn changes_processed(&self) -> usize {
self.changes_processed
}
pub fn total_changes(&self) -> usize {
self.total_changes
}
pub fn remaining_changes(&self) -> usize {
self.pending_changes.len()
}
pub fn iteration(&self) -> u32 {
self.iteration
}
pub fn max_iterations(&self) -> u32 {
self.max_iterations
}
pub fn current_change_id(&self) -> Option<&String> {
self.current_change_id.as_ref()
}
pub fn apply_count(&self, change_id: &str) -> u32 {
*self.apply_counts.get(change_id).unwrap_or(&0)
}
pub fn task_progress(&self, change_id: &str) -> (u32, u32) {
*self.task_progress.get(change_id).unwrap_or(&(0, 0))
}
pub fn set_task_progress(&mut self, change_id: String, completed: u32, total: u32) {
self.task_progress.insert(change_id, (completed, total));
}
pub fn is_in_snapshot(&self, change_id: &str) -> bool {
self.initial_change_ids.contains(change_id)
}
pub fn is_pending(&self, change_id: &str) -> bool {
self.pending_changes.contains(change_id)
}
pub fn is_archived(&self, change_id: &str) -> bool {
self.archived_changes.contains(change_id)
}
pub fn is_complete(&self) -> bool {
self.pending_changes.is_empty()
}
pub fn is_iteration_limit_reached(&self) -> bool {
self.max_iterations > 0 && self.iteration >= self.max_iterations
}
pub fn is_approaching_iteration_limit(&self) -> bool {
if self.max_iterations == 0 {
return false;
}
let threshold = (self.max_iterations as f32 * 0.8) as u32;
self.iteration == threshold
}
pub fn increment_iteration(&mut self) {
self.iteration += 1;
}
pub fn set_current_change(&mut self, change_id: Option<String>) {
self.current_change_id = change_id;
}
pub fn increment_apply_count(&mut self, change_id: &str) -> u32 {
let count = self.apply_counts.entry(change_id.to_string()).or_insert(0);
*count += 1;
*count
}
pub fn mark_archived(&mut self, change_id: &str) {
if self.pending_changes.remove(change_id) {
self.archived_changes.insert(change_id.to_string());
self.changes_processed += 1;
self.apply_counts.remove(change_id);
if self.current_change_id.as_deref() == Some(change_id) {
self.current_change_id = None;
}
}
}
pub fn add_dynamic_change(&mut self, change_id: String) {
if !self.initial_change_ids.contains(&change_id)
&& !self.pending_changes.contains(&change_id)
&& !self.archived_changes.contains(&change_id)
{
self.initial_change_ids.insert(change_id.clone());
self.pending_changes.insert(change_id.clone());
self.total_changes += 1;
self.change_runtime.entry(change_id).or_default();
}
}
fn runtime_entry(&mut self, change_id: &str) -> &mut ChangeRuntimeState {
self.change_runtime
.entry(change_id.to_string())
.or_default()
}
pub fn change_runtime(&self, change_id: &str) -> Option<&ChangeRuntimeState> {
self.change_runtime.get(change_id)
}
pub fn display_status(&self, change_id: &str) -> &'static str {
match self.change_runtime.get(change_id) {
Some(rt) => rt.display_status(),
None => "not queued",
}
}
pub fn is_active_change(&self, change_id: &str) -> bool {
self.change_runtime
.get(change_id)
.map(|rt| rt.is_active())
.unwrap_or(false)
}
pub fn is_resolving_active(&self) -> bool {
self.change_runtime
.values()
.any(|rt| matches!(rt.activity, ActivityState::Resolving) && !rt.is_terminal())
}
pub fn resolve_wait_change_ids(&self) -> Vec<String> {
self.change_runtime
.iter()
.filter_map(|(id, rt)| {
if matches!(rt.wait_state, WaitState::ResolveWait) && !rt.is_terminal() {
Some(id.clone())
} else {
None
}
})
.collect()
}
pub fn all_display_statuses(&self) -> HashMap<String, &'static str> {
self.change_runtime
.iter()
.map(|(id, rt)| (id.clone(), rt.display_status()))
.collect()
}
pub fn is_terminal_change(&self, change_id: &str) -> bool {
self.change_runtime
.get(change_id)
.map(|rt| rt.is_terminal())
.unwrap_or(false)
}
pub fn remove_from_pending(&mut self, change_id: &str) {
self.pending_changes.remove(change_id);
if self.current_change_id.as_deref() == Some(change_id) {
self.current_change_id = None;
}
}
pub fn drop_pending_change(&mut self, change_id: &str) -> bool {
let removed = self.pending_changes.remove(change_id);
if removed {
self.total_changes = self.total_changes.saturating_sub(1);
}
if self.current_change_id.as_deref() == Some(change_id) {
self.current_change_id = None;
}
removed
}
pub fn clear_pending_changes(&mut self) {
self.pending_changes.clear();
}
pub fn record_error_and_check_circuit_breaker(
&mut self,
change_id: &str,
error: &str,
config: CircuitBreakerConfig,
) -> bool {
let history = self
.error_histories
.entry(change_id.to_string())
.or_insert_with(|| ErrorHistory::new(config.clone()));
history.record_error(error);
history.detect_same_error()
}
pub fn last_error(&self, change_id: &str) -> Option<&str> {
self.error_histories
.get(change_id)
.and_then(ErrorHistory::last_error)
}
pub fn clear_error_history(&mut self, change_id: &str) {
self.error_histories.remove(change_id);
}
pub fn apply_command(&mut self, cmd: ReducerCommand) -> ReduceOutcome {
match cmd {
ReducerCommand::AddToQueue(change_id) => {
{
let rt = self.runtime_entry(&change_id);
if matches!(
rt.terminal,
TerminalState::Archived
| TerminalState::Merged
| TerminalState::Rejected(_)
) {
return ReduceOutcome::NoOp;
}
if !rt.is_terminal()
&& !rt.dequeued
&& (rt.is_active() || rt.queue_intent == QueueIntent::Queued)
{
return ReduceOutcome::NoOp;
}
if rt.is_terminal() {
rt.terminal = TerminalState::None;
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
}
rt.dequeued = false;
rt.queue_intent = QueueIntent::Queued;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
}
self.add_dynamic_change(change_id.clone());
ReduceOutcome::Changed(ReducerEffect::QueueIntentSet {
change_id,
intent: QueueIntent::Queued,
})
}
ReducerCommand::RemoveFromQueue(change_id) => {
let rt = self.runtime_entry(&change_id);
if rt.queue_intent == QueueIntent::NotQueued {
return ReduceOutcome::NoOp;
}
rt.queue_intent = QueueIntent::NotQueued;
ReduceOutcome::Changed(ReducerEffect::QueueIntentSet {
change_id,
intent: QueueIntent::NotQueued,
})
}
ReducerCommand::ResolveMerge(change_id) => {
let rt = self.runtime_entry(&change_id);
if !matches!(rt.wait_state, WaitState::MergeWait | WaitState::ResolveWait) {
return ReduceOutcome::NoOp;
}
rt.wait_state = WaitState::ResolveWait;
if !self.resolve_wait_queue.contains(&change_id) {
self.resolve_wait_queue.push(change_id.clone());
}
ReduceOutcome::Changed(ReducerEffect::WaitStateSet {
change_id,
wait: WaitState::ResolveWait,
})
}
ReducerCommand::DequeueChange(change_id) => {
let rt = self.runtime_entry(&change_id);
if matches!(
rt.terminal,
TerminalState::Archived | TerminalState::Merged | TerminalState::Rejected(_)
) {
return ReduceOutcome::NoOp;
}
rt.terminal = TerminalState::None;
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.queue_intent = QueueIntent::NotQueued;
rt.dequeued = true;
ReduceOutcome::Changed(ReducerEffect::QueueIntentSet {
change_id,
intent: QueueIntent::NotQueued,
})
}
ReducerCommand::StopChange(change_id) => {
let rt = self.runtime_entry(&change_id);
if rt.is_terminal() {
return ReduceOutcome::NoOp;
}
rt.terminal = TerminalState::Stopped;
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.queue_intent = QueueIntent::NotQueued;
ReduceOutcome::Changed(ReducerEffect::TerminalStateSet {
change_id,
terminal: TerminalState::Stopped,
})
}
}
}
pub fn apply_observation(&mut self, change_id: &str, obs: WorkspaceObservation) {
let rt = self.runtime_entry(change_id);
if rt.is_active() {
return;
}
if rt.is_terminal() {
return;
}
match obs {
WorkspaceObservation::WorkspaceArchived => {
if !matches!(rt.wait_state, WaitState::MergeWait | WaitState::ResolveWait) {
rt.wait_state = WaitState::MergeWait;
rt.observation = WorkspaceObservation::WorkspaceArchived;
}
}
WorkspaceObservation::WorktreeNotAhead => {
if matches!(rt.wait_state, WaitState::MergeWait) {
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
}
rt.observation = WorkspaceObservation::WorktreeNotAhead;
}
WorkspaceObservation::None => {
rt.observation = WorkspaceObservation::None;
}
}
}
pub fn apply_execution_event(&mut self, event: &crate::events::ExecutionEvent) {
use crate::events::ExecutionEvent;
match event {
ExecutionEvent::ProcessingStarted(change_id) => {
self.set_current_change(Some(change_id.clone()));
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.queue_intent = QueueIntent::Queued;
}
}
ExecutionEvent::ProcessingCompleted(change_id) => {
let _ = change_id;
}
ExecutionEvent::ProcessingError { id, error } => {
self.remove_from_pending(id);
let rt = self.runtime_entry(id);
if !rt.is_terminal() && !rt.dequeued {
rt.terminal = TerminalState::Error(error.clone());
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
}
}
ExecutionEvent::ApplyStarted {
change_id,
command: _,
} => {
let mut should_start = false;
{
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.activity = ActivityState::Applying;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
should_start = true;
}
}
if should_start {
self.set_current_change(Some(change_id.clone()));
}
}
ExecutionEvent::ApplyCompleted { change_id, .. } => {
self.increment_apply_count(change_id);
let rt = self.runtime_entry(change_id);
if matches!(rt.activity, ActivityState::Applying) {
rt.activity = ActivityState::Idle;
}
}
ExecutionEvent::ApplyFailed { change_id, error } => {
self.remove_from_pending(change_id);
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.terminal = TerminalState::Error(error.clone());
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
}
}
ExecutionEvent::AcceptanceStarted { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.activity = ActivityState::Accepting;
}
}
ExecutionEvent::AcceptanceCompleted { change_id } => {
let rt = self.runtime_entry(change_id);
if matches!(rt.activity, ActivityState::Accepting) {
rt.activity = ActivityState::Idle;
}
}
ExecutionEvent::AcceptanceFailed { change_id, error } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.terminal = TerminalState::Error(error.clone());
rt.activity = ActivityState::Idle;
}
}
ExecutionEvent::ChangeRejected { change_id, reason } => {
self.remove_from_pending(change_id);
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.terminal = TerminalState::Rejected(reason.clone());
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.queue_intent = QueueIntent::NotQueued;
}
}
ExecutionEvent::RejectionReviewCompleted { change_id, outcome } => {
let should_mark_pending_removed =
matches!(outcome, crate::events::RejectionOutcome::Confirm);
if should_mark_pending_removed {
self.remove_from_pending(change_id);
}
let rt = self.runtime_entry(change_id);
if rt.is_terminal() || rt.dequeued {
return;
}
match *outcome {
crate::events::RejectionOutcome::Confirm => {
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.queue_intent = QueueIntent::NotQueued;
rt.terminal = TerminalState::Rejected(
"rejecting review confirmed rejection".to_string(),
);
}
crate::events::RejectionOutcome::Resume => {
rt.activity = ActivityState::Applying;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.terminal = TerminalState::None;
}
crate::events::RejectionOutcome::Block => {
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::Stalled;
rt.terminal = TerminalState::None;
rt.set_blocked_metadata(
"rejection review returned block; unresolved blocker remains",
"resolve unresolved blocker tasks in openspec/changes/<change_id>/tasks.md, then trigger explicit resume",
"existing worktree and WIP context are preserved for stalled rejection review",
);
}
}
}
ExecutionEvent::RejectionReviewFailed { change_id, error } => {
self.remove_from_pending(change_id);
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.terminal = TerminalState::Error(error.clone());
}
}
ExecutionEvent::WorkspaceStatusUpdated {
change_id,
workspace_name: _,
status,
} => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
match status {
crate::vcs::WorkspaceStatus::Applying => {
rt.activity = ActivityState::Applying;
}
crate::vcs::WorkspaceStatus::Accepting => {
rt.activity = ActivityState::Accepting;
}
crate::vcs::WorkspaceStatus::Blocked => {
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::Stalled;
rt.set_blocked_metadata(
"apply reported recoverable blocker; workspace remains stalled",
"resolve implementation blocker section and pending unblock tasks before explicit retry",
"existing worktree and WIP context are preserved while stalled",
);
}
crate::vcs::WorkspaceStatus::Rejecting => {
rt.activity = ActivityState::Rejecting;
}
crate::vcs::WorkspaceStatus::Archiving => {
rt.activity = ActivityState::Archiving;
}
crate::vcs::WorkspaceStatus::Resolving => {
rt.activity = ActivityState::Resolving;
}
_ => {}
}
}
}
ExecutionEvent::ArchiveStarted { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.activity = ActivityState::Archiving;
}
}
ExecutionEvent::ArchiveResumed { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.activity = ActivityState::Archiving;
}
}
ExecutionEvent::ArchiveRetryScheduled { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.activity = ActivityState::Archiving;
}
}
ExecutionEvent::ChangeArchived(change_id) => {
self.mark_archived(change_id);
let mode = self.execution_mode;
let has_other_resolving = self.change_runtime.iter().any(|(id, runtime)| {
id != change_id
&& matches!(runtime.activity, ActivityState::Resolving)
&& !runtime.is_terminal()
});
let rt = self.runtime_entry(change_id);
rt.activity = ActivityState::Idle;
match mode {
ExecutionMode::Serial => {
rt.terminal = TerminalState::Archived;
rt.wait_state = WaitState::None;
rt.queue_intent = QueueIntent::NotQueued;
}
ExecutionMode::Parallel => {
rt.wait_state = if has_other_resolving {
WaitState::ResolveWait
} else {
WaitState::MergeWait
};
}
}
}
ExecutionEvent::ArchiveFailed {
change_id, error, ..
} => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.terminal = TerminalState::Error(error.clone());
rt.activity = ActivityState::Idle;
}
}
ExecutionEvent::MergeDeferred {
change_id,
auto_resumable,
..
} => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.is_active() {
if *auto_resumable {
rt.wait_state = WaitState::ResolveWait;
if !self.resolve_wait_queue.contains(change_id) {
self.resolve_wait_queue.push(change_id.clone());
}
} else {
rt.wait_state = WaitState::MergeWait;
}
}
}
ExecutionEvent::MergeCompleted { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
rt.terminal = TerminalState::Merged;
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.queue_intent = QueueIntent::NotQueued;
self.resolve_wait_queue.retain(|id| id != change_id);
}
}
ExecutionEvent::ResolveStarted { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.activity = ActivityState::Resolving;
rt.wait_state = WaitState::None;
}
}
ExecutionEvent::ResolveCompleted { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal()
&& !rt.dequeued
&& (matches!(rt.activity, ActivityState::Resolving)
|| matches!(rt.wait_state, WaitState::ResolveWait))
{
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.terminal = TerminalState::Merged;
rt.queue_intent = QueueIntent::NotQueued;
rt.clear_blocked_metadata();
}
self.resolve_wait_queue.retain(|id| id != change_id);
}
ExecutionEvent::ResolveFailed { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() {
if matches!(rt.activity, ActivityState::Resolving) {
rt.activity = ActivityState::Idle;
}
if matches!(rt.activity, ActivityState::Idle)
&& matches!(rt.wait_state, WaitState::ResolveWait | WaitState::None)
{
rt.wait_state = WaitState::MergeWait;
self.resolve_wait_queue.retain(|id| id != change_id);
}
}
}
ExecutionEvent::DependencyBlocked { change_id, .. } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.is_active() {
rt.wait_state = WaitState::DependencyBlocked;
rt.set_blocked_metadata(
"dependency_blocked",
"resolve dependencies and retry queue",
"worktree snapshot not required for dependency blocker",
);
}
}
ExecutionEvent::DependencyResolved { change_id } => {
let rt = self.runtime_entry(change_id);
if matches!(rt.wait_state, WaitState::DependencyBlocked) {
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
}
}
ExecutionEvent::AcceptanceGated { change_id, reason } => {
let rt = self.runtime_entry(change_id);
if !rt.is_terminal() && !rt.dequeued {
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::AcceptanceGated;
rt.terminal = TerminalState::None;
rt.set_blocked_metadata(
"acceptance-gated",
"review acceptance blocker evidence and decide resume/reject route",
reason.clone(),
);
}
}
ExecutionEvent::ChangeDequeued { change_id }
| ExecutionEvent::ChangeStopped { change_id } => {
let rt = self.runtime_entry(change_id);
if matches!(
rt.terminal,
TerminalState::Archived | TerminalState::Merged | TerminalState::Rejected(_)
) {
return;
}
rt.terminal = TerminalState::None;
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.queue_intent = QueueIntent::NotQueued;
rt.dequeued = true;
self.resolve_wait_queue.retain(|id| id != change_id);
}
ExecutionEvent::ChangesRefreshed {
changes,
merge_wait_ids,
worktree_not_ahead_ids,
..
} => {
let new_ids: Vec<String> = changes
.iter()
.filter(|c| {
!self.initial_change_ids.contains(&c.id)
&& !self.archived_changes.contains(&c.id)
})
.map(|c| c.id.clone())
.collect();
for id in new_ids {
self.add_dynamic_change(id);
}
for change in changes {
let rt = self.runtime_entry(&change.id);
if matches!(rt.terminal, TerminalState::Rejected(_)) {
rt.terminal = TerminalState::None;
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::None;
rt.clear_blocked_metadata();
rt.queue_intent = QueueIntent::NotQueued;
}
}
let mw: Vec<String> = merge_wait_ids.iter().cloned().collect();
let nah: Vec<String> = worktree_not_ahead_ids.iter().cloned().collect();
for id in mw {
self.apply_observation(&id.clone(), WorkspaceObservation::WorkspaceArchived);
}
for id in nah {
self.apply_observation(&id.clone(), WorkspaceObservation::WorktreeNotAhead);
}
}
_ => {}
}
}
}
impl Default for OrchestratorState {
fn default() -> Self {
Self::new(Vec::new(), 0)
}
}
impl OrchestratorState {
#[allow(dead_code)]
pub fn execution_mode(&self) -> ExecutionMode {
self.execution_mode
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_new_state() {
let state =
OrchestratorState::new(vec!["change-a".to_string(), "change-b".to_string()], 10);
assert_eq!(state.total_changes(), 2);
assert_eq!(state.remaining_changes(), 2);
assert_eq!(state.changes_processed(), 0);
assert_eq!(state.iteration(), 0);
assert_eq!(state.max_iterations(), 10);
assert!(state.current_change_id().is_none());
}
#[test]
fn test_is_in_snapshot() {
let state = OrchestratorState::new(vec!["change-a".to_string()], 0);
assert!(state.is_in_snapshot("change-a"));
assert!(!state.is_in_snapshot("change-b"));
}
#[test]
fn test_apply_count_increment() {
let mut state = OrchestratorState::new(vec!["change-a".to_string()], 0);
assert_eq!(state.apply_count("change-a"), 0);
assert_eq!(state.increment_apply_count("change-a"), 1);
assert_eq!(state.increment_apply_count("change-a"), 2);
assert_eq!(state.apply_count("change-a"), 2);
}
#[test]
fn test_mark_archived() {
let mut state =
OrchestratorState::new(vec!["change-a".to_string(), "change-b".to_string()], 0);
state.set_current_change(Some("change-a".to_string()));
state.increment_apply_count("change-a");
assert!(state.is_pending("change-a"));
assert!(!state.is_archived("change-a"));
assert_eq!(state.remaining_changes(), 2);
state.mark_archived("change-a");
assert!(!state.is_pending("change-a"));
assert!(state.is_archived("change-a"));
assert_eq!(state.remaining_changes(), 1);
assert_eq!(state.changes_processed(), 1);
assert!(state.current_change_id().is_none());
assert_eq!(state.apply_count("change-a"), 0); }
#[test]
fn test_is_complete() {
let mut state = OrchestratorState::new(vec!["change-a".to_string()], 0);
assert!(!state.is_complete());
state.mark_archived("change-a");
assert!(state.is_complete());
}
#[test]
fn test_iteration_limit() {
let mut state = OrchestratorState::new(vec![], 10);
assert!(!state.is_iteration_limit_reached());
for _ in 0..8 {
state.increment_iteration();
}
assert!(state.is_approaching_iteration_limit()); assert!(!state.is_iteration_limit_reached());
state.increment_iteration(); assert!(!state.is_iteration_limit_reached());
state.increment_iteration(); assert!(state.is_iteration_limit_reached());
}
#[test]
fn test_no_iteration_limit() {
let mut state = OrchestratorState::new(vec![], 0);
for _ in 0..100 {
state.increment_iteration();
}
assert!(!state.is_iteration_limit_reached());
assert!(!state.is_approaching_iteration_limit());
}
#[test]
fn test_add_dynamic_change() {
let mut state = OrchestratorState::new(vec!["change-a".to_string()], 0);
assert_eq!(state.total_changes(), 1);
assert!(!state.is_pending("change-b"));
state.add_dynamic_change("change-b".to_string());
assert_eq!(state.total_changes(), 2);
assert!(state.is_pending("change-b"));
assert!(state.is_in_snapshot("change-b"));
}
#[test]
fn test_add_dynamic_change_idempotent() {
let mut state = OrchestratorState::new(vec!["change-a".to_string()], 0);
state.add_dynamic_change("change-a".to_string()); state.add_dynamic_change("change-b".to_string());
state.add_dynamic_change("change-b".to_string());
assert_eq!(state.total_changes(), 2); }
#[test]
fn test_remove_from_pending() {
let mut state = OrchestratorState::new(vec!["change-a".to_string()], 0);
state.set_current_change(Some("change-a".to_string()));
assert!(state.is_pending("change-a"));
assert!(state.current_change_id().is_some());
state.remove_from_pending("change-a");
assert!(!state.is_pending("change-a"));
assert!(!state.is_archived("change-a")); assert!(state.current_change_id().is_none());
}
#[test]
fn test_error_history_management_and_circuit_breaker() {
let mut state = OrchestratorState::new(vec!["change-a".to_string()], 0);
let config = CircuitBreakerConfig {
enabled: true,
threshold: 2,
};
assert!(!state.record_error_and_check_circuit_breaker(
"change-a",
"same error",
config.clone()
));
assert!(state.record_error_and_check_circuit_breaker("change-a", "same error", config));
assert!(state.last_error("change-a").is_some());
state.clear_error_history("change-a");
assert!(state.last_error("change-a").is_none());
}
#[test]
fn test_orchestrator_state_initializes_change_runtime() {
let state = OrchestratorState::new(vec!["change-a".to_string(), "change-b".to_string()], 0);
let rt_a = state
.change_runtime("change-a")
.expect("runtime for change-a");
assert_eq!(rt_a.queue_intent, QueueIntent::NotQueued);
assert_eq!(rt_a.activity, ActivityState::Idle);
assert_eq!(rt_a.wait_state, WaitState::None);
assert!(matches!(rt_a.terminal, TerminalState::None));
let rt_b = state
.change_runtime("change-b")
.expect("runtime for change-b");
assert_eq!(rt_b.queue_intent, QueueIntent::NotQueued);
}
#[test]
fn test_change_runtime_invariants() {
let valid = ChangeRuntimeState::default();
assert!(valid.invariants_hold());
let invalid = ChangeRuntimeState {
terminal: TerminalState::Merged,
activity: ActivityState::Applying,
..Default::default()
};
assert!(!invalid.invariants_hold());
let invalid2 = ChangeRuntimeState {
wait_state: WaitState::ResolveWait,
activity: ActivityState::Resolving,
..Default::default()
};
assert!(!invalid2.invariants_hold());
let ok = ChangeRuntimeState {
wait_state: WaitState::MergeWait,
..Default::default()
};
assert!(ok.invariants_hold());
let in_flight_rejecting = ChangeRuntimeState {
activity: ActivityState::Rejecting,
terminal: TerminalState::None,
..Default::default()
};
assert!(in_flight_rejecting.invariants_hold());
}
#[test]
fn test_display_status_derivation() {
let state = OrchestratorState::new(vec!["c".to_string()], 0);
assert_eq!(state.display_status("c"), "not queued");
assert_eq!(state.display_status("unknown"), "not queued");
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert_eq!(state.display_status("c"), "queued");
let rt = state.runtime_entry("c");
rt.activity = ActivityState::Applying;
assert_eq!(state.display_status("c"), "applying");
let rt = state.runtime_entry("c");
rt.terminal = TerminalState::Archived;
assert_eq!(state.display_status("c"), "archived");
}
#[test]
fn test_display_color_derivation() {
let mut rt = ChangeRuntimeState::default();
assert_eq!(rt.display_color(), ratatui::style::Color::DarkGray);
rt.queue_intent = QueueIntent::Queued;
assert_eq!(rt.display_color(), ratatui::style::Color::Yellow);
rt.wait_state = WaitState::DependencyBlocked;
assert_eq!(rt.display_color(), ratatui::style::Color::Gray);
rt.activity = ActivityState::Applying;
assert_eq!(rt.display_color(), ratatui::style::Color::Cyan);
rt.activity = ActivityState::Accepting;
assert_eq!(rt.display_color(), ratatui::style::Color::LightGreen);
rt.activity = ActivityState::Archiving;
assert_eq!(rt.display_color(), ratatui::style::Color::Magenta);
rt.activity = ActivityState::Resolving;
assert_eq!(rt.display_color(), ratatui::style::Color::LightCyan);
rt.activity = ActivityState::Idle;
rt.wait_state = WaitState::MergeWait;
assert_eq!(rt.display_color(), ratatui::style::Color::LightMagenta);
rt.wait_state = WaitState::ResolveWait;
assert_eq!(rt.display_color(), ratatui::style::Color::Magenta);
rt.wait_state = WaitState::None;
rt.terminal = TerminalState::Archived;
assert_eq!(rt.display_color(), ratatui::style::Color::Blue);
rt.terminal = TerminalState::Merged;
assert_eq!(rt.display_color(), ratatui::style::Color::LightBlue);
rt.terminal = TerminalState::Rejected("blocked".to_string());
assert_eq!(rt.display_color(), ratatui::style::Color::LightRed);
rt.terminal = TerminalState::Stopped;
assert_eq!(rt.display_color(), ratatui::style::Color::DarkGray);
rt.terminal = TerminalState::Error("boom".to_string());
assert_eq!(rt.display_color(), ratatui::style::Color::Red);
}
#[test]
fn test_error_message_derivation() {
let rt = ChangeRuntimeState {
terminal: TerminalState::Error("fatal".to_string()),
..Default::default()
};
assert_eq!(rt.error_message(), Some("fatal"));
let rt2 = ChangeRuntimeState {
terminal: TerminalState::Merged,
..Default::default()
};
assert_eq!(rt2.error_message(), None);
}
#[test]
fn test_runtime_state_active_classification() {
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
assert!(!state.is_active_change("c"));
state.runtime_entry("c").activity = ActivityState::Applying;
assert!(state.is_active_change("c"));
state.runtime_entry("c").terminal = TerminalState::Archived;
state.runtime_entry("c").activity = ActivityState::Idle;
assert!(!state.is_active_change("c"));
assert!(state.is_terminal_change("c"));
}
#[test]
fn test_apply_command_queue_intent() {
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
let outcome = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(matches!(outcome, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "queued");
let outcome2 = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(matches!(outcome2, ReduceOutcome::NoOp));
let outcome3 = state.apply_command(ReducerCommand::RemoveFromQueue("c".to_string()));
assert!(matches!(outcome3, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "not queued");
let outcome4 = state.apply_command(ReducerCommand::RemoveFromQueue("c".to_string()));
assert!(matches!(outcome4, ReduceOutcome::NoOp));
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
let outcome5 = state.apply_command(ReducerCommand::DequeueChange("c".to_string()));
assert!(matches!(outcome5, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "not queued");
let outcome6 = state.apply_command(ReducerCommand::DequeueChange("c".to_string()));
assert!(matches!(outcome6, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "not queued");
state.runtime_entry("c").terminal = TerminalState::Rejected("blocked".to_string());
let outcome7 = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(matches!(outcome7, ReduceOutcome::NoOp));
use crate::events::ExecutionEvent;
use crate::openspec::{Change, ProposalMetadata};
use std::collections::{HashMap, HashSet};
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![Change {
id: "c".to_string(),
completed_tasks: 0,
total_tasks: 1,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
});
assert_eq!(state.display_status("c"), "not queued");
let outcome8 = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(matches!(outcome8, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "queued");
}
#[test]
fn test_apply_execution_event_transitions() {
use crate::events::{ExecutionEvent, RejectionOutcome};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("c"), "applying");
state.apply_execution_event(&ExecutionEvent::ApplyCompleted {
change_id: "c".to_string(),
revision: "rev1".to_string(),
});
assert_eq!(state.display_status("c"), "not queued");
state.apply_execution_event(&ExecutionEvent::AcceptanceStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("c"), "accepting");
state.apply_execution_event(&ExecutionEvent::AcceptanceCompleted {
change_id: "c".to_string(),
});
state.apply_execution_event(&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "c".to_string(),
workspace_name: "ws-c".to_string(),
status: crate::vcs::WorkspaceStatus::Rejecting,
});
assert_eq!(state.display_status("c"), "rejecting");
state.apply_execution_event(&ExecutionEvent::RejectionReviewCompleted {
change_id: "c".to_string(),
outcome: RejectionOutcome::Resume,
});
assert_eq!(state.display_status("c"), "applying");
assert_ne!(
state
.change_runtime("c")
.expect("runtime for c after rejecting resume")
.activity,
ActivityState::Rejecting
);
state.apply_execution_event(&ExecutionEvent::RejectionReviewFailed {
change_id: "c".to_string(),
error: "rejecting failed".to_string(),
});
assert_eq!(state.display_status("c"), "error");
assert_ne!(
state
.change_runtime("c")
.expect("runtime for c after rejecting failure")
.activity,
ActivityState::Rejecting
);
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ProcessingStarted("c".to_string()));
state.apply_execution_event(&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "c".to_string(),
workspace_name: "ws-c".to_string(),
status: crate::vcs::WorkspaceStatus::Rejecting,
});
assert_eq!(state.display_status("c"), "rejecting");
state.apply_execution_event(&ExecutionEvent::RejectionReviewCompleted {
change_id: "c".to_string(),
outcome: RejectionOutcome::Confirm,
});
assert_eq!(state.display_status("c"), "rejected");
assert_ne!(
state
.change_runtime("c")
.expect("runtime for c after rejecting confirm")
.activity,
ActivityState::Rejecting
);
}
#[test]
fn test_rejection_review_block_transitions_to_blocked_with_metadata() {
use crate::events::{ExecutionEvent, RejectionOutcome};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ProcessingStarted("c".to_string()));
state.apply_execution_event(&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "c".to_string(),
workspace_name: "ws-c".to_string(),
status: crate::vcs::WorkspaceStatus::Rejecting,
});
assert_eq!(state.display_status("c"), "rejecting");
state.apply_execution_event(&ExecutionEvent::RejectionReviewCompleted {
change_id: "c".to_string(),
outcome: RejectionOutcome::Block,
});
let runtime = state
.change_runtime("c")
.expect("runtime for c after rejecting block");
assert_eq!(state.display_status("c"), "stalled");
assert_eq!(runtime.activity, ActivityState::Idle);
assert_eq!(runtime.wait_state, WaitState::Stalled);
assert!(matches!(runtime.terminal, TerminalState::None));
assert_eq!(
runtime.blocked_metadata.blocker_reason.as_deref(),
Some("rejection review returned block; unresolved blocker remains")
);
assert_eq!(
runtime.blocked_metadata.unblock_metadata.as_deref(),
Some(
"resolve unresolved blocker tasks in openspec/changes/<change_id>/tasks.md, then trigger explicit resume"
)
);
assert_eq!(
runtime.blocked_metadata.worktree_snapshot.as_deref(),
Some("existing worktree and WIP context are preserved for stalled rejection review")
);
}
#[test]
fn test_rejection_review_resume_from_archived_workspace_context_sets_applying() {
use crate::events::{ExecutionEvent, RejectionOutcome};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
assert_eq!(state.display_status("c"), "merge wait");
state.apply_execution_event(&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "c".to_string(),
workspace_name: "ws-c".to_string(),
status: crate::vcs::WorkspaceStatus::Rejecting,
});
assert_eq!(state.display_status("c"), "rejecting");
state.apply_execution_event(&ExecutionEvent::RejectionReviewCompleted {
change_id: "c".to_string(),
outcome: RejectionOutcome::Resume,
});
assert_eq!(state.display_status("c"), "applying");
}
#[test]
fn test_rejection_review_block_from_archived_workspace_context_sets_stalled() {
use crate::events::{ExecutionEvent, RejectionOutcome};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
assert_eq!(state.display_status("c"), "merge wait");
state.apply_execution_event(&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "c".to_string(),
workspace_name: "ws-c".to_string(),
status: crate::vcs::WorkspaceStatus::Rejecting,
});
assert_eq!(state.display_status("c"), "rejecting");
state.apply_execution_event(&ExecutionEvent::RejectionReviewCompleted {
change_id: "c".to_string(),
outcome: RejectionOutcome::Block,
});
assert_eq!(state.display_status("c"), "stalled");
}
#[test]
fn test_apply_observation_reconcile_merge_wait() {
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
assert_eq!(state.display_status("c"), "merge wait");
state.apply_observation("c", WorkspaceObservation::WorktreeNotAhead);
assert_eq!(state.display_status("c"), "not queued");
state.runtime_entry("c").activity = ActivityState::Applying;
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
assert_eq!(state.display_status("c"), "applying");
}
#[test]
fn test_changes_refreshed_uses_reducer_observation_path() {
use crate::events::ExecutionEvent;
use std::collections::{HashMap, HashSet};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
let mut merge_wait_ids = HashSet::new();
merge_wait_ids.insert("c".to_string());
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids,
});
assert_eq!(state.display_status("c"), "merge wait");
}
#[test]
fn test_change_rejected_clears_only_target_queue_intent() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["a".to_string(), "b".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("a".to_string()));
state.apply_command(ReducerCommand::AddToQueue("b".to_string()));
assert_eq!(state.display_status("a"), "queued");
assert_eq!(state.display_status("b"), "queued");
state.apply_execution_event(&ExecutionEvent::ChangeRejected {
change_id: "a".to_string(),
reason: "blocked".to_string(),
});
assert_eq!(state.display_status("a"), "rejected");
assert_eq!(state.display_status("b"), "queued");
}
#[test]
fn test_changes_refreshed_reactivates_rejected_change() {
use crate::events::ExecutionEvent;
use crate::openspec::{Change, ProposalMetadata};
use std::collections::{HashMap, HashSet};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
let rt = state.runtime_entry("c");
rt.terminal = TerminalState::Rejected("blocked".to_string());
rt.activity = ActivityState::Rejecting;
rt.wait_state = WaitState::MergeWait;
rt.queue_intent = QueueIntent::Queued;
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![Change {
id: "c".to_string(),
completed_tasks: 0,
total_tasks: 1,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
});
let rt = state.change_runtime("c").expect("runtime for c");
assert!(matches!(rt.terminal, TerminalState::None));
assert!(matches!(rt.activity, ActivityState::Idle));
assert!(matches!(rt.wait_state, WaitState::None));
assert!(matches!(rt.queue_intent, QueueIntent::NotQueued));
assert_eq!(state.display_status("c"), "not queued");
}
#[test]
fn test_merge_wait_release_after_external_merge() {
use crate::events::ExecutionEvent;
use std::collections::{HashMap, HashSet};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
assert_eq!(state.display_status("c"), "merge wait");
let mut not_ahead = HashSet::new();
not_ahead.insert("c".to_string());
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: not_ahead,
merge_wait_ids: HashSet::new(),
});
assert_eq!(state.display_status("c"), "not queued");
}
#[test]
fn test_workspace_archived_recovers_merge_wait() {
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
assert_eq!(state.display_status("c"), "merge wait");
assert!(
matches!(
state.change_runtime("c").unwrap().wait_state,
WaitState::MergeWait
),
"observation should set MergeWait, not ResolveWait"
);
}
#[test]
fn test_queue_add_not_overwritten_by_merge_wait_refresh() {
use crate::events::ExecutionEvent;
use std::collections::{HashMap, HashSet};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert_eq!(state.display_status("c"), "queued");
state.apply_observation("c", WorkspaceObservation::WorkspaceArchived);
let mut not_ahead = HashSet::new();
not_ahead.insert("c".to_string());
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: not_ahead,
merge_wait_ids: HashSet::new(),
});
assert_eq!(state.display_status("c"), "queued");
}
#[test]
fn test_parallel_merge_events_drive_reducer_wait_states() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "c".to_string(),
reason: "base dirty".to_string(),
auto_resumable: false,
});
assert_eq!(state.display_status("c"), "merge wait");
state.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "c".to_string(),
command: "resolve".to_string(),
});
assert_eq!(state.display_status("c"), "resolving");
state.apply_execution_event(&ExecutionEvent::ResolveCompleted {
change_id: "c".to_string(),
worktree_change_ids: None,
});
assert_eq!(state.display_status("c"), "merged");
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "c".to_string(),
revision: "rev".to_string(),
});
assert_eq!(state.display_status("c"), "merged");
state.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "c".to_string(),
error: "late".to_string(),
});
assert_eq!(state.display_status("c"), "merged");
}
#[test]
fn test_resolve_completed_clears_resolve_wait_and_survives_refresh() {
use crate::events::ExecutionEvent;
let mut state =
OrchestratorState::with_mode(vec!["c".to_string()], 0, ExecutionMode::Parallel);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
assert_eq!(state.display_status("c"), "merge wait");
state.apply_command(ReducerCommand::ResolveMerge("c".to_string()));
assert_eq!(state.display_status("c"), "resolve pending");
assert_eq!(state.resolve_wait_change_ids(), vec!["c".to_string()]);
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: std::collections::HashSet::new(),
uncommitted_file_change_ids: std::collections::HashSet::new(),
worktree_change_ids: std::collections::HashSet::from(["c".to_string()]),
worktree_paths: std::collections::HashMap::new(),
worktree_not_ahead_ids: std::collections::HashSet::new(),
merge_wait_ids: std::collections::HashSet::from(["c".to_string()]),
});
assert_eq!(state.display_status("c"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::ResolveCompleted {
change_id: "c".to_string(),
worktree_change_ids: None,
});
assert_eq!(state.display_status("c"), "merged");
assert!(
state.resolve_wait_change_ids().is_empty(),
"resolve completion must clear queued resolve intent"
);
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: std::collections::HashSet::new(),
uncommitted_file_change_ids: std::collections::HashSet::new(),
worktree_change_ids: std::collections::HashSet::from(["c".to_string()]),
worktree_paths: std::collections::HashMap::new(),
worktree_not_ahead_ids: std::collections::HashSet::new(),
merge_wait_ids: std::collections::HashSet::from(["c".to_string()]),
});
assert_eq!(state.display_status("c"), "merged");
assert_ne!(
state.display_status("c"),
"resolve pending",
"row must not regress to resolve pending after successful resolve + refresh"
);
}
#[test]
fn test_resolve_failed_restores_merge_wait() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "c".to_string(),
reason: "base dirty".to_string(),
auto_resumable: false,
});
state.apply_command(ReducerCommand::ResolveMerge("c".to_string()));
state.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "c".to_string(),
command: "resolve".to_string(),
});
state.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "c".to_string(),
error: "conflict".to_string(),
});
assert_eq!(state.display_status("c"), "merge wait");
let mut state2 = OrchestratorState::new(vec!["c".to_string()], 0);
state2.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "c".to_string(),
revision: "rev".to_string(),
});
state2.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "c".to_string(),
error: "late".to_string(),
});
assert_eq!(state2.display_status("c"), "merged");
}
#[test]
fn test_workspace_status_update_targets_explicit_change_id() {
use crate::events::ExecutionEvent;
use crate::vcs::WorkspaceStatus;
let mut state = OrchestratorState::new(vec!["a".to_string(), "b".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "a".to_string(),
command: "apply-a".to_string(),
});
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "b".to_string(),
command: "apply-b".to_string(),
});
state.apply_execution_event(&ExecutionEvent::WorkspaceStatusUpdated {
change_id: "b".to_string(),
workspace_name: "ws-b".to_string(),
status: WorkspaceStatus::Rejecting,
});
assert_eq!(state.display_status("a"), "applying");
assert_eq!(state.display_status("b"), "rejecting");
}
#[test]
fn test_late_events_after_stop_do_not_regress_state() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ChangeDequeued {
change_id: "c".to_string(),
});
assert_eq!(state.display_status("c"), "not queued");
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("c"), "not queued");
state.apply_execution_event(&ExecutionEvent::AcceptanceStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("c"), "not queued");
state.apply_execution_event(&ExecutionEvent::ProcessingError {
id: "c".to_string(),
error: "late error".to_string(),
});
assert_eq!(state.display_status("c"), "not queued");
}
#[test]
fn test_reducer_idempotency_and_precedence() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
assert_eq!(state.display_status("c"), "archived");
state.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "c".to_string(),
error: "late".to_string(),
});
assert_eq!(state.display_status("c"), "archived");
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("c"), "archived");
}
#[test]
fn test_reducer_runtime_and_legacy_aggregates_stay_consistent() {
use crate::events::ExecutionEvent;
let mut state =
OrchestratorState::new(vec!["a".to_string(), "b".to_string(), "c".to_string()], 5);
assert_eq!(state.display_status("a"), "not queued");
assert_eq!(state.display_status("b"), "not queued");
assert_eq!(state.display_status("c"), "not queued");
assert!(state.is_pending("a"));
assert!(state.is_pending("b"));
assert!(state.is_pending("c"));
state.apply_command(ReducerCommand::AddToQueue("a".to_string()));
state.apply_command(ReducerCommand::AddToQueue("b".to_string()));
assert_eq!(state.display_status("a"), "queued");
assert_eq!(state.display_status("b"), "queued");
assert!(state.is_pending("a"));
assert!(state.is_pending("b"));
state.set_current_change(Some("a".to_string()));
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "a".to_string(),
command: "cmd".to_string(),
});
state.increment_apply_count("a");
assert_eq!(state.display_status("a"), "applying");
assert_eq!(state.current_change_id(), Some(&"a".to_string()));
assert_eq!(state.apply_count("a"), 1);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("a".to_string()));
state.mark_archived("a");
assert_eq!(state.display_status("a"), "archived");
assert!(state.is_archived("a"));
assert!(!state.is_pending("a"));
state.apply_command(ReducerCommand::DequeueChange("b".to_string()));
assert_eq!(state.display_status("b"), "not queued");
assert_eq!(state.display_status("c"), "not queued");
assert!(state.is_pending("c")); assert!(!state.is_archived("c"));
}
#[test]
fn test_serial_mode_change_archived_is_terminal() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
assert_eq!(state.execution_mode(), ExecutionMode::Serial);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
assert_eq!(state.display_status("c"), "archived");
assert!(state.is_terminal_change("c"));
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "c".to_string(),
revision: "rev".to_string(),
});
assert_eq!(
state.display_status("c"),
"archived",
"Serial: MergeCompleted must not override Archived terminal"
);
}
#[test]
fn test_parallel_mode_change_archived_transitions_to_merge_wait() {
use crate::events::ExecutionEvent;
let mut state =
OrchestratorState::with_mode(vec!["c".to_string()], 0, ExecutionMode::Parallel);
assert_eq!(state.execution_mode(), ExecutionMode::Parallel);
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
state.apply_execution_event(&ExecutionEvent::ApplyCompleted {
change_id: "c".to_string(),
revision: "rev1".to_string(),
});
state.apply_execution_event(&ExecutionEvent::ArchiveStarted {
change_id: "c".to_string(),
command: "archive".to_string(),
});
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
assert_eq!(
state.display_status("c"),
"merge wait",
"Parallel: ChangeArchived must transition to merge wait when no resolving change exists"
);
assert!(
!state.is_terminal_change("c"),
"Parallel: change must not be terminal after archive"
);
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "c".to_string(),
revision: "merge-rev".to_string(),
});
assert_eq!(
state.display_status("c"),
"merged",
"Parallel: MergeCompleted must transition to merged terminal"
);
assert!(state.is_terminal_change("c"));
}
#[test]
fn test_parallel_mode_change_archived_uses_resolve_pending_when_other_change_is_resolving() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::with_mode(
vec!["resolving".to_string(), "archived".to_string()],
0,
ExecutionMode::Parallel,
);
state.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "resolving".to_string(),
command: "resolve resolving".to_string(),
});
assert_eq!(state.display_status("resolving"), "resolving");
state.apply_execution_event(&ExecutionEvent::ChangeArchived("archived".to_string()));
assert_eq!(
state.display_status("archived"),
"resolve pending",
"Parallel: ChangeArchived must transition to resolve pending while another change is resolving"
);
assert!(!state.is_terminal_change("archived"));
}
#[test]
fn test_parallel_mode_full_lifecycle() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::with_mode(
vec!["a".to_string(), "b".to_string()],
0,
ExecutionMode::Parallel,
);
state.apply_command(ReducerCommand::AddToQueue("a".to_string()));
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "a".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("a"), "applying");
state.apply_execution_event(&ExecutionEvent::ChangeArchived("a".to_string()));
assert_eq!(state.display_status("a"), "merge wait");
assert!(!state.is_terminal_change("a"));
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "a".to_string(),
revision: "rev-a".to_string(),
});
assert_eq!(state.display_status("a"), "merged");
assert!(state.is_terminal_change("a"));
assert_eq!(state.display_status("b"), "not queued");
assert!(!state.is_terminal_change("b"));
}
#[test]
fn test_parallel_mode_merge_deferred_then_completed() {
use crate::events::ExecutionEvent;
let mut state =
OrchestratorState::with_mode(vec!["c".to_string()], 0, ExecutionMode::Parallel);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
assert_eq!(state.display_status("c"), "merge wait");
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "c".to_string(),
reason: "base dirty".to_string(),
auto_resumable: false,
});
assert_eq!(state.display_status("c"), "merge wait");
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "c".to_string(),
revision: "rev".to_string(),
});
assert_eq!(state.display_status("c"), "merged");
assert!(state.is_terminal_change("c"));
}
#[test]
fn test_parallel_mode_late_events_do_not_regress_merged() {
use crate::events::ExecutionEvent;
let mut state =
OrchestratorState::with_mode(vec!["c".to_string()], 0, ExecutionMode::Parallel);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "c".to_string(),
revision: "rev".to_string(),
});
assert_eq!(state.display_status("c"), "merged");
state.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "c".to_string(),
error: "late".to_string(),
});
assert_eq!(state.display_status("c"), "merged");
state.apply_execution_event(&ExecutionEvent::ApplyStarted {
change_id: "c".to_string(),
command: "cmd".to_string(),
});
assert_eq!(state.display_status("c"), "merged");
}
#[test]
fn test_auto_resumable_merge_deferred_sets_resolve_wait() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["b".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "b".to_string(),
reason: "Merge in progress (MERGE_HEAD exists)".to_string(),
auto_resumable: true,
});
assert_eq!(
state.display_status("b"),
"resolve pending",
"auto-resumable deferred change must enter ResolveWait, not MergeWait"
);
}
#[test]
fn test_auto_resumable_deferred_survives_workspace_refresh() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["b".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "b".to_string(),
reason: "Working tree has uncommitted changes".to_string(),
auto_resumable: true,
});
assert_eq!(state.display_status("b"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: Default::default(),
uncommitted_file_change_ids: Default::default(),
worktree_change_ids: Default::default(),
worktree_paths: Default::default(),
worktree_not_ahead_ids: Default::default(),
merge_wait_ids: ["b".to_string()].into_iter().collect(),
});
assert_eq!(
state.display_status("b"),
"resolve pending",
"workspace refresh must not regress auto-resumable deferred change to merge wait"
);
}
#[test]
fn test_auto_resumable_deferred_then_merge_completed() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["b".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "b".to_string(),
reason: "Merge in progress (MERGE_HEAD exists)".to_string(),
auto_resumable: true,
});
assert_eq!(state.display_status("b"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "b".to_string(),
revision: "abc123".to_string(),
});
assert_eq!(state.display_status("b"), "merged");
assert!(state.is_terminal_change("b"));
}
#[test]
fn test_manual_resolve_blocked_by_merge_in_progress_becomes_resolve_pending() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "c".to_string(),
reason: "base dirty".to_string(),
auto_resumable: false,
});
assert_eq!(state.display_status("c"), "merge wait");
state.apply_command(ReducerCommand::ResolveMerge("c".to_string()));
assert_eq!(state.display_status("c"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "c".to_string(),
reason: "Merge in progress (MERGE_HEAD exists)".to_string(),
auto_resumable: true,
});
assert_eq!(
state.display_status("c"),
"resolve pending",
"auto-resumable dirty base must stay as resolve pending"
);
}
#[test]
fn test_manual_resolve_blocked_by_uncommitted_changes_stays_merge_wait() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "c".to_string(),
reason: "base dirty".to_string(),
auto_resumable: false,
});
assert_eq!(state.display_status("c"), "merge wait");
state.apply_command(ReducerCommand::ResolveMerge("c".to_string()));
assert_eq!(state.display_status("c"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "c".to_string(),
error: "Base is dirty: Working tree has uncommitted changes".to_string(),
});
assert_eq!(
state.display_status("c"),
"merge wait",
"uncommitted-changes dirty base must revert to merge wait"
);
}
#[test]
fn test_auto_resumable_deferred_resolve_auto_retries_after_preceding_completes() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["a".to_string(), "b".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "a".to_string(),
command: "resolve a".to_string(),
});
assert_eq!(state.display_status("a"), "resolving");
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "b".to_string(),
reason: "Merge in progress (MERGE_HEAD exists)".to_string(),
auto_resumable: true,
});
assert_eq!(state.display_status("b"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::ResolveCompleted {
change_id: "a".to_string(),
worktree_change_ids: None,
});
assert_eq!(state.display_status("a"), "merged");
state.apply_execution_event(&ExecutionEvent::MergeCompleted {
change_id: "b".to_string(),
revision: "rev-b".to_string(),
});
assert_eq!(state.display_status("b"), "merged");
assert!(state.is_terminal_change("b"));
}
#[test]
fn test_add_to_queue_retries_error_terminal() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ProcessingError {
id: "c".to_string(),
error: "apply failed".to_string(),
});
assert_eq!(state.display_status("c"), "error");
assert!(state.is_terminal_change("c"));
let outcome = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(
matches!(outcome, ReduceOutcome::Changed(_)),
"AddToQueue on error change must be Changed, not NoOp"
);
assert_eq!(
state.display_status("c"),
"queued",
"after retry, change must display as queued"
);
assert!(
!state.is_terminal_change("c"),
"error terminal must be cleared by AddToQueue"
);
}
#[test]
fn test_dequeue_change_resets_to_not_queued_idle() {
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
let outcome = state.apply_command(ReducerCommand::DequeueChange("c".to_string()));
assert!(matches!(outcome, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "not queued");
assert!(!state.is_terminal_change("c"));
let outcome2 = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(matches!(outcome2, ReduceOutcome::Changed(_)));
assert_eq!(state.display_status("c"), "queued");
}
#[test]
fn test_add_to_queue_noop_on_archived() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::ChangeArchived("c".to_string()));
assert_eq!(state.display_status("c"), "archived");
let outcome = state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert!(
matches!(outcome, ReduceOutcome::NoOp),
"AddToQueue on archived change must be NoOp"
);
assert_eq!(state.display_status("c"), "archived");
}
#[test]
fn test_dependency_resolved_restores_queued_after_block() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert_eq!(state.display_status("c"), "queued");
state.apply_execution_event(&ExecutionEvent::DependencyBlocked {
change_id: "c".to_string(),
dependency_ids: vec!["dep".to_string()],
});
assert_eq!(state.display_status("c"), "blocked");
state.apply_execution_event(&ExecutionEvent::DependencyResolved {
change_id: "c".to_string(),
});
assert_eq!(
state.display_status("c"),
"queued",
"DependencyResolved must restore queued (not not-queued)"
);
}
#[test]
fn test_dependency_blocked_and_resolved_preserve_queue_intent_until_user_dequeue() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
state.apply_execution_event(&ExecutionEvent::DependencyBlocked {
change_id: "c".to_string(),
dependency_ids: vec!["dep-a".to_string()],
});
assert_eq!(state.display_status("c"), "blocked");
state.apply_execution_event(&ExecutionEvent::DependencyResolved {
change_id: "c".to_string(),
});
assert_eq!(state.display_status("c"), "queued");
state.apply_command(ReducerCommand::RemoveFromQueue("c".to_string()));
assert_eq!(
state.display_status("c"),
"not queued",
"queue intent should only clear on explicit dequeue command"
);
}
#[test]
fn test_changes_refreshed_preserves_queue_intent() {
use crate::events::ExecutionEvent;
use std::collections::{HashMap, HashSet};
let mut state = OrchestratorState::new(vec!["c".to_string()], 0);
state.apply_command(ReducerCommand::AddToQueue("c".to_string()));
assert_eq!(state.display_status("c"), "queued");
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
});
assert_eq!(
state.display_status("c"),
"queued",
"ChangesRefreshed must not overwrite queue_intent = Queued"
);
}
#[test]
fn test_fast_forward_merged_survives_archived_observation() {
use crate::events::ExecutionEvent;
let mut state = OrchestratorState::new(vec!["ff".to_string()], 0);
state.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "ff".to_string(),
reason: "base dirty".to_string(),
auto_resumable: false,
});
assert_eq!(state.display_status("ff"), "merge wait");
state.apply_command(ReducerCommand::ResolveMerge("ff".to_string()));
assert_eq!(state.display_status("ff"), "resolve pending");
state.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "ff".to_string(),
command: "resolve-cmd".to_string(),
});
assert_eq!(state.display_status("ff"), "resolving");
state.apply_execution_event(&ExecutionEvent::ResolveCompleted {
change_id: "ff".to_string(),
worktree_change_ids: None,
});
assert_eq!(state.display_status("ff"), "merged");
state.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![],
committed_change_ids: Default::default(),
uncommitted_file_change_ids: Default::default(),
worktree_change_ids: Default::default(),
worktree_paths: Default::default(),
worktree_not_ahead_ids: Default::default(),
merge_wait_ids: ["ff".to_string()].into_iter().collect(),
});
assert_eq!(
state.display_status("ff"),
"merged",
"Terminal Merged from fast-forward resolve must not regress to merge wait"
);
}
}