use std::collections::HashMap;
use std::time::{Duration, Instant};
use terraphim_spawner::health::{CircuitBreaker, CircuitBreakerConfig, CircuitState, HealthStatus};
use tracing::{debug, info, warn};
use crate::kg_router::KgRouter;
#[derive(Debug, Clone, serde::Serialize)]
pub struct ProbeResult {
pub provider: String,
pub model: String,
pub cli_tool: String,
pub status: ProbeStatus,
pub latency_ms: Option<u64>,
pub error: Option<String>,
pub timestamp: String,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, serde::Serialize)]
#[serde(rename_all = "snake_case")]
pub enum ProbeStatus {
Success,
Error,
Timeout,
}
pub struct ProviderHealthMap {
breakers: HashMap<String, CircuitBreaker>,
results: Vec<ProbeResult>,
probed_at: Option<Instant>,
ttl: Duration,
cb_config: CircuitBreakerConfig,
}
impl std::fmt::Debug for ProviderHealthMap {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ProviderHealthMap")
.field("providers", &self.breakers.len())
.field("results", &self.results.len())
.field("stale", &self.is_stale())
.finish()
}
}
impl ProviderHealthMap {
pub fn new(ttl: Duration) -> Self {
Self {
breakers: HashMap::new(),
results: Vec::new(),
probed_at: None,
ttl,
cb_config: CircuitBreakerConfig {
failure_threshold: 5,
cooldown: Duration::from_secs(60),
success_threshold: 1,
},
}
}
pub fn is_stale(&self) -> bool {
self.probed_at
.map(|t| t.elapsed() >= self.ttl)
.unwrap_or(true)
}
pub async fn probe_all(&mut self, kg_router: &KgRouter) {
let mut seen = HashMap::new();
let mut tasks = Vec::new();
for rule in kg_router.all_routes() {
let key = format!("{}:{}", rule.provider, rule.model);
if seen.contains_key(&key) {
continue;
}
seen.insert(key, true);
let provider = rule.provider.clone();
let model = rule.model.clone();
let action = rule.action.clone();
tasks.push(tokio::spawn(async move {
probe_single(&provider, &model, action.as_deref()).await
}));
}
let mut results = Vec::new();
for task in tasks {
match task.await {
Ok(result) => results.push(result),
Err(e) => warn!(error = %e, "probe task panicked"),
}
}
for result in &results {
let key = format!("{}:{}", result.provider, result.model);
let breaker = self
.breakers
.entry(key)
.or_insert_with(|| CircuitBreaker::new(self.cb_config.clone()));
match result.status {
ProbeStatus::Success => breaker.record_success(),
ProbeStatus::Error | ProbeStatus::Timeout => breaker.record_failure(),
}
}
info!(
providers_probed = results.len(),
healthy = results
.iter()
.filter(|r| r.status == ProbeStatus::Success)
.count(),
"provider probe complete"
);
self.results = results;
self.probed_at = Some(Instant::now());
}
pub fn model_health(&self, provider: &str, model: &str) -> HealthStatus {
let key = format!("{provider}:{model}");
if let Some(result) = self
.results
.iter()
.find(|r| r.provider == provider && r.model == model)
{
return match result.status {
ProbeStatus::Success => HealthStatus::Healthy,
ProbeStatus::Error => HealthStatus::Unhealthy,
ProbeStatus::Timeout => HealthStatus::Unhealthy,
};
}
match self.breakers.get(&key) {
Some(breaker) => match breaker.state() {
CircuitState::Closed => HealthStatus::Healthy,
CircuitState::HalfOpen => HealthStatus::Degraded,
CircuitState::Open => HealthStatus::Unhealthy,
},
None => HealthStatus::Healthy,
}
}
pub fn provider_health(&self, provider: &str) -> HealthStatus {
let provider_results: Vec<_> = self
.results
.iter()
.filter(|r| r.provider == provider)
.collect();
if !provider_results.is_empty() {
if provider_results
.iter()
.any(|r| r.status == ProbeStatus::Success)
{
return HealthStatus::Healthy;
}
return HealthStatus::Unhealthy;
}
let provider_breakers: Vec<_> = self
.breakers
.iter()
.filter(|(k, _)| k.starts_with(&format!("{provider}:")))
.collect();
if provider_breakers.is_empty() {
return HealthStatus::Healthy; }
if provider_breakers.iter().any(|(_, b)| b.should_allow()) {
HealthStatus::Healthy
} else {
HealthStatus::Unhealthy
}
}
pub fn is_healthy(&self, provider: &str) -> bool {
matches!(
self.provider_health(provider),
HealthStatus::Healthy | HealthStatus::Degraded
)
}
pub fn is_model_healthy(&self, provider: &str, model: &str) -> bool {
matches!(
self.model_health(provider, model),
HealthStatus::Healthy | HealthStatus::Degraded
)
}
pub fn unhealthy_providers(&self) -> Vec<String> {
let mut providers: HashMap<String, (usize, usize)> = HashMap::new();
for result in &self.results {
let entry = providers.entry(result.provider.clone()).or_insert((0, 0));
entry.0 += 1;
if result.status == ProbeStatus::Success {
entry.1 += 1;
}
}
let mut unhealthy: Vec<String> = providers
.into_iter()
.filter(|(_, (total, healthy))| *total > 0 && *healthy == 0)
.map(|(name, _)| name)
.collect();
let mut cb_providers: HashMap<String, (usize, usize)> = HashMap::new();
for (key, breaker) in &self.breakers {
if let Some(provider) = key.split(':').next() {
if !unhealthy.contains(&provider.to_string()) {
let entry = cb_providers.entry(provider.to_string()).or_insert((0, 0));
entry.0 += 1;
if breaker.should_allow() {
entry.1 += 1;
}
}
}
}
for (name, (total, healthy)) in cb_providers {
if total > 0 && healthy == 0 && !unhealthy.contains(&name) {
unhealthy.push(name);
}
}
unhealthy
}
pub fn record_success(&mut self, provider: &str) {
let prefix = format!("{provider}:");
for (key, breaker) in &mut self.breakers {
if key.starts_with(&prefix) {
breaker.record_success();
}
}
}
pub fn record_model_success(&mut self, provider: &str, model: &str) {
let key = format!("{provider}:{model}");
if let Some(breaker) = self.breakers.get_mut(&key) {
breaker.record_success();
}
}
pub fn record_failure(&mut self, provider: &str) {
let prefix = format!("{provider}:");
let keys: Vec<String> = self
.breakers
.keys()
.filter(|k| k.starts_with(&prefix))
.cloned()
.collect();
for key in keys {
let breaker = self.breakers.get_mut(&key).unwrap();
breaker.record_failure();
}
if !self.breakers.keys().any(|k| k.starts_with(&prefix)) {
let key = format!("{provider}:*");
let breaker = self
.breakers
.entry(key)
.or_insert_with(|| CircuitBreaker::new(self.cb_config.clone()));
breaker.record_failure();
}
warn!(
provider = provider,
"provider failure recorded (all models)"
);
}
pub fn record_model_failure(&mut self, provider: &str, model: &str) {
let key = format!("{provider}:{model}");
let breaker = self
.breakers
.entry(key)
.or_insert_with(|| CircuitBreaker::new(self.cb_config.clone()));
breaker.record_failure();
warn!(
provider = provider,
model = model,
state = %breaker.state(),
"model failure recorded"
);
}
pub fn results(&self) -> &[ProbeResult] {
&self.results
}
pub async fn save_results(&self, dir: &std::path::Path) -> std::io::Result<()> {
tokio::fs::create_dir_all(dir).await?;
let json = serde_json::to_string_pretty(&self.results).map_err(std::io::Error::other)?;
let timestamp = chrono::Utc::now().format("%Y-%m-%d-%H%M%S");
let timestamped = dir.join(format!("{timestamp}.json"));
let latest = dir.join("latest.json");
tokio::fs::write(×tamped, &json).await?;
tokio::fs::write(&latest, &json).await?;
info!(
path = %timestamped.display(),
results = self.results.len(),
"probe results saved"
);
Ok(())
}
}
async fn probe_single(provider: &str, model: &str, action_template: Option<&str>) -> ProbeResult {
let timestamp = chrono::Utc::now().to_rfc3339();
let test_prompt = "echo hello";
let action = match action_template {
Some(tmpl) => tmpl
.replace("{{ model }}", model)
.replace("{{model}}", model)
.replace("{{ prompt }}", test_prompt)
.replace("{{prompt}}", test_prompt),
None => {
return ProbeResult {
provider: provider.to_string(),
model: model.to_string(),
cli_tool: String::new(),
status: ProbeStatus::Error,
latency_ms: None,
error: Some("no action:: template defined".to_string()),
timestamp,
};
}
};
let cli_tool = action
.split_whitespace()
.next()
.unwrap_or("")
.rsplit('/')
.next()
.unwrap_or("")
.to_string();
let start = Instant::now();
let timeout = Duration::from_secs(60);
debug!(provider, model, action = %action, "running probe command");
let home = std::env::var("HOME").unwrap_or_else(|_| "/home/alex".to_string());
let path_prefix =
format!("{home}/.local/bin:{home}/.bun/bin:{home}/bin:{home}/.cargo/bin:{home}/go/bin",);
let result = tokio::time::timeout(timeout, async {
let output = tokio::process::Command::new("bash")
.arg("-c")
.arg(&action)
.env(
"PATH",
format!(
"{path_prefix}:{}",
std::env::var("PATH").unwrap_or_default()
),
)
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| format!("spawn failed: {e}"))?
.wait_with_output()
.await
.map_err(|e| format!("wait failed: {e}"))?;
if output.status.success() {
Ok(())
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
Err(format!(
"exit {}: {}",
output.status,
stderr.chars().take(200).collect::<String>()
))
}
})
.await;
let latency_ms = start.elapsed().as_millis() as u64;
match result {
Ok(Ok(())) => {
info!(provider, model, latency_ms, "probe success");
ProbeResult {
provider: provider.to_string(),
model: model.to_string(),
cli_tool,
status: ProbeStatus::Success,
latency_ms: Some(latency_ms),
error: None,
timestamp,
}
}
Ok(Err(e)) => {
warn!(provider, model, error = %e, "probe failed");
ProbeResult {
provider: provider.to_string(),
model: model.to_string(),
cli_tool,
status: ProbeStatus::Error,
latency_ms: Some(latency_ms),
error: Some(e),
timestamp,
}
}
Err(_) => {
warn!(provider, model, "probe timed out after 30s");
ProbeResult {
provider: provider.to_string(),
model: model.to_string(),
cli_tool,
status: ProbeStatus::Timeout,
latency_ms: Some(latency_ms),
error: Some("timeout after 60s".to_string()),
timestamp,
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn new_health_map_is_stale() {
let map = ProviderHealthMap::new(Duration::from_secs(300));
assert!(map.is_stale());
}
#[test]
fn unknown_provider_is_healthy() {
let map = ProviderHealthMap::new(Duration::from_secs(300));
assert!(map.is_healthy("nonexistent"));
assert_eq!(map.provider_health("nonexistent"), HealthStatus::Healthy);
}
#[test]
fn record_failures_opens_circuit() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
for _ in 0..5 {
map.record_failure("kimi");
}
assert!(!map.is_healthy("kimi"));
assert_eq!(map.provider_health("kimi"), HealthStatus::Unhealthy);
assert_eq!(map.unhealthy_providers(), vec!["kimi".to_string()]);
}
#[test]
fn record_success_keeps_healthy() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
map.record_failure("kimi");
map.record_success("kimi");
assert!(map.is_healthy("kimi"));
}
#[test]
fn probe_timeout_marks_unhealthy_immediately() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
map.results = vec![ProbeResult {
provider: "kimi".to_string(),
model: "kimi-for-coding/k2p5".to_string(),
cli_tool: "opencode".to_string(),
status: ProbeStatus::Timeout,
latency_ms: Some(30000),
error: Some("timeout".to_string()),
timestamp: String::new(),
}];
assert!(!map.is_healthy("kimi"));
assert_eq!(map.provider_health("kimi"), HealthStatus::Unhealthy);
assert!(map.unhealthy_providers().contains(&"kimi".to_string()));
}
#[test]
fn probe_success_overrides_circuit_breaker_failures() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
for _ in 0..3 {
map.record_failure("kimi");
}
map.results = vec![ProbeResult {
provider: "kimi".to_string(),
model: "kimi-for-coding/k2p5".to_string(),
cli_tool: "opencode".to_string(),
status: ProbeStatus::Success,
latency_ms: Some(5000),
error: None,
timestamp: String::new(),
}];
assert!(map.is_healthy("kimi"));
}
#[test]
fn mixed_model_results_any_success_means_healthy() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
map.results = vec![
ProbeResult {
provider: "minimax".to_string(),
model: "opencode-go/minimax-m2.5".to_string(),
cli_tool: "opencode".to_string(),
status: ProbeStatus::Timeout,
latency_ms: Some(30000),
error: Some("timeout".to_string()),
timestamp: String::new(),
},
ProbeResult {
provider: "minimax".to_string(),
model: "minimax-coding-plan/MiniMax-M2.5".to_string(),
cli_tool: "opencode".to_string(),
status: ProbeStatus::Success,
latency_ms: Some(10000),
error: None,
timestamp: String::new(),
},
];
assert!(map.is_healthy("minimax"));
assert!(!map.is_model_healthy("minimax", "opencode-go/minimax-m2.5"));
assert!(map.is_model_healthy("minimax", "minimax-coding-plan/MiniMax-M2.5"));
}
#[test]
fn per_model_failure_does_not_affect_other_models() {
let mut map = ProviderHealthMap::new(Duration::from_secs(300));
map.results = vec![
ProbeResult {
provider: "anthropic".to_string(),
model: "claude-opus-4-6".to_string(),
cli_tool: "claude".to_string(),
status: ProbeStatus::Error,
latency_ms: Some(5000),
error: Some("rate limited".to_string()),
timestamp: String::new(),
},
ProbeResult {
provider: "anthropic".to_string(),
model: "claude-sonnet-4-6".to_string(),
cli_tool: "claude".to_string(),
status: ProbeStatus::Success,
latency_ms: Some(8000),
error: None,
timestamp: String::new(),
},
];
assert!(map.is_healthy("anthropic"));
assert!(!map.is_model_healthy("anthropic", "claude-opus-4-6"));
assert!(map.is_model_healthy("anthropic", "claude-sonnet-4-6"));
assert!(!map.unhealthy_providers().contains(&"anthropic".to_string()));
}
}