use std::sync::OnceLock;
use async_trait::async_trait;
use chrono::{Local, Utc};
use ratatui::style::Color;
use regex::Regex;
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc;
use tracing::debug;
use crate::orchestration::state::OrchestratorState;
#[cfg(feature = "web-monitoring")]
use utoipa::ToSchema;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[cfg_attr(feature = "web-monitoring", derive(ToSchema))]
#[serde(rename_all = "lowercase")]
pub enum LogLevel {
Info,
Success,
Warn,
Error,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "web-monitoring", derive(ToSchema))]
pub struct LogEntry {
pub timestamp: String,
#[serde(with = "chrono::serde::ts_seconds")]
pub created_at: chrono::DateTime<chrono::Utc>,
pub message: String,
#[serde(skip)]
#[cfg_attr(feature = "web-monitoring", schema(ignore = true))]
pub color: Color,
pub level: LogLevel,
pub change_id: Option<String>,
pub operation: Option<String>,
pub iteration: Option<u32>,
pub workspace_path: Option<String>,
}
fn ansi_csi_regex() -> &'static Regex {
static REGEX: OnceLock<Regex> = OnceLock::new();
REGEX.get_or_init(|| Regex::new(r"\x1b\[[0-?]*[ -/]*[@-~]").expect("Invalid ANSI CSI regex"))
}
fn ansi_fragment_regex() -> &'static Regex {
static REGEX: OnceLock<Regex> = OnceLock::new();
REGEX.get_or_init(|| Regex::new(r"\[[0-9;]{1,}m").expect("Invalid ANSI fragment regex"))
}
fn sanitize_log_message(message: &str) -> String {
let without_ansi = ansi_csi_regex().replace_all(message, "");
let without_fragments = ansi_fragment_regex().replace_all(&without_ansi, "");
without_fragments
.chars()
.filter(|ch| !ch.is_control())
.collect()
}
impl LogEntry {
pub fn info(message: impl Into<String>) -> Self {
let message = message.into();
let message = sanitize_log_message(&message);
let now_local = Local::now();
let now_utc = Utc::now();
Self {
timestamp: now_local.format("%H:%M:%S").to_string(),
created_at: now_utc,
message,
color: Color::White,
level: LogLevel::Info,
change_id: None,
operation: None,
iteration: None,
workspace_path: None,
}
}
pub fn success(message: impl Into<String>) -> Self {
let message = message.into();
let message = sanitize_log_message(&message);
let now_local = Local::now();
let now_utc = Utc::now();
Self {
timestamp: now_local.format("%H:%M:%S").to_string(),
created_at: now_utc,
message,
color: Color::Green,
level: LogLevel::Success,
change_id: None,
operation: None,
iteration: None,
workspace_path: None,
}
}
pub fn warn(message: impl Into<String>) -> Self {
let message = message.into();
let message = sanitize_log_message(&message);
let now_local = Local::now();
let now_utc = Utc::now();
Self {
timestamp: now_local.format("%H:%M:%S").to_string(),
created_at: now_utc,
message,
color: Color::Yellow,
level: LogLevel::Warn,
change_id: None,
operation: None,
iteration: None,
workspace_path: None,
}
}
pub fn error(message: impl Into<String>) -> Self {
let message = message.into();
let message = sanitize_log_message(&message);
let now_local = Local::now();
let now_utc = Utc::now();
Self {
timestamp: now_local.format("%H:%M:%S").to_string(),
created_at: now_utc,
message,
color: Color::Red,
level: LogLevel::Error,
change_id: None,
operation: None,
iteration: None,
workspace_path: None,
}
}
#[allow(dead_code)]
pub fn with_change_id(mut self, change_id: impl Into<String>) -> Self {
self.change_id = Some(change_id.into());
self
}
pub fn with_operation(mut self, operation: impl Into<String>) -> Self {
self.operation = Some(operation.into());
self
}
pub fn with_iteration(mut self, iteration: u32) -> Self {
self.iteration = Some(iteration);
self
}
#[allow(dead_code)]
pub fn with_workspace_path(mut self, workspace_path: impl Into<String>) -> Self {
self.workspace_path = Some(workspace_path.into());
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RejectionOutcome {
Confirm,
Resume,
Block,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct StalledBlocker {
pub category: String,
pub phase: String,
pub gate: String,
pub error_summary: String,
pub next_action: String,
pub resumable: bool,
pub worktree_preserved: bool,
}
fn classify_stalled_blocker_category(error_summary: &str) -> &'static str {
let lower = error_summary.to_ascii_lowercase();
if lower.contains("credential")
|| lower.contains(" api key")
|| lower.contains("token")
|| lower.contains("auth")
{
"credential"
} else if lower.contains("still running")
|| lower.contains("pending")
|| lower.contains("agent-exec")
|| lower.contains("managed verification job")
{
"pending_verification"
} else if lower.contains("docker")
|| lower.contains("daemon")
|| lower.contains("dns")
|| lower.contains("network")
|| lower.contains("port conflict")
|| lower.contains("address already in use")
|| lower.contains("image")
|| lower.contains("timeout")
{
"infrastructure"
} else if lower.contains("external service")
|| lower.contains("rate limit")
|| lower.contains("429")
|| lower.contains("service outage")
{
"external_service"
} else {
"infrastructure"
}
}
impl StalledBlocker {
pub fn permission_denial(
phase: impl Into<String>,
denial: &crate::permission::PermissionDenial,
) -> Self {
Self {
category: format!("permission:{}", denial.category.as_str()),
phase: phase.into(),
gate: "permission_policy".to_string(),
error_summary: format!(
"repeated unresolved permission/tool policy denial for {}: {}; evidence: {}",
denial.category.as_str(),
denial.denied_target,
denial.evidence
),
next_action: denial.format_guidance(),
resumable: true,
worktree_preserved: true,
}
}
pub fn acceptance_infrastructure(error_summary: impl Into<String>) -> Self {
let error_summary = error_summary.into();
Self {
category: classify_stalled_blocker_category(&error_summary).to_string(),
phase: "acceptance".to_string(),
gate: "acceptance".to_string(),
error_summary,
next_action: "resolve the external verification blocker and rerun acceptance"
.to_string(),
resumable: true,
worktree_preserved: true,
}
}
pub fn summary(&self) -> String {
format!(
"category={}, phase={}, gate={}, resumable={}, next_action={}, error={}",
self.category,
self.phase,
self.gate,
self.resumable,
self.next_action,
self.error_summary
)
}
pub fn worktree_snapshot(&self) -> String {
if self.worktree_preserved {
"existing worktree and WIP context are preserved while stalled".to_string()
} else {
"worktree preservation unavailable for this stalled hold".to_string()
}
}
}
#[derive(Debug, Clone)]
pub enum ExecutionEvent {
ProcessingStarted(String),
ProcessingCompleted(String),
ProcessingError { id: String, error: String },
#[allow(dead_code)]
ApplyStarted { change_id: String, command: String },
ApplyCompleted {
change_id: String,
#[allow(dead_code)]
revision: String,
},
#[allow(dead_code)]
ApplyFailed { change_id: String, error: String },
#[allow(dead_code)]
ApplyOutput {
change_id: String,
output: String,
iteration: Option<u32>,
},
ArchiveStarted { change_id: String, command: String },
ArchiveResumed {
change_id: String,
reason: Option<String>,
summary: Option<String>,
},
ArchiveRetryScheduled {
change_id: String,
attempt: u32,
max_attempts: u32,
reason: Option<String>,
summary: Option<String>,
},
ChangeArchived(String),
#[allow(dead_code)]
ArchiveFailed {
change_id: String,
error: String,
reason: Option<String>,
summary: Option<String>,
},
#[allow(dead_code)]
ArchiveOutput {
change_id: String,
output: String,
iteration: u32,
},
AcceptanceStarted { change_id: String, command: String },
AcceptanceCompleted { change_id: String },
#[allow(dead_code)]
AcceptanceFailed { change_id: String, error: String },
ChangeRejected { change_id: String, reason: String },
RejectionReviewCompleted {
change_id: String,
outcome: RejectionOutcome,
},
RejectionReviewFailed { change_id: String, error: String },
#[allow(dead_code)]
AcceptanceOutput {
change_id: String,
output: String,
iteration: Option<u32>,
},
ProgressUpdated {
change_id: String,
completed: u32,
total: u32,
},
#[allow(dead_code)]
WorkspaceCreated {
change_id: String,
workspace: String,
},
WorkspaceStatusUpdated {
change_id: String,
#[allow(dead_code)]
workspace_name: String,
#[allow(dead_code)]
status: crate::vcs::WorkspaceStatus,
},
#[allow(dead_code)]
WorkspaceResumed {
change_id: String,
workspace: String,
},
#[allow(dead_code)]
WorkspacePreserved {
change_id: String,
workspace_name: String,
},
#[allow(dead_code)]
CleanupStarted { workspace: String },
CleanupCompleted {
#[allow(dead_code)]
workspace: String,
},
#[allow(dead_code)]
MergeStarted { revisions: Vec<String> },
MergeCompleted {
change_id: String,
#[allow(dead_code)]
revision: String,
},
#[allow(dead_code)]
MergeDeferred {
change_id: String,
reason: String,
auto_resumable: bool,
},
ResolveStarted { change_id: String, command: String },
ResolveCompleted {
change_id: String,
worktree_change_ids: Option<std::collections::HashSet<String>>,
},
ResolveFailed { change_id: String, error: String },
#[allow(dead_code)]
MergeConflict { files: Vec<String> },
ConflictResolutionStarted,
ConflictResolutionCompleted,
#[allow(dead_code)]
ConflictResolutionFailed { error: String },
#[allow(dead_code)]
ChangeSkipped { change_id: String, reason: String },
DependencyBlocked {
change_id: String,
#[allow(dead_code)]
dependency_ids: Vec<String>,
},
DependencyResolved { change_id: String },
AcceptanceGated {
change_id: String,
blocker: StalledBlocker,
},
ExecutionBlocked {
change_id: String,
blocker: StalledBlocker,
},
#[allow(dead_code)]
AnalysisStarted { remaining_changes: usize },
#[allow(dead_code)]
AnalysisOutput { output: String, iteration: u32 },
#[allow(dead_code)]
AnalysisCompleted { groups_found: usize },
#[allow(dead_code)]
ResolveOutput {
change_id: String,
output: String,
iteration: Option<u32>,
},
#[allow(dead_code)]
HookStarted {
change_id: String,
hook_type: String,
},
#[allow(dead_code)]
HookCompleted {
change_id: String,
hook_type: String,
},
#[allow(dead_code)]
HookFailed {
change_id: String,
hook_type: String,
error: String,
},
Warning { title: String, message: String },
ParallelStartRejected {
change_ids: Vec<String>,
reason: String,
},
Log(LogEntry),
Stopping,
Stopped,
AllCompleted,
Error { message: String },
ChangesRefreshed {
changes: Vec<crate::openspec::Change>,
committed_change_ids: std::collections::HashSet<String>,
uncommitted_file_change_ids: std::collections::HashSet<String>,
worktree_change_ids: std::collections::HashSet<String>,
worktree_paths: std::collections::HashMap<String, std::path::PathBuf>,
worktree_not_ahead_ids: std::collections::HashSet<String>,
merge_wait_ids: std::collections::HashSet<String>,
},
WorktreesRefreshed {
worktrees: Vec<crate::tui::types::WorktreeInfo>,
},
BranchMergeStarted { branch_name: String },
BranchMergeCompleted { branch_name: String },
BranchMergeFailed { branch_name: String, error: String },
ChangeDequeued { change_id: String },
#[allow(dead_code)]
ChangeStopped { change_id: String },
#[allow(dead_code)]
ChangeStopFailed { change_id: String, error: String },
RemoteChangeUpdate {
id: String,
completed_tasks: u32,
total_tasks: u32,
status: Option<String>,
iteration_number: Option<u32>,
},
}
#[async_trait]
pub trait EventSink: Send + Sync {
async fn on_event(&self, event: &ExecutionEvent);
async fn on_state_changed(&self, state: &OrchestratorState);
}
pub struct NoopEventSink;
#[async_trait]
impl EventSink for NoopEventSink {
async fn on_event(&self, _event: &ExecutionEvent) {}
async fn on_state_changed(&self, _state: &OrchestratorState) {}
}
pub async fn send_event(tx: &Option<mpsc::Sender<ExecutionEvent>>, event: ExecutionEvent) {
if let Some(ref tx) = tx {
if let Err(e) = tx.send(event).await {
debug!("Failed to send execution event: {}", e);
}
}
}
pub async fn dispatch_event(
state: &tokio::sync::RwLock<OrchestratorState>,
sinks: &[std::sync::Arc<dyn EventSink>],
event: ExecutionEvent,
) {
let state_snapshot = {
let mut guard = state.write().await;
guard.apply_execution_event(&event);
guard.clone()
};
for sink in sinks {
sink.on_event(&event).await;
}
for sink in sinks {
sink.on_state_changed(&state_snapshot).await;
}
}
pub fn cli_event_sinks() -> Vec<std::sync::Arc<dyn EventSink>> {
vec![std::sync::Arc::new(NoopEventSink)]
}
#[derive(Default)]
#[allow(dead_code)]
pub struct MockEventSink {
events: tokio::sync::Mutex<Vec<ExecutionEvent>>,
}
#[allow(dead_code)]
impl MockEventSink {
pub fn new() -> Self {
Self::default()
}
pub async fn events(&self) -> Vec<ExecutionEvent> {
self.events.lock().await.clone()
}
}
#[async_trait]
impl EventSink for MockEventSink {
async fn on_event(&self, event: &ExecutionEvent) {
self.events.lock().await.push(event.clone());
}
async fn on_state_changed(&self, _state: &OrchestratorState) {}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_execution_event_debug() {
let event = ExecutionEvent::WorkspaceCreated {
change_id: "test".to_string(),
workspace: "ws-test".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("WorkspaceCreated"));
}
#[tokio::test]
async fn test_dispatch_event_notifies_mock_sink() {
let state = tokio::sync::RwLock::new(crate::orchestration::state::OrchestratorState::new(
vec!["change-a".to_string()],
10,
));
let mock_sink = std::sync::Arc::new(MockEventSink::new());
let sinks: Vec<std::sync::Arc<dyn EventSink>> = vec![mock_sink.clone()];
dispatch_event(
&state,
&sinks,
ExecutionEvent::ProcessingStarted("change-a".to_string()),
)
.await;
let captured = mock_sink.events().await;
assert_eq!(captured.len(), 1);
assert!(matches!(
captured.first(),
Some(ExecutionEvent::ProcessingStarted(id)) if id == "change-a"
));
}
#[test]
fn test_log_entry_info() {
let entry = LogEntry::info("test message");
assert_eq!(entry.message, "test message");
assert!(matches!(entry.color, Color::White));
assert!(entry.change_id.is_none());
}
#[test]
fn test_log_entry_strips_ansi_sequences() {
let entry = LogEntry::info("\x1b[96mRead\x1b[0m");
assert_eq!(entry.message, "Read");
}
#[test]
fn test_log_entry_strips_sgr_fragments() {
let entry = LogEntry::info("[96m[1m| [0m[90m Read");
assert_eq!(entry.message, "| Read");
}
#[test]
fn test_log_entry_with_change_id() {
let entry = LogEntry::info("test").with_change_id("test-change");
assert_eq!(entry.change_id, Some("test-change".to_string()));
}
#[test]
fn test_hook_started_event() {
let event = ExecutionEvent::HookStarted {
change_id: "test-change".to_string(),
hook_type: "pre_apply".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("HookStarted"));
assert!(debug_str.contains("test-change"));
assert!(debug_str.contains("pre_apply"));
}
#[test]
fn test_hook_completed_event() {
let event = ExecutionEvent::HookCompleted {
change_id: "test-change".to_string(),
hook_type: "post_apply".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("HookCompleted"));
assert!(debug_str.contains("post_apply"));
}
#[test]
fn test_hook_failed_event() {
let event = ExecutionEvent::HookFailed {
change_id: "test-change".to_string(),
hook_type: "pre_archive".to_string(),
error: "Hook timed out".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("HookFailed"));
assert!(debug_str.contains("pre_archive"));
assert!(debug_str.contains("Hook timed out"));
}
#[test]
fn test_progress_updated_event() {
let event = ExecutionEvent::ProgressUpdated {
change_id: "test-change".to_string(),
completed: 5,
total: 10,
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("ProgressUpdated"));
assert!(debug_str.contains("test-change"));
}
#[test]
fn test_log_entry_with_operation() {
let entry = LogEntry::info("test").with_operation("apply");
assert_eq!(entry.operation, Some("apply".to_string()));
}
#[test]
fn test_log_entry_with_iteration() {
let entry = LogEntry::info("test").with_iteration(2);
assert_eq!(entry.iteration, Some(2));
}
#[test]
fn test_log_entry_with_operation_and_iteration() {
let entry = LogEntry::info("test")
.with_change_id("test-change")
.with_operation("apply")
.with_iteration(3);
assert_eq!(entry.change_id, Some("test-change".to_string()));
assert_eq!(entry.operation, Some("apply".to_string()));
assert_eq!(entry.iteration, Some(3));
}
#[test]
fn test_log_entry_info_level() {
let entry = LogEntry::info("test");
assert_eq!(entry.level, LogLevel::Info);
assert!(matches!(entry.color, Color::White));
}
#[test]
fn test_log_entry_success_level() {
let entry = LogEntry::success("test");
assert_eq!(entry.level, LogLevel::Success);
assert!(matches!(entry.color, Color::Green));
}
#[test]
fn test_log_entry_warn_level() {
let entry = LogEntry::warn("test");
assert_eq!(entry.level, LogLevel::Warn);
assert!(matches!(entry.color, Color::Yellow));
}
#[test]
fn test_log_entry_error_level() {
let entry = LogEntry::error("test");
assert_eq!(entry.level, LogLevel::Error);
assert!(matches!(entry.color, Color::Red));
}
#[test]
fn test_log_level_equality() {
assert_eq!(LogLevel::Info, LogLevel::Info);
assert_ne!(LogLevel::Info, LogLevel::Error);
}
#[test]
fn test_acceptance_started_event_with_command() {
let event = ExecutionEvent::AcceptanceStarted {
change_id: "test-change".to_string(),
command: "claude --dangerously-skip-permissions acceptance test-change".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("AcceptanceStarted"));
assert!(debug_str.contains("test-change"));
assert!(debug_str.contains("acceptance"));
}
#[test]
fn test_archive_started_event_with_command() {
let event = ExecutionEvent::ArchiveStarted {
change_id: "test-change".to_string(),
command: "claude --dangerously-skip-permissions archive test-change".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("ArchiveStarted"));
assert!(debug_str.contains("test-change"));
assert!(debug_str.contains("archive"));
}
#[test]
fn test_resolve_started_event_with_command() {
let event = ExecutionEvent::ResolveStarted {
change_id: "test-change".to_string(),
command: "claude --dangerously-skip-permissions resolve test-change".to_string(),
};
let debug_str = format!("{:?}", event);
assert!(debug_str.contains("ResolveStarted"));
assert!(debug_str.contains("test-change"));
assert!(debug_str.contains("resolve"));
}
}