use crate::events::{EventSink, ExecutionEvent, LogEntry};
use crate::openspec::Change;
use crate::tui::types::WorktreeInfo;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{broadcast, mpsc, Mutex, RwLock};
#[cfg(feature = "web-monitoring")]
use utoipa::ToSchema;
#[derive(Debug, Clone)]
pub enum ControlCommand {
Start,
Stop,
CancelStop,
ForceStop,
Retry,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "web-monitoring", derive(ToSchema))]
pub struct StateUpdate {
#[serde(rename = "type")]
pub msg_type: String,
pub timestamp: String,
pub changes: Vec<ChangeStatus>,
#[serde(skip_serializing_if = "Option::is_none")]
pub logs: Option<Vec<LogEntry>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub worktrees: Option<Vec<WorktreeInfo>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub app_mode: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[cfg_attr(feature = "web-monitoring", derive(ToSchema))]
pub struct ChangeStatus {
pub id: String,
pub completed_tasks: u32,
pub total_tasks: u32,
pub progress_percent: f32,
pub status: String,
pub dependencies: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub queue_status: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub iteration_number: Option<u32>,
}
impl From<&Change> for ChangeStatus {
fn from(change: &Change) -> Self {
let status = if change.is_complete() {
"complete"
} else if change.completed_tasks > 0 {
"in_progress"
} else {
"pending"
};
Self {
id: change.id.clone(),
completed_tasks: change.completed_tasks,
total_tasks: change.total_tasks,
progress_percent: change.progress_percent(),
status: status.to_string(),
dependencies: change.dependencies.clone(),
queue_status: None, iteration_number: None, }
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "web-monitoring", derive(ToSchema))]
pub struct OrchestratorStateSnapshot {
pub changes: Vec<ChangeStatus>,
pub total_changes: usize,
pub completed_changes: usize,
pub in_progress_changes: usize,
pub pending_changes: usize,
pub last_updated: String,
pub logs: Vec<LogEntry>,
pub worktrees: Vec<WorktreeInfo>,
pub app_mode: String,
pub is_resolving: bool,
}
impl OrchestratorStateSnapshot {
pub fn from_changes(changes: &[Change]) -> Self {
Self::from_changes_with_shared_state(changes, None)
}
pub fn from_changes_with_shared_state(
changes: &[Change],
shared_state: Option<&crate::orchestration::state::OrchestratorState>,
) -> Self {
let mut change_statuses: Vec<ChangeStatus> =
changes.iter().map(ChangeStatus::from).collect();
if let Some(shared) = shared_state {
for status in &mut change_statuses {
let display = shared.display_status(&status.id);
if display != "not queued" {
status.queue_status = Some(display.to_string());
}
let apply_count = shared.apply_count(&status.id);
if apply_count > 0 {
status.iteration_number = Some(apply_count);
}
}
}
let completed = change_statuses
.iter()
.filter(|c| {
c.queue_status
.as_ref()
.is_some_and(|s| s == "archived" || s == "merged")
})
.count();
let in_progress = change_statuses
.iter()
.filter(|c| {
c.queue_status.as_ref().is_some_and(|s| {
s == "applying" || s == "accepting" || s == "archiving" || s == "resolving"
})
})
.count();
let pending = change_statuses
.iter()
.filter(|c| c.queue_status.as_ref().is_some_and(|s| s == "queued"))
.count();
Self {
total_changes: change_statuses.len(),
completed_changes: completed,
in_progress_changes: in_progress,
pending_changes: pending,
changes: change_statuses,
last_updated: chrono::Utc::now().to_rfc3339(),
logs: Vec::new(),
worktrees: Vec::new(),
app_mode: "select".to_string(),
is_resolving: false,
}
}
}
fn progress_percent(completed: u32, total: u32) -> f32 {
if total == 0 {
0.0
} else {
(completed as f32 / total as f32) * 100.0
}
}
fn status_from_progress(completed: u32, total: u32) -> &'static str {
if total > 0 && completed >= total {
"complete"
} else if completed > 0 {
"in_progress"
} else {
"pending"
}
}
fn apply_reducer_derived_queue_statuses(
state: &mut OrchestratorStateSnapshot,
shared: &crate::orchestration::state::OrchestratorState,
) {
for change in &mut state.changes {
let display = shared.display_status(&change.id);
change.queue_status = if display == "not queued" {
None
} else {
Some(display.to_string())
};
let apply_count = shared.apply_count(&change.id);
if apply_count > 0 {
change.iteration_number = Some(apply_count);
}
}
}
fn refresh_summary(state: &mut OrchestratorStateSnapshot) {
state.total_changes = state.changes.len();
state.completed_changes = state
.changes
.iter()
.filter(|change| {
change
.queue_status
.as_ref()
.is_some_and(|s| s == "archived" || s == "merged")
})
.count();
state.in_progress_changes = state
.changes
.iter()
.filter(|change| {
change.queue_status.as_ref().is_some_and(|s| {
s == "applying" || s == "accepting" || s == "archiving" || s == "resolving"
})
})
.count();
state.pending_changes = state
.changes
.iter()
.filter(|change| change.queue_status.as_ref().is_some_and(|s| s == "queued"))
.count();
state.last_updated = chrono::Utc::now().to_rfc3339();
}
pub struct WebEventSink {
web_state: Arc<WebState>,
}
impl WebEventSink {
pub fn new(web_state: Arc<WebState>) -> Self {
Self { web_state }
}
}
#[async_trait]
impl EventSink for WebEventSink {
async fn on_event(&self, event: &ExecutionEvent) {
self.web_state.apply_execution_event(event).await;
}
async fn on_state_changed(&self, _state: &crate::orchestration::state::OrchestratorState) {}
}
pub struct WebState {
state: RwLock<OrchestratorStateSnapshot>,
tx: broadcast::Sender<StateUpdate>,
control_tx: Mutex<Option<mpsc::UnboundedSender<ControlCommand>>>,
shared_orchestrator_state: tokio::sync::RwLock<
Option<std::sync::Arc<tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>>>,
>,
}
impl WebState {
pub fn new(initial_changes: &[Change]) -> Self {
let (tx, _) = broadcast::channel(100);
let state = OrchestratorStateSnapshot::from_changes(initial_changes);
Self {
state: RwLock::new(state),
tx,
control_tx: Mutex::new(None),
shared_orchestrator_state: tokio::sync::RwLock::new(None),
}
}
pub async fn set_control_channel(&self, control_tx: mpsc::UnboundedSender<ControlCommand>) {
*self.control_tx.lock().await = Some(control_tx);
}
pub fn send_control_command(
&self,
command: ControlCommand,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let control_tx_guard = self
.control_tx
.try_lock()
.map_err(|_| "Control channel lock contention")?;
if let Some(tx) = control_tx_guard.as_ref() {
tx.send(command)
.map_err(|e| format!("Failed to send control command: {}", e))?;
Ok(())
} else {
Err("Control channel not initialized".into())
}
}
pub async fn set_shared_state(
&self,
shared_state: std::sync::Arc<
tokio::sync::RwLock<crate::orchestration::state::OrchestratorState>,
>,
) {
*self.shared_orchestrator_state.write().await = Some(shared_state);
}
pub async fn get_state(&self) -> OrchestratorStateSnapshot {
self.state.read().await.clone()
}
pub async fn update(&self, changes: &[Change]) {
let shared_state_opt = self.shared_orchestrator_state.read().await;
let shared_state_data = if let Some(ref shared_arc) = *shared_state_opt {
shared_arc.try_read().ok()
} else {
None
};
let mut new_state = OrchestratorStateSnapshot::from_changes_with_shared_state(
changes,
shared_state_data.as_deref(),
);
drop(shared_state_data); drop(shared_state_opt);
let (old_changes, old_app_mode, old_is_resolving) = {
let old_state = self.state.read().await;
(
old_state.changes.clone(),
old_state.app_mode.clone(),
old_state.is_resolving,
)
};
new_state.app_mode = old_app_mode.clone();
new_state.is_resolving = old_is_resolving;
for new_change in &mut new_state.changes {
if let Some(existing) = old_changes.iter().find(|c| c.id == new_change.id) {
if new_change.queue_status.is_none() {
new_change.queue_status = existing.queue_status.clone();
}
if new_change.iteration_number.is_none() {
new_change.iteration_number = existing.iteration_number;
}
if new_change.total_tasks == 0
&& (existing.completed_tasks > 0 || existing.total_tasks > 0)
{
new_change.completed_tasks = existing.completed_tasks;
new_change.total_tasks = existing.total_tasks;
new_change.progress_percent = existing.progress_percent;
new_change.status = existing.status.clone();
}
}
}
let has_changes = !self
.compute_diff(&old_changes, &new_state.changes)
.is_empty();
{
let mut state = self.state.write().await;
*state = new_state.clone();
}
if has_changes {
self.broadcast_snapshot(new_state.changes).await;
}
}
pub async fn update_with_mode(&self, changes: &[Change], app_mode: &str) {
let shared_state_opt = self.shared_orchestrator_state.read().await;
let shared_state_data = if let Some(ref shared_arc) = *shared_state_opt {
shared_arc.try_read().ok()
} else {
None
};
let mut new_state = OrchestratorStateSnapshot::from_changes_with_shared_state(
changes,
shared_state_data.as_deref(),
);
drop(shared_state_data); drop(shared_state_opt);
new_state.app_mode = app_mode.to_string();
let (old_changes, old_app_mode, old_is_resolving) = {
let old_state = self.state.read().await;
(
old_state.changes.clone(),
old_state.app_mode.clone(),
old_state.is_resolving,
)
};
new_state.is_resolving = old_is_resolving;
for new_change in &mut new_state.changes {
if let Some(existing) = old_changes.iter().find(|c| c.id == new_change.id) {
new_change.queue_status = existing.queue_status.clone();
new_change.iteration_number = existing.iteration_number;
if new_change.total_tasks == 0
&& (existing.completed_tasks > 0 || existing.total_tasks > 0)
{
new_change.completed_tasks = existing.completed_tasks;
new_change.total_tasks = existing.total_tasks;
new_change.progress_percent = existing.progress_percent;
new_change.status = existing.status.clone();
}
}
}
let has_changes = !self
.compute_diff(&old_changes, &new_state.changes)
.is_empty();
let app_mode_changed = new_state.app_mode != old_app_mode;
{
let mut state = self.state.write().await;
*state = new_state.clone();
}
if has_changes || app_mode_changed {
self.broadcast_snapshot(new_state.changes).await;
}
}
pub async fn apply_execution_event(&self, event: &ExecutionEvent) {
let mut broadcast_update = None;
{
let mut state = self.state.write().await;
let mut updated = false;
let mut log_broadcast = None;
let mut worktree_broadcast = None;
let mut mode_broadcast = None;
match event {
ExecutionEvent::ProcessingStarted(change_id) => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.status = "in_progress".to_string();
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
state.app_mode = "running".to_string();
mode_broadcast = Some("running".to_string());
}
ExecutionEvent::ProcessingCompleted(change_id) => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
if change.completed_tasks < change.total_tasks {
change.completed_tasks = change.total_tasks;
}
change.status = "complete".to_string();
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ProcessingError { id, error: _ } => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *id) {
change.status = "error".to_string();
updated = true;
}
state.app_mode = "error".to_string();
mode_broadcast = Some("error".to_string());
}
ExecutionEvent::ApplyOutput {
change_id,
iteration,
..
} => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
if let Some(iter) = iteration {
change.iteration_number = Some(*iter);
updated = true;
}
}
}
ExecutionEvent::AcceptanceStarted { change_id, .. } => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::AcceptanceCompleted { change_id } => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ArchiveStarted {
change_id,
command: _,
} => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ChangeArchived(change_id) => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ArchiveOutput { change_id, .. } => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ProgressUpdated {
change_id,
completed,
total,
} => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
if *total > 0 {
change.completed_tasks = *completed;
change.total_tasks = *total;
change.progress_percent = progress_percent(*completed, *total);
change.status = status_from_progress(*completed, *total).to_string();
updated = true;
}
}
}
ExecutionEvent::MergeCompleted { change_id, .. } => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.status = "complete".to_string();
updated = true;
}
}
ExecutionEvent::ResolveStarted {
change_id,
command: _,
} => {
state.is_resolving = true;
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ResolveCompleted { change_id, .. } => {
state.is_resolving = false;
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::ResolveFailed {
change_id,
error: _,
} => {
state.is_resolving = false;
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.status = "error".to_string();
updated = true;
}
}
ExecutionEvent::MergeDeferred {
change_id,
reason: _,
auto_resumable,
} => {
let is_resolving = state.is_resolving;
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
if is_resolving || *auto_resumable {
change.status = "in_progress".to_string();
}
updated = true;
}
}
ExecutionEvent::Log(log_entry) => {
state.logs.push(log_entry.clone());
let logs_len = state.logs.len();
if logs_len > 1000 {
state.logs.drain(0..(logs_len - 1000));
}
log_broadcast = Some(vec![log_entry.clone()]);
}
ExecutionEvent::ChangesRefreshed {
changes,
committed_change_ids: _,
uncommitted_file_change_ids: _,
worktree_change_ids: _,
worktree_paths: _,
worktree_not_ahead_ids: _,
merge_wait_ids: _,
} => {
let mut new_change_statuses: Vec<ChangeStatus> =
changes.iter().map(ChangeStatus::from).collect();
for new_change in &mut new_change_statuses {
if let Some(existing) = state.changes.iter().find(|c| c.id == new_change.id)
{
new_change.iteration_number = existing.iteration_number;
if new_change.total_tasks == 0
&& (existing.completed_tasks > 0 || existing.total_tasks > 0)
{
new_change.completed_tasks = existing.completed_tasks;
new_change.total_tasks = existing.total_tasks;
new_change.progress_percent = existing.progress_percent;
new_change.status = existing.status.clone();
}
}
}
state.changes = new_change_statuses;
refresh_summary(&mut state);
updated = true;
}
ExecutionEvent::WorktreesRefreshed { worktrees } => {
state.worktrees = worktrees.clone();
worktree_broadcast = Some(worktrees.clone());
}
ExecutionEvent::DependencyBlocked {
change_id,
dependency_ids: _,
} => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.status = "pending".to_string();
updated = true;
}
}
ExecutionEvent::DependencyResolved { change_id } => {
if let Some(change) = state.changes.iter_mut().find(|c| c.id == *change_id) {
change.progress_percent =
progress_percent(change.completed_tasks, change.total_tasks);
updated = true;
}
}
ExecutionEvent::Stopping => {
state.app_mode = "stopping".to_string();
mode_broadcast = Some("stopping".to_string());
}
ExecutionEvent::Stopped => {
state.app_mode = "stopped".to_string();
mode_broadcast = Some("stopped".to_string());
}
ExecutionEvent::AllCompleted => {
state.app_mode = "select".to_string();
mode_broadcast = Some("select".to_string());
}
ExecutionEvent::Error { message } => {
state.app_mode = "error".to_string();
mode_broadcast = Some("error".to_string());
log_broadcast = Some(vec![LogEntry::error(message.clone())]);
}
_ => {}
}
if updated {
if let Ok(shared_state_opt) = self.shared_orchestrator_state.try_read() {
if let Some(shared_arc) = shared_state_opt.as_ref() {
if let Ok(shared) = shared_arc.try_read() {
apply_reducer_derived_queue_statuses(&mut state, &shared);
}
}
}
refresh_summary(&mut state);
}
if updated
|| log_broadcast.is_some()
|| worktree_broadcast.is_some()
|| mode_broadcast.is_some()
{
broadcast_update = Some(StateUpdate {
msg_type: "state_update".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
changes: state.changes.clone(),
logs: log_broadcast,
worktrees: worktree_broadcast,
app_mode: mode_broadcast,
});
}
}
if let Some(update) = broadcast_update {
let _ = self.tx.send(update);
}
}
async fn broadcast_snapshot(&self, changes: Vec<ChangeStatus>) {
let (current_app_mode, current_worktrees) = {
let state = self.state.read().await;
(state.app_mode.clone(), state.worktrees.clone())
};
let update = StateUpdate {
msg_type: "state_update".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
changes,
logs: None,
worktrees: Some(current_worktrees),
app_mode: Some(current_app_mode),
};
let _ = self.tx.send(update);
}
fn compute_diff(&self, old: &[ChangeStatus], new: &[ChangeStatus]) -> Vec<ChangeStatus> {
let mut diff = Vec::new();
for new_change in new {
let old_change = old.iter().find(|c| c.id == new_change.id);
match old_change {
Some(old) if old != new_change => {
diff.push(new_change.clone());
}
None => {
diff.push(new_change.clone());
}
_ => {
}
}
}
for old_change in old {
if !new.iter().any(|c| c.id == old_change.id) {
let mut archived = old_change.clone();
archived.status = "archived".to_string();
diff.push(archived);
}
}
diff
}
pub async fn refresh_from_disk(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
use crate::openspec;
let repo_root =
std::env::current_dir().map_err(|e| format!("Failed to resolve repo root: {}", e))?;
let mut changes = openspec::list_changes_native()
.map_err(|e| format!("Failed to refresh changes from disk: {}", e))?;
for change in &mut changes {
let worktree_path =
match crate::vcs::git::get_worktree_path_for_change(&repo_root, &change.id).await {
Ok(Some(wt_path)) => Some(wt_path),
Ok(None) => None,
Err(e) => {
tracing::debug!("Failed to get worktree path for {}: {}", change.id, e);
None
}
};
match crate::task_parser::parse_progress_with_fallback(
&change.id,
worktree_path.as_deref(),
) {
Ok(progress) => {
change.completed_tasks = progress.completed;
change.total_tasks = progress.total;
}
Err(e) => {
tracing::debug!("Failed to read progress for {}: {}", change.id, e);
}
}
}
let worktrees = match crate::worktree_ops::get_worktrees(&repo_root).await {
Ok(wts) => wts,
Err(e) => {
tracing::debug!("Failed to retrieve worktrees: {}", e);
Vec::new()
}
};
let current_app_mode = {
let state = self.state.read().await;
state.app_mode.clone()
};
self.update_with_mode(&changes, ¤t_app_mode).await;
let worktrees_changed = {
let mut state = self.state.write().await;
let changed = state.worktrees != worktrees;
state.worktrees = worktrees.clone();
changed
};
if worktrees_changed {
let update = StateUpdate {
msg_type: "state_update".to_string(),
timestamp: chrono::Utc::now().to_rfc3339(),
changes: self.state.read().await.changes.clone(),
logs: None,
worktrees: Some(worktrees),
app_mode: None,
};
let _ = self.tx.send(update);
}
Ok(())
}
pub fn subscribe(&self) -> broadcast::Receiver<StateUpdate> {
self.tx.subscribe()
}
pub async fn get_change(&self, id: &str) -> Option<ChangeStatus> {
let state = self.state.read().await;
state.changes.iter().find(|c| c.id == id).cloned()
}
pub async fn list_changes(&self) -> Vec<ChangeStatus> {
self.state.read().await.changes.clone()
}
}
impl Default for WebState {
fn default() -> Self {
Self::new(&[])
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::openspec::ProposalMetadata;
fn create_test_change(id: &str, completed: u32, total: u32) -> Change {
Change {
id: id.to_string(),
completed_tasks: completed,
total_tasks: total,
last_modified: "1m ago".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}
}
#[test]
fn test_change_status_from_change() {
let change = create_test_change("test-change", 3, 5);
let status = ChangeStatus::from(&change);
assert_eq!(status.id, "test-change");
assert_eq!(status.completed_tasks, 3);
assert_eq!(status.total_tasks, 5);
assert!((status.progress_percent - 60.0).abs() < 0.01);
assert_eq!(status.status, "in_progress");
}
#[test]
fn test_change_status_pending() {
let change = create_test_change("pending-change", 0, 5);
let status = ChangeStatus::from(&change);
assert_eq!(status.status, "pending");
}
#[test]
fn test_change_status_complete() {
let change = create_test_change("complete-change", 5, 5);
let status = ChangeStatus::from(&change);
assert_eq!(status.status, "complete");
}
#[test]
fn test_orchestrator_state_snapshot_from_changes() {
let changes = vec![
create_test_change("change-a", 0, 3),
create_test_change("change-b", 2, 5),
create_test_change("change-c", 4, 4),
];
let mut state = OrchestratorStateSnapshot::from_changes(&changes);
assert_eq!(state.total_changes, 3);
assert_eq!(state.pending_changes, 0);
assert_eq!(state.in_progress_changes, 0);
assert_eq!(state.completed_changes, 0);
state.changes[0].queue_status = Some("queued".to_string());
state.changes[1].queue_status = Some("applying".to_string());
state.changes[2].queue_status = Some("archived".to_string());
refresh_summary(&mut state);
assert_eq!(state.pending_changes, 1);
assert_eq!(state.in_progress_changes, 1);
assert_eq!(state.completed_changes, 1);
}
#[tokio::test]
async fn test_web_state_get_state() {
let changes = vec![create_test_change("test", 1, 3)];
let web_state = WebState::new(&changes);
let state = web_state.get_state().await;
assert_eq!(state.total_changes, 1);
assert_eq!(state.changes[0].id, "test");
}
#[tokio::test]
async fn test_web_state_update() {
let web_state = WebState::new(&[]);
let mut rx = web_state.subscribe();
let changes = vec![create_test_change("new-change", 2, 4)];
web_state.update(&changes).await;
let state = web_state.get_state().await;
assert_eq!(state.total_changes, 1);
assert_eq!(state.changes[0].id, "new-change");
let update = rx.try_recv().unwrap();
assert_eq!(update.msg_type, "state_update");
assert_eq!(update.changes[0].id, "new-change");
}
#[tokio::test]
async fn test_apply_execution_event_processing_started_sets_in_progress() {
let changes = vec![create_test_change("change-a", 0, 3)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ProcessingStarted("change-a".to_string()))
.await;
let state = web_state.get_state().await;
assert_eq!(state.changes[0].status, "in_progress");
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_apply_execution_event_acceptance_started() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::AcceptanceStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_apply_execution_event_acceptance_completed() {
let changes = vec![create_test_change("change-a", 10, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::AcceptanceCompleted {
change_id: "change-a".to_string(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_apply_execution_event_progress_updated_updates_counts() {
let changes = vec![create_test_change("change-a", 0, 3)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ProgressUpdated {
change_id: "change-a".to_string(),
completed: 2,
total: 4,
})
.await;
let state = web_state.get_state().await;
let change = &state.changes[0];
assert_eq!(change.completed_tasks, 2);
assert_eq!(change.total_tasks, 4);
assert!((change.progress_percent - 50.0).abs() < 0.01);
assert_eq!(change.status, "in_progress");
}
#[tokio::test]
async fn test_web_state_get_change() {
let changes = vec![
create_test_change("change-a", 1, 3),
create_test_change("change-b", 2, 5),
];
let web_state = WebState::new(&changes);
let change = web_state.get_change("change-b").await;
assert!(change.is_some());
assert_eq!(change.unwrap().id, "change-b");
let missing = web_state.get_change("nonexistent").await;
assert!(missing.is_none());
}
#[tokio::test]
async fn test_compute_diff_no_changes() {
let changes = vec![create_test_change("change-a", 2, 5)];
let web_state = WebState::new(&changes);
let mut rx = web_state.subscribe();
web_state.update(&changes).await;
assert!(rx.try_recv().is_err());
}
#[tokio::test]
async fn test_compute_diff_progress_update() {
let initial = vec![
create_test_change("change-a", 2, 5),
create_test_change("change-b", 1, 5),
];
let web_state = WebState::new(&initial);
let mut rx = web_state.subscribe();
let updated = vec![
create_test_change("change-a", 3, 5),
create_test_change("change-b", 1, 5),
];
web_state.update(&updated).await;
let update = rx.try_recv().unwrap();
assert_eq!(update.changes.len(), 2);
assert!(update.changes.iter().any(|change| change.id == "change-a"));
assert!(update.changes.iter().any(|change| change.id == "change-b"));
let updated_change = update
.changes
.iter()
.find(|change| change.id == "change-a")
.unwrap();
assert_eq!(updated_change.completed_tasks, 3);
}
#[tokio::test]
async fn test_compute_diff_archived_change() {
let initial = vec![
create_test_change("change-a", 2, 5),
create_test_change("change-b", 3, 5),
];
let web_state = WebState::new(&initial);
let mut rx = web_state.subscribe();
let updated = vec![create_test_change("change-a", 2, 5)];
web_state.update(&updated).await;
let update = rx.try_recv().unwrap();
assert_eq!(update.changes.len(), 1);
assert_eq!(update.changes[0].id, "change-a");
assert_eq!(update.changes[0].status, "in_progress");
}
#[tokio::test]
async fn test_compute_diff_new_change() {
let initial = vec![create_test_change("change-a", 2, 5)];
let web_state = WebState::new(&initial);
let mut rx = web_state.subscribe();
let updated = vec![
create_test_change("change-a", 2, 5),
create_test_change("change-b", 0, 3),
];
web_state.update(&updated).await;
let update = rx.try_recv().unwrap();
assert_eq!(update.changes.len(), 2);
assert!(update.changes.iter().any(|change| change.id == "change-a"));
assert!(update.changes.iter().any(|change| change.id == "change-b"));
}
#[tokio::test]
async fn test_progress_updated_zero_preserves_existing_progress() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ProgressUpdated {
change_id: "change-a".to_string(),
completed: 0,
total: 0,
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 5,
"completed_tasks should be preserved on 0/0"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on 0/0"
);
}
#[tokio::test]
async fn test_progress_updated_valid_updates_progress() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ProgressUpdated {
change_id: "change-a".to_string(),
completed: 8,
total: 12,
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 8,
"completed_tasks should be updated with valid data"
);
assert_eq!(
state.changes[0].total_tasks, 12,
"total_tasks should be updated with valid data"
);
}
#[tokio::test]
async fn test_update_method_preserves_progress_on_zero() {
let initial = vec![create_test_change("change-a", 7, 10)];
let web_state = WebState::new(&initial);
let updated = vec![Change {
id: "change-a".to_string(),
completed_tasks: 0,
total_tasks: 0,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}];
web_state.update(&updated).await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 7,
"completed_tasks should be preserved on update with 0/0"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on update with 0/0"
);
}
#[tokio::test]
async fn test_update_method_updates_progress_with_valid_data() {
let initial = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&initial);
let updated = vec![create_test_change("change-a", 9, 12)];
web_state.update(&updated).await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 9,
"completed_tasks should be updated with valid data"
);
assert_eq!(
state.changes[0].total_tasks, 12,
"total_tasks should be updated with valid data"
);
}
#[tokio::test]
async fn test_changes_refreshed_preserves_progress_on_zero() {
let initial = vec![create_test_change("change-a", 7, 10)];
let web_state = WebState::new(&initial);
web_state
.apply_execution_event(&ExecutionEvent::ProcessingStarted("change-a".to_string()))
.await;
use std::collections::{HashMap, HashSet};
web_state
.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![Change {
id: "change-a".to_string(),
completed_tasks: 0,
total_tasks: 0,
last_modified: "now".to_string(),
dependencies: Vec::new(),
metadata: ProposalMetadata::default(),
}],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 7,
"completed_tasks should be preserved on ChangesRefreshed with 0/0"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on ChangesRefreshed with 0/0"
);
}
#[tokio::test]
async fn test_changes_refreshed_updates_progress_with_valid_data() {
let initial = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&initial);
use std::collections::{HashMap, HashSet};
web_state
.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![create_test_change("change-a", 9, 12)],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 9,
"completed_tasks should be updated with valid data"
);
assert_eq!(
state.changes[0].total_tasks, 12,
"total_tasks should be updated with valid data"
);
}
#[tokio::test]
async fn test_archive_started_preserves_progress_when_zero() {
let initial = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&initial);
web_state
.apply_execution_event(&ExecutionEvent::ArchiveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 5,
"completed_tasks should be preserved during archiving"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved during archiving"
);
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_progress_updated_preserves_existing_during_archiving() {
let initial = vec![create_test_change("change-a", 7, 10)];
let web_state = WebState::new(&initial);
web_state
.apply_execution_event(&ExecutionEvent::ArchiveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
web_state
.apply_execution_event(&ExecutionEvent::ProgressUpdated {
change_id: "change-a".to_string(),
completed: 0,
total: 0,
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 7,
"completed_tasks should be preserved on 0/0 update during archiving"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on 0/0 update during archiving"
);
}
#[tokio::test]
async fn test_progress_updated_preserves_existing_during_resolving() {
let initial = vec![create_test_change("change-a", 8, 10)];
let web_state = WebState::new(&initial);
web_state
.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "test resolve command".to_string(),
})
.await;
web_state
.apply_execution_event(&ExecutionEvent::ProgressUpdated {
change_id: "change-a".to_string(),
completed: 0,
total: 0,
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 8,
"completed_tasks should be preserved on 0/0 update during resolving"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on 0/0 update during resolving"
);
}
#[tokio::test]
async fn test_changes_refreshed_preserves_progress_during_archiving() {
let initial = vec![create_test_change("change-a", 6, 10)];
let web_state = WebState::new(&initial);
web_state
.apply_execution_event(&ExecutionEvent::ArchiveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
use std::collections::{HashMap, HashSet};
web_state
.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![create_test_change("change-a", 0, 0)],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 6,
"completed_tasks should be preserved on ChangesRefreshed with 0/0 during archiving"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on ChangesRefreshed with 0/0 during archiving"
);
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_changes_refreshed_preserves_progress_during_resolving() {
let initial = vec![create_test_change("change-a", 9, 10)];
let web_state = WebState::new(&initial);
web_state
.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "test resolve command".to_string(),
})
.await;
use std::collections::{HashMap, HashSet};
web_state
.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![create_test_change("change-a", 0, 0)],
committed_change_ids: HashSet::new(),
uncommitted_file_change_ids: HashSet::new(),
worktree_change_ids: HashSet::new(),
worktree_paths: HashMap::new(),
worktree_not_ahead_ids: HashSet::new(),
merge_wait_ids: HashSet::new(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].completed_tasks, 9,
"completed_tasks should be preserved on ChangesRefreshed with 0/0 during resolving"
);
assert_eq!(
state.changes[0].total_tasks, 10,
"total_tasks should be preserved on ChangesRefreshed with 0/0 during resolving"
);
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_merge_deferred_during_resolve_sets_resolve_pending() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
let state = web_state.get_state().await;
assert!(state.is_resolving, "is_resolving should be true");
web_state
.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "change-a".to_string(),
reason: "test reason".to_string(),
auto_resumable: true,
})
.await;
let state = web_state.get_state().await;
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_merge_deferred_not_resolving_sets_merge_wait() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "change-a".to_string(),
reason: "test reason".to_string(),
auto_resumable: false,
})
.await;
let state = web_state.get_state().await;
assert_eq!(state.changes[0].queue_status, None);
assert!(!state.is_resolving, "is_resolving should be false");
}
#[tokio::test]
async fn test_resolve_started_sets_is_resolving() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
let state = web_state.get_state().await;
assert!(state.is_resolving, "is_resolving should be true");
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_resolve_completed_clears_is_resolving() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
web_state
.apply_execution_event(&ExecutionEvent::ResolveCompleted {
change_id: "change-a".to_string(),
worktree_change_ids: None,
})
.await;
let state = web_state.get_state().await;
assert!(!state.is_resolving, "is_resolving should be false");
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_resolve_failed_clears_is_resolving() {
let changes = vec![create_test_change("change-a", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::ResolveStarted {
change_id: "change-a".to_string(),
command: "test command".to_string(),
})
.await;
web_state
.apply_execution_event(&ExecutionEvent::ResolveFailed {
change_id: "change-a".to_string(),
error: "test error".to_string(),
})
.await;
let state = web_state.get_state().await;
assert!(!state.is_resolving, "is_resolving should be false");
assert_eq!(state.changes[0].queue_status, None);
}
#[tokio::test]
async fn test_auto_resumable_merge_deferred_without_resolve_shows_resolve_pending() {
let changes = vec![create_test_change("change-b", 5, 10)];
let web_state = WebState::new(&changes);
web_state
.apply_execution_event(&ExecutionEvent::MergeDeferred {
change_id: "change-b".to_string(),
reason: "Merge in progress (MERGE_HEAD exists)".to_string(),
auto_resumable: true,
})
.await;
let state = web_state.get_state().await;
assert_eq!(state.changes[0].queue_status, None);
}
#[test]
fn test_web_snapshot_uses_reducer_display_status_without_payload_change() {
use crate::orchestration::state::{OrchestratorState, ReducerCommand};
let mut shared = OrchestratorState::new(
vec![
"ch-queued".to_string(),
"ch-notqueued".to_string(),
"ch-archived".to_string(),
],
0,
);
let changes = vec![
create_test_change("ch-queued", 0, 3),
create_test_change("ch-notqueued", 0, 3),
create_test_change("ch-archived", 3, 3),
];
shared.apply_command(ReducerCommand::AddToQueue("ch-queued".to_string()));
shared.apply_command(ReducerCommand::AddToQueue("ch-archived".to_string()));
shared.apply_execution_event(&crate::events::ExecutionEvent::ChangeArchived(
"ch-archived".to_string(),
));
let snapshot =
OrchestratorStateSnapshot::from_changes_with_shared_state(&changes, Some(&shared));
let queued = snapshot
.changes
.iter()
.find(|c| c.id == "ch-queued")
.unwrap();
let notqueued = snapshot
.changes
.iter()
.find(|c| c.id == "ch-notqueued")
.unwrap();
let archived = snapshot
.changes
.iter()
.find(|c| c.id == "ch-archived")
.unwrap();
assert_eq!(queued.queue_status, Some("queued".to_string()));
assert_eq!(notqueued.queue_status, None);
assert_eq!(archived.queue_status, Some("archived".to_string()));
}
#[tokio::test]
async fn test_changes_refreshed_reactivated_change_clears_rejected_queue_status() {
use crate::events::ExecutionEvent;
use crate::orchestration::state::OrchestratorState;
use std::sync::Arc;
let changes = vec![create_test_change("change-a", 0, 1)];
let web_state = WebState::new(&changes);
let shared = Arc::new(tokio::sync::RwLock::new(OrchestratorState::new(
vec!["change-a".to_string()],
0,
)));
{
let mut guard = shared.write().await;
guard.apply_execution_event(&ExecutionEvent::ChangeRejected {
change_id: "change-a".to_string(),
reason: "blocked".to_string(),
});
assert_eq!(guard.display_status("change-a"), "rejected");
guard.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![create_test_change("change-a", 0, 1)],
committed_change_ids: std::collections::HashSet::new(),
uncommitted_file_change_ids: std::collections::HashSet::new(),
worktree_change_ids: std::collections::HashSet::new(),
worktree_paths: std::collections::HashMap::new(),
worktree_not_ahead_ids: std::collections::HashSet::new(),
merge_wait_ids: std::collections::HashSet::new(),
});
assert_eq!(guard.display_status("change-a"), "not queued");
}
web_state.set_shared_state(shared.clone()).await;
web_state
.apply_execution_event(&ExecutionEvent::ChangesRefreshed {
changes: vec![create_test_change("change-a", 0, 1)],
committed_change_ids: std::collections::HashSet::new(),
uncommitted_file_change_ids: std::collections::HashSet::new(),
worktree_change_ids: std::collections::HashSet::new(),
worktree_paths: std::collections::HashMap::new(),
worktree_not_ahead_ids: std::collections::HashSet::new(),
merge_wait_ids: std::collections::HashSet::new(),
})
.await;
let state = web_state.get_state().await;
assert_eq!(
state.changes[0].queue_status, None,
"reactivated change should not keep rejected queue_status"
);
}
#[tokio::test]
async fn test_dependency_blocked_and_resolved_converges_to_reducer_queue_status() {
use crate::orchestration::state::{OrchestratorState, ReducerCommand};
use std::sync::Arc;
use tokio::sync::RwLock;
let changes = vec![create_test_change("change-b", 0, 3)];
let web_state = WebState::new(&changes);
let mut shared = OrchestratorState::new(vec!["change-b".to_string()], 0);
shared.apply_command(ReducerCommand::AddToQueue("change-b".to_string()));
let shared = Arc::new(RwLock::new(shared));
web_state.set_shared_state(shared.clone()).await;
{
let mut guard = shared.write().await;
guard.apply_execution_event(&crate::events::ExecutionEvent::DependencyBlocked {
change_id: "change-b".to_string(),
dependency_ids: vec!["change-a".to_string()],
});
}
web_state
.apply_execution_event(&ExecutionEvent::DependencyBlocked {
change_id: "change-b".to_string(),
dependency_ids: vec!["change-a".to_string()],
})
.await;
let blocked_state = web_state.get_state().await;
assert_eq!(
blocked_state.changes[0].queue_status,
Some("blocked".to_string()),
"web state should converge to reducer-derived blocked status"
);
{
let mut guard = shared.write().await;
guard.apply_execution_event(&crate::events::ExecutionEvent::DependencyResolved {
change_id: "change-b".to_string(),
});
}
web_state
.apply_execution_event(&ExecutionEvent::DependencyResolved {
change_id: "change-b".to_string(),
})
.await;
let resolved_state = web_state.get_state().await;
assert_eq!(
resolved_state.changes[0].queue_status,
Some("queued".to_string()),
"web state should converge back to queued after dependency resolved"
);
}
}