1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
//! Background health monitor for coding agents.
//!
//! Periodically probes registered agents' ACP endpoints and updates their
//! connection status. Handles auto-connect on registration and reconnection
//! after transient failures.
use std::sync::Arc;
use std::time::Duration;
use chrono::Utc;
use tokio::time::interval;
use tracing::{debug, info, warn};
use super::backend::ConfigDrivenBackend;
use super::backend::CodingAgentBackend;
use super::registry::CodingAgentRegistry;
use super::status::AgentConnectionStatus;
/// Configuration for the health monitor.
#[derive(Debug, Clone)]
pub struct HealthMonitorConfig {
/// How often to probe agents (default: 30 seconds).
pub check_interval: Duration,
/// Consecutive failures before marking as Error (default: 3).
pub failure_threshold: u32,
}
impl Default for HealthMonitorConfig {
fn default() -> Self {
Self {
check_interval: Duration::from_secs(30),
failure_threshold: 3,
}
}
}
/// Tracks consecutive failure counts per agent.
struct FailureTracker {
counts: dashmap::DashMap<String, u32>,
}
impl FailureTracker {
fn new() -> Self {
Self {
counts: dashmap::DashMap::new(),
}
}
fn record_success(&self, agent_id: &str) {
self.counts.insert(agent_id.to_string(), 0);
}
fn record_failure(&self, agent_id: &str) -> u32 {
let mut entry = self.counts.entry(agent_id.to_string()).or_insert(0);
*entry += 1;
*entry
}
}
/// Spawns the background health monitor loop.
///
/// Returns a `JoinHandle` that can be used to abort the monitor on shutdown.
pub fn spawn_health_monitor(
registry: Arc<CodingAgentRegistry>,
config: HealthMonitorConfig,
) -> tokio::task::JoinHandle<()> {
info!(
interval_secs = config.check_interval.as_secs(),
failure_threshold = config.failure_threshold,
"coding agent health monitor started"
);
tokio::spawn(async move {
let mut ticker = interval(config.check_interval);
let tracker = FailureTracker::new();
// Initial probe on startup (after a short delay to let agents register)
tokio::time::sleep(Duration::from_secs(5)).await;
loop {
ticker.tick().await;
let agents = registry.list_agents();
if agents.is_empty() {
debug!("health monitor: no agents registered, skipping");
continue;
}
// Clean up failure tracking for agents that were unregistered
let agent_ids: std::collections::HashSet<&str> =
agents.iter().map(|a| a.id.as_str()).collect();
tracker.counts.retain(|id, _| agent_ids.contains(id.as_str()));
for agent in &agents {
// Skip agents with stdio transport — they're managed by the process manager
if agent.config.transport.as_ref().is_some_and(|t| matches!(t, super::config::AgentTransport::Stdio { .. })) {
continue;
}
// Skip agents with placeholder endpoints (acp:// scheme)
if agent.endpoint.starts_with("acp://") || agent.endpoint.is_empty() {
continue;
}
let backend = ConfigDrivenBackend::new(
// We need the backend definition for the health check,
// but we can probe the endpoint directly
super::config::BackendDefinition {
agent_type: agent.backend_type.clone(),
display_name: String::new(),
cli_command: String::new(),
install_check_command: String::new(),
auth_method: super::config::AuthMethod::None,
capabilities: super::config::AgentCapabilities::default(),
install_instructions: String::new(),
install_instructions_windows: None,
install_instructions_linux: None,
},
Some(agent.endpoint.clone()),
);
let health = backend.health_check().await;
registry.record_health_check(&agent.id);
match health {
Ok(status) if status.reachable => {
tracker.record_success(&agent.id);
let current = &agent.status;
if !matches!(current, AgentConnectionStatus::Connected) {
info!(agent_id = %agent.id, "agent connected");
let _ = registry.update_status(
&agent.id,
AgentConnectionStatus::Connected,
);
}
}
Ok(_) | Err(_) => {
let failures = tracker.record_failure(&agent.id);
let current = &agent.status;
if failures >= config.failure_threshold {
if !matches!(current, AgentConnectionStatus::Error { .. }) {
warn!(
agent_id = %agent.id,
failures = failures,
"agent unreachable, marking as error"
);
let _ = registry.update_status(
&agent.id,
AgentConnectionStatus::Error {
message: format!(
"Unreachable after {} consecutive health checks",
failures
),
since: Utc::now(),
},
);
}
} else if !matches!(current, AgentConnectionStatus::Disconnected { .. }) {
debug!(
agent_id = %agent.id,
failures = failures,
threshold = config.failure_threshold,
"agent health check failed"
);
let _ = registry.update_status(
&agent.id,
AgentConnectionStatus::Disconnected { since: Utc::now() },
);
}
}
}
}
}
})
}