use std::{
sync::{Arc, RwLock},
time::{Duration, Instant},
};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ErrorKind {
RateLimit,
Transient,
Auth,
Balance,
ModelMissing,
BadRequest,
ContextExceeded,
Unknown,
}
impl ErrorKind {
pub fn is_disabling(&self) -> bool {
matches!(self, Self::Auth | Self::Balance | Self::ModelMissing)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ModelStatus {
Healthy,
Cooling { until: Instant },
Disabled { reason: String },
}
#[derive(Debug, Clone)]
pub struct ModelHealth {
pub model: String,
pub status: ModelStatus,
pub last_error: Option<String>,
pub consecutive_failures: u32,
}
impl ModelHealth {
pub fn new(model: impl Into<String>) -> Self {
Self {
model: model.into(),
status: ModelStatus::Healthy,
last_error: None,
consecutive_failures: 0,
}
}
pub fn is_callable(&self) -> bool {
match &self.status {
ModelStatus::Healthy => true,
ModelStatus::Cooling { until } => Instant::now() >= *until,
ModelStatus::Disabled { .. } => false,
}
}
pub fn record_success(&mut self) {
self.status = ModelStatus::Healthy;
self.last_error = None;
self.consecutive_failures = 0;
}
pub fn record_failure(&mut self, kind: ErrorKind, body_snippet: String, now: Instant) {
self.last_error = Some(body_snippet);
self.consecutive_failures = self.consecutive_failures.saturating_add(1);
let backoff = if kind.is_disabling() && self.consecutive_failures >= AUTH_DISABLE_AFTER {
MAX_COOLDOWN
} else {
cooling_backoff(self.consecutive_failures, kind)
};
self.status = ModelStatus::Cooling {
until: now + backoff,
};
}
pub fn reset(&mut self) {
self.status = ModelStatus::Healthy;
self.consecutive_failures = 0;
self.last_error = None;
}
}
#[derive(Debug, Clone, Default)]
pub struct ChainHealth {
pub entries: Vec<ModelHealth>,
}
impl ChainHealth {
pub fn from_chain<I, S>(models: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
Self {
entries: models.into_iter().map(ModelHealth::new).collect(),
}
}
pub fn next_callable(&self) -> Option<&ModelHealth> {
self.entries.iter().find(|e| e.is_callable())
}
pub fn get_mut(&mut self, model: &str) -> Option<&mut ModelHealth> {
self.entries.iter_mut().find(|e| e.model == model)
}
pub fn snapshot(&self) -> Vec<(String, ModelStatus, Option<String>, u32)> {
self.entries
.iter()
.map(|e| {
(
e.model.clone(),
e.status.clone(),
e.last_error.clone(),
e.consecutive_failures,
)
})
.collect()
}
pub fn all_unavailable(&self) -> bool {
!self.entries.is_empty() && self.entries.iter().all(|e| !e.is_callable())
}
}
#[derive(Clone, Default)]
pub struct ProviderHealthRegistry {
inner: Arc<RwLock<ChainHealth>>,
}
impl ProviderHealthRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn ensure(&self, models: &[String]) {
let mut g = match self.inner.write() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
for m in models {
if g.get_mut(m).is_none() {
g.entries.push(ModelHealth::new(m.clone()));
}
}
}
pub fn is_callable(&self, model: &str) -> bool {
let g = match self.inner.read() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
g.entries
.iter()
.find(|e| e.model == model)
.map(|e| e.is_callable())
.unwrap_or(true)
}
pub fn record_success(&self, model: &str) {
let mut g = match self.inner.write() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if let Some(h) = g.get_mut(model) {
h.record_success();
}
}
pub fn record_failure(&self, model: &str, kind: ErrorKind, body: String) {
let mut g = match self.inner.write() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if let Some(h) = g.get_mut(model) {
h.record_failure(kind, body, Instant::now());
}
}
pub fn reset(&self, model: &str) -> bool {
let mut g = match self.inner.write() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
if let Some(h) = g.get_mut(model) {
h.reset();
true
} else {
false
}
}
pub fn snapshot(&self) -> Vec<HealthEntrySnapshot> {
let g = match self.inner.read() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
let now = Instant::now();
g.entries
.iter()
.map(|e| HealthEntrySnapshot {
model: e.model.clone(),
status: status_label(&e.status),
reason: reason_for(&e.status),
cooldown_seconds: cooldown_seconds(&e.status, now),
last_error: e.last_error.clone(),
consecutive_failures: e.consecutive_failures,
})
.collect()
}
pub fn model_ids(&self) -> Vec<String> {
let g = match self.inner.read() {
Ok(g) => g,
Err(p) => p.into_inner(),
};
g.entries.iter().map(|e| e.model.clone()).collect()
}
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct HealthEntrySnapshot {
pub model: String,
pub status: &'static str,
pub reason: Option<String>,
pub cooldown_seconds: Option<u64>,
pub last_error: Option<String>,
pub consecutive_failures: u32,
}
fn status_label(s: &ModelStatus) -> &'static str {
match s {
ModelStatus::Healthy => "Healthy",
ModelStatus::Cooling { .. } => "Cooling",
ModelStatus::Disabled { .. } => "Disabled",
}
}
fn reason_for(s: &ModelStatus) -> Option<String> {
match s {
ModelStatus::Disabled { reason } => Some(reason.clone()),
_ => None,
}
}
fn cooldown_seconds(s: &ModelStatus, now: Instant) -> Option<u64> {
match s {
ModelStatus::Cooling { until } => {
if *until > now {
Some((*until - now).as_secs())
} else {
Some(0)
}
}
_ => None,
}
}
pub const AUTH_DISABLE_AFTER: u32 = 3;
pub const MAX_COOLDOWN: Duration = Duration::from_secs(3600);
pub fn cooling_backoff(consecutive: u32, kind: ErrorKind) -> Duration {
let base = match kind {
ErrorKind::RateLimit => 30u64,
ErrorKind::Transient | ErrorKind::Unknown => 10u64,
ErrorKind::BadRequest | ErrorKind::ContextExceeded => 5u64,
ErrorKind::Auth | ErrorKind::Balance | ErrorKind::ModelMissing => 60u64,
};
let exponent = consecutive.saturating_sub(1).min(16);
let secs = base.saturating_mul(1u64 << exponent);
Duration::from_secs(secs).min(MAX_COOLDOWN)
}
pub fn classify_error(err: &anyhow::Error) -> ErrorKind {
let s = format!("{err:#}");
classify_str(&s)
}
pub fn classify_str(s: &str) -> ErrorKind {
let lower = s.to_lowercase();
if lower.contains("session_ctx_exceeded")
|| lower.contains("exceed_context_size_error")
|| lower.contains("max-session-ctx")
{
return ErrorKind::ContextExceeded;
}
if lower.contains("insufficient_quota")
|| lower.contains("insufficient quota")
|| lower.contains("credit_balance_too_low")
|| (lower.contains("credit balance") && lower.contains("too low"))
|| lower.contains("accountoverdue")
|| lower.contains("balance_not_enough")
|| lower.contains("balance not enough")
|| s.contains("余额不足")
|| s.contains("额度不足")
|| s.contains("欠费")
|| (lower.contains("402") && (lower.contains("payment") || lower.contains("balance")))
{
return ErrorKind::Balance;
}
if lower.contains("model_not_found")
|| lower.contains("model not found")
|| lower.contains("does not exist or you do not have access")
|| lower.contains("modelnotopen")
|| lower.contains("endpointisnotenabled")
|| lower.contains("invalid model")
|| (lower.contains("not_found_error") && lower.contains("model"))
{
return ErrorKind::ModelMissing;
}
if lower.contains("401")
|| lower.contains("403")
|| lower.contains("invalid_api_key")
|| lower.contains("invalid api key")
|| lower.contains("authentication_error")
|| lower.contains("unauthorized")
|| lower.contains("permission_denied")
|| lower.contains("authentication fails")
{
return ErrorKind::Auth;
}
if lower.contains("429")
|| lower.contains("rate_limit")
|| lower.contains("rate limit")
|| lower.contains("too many requests")
|| lower.contains("ratelimit")
{
return ErrorKind::RateLimit;
}
if lower.contains("max_tokens") && (lower.contains("400") || lower.contains("exceed")) {
return ErrorKind::BadRequest;
}
if lower.contains("500")
|| lower.contains("502")
|| lower.contains("503")
|| lower.contains("504")
|| lower.contains("overloaded")
|| lower.contains("server_error")
|| lower.contains("internal server error")
|| lower.contains("gateway timeout")
|| lower.contains("connection failed")
|| lower.contains("connect error")
|| lower.contains("timeout")
|| lower.contains("timed out")
{
return ErrorKind::Transient;
}
ErrorKind::Unknown
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn classify_balance_doubao() {
let body = r#"OpenAI API error 402 Payment Required: {"error":{"code":"AccountOverdueError","message":"账户欠费,请充值后重试"}}"#;
assert_eq!(classify_str(body), ErrorKind::Balance);
}
#[test]
fn classify_balance_openai() {
let body = r#"{"error":{"message":"You exceeded your current quota","type":"insufficient_quota"}}"#;
assert_eq!(classify_str(body), ErrorKind::Balance);
}
#[test]
fn classify_balance_anthropic() {
let body = r#"{"type":"error","error":{"type":"invalid_request_error","message":"Your credit balance is too low to access the Claude API"}}"#;
assert_eq!(classify_str(body), ErrorKind::Balance);
}
#[test]
fn classify_balance_zh() {
let body = r#"call failed: 余额不足,请充值"#;
assert_eq!(classify_str(body), ErrorKind::Balance);
}
#[test]
fn classify_auth_401() {
let body = r#"OpenAI API error 401 Unauthorized: {"error":{"message":"Incorrect API key"}}"#;
assert_eq!(classify_str(body), ErrorKind::Auth);
}
#[test]
fn classify_auth_invalid_key() {
let body = r#"{"error":{"type":"invalid_api_key","message":"..."}}"#;
assert_eq!(classify_str(body), ErrorKind::Auth);
}
#[test]
fn classify_rate_limit_429() {
let body = r#"OpenAI API error 429 Too Many Requests"#;
assert_eq!(classify_str(body), ErrorKind::RateLimit);
}
#[test]
fn classify_model_missing() {
let body = r#"{"error":{"code":"model_not_found","message":"The model gpt-5 does not exist"}}"#;
assert_eq!(classify_str(body), ErrorKind::ModelMissing);
}
#[test]
fn classify_model_missing_volcengine() {
let body = r#"{"error":{"code":"EndpointIsNotEnabled","message":"endpoint is not enabled"}}"#;
assert_eq!(classify_str(body), ErrorKind::ModelMissing);
}
#[test]
fn classify_transient_503() {
let body = r#"upstream 503 Service Unavailable: overloaded"#;
assert_eq!(classify_str(body), ErrorKind::Transient);
}
#[test]
fn classify_transient_timeout() {
assert_eq!(classify_str("connection failed: timed out"), ErrorKind::Transient);
}
#[test]
fn classify_bad_request_max_tokens() {
let body = r#"400 Bad Request: max_tokens exceeds model ceiling"#;
assert_eq!(classify_str(body), ErrorKind::BadRequest);
}
#[test]
fn classify_session_ctx_exceeded() {
let body = r#"413 Payload Too Large: {"error":{"code":"session_ctx_exceeded","detail":"session has grown past the worker's max-session-ctx budget; call /compact to summarize history or recreate the session"}}"#;
assert_eq!(classify_str(body), ErrorKind::ContextExceeded);
}
#[test]
fn classify_session_ctx_exceeded_worker_envelope() {
let body = r#"{"error":{"code":400,"message":"session_ctx_exceeded: replay session would start with 35518 tokens, reaching --rsclaw-max-session-ctx=32768; compact history before replaying this session","type":"exceed_context_size_error","n_prompt_tokens":35518,"n_ctx":32768}}"#;
assert_eq!(classify_str(body), ErrorKind::ContextExceeded);
}
#[test]
fn classify_session_ctx_exceeded_not_confused_with_plain_max_tokens() {
let body = r#"400 Bad Request: max_tokens exceeds model ceiling"#;
assert_eq!(classify_str(body), ErrorKind::BadRequest);
}
#[test]
fn classify_unknown_falls_through() {
let body = r#"unrecognised gibberish from upstream"#;
assert_eq!(classify_str(body), ErrorKind::Unknown);
}
#[test]
fn health_transitions_healthy_to_cooling() {
let mut h = ModelHealth::new("doubao/x");
assert!(h.is_callable());
h.record_failure(
ErrorKind::Transient,
"503".into(),
Instant::now(),
);
assert!(matches!(h.status, ModelStatus::Cooling { .. }));
assert!(!h.is_callable());
assert_eq!(h.consecutive_failures, 1);
}
#[test]
fn health_balance_cools_bounded_not_disabled() {
let mut h = ModelHealth::new("doubao/x");
h.record_failure(ErrorKind::Balance, "402".into(), Instant::now());
assert!(matches!(h.status, ModelStatus::Cooling { .. }));
assert!(!h.is_callable());
}
#[test]
fn health_auth_escalates_to_bounded_cooldown_not_disabled() {
let now = Instant::now();
let mut h = ModelHealth::new("doubao/x");
for _ in 0..AUTH_DISABLE_AFTER - 1 {
h.record_failure(ErrorKind::Auth, "401".into(), now);
}
assert!(matches!(h.status, ModelStatus::Cooling { .. }));
h.record_failure(ErrorKind::Auth, "401".into(), now);
match h.status {
ModelStatus::Cooling { until } => assert_eq!(until, now + MAX_COOLDOWN),
other => panic!("expected bounded Cooling, got {other:?}"),
}
}
#[test]
fn health_success_resets() {
let mut h = ModelHealth::new("doubao/x");
h.record_failure(ErrorKind::Transient, "503".into(), Instant::now());
h.record_success();
assert!(matches!(h.status, ModelStatus::Healthy));
assert_eq!(h.consecutive_failures, 0);
}
#[test]
fn health_reset_clears_cooldown() {
let mut h = ModelHealth::new("doubao/x");
h.record_failure(ErrorKind::Balance, "402".into(), Instant::now());
assert!(matches!(h.status, ModelStatus::Cooling { .. }));
h.reset();
assert!(matches!(h.status, ModelStatus::Healthy));
}
#[test]
fn chain_next_callable_skips_disabled() {
let mut chain = ChainHealth::from_chain(["a", "b", "c"]);
chain
.get_mut("a")
.unwrap()
.record_failure(ErrorKind::Balance, "".into(), Instant::now());
assert_eq!(chain.next_callable().unwrap().model, "b");
}
#[test]
fn chain_all_unavailable_when_drained() {
let mut chain = ChainHealth::from_chain(["a", "b"]);
chain
.get_mut("a")
.unwrap()
.record_failure(ErrorKind::Balance, "".into(), Instant::now());
chain
.get_mut("b")
.unwrap()
.record_failure(ErrorKind::Balance, "".into(), Instant::now());
assert!(chain.all_unavailable());
assert!(chain.next_callable().is_none());
}
#[test]
fn cooling_backoff_caps_at_max() {
let d = cooling_backoff(20, ErrorKind::RateLimit);
assert_eq!(d, MAX_COOLDOWN);
}
#[test]
fn cooling_backoff_starts_at_base() {
assert_eq!(cooling_backoff(1, ErrorKind::RateLimit), Duration::from_secs(30));
assert_eq!(cooling_backoff(1, ErrorKind::Transient), Duration::from_secs(10));
}
}