use std::collections::BTreeMap;
use serde::{Deserialize, Serialize};
use crate::error::ToolError;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum ProbeKind {
#[default]
Http,
Tcp,
Nats,
Passive,
}
impl ProbeKind {
pub fn as_str(&self) -> &'static str {
match self {
ProbeKind::Http => "http",
ProbeKind::Tcp => "tcp",
ProbeKind::Nats => "nats",
ProbeKind::Passive => "passive",
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DownstreamSpec {
pub name: String,
pub probe: ProbeKind,
pub target: Option<String>,
}
impl DownstreamSpec {
pub fn default_named() -> Self {
Self {
name: "default".to_string(),
probe: ProbeKind::Passive,
target: None,
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct CircuitConfig {
pub trip_after: u32,
pub probe_after_ms: u64,
pub downstream: Vec<DownstreamSpec>,
pub probe_interval_ms: u64,
}
impl Default for CircuitConfig {
fn default() -> Self {
Self {
trip_after: 5,
probe_after_ms: 30_000,
downstream: Vec::new(),
probe_interval_ms: 5_000,
}
}
}
impl CircuitConfig {
pub fn downstreams(&self) -> Vec<DownstreamSpec> {
if self.downstream.is_empty() {
vec![DownstreamSpec::default_named()]
} else {
self.downstream.clone()
}
}
pub fn validate(&self) -> Result<(), ToolError> {
if self.trip_after == 0 {
return Err(ToolError::Configuration(
"spool.circuit.trip_after must be >= 1".to_string(),
));
}
let mut seen = std::collections::HashSet::new();
for d in &self.downstream {
if d.name.is_empty() {
return Err(ToolError::Configuration(
"spool.circuit.downstream[].name must be non-empty".to_string(),
));
}
if !seen.insert(d.name.clone()) {
return Err(ToolError::Configuration(format!(
"spool.circuit.downstream name '{}' is duplicated",
d.name
)));
}
if !matches!(d.probe, ProbeKind::Passive)
&& d.target.as_deref().unwrap_or("").is_empty()
{
return Err(ToolError::Configuration(format!(
"spool.circuit.downstream '{}' probe '{}' requires a 'target'",
d.name,
d.probe.as_str()
)));
}
}
Ok(())
}
pub fn parse(value: &serde_json::Value) -> Result<CircuitConfig, ToolError> {
let obj = value.as_object().ok_or_else(|| {
ToolError::Configuration("spool.circuit must be a mapping".to_string())
})?;
let mut cfg = CircuitConfig::default();
if let Some(t) = obj.get("trip_after").and_then(|v| v.as_u64()) {
cfg.trip_after = t as u32;
}
if let Some(p) = obj.get("probe_after_ms").and_then(|v| v.as_u64()) {
cfg.probe_after_ms = p;
}
if let Some(p) = obj.get("probe_interval_ms").and_then(|v| v.as_u64()) {
cfg.probe_interval_ms = p;
}
if let Some(list) = obj.get("downstream").and_then(|v| v.as_array()) {
for d in list {
let name = d
.get("name")
.and_then(|v| v.as_str())
.ok_or_else(|| {
ToolError::Configuration(
"spool.circuit.downstream[].name is required".to_string(),
)
})?
.to_string();
let probe = match d.get("type").or_else(|| d.get("probe")) {
Some(p) => crate::spool::parse_enum_str(p, "spool.circuit.downstream[].type", &[
("http", ProbeKind::Http),
("tcp", ProbeKind::Tcp),
("nats", ProbeKind::Nats),
("passive", ProbeKind::Passive),
])?,
None => ProbeKind::Http,
};
let target = d.get("target").and_then(|v| v.as_str()).map(str::to_string);
cfg.downstream.push(DownstreamSpec { name, probe, target });
}
}
cfg.validate()?;
Ok(cfg)
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "snake_case")]
pub enum CircuitPhase {
#[default]
Closed,
Open,
HalfOpen,
}
impl CircuitPhase {
pub fn as_str(&self) -> &'static str {
match self {
CircuitPhase::Closed => "closed",
CircuitPhase::Open => "open",
CircuitPhase::HalfOpen => "half_open",
}
}
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
pub struct CircuitState {
pub phase: CircuitPhase,
pub consecutive_failures: u32,
pub opened_at_ms: Option<u64>,
pub last_transition_ms: u64,
pub trips: u64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CircuitDecision {
Dispatch,
Spool,
Probe,
}
#[derive(Debug, Clone)]
pub struct CircuitBreaker {
trip_after: u32,
probe_after_ms: u64,
state: CircuitState,
}
impl CircuitBreaker {
pub fn new(trip_after: u32, probe_after_ms: u64) -> Self {
Self {
trip_after: trip_after.max(1),
probe_after_ms,
state: CircuitState::default(),
}
}
pub fn from_state(trip_after: u32, probe_after_ms: u64, state: CircuitState) -> Self {
Self {
trip_after: trip_after.max(1),
probe_after_ms,
state,
}
}
pub fn state(&self) -> &CircuitState {
&self.state
}
pub fn phase(&self) -> CircuitPhase {
self.state.phase
}
pub fn decide(&mut self, now_ms: u64) -> CircuitDecision {
match self.state.phase {
CircuitPhase::Closed => CircuitDecision::Dispatch,
CircuitPhase::HalfOpen => CircuitDecision::Spool,
CircuitPhase::Open => {
let opened = self.state.opened_at_ms.unwrap_or(now_ms);
if now_ms.saturating_sub(opened) >= self.probe_after_ms {
self.state.phase = CircuitPhase::HalfOpen;
self.state.last_transition_ms = now_ms;
CircuitDecision::Probe
} else {
CircuitDecision::Spool
}
}
}
}
pub fn record_success(&mut self, now_ms: u64) -> bool {
let was_open = !matches!(self.state.phase, CircuitPhase::Closed);
self.state.consecutive_failures = 0;
if was_open {
self.state.phase = CircuitPhase::Closed;
self.state.opened_at_ms = None;
self.state.last_transition_ms = now_ms;
}
was_open
}
pub fn record_failure(&mut self, now_ms: u64) -> bool {
self.state.consecutive_failures = self.state.consecutive_failures.saturating_add(1);
if matches!(self.state.phase, CircuitPhase::Closed)
&& self.state.consecutive_failures >= self.trip_after
{
self.open(now_ms);
true
} else {
false
}
}
pub fn on_probe(&mut self, ok: bool, now_ms: u64) -> bool {
if ok {
self.record_success(now_ms)
} else {
self.open(now_ms);
false
}
}
fn open(&mut self, now_ms: u64) {
let fresh = !matches!(self.state.phase, CircuitPhase::Open);
self.state.phase = CircuitPhase::Open;
self.state.opened_at_ms = Some(now_ms);
self.state.last_transition_ms = now_ms;
if fresh {
self.state.trips = self.state.trips.saturating_add(1);
}
}
pub fn open_for_ms(&self, now_ms: u64) -> u64 {
match (self.state.phase, self.state.opened_at_ms) {
(CircuitPhase::Closed, _) | (_, None) => 0,
(_, Some(opened)) => now_ms.saturating_sub(opened),
}
}
}
#[derive(Debug, Clone)]
pub struct CircuitRegistry {
trip_after: u32,
probe_after_ms: u64,
breakers: BTreeMap<String, CircuitBreaker>,
downstreams: BTreeMap<String, DownstreamSpec>,
}
impl CircuitRegistry {
pub fn new(config: &CircuitConfig) -> Self {
let mut breakers = BTreeMap::new();
let mut downstreams = BTreeMap::new();
for d in config.downstreams() {
breakers.insert(
d.name.clone(),
CircuitBreaker::new(config.trip_after, config.probe_after_ms),
);
downstreams.insert(d.name.clone(), d);
}
Self {
trip_after: config.trip_after,
probe_after_ms: config.probe_after_ms,
breakers,
downstreams,
}
}
pub fn downstreams(&self) -> impl Iterator<Item = &DownstreamSpec> {
self.downstreams.values()
}
pub fn route<'a>(&'a self, resolved: Option<&str>) -> &'a str {
if let Some(r) = resolved {
if self.breakers.contains_key(r) {
return self.breakers.get_key_value(r).unwrap().0;
}
}
if self.breakers.contains_key("default") {
return self.breakers.get_key_value("default").unwrap().0;
}
self.breakers
.keys()
.next()
.map(String::as_str)
.unwrap_or("default")
}
pub fn breaker_mut(&mut self, name: &str) -> &mut CircuitBreaker {
let (trip, probe) = (self.trip_after, self.probe_after_ms);
self.breakers
.entry(name.to_string())
.or_insert_with(|| CircuitBreaker::new(trip, probe))
}
pub fn breaker(&self, name: &str) -> Option<&CircuitBreaker> {
self.breakers.get(name)
}
pub fn snapshot(&self) -> BTreeMap<String, CircuitState> {
self.breakers
.iter()
.map(|(k, v)| (k.clone(), v.state().clone()))
.collect()
}
pub fn restore(&mut self, snapshot: &BTreeMap<String, CircuitState>) {
for (name, state) in snapshot {
let b = CircuitBreaker::from_state(self.trip_after, self.probe_after_ms, state.clone());
self.breakers.insert(name.clone(), b);
}
}
pub fn any_open(&self) -> bool {
self.breakers
.values()
.any(|b| !matches!(b.phase(), CircuitPhase::Closed))
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn trips_after_n_consecutive_failures() {
let mut b = CircuitBreaker::new(3, 1000);
assert_eq!(b.decide(0), CircuitDecision::Dispatch);
assert!(!b.record_failure(0));
assert!(!b.record_failure(0));
assert!(b.record_failure(0)); assert_eq!(b.phase(), CircuitPhase::Open);
assert_eq!(b.decide(0), CircuitDecision::Spool);
assert_eq!(b.state().trips, 1);
}
#[test]
fn a_success_resets_the_failure_count() {
let mut b = CircuitBreaker::new(3, 1000);
b.record_failure(0);
b.record_failure(0);
b.record_success(0); assert!(!b.record_failure(0)); assert_eq!(b.phase(), CircuitPhase::Closed);
}
#[test]
fn half_open_probe_then_close_on_success() {
let mut b = CircuitBreaker::new(1, 1000);
b.record_failure(0); assert_eq!(b.decide(500), CircuitDecision::Spool); assert_eq!(b.decide(1000), CircuitDecision::Probe); assert_eq!(b.phase(), CircuitPhase::HalfOpen);
assert_eq!(b.decide(1000), CircuitDecision::Spool); assert!(b.on_probe(true, 1100)); assert_eq!(b.phase(), CircuitPhase::Closed);
assert_eq!(b.decide(1100), CircuitDecision::Dispatch);
}
#[test]
fn half_open_probe_failure_reopens() {
let mut b = CircuitBreaker::new(1, 1000);
b.record_failure(0);
assert_eq!(b.decide(1000), CircuitDecision::Probe);
assert!(!b.on_probe(false, 1000)); assert_eq!(b.phase(), CircuitPhase::Open);
assert_eq!(b.decide(1500), CircuitDecision::Spool);
assert_eq!(b.decide(2000), CircuitDecision::Probe);
}
#[test]
fn record_failure_returns_true_only_on_fresh_trip() {
let mut b = CircuitBreaker::new(2, 1000);
assert!(!b.record_failure(0));
assert!(b.record_failure(0)); assert!(!b.record_failure(0)); assert_eq!(b.state().trips, 1);
}
#[test]
fn open_for_ms_tracks_outage_duration() {
let mut b = CircuitBreaker::new(1, 100_000);
assert_eq!(b.open_for_ms(0), 0); b.record_failure(1000);
assert_eq!(b.open_for_ms(1000), 0);
assert_eq!(b.open_for_ms(31_000), 30_000); }
#[test]
fn state_round_trips_through_serde() {
let mut b = CircuitBreaker::new(2, 5000);
b.record_failure(10);
b.record_failure(10); let json = serde_json::to_string(b.state()).unwrap();
let restored: CircuitState = serde_json::from_str(&json).unwrap();
let b2 = CircuitBreaker::from_state(2, 5000, restored);
assert_eq!(b2.phase(), CircuitPhase::Open);
assert_eq!(b2.state().trips, 1);
}
#[test]
fn registry_scopes_per_downstream() {
let cfg = CircuitConfig {
trip_after: 1,
probe_after_ms: 1000,
downstream: vec![
DownstreamSpec { name: "warehouse".into(), probe: ProbeKind::Passive, target: None },
DownstreamSpec { name: "analytics".into(), probe: ProbeKind::Passive, target: None },
],
probe_interval_ms: 1000,
};
let mut reg = CircuitRegistry::new(&cfg);
reg.breaker_mut("warehouse").record_failure(0);
assert_eq!(reg.breaker("warehouse").unwrap().phase(), CircuitPhase::Open);
assert_eq!(
reg.breaker_mut("analytics").decide(0),
CircuitDecision::Dispatch
);
assert!(reg.any_open());
}
#[test]
fn registry_routes_resolved_target_else_default() {
let cfg = CircuitConfig::default(); let reg = CircuitRegistry::new(&cfg);
assert_eq!(reg.route(None), "default");
assert_eq!(reg.route(Some("nonexistent")), "default");
}
#[test]
fn registry_snapshot_restore_round_trip() {
let cfg = CircuitConfig { trip_after: 1, probe_after_ms: 1000, downstream: vec![
DownstreamSpec { name: "wh".into(), probe: ProbeKind::Passive, target: None },
], probe_interval_ms: 1000 };
let mut reg = CircuitRegistry::new(&cfg);
reg.breaker_mut("wh").record_failure(0); let snap = reg.snapshot();
let mut reg2 = CircuitRegistry::new(&cfg);
reg2.restore(&snap);
assert_eq!(reg2.breaker("wh").unwrap().phase(), CircuitPhase::Open);
}
#[test]
fn config_parse_and_validate() {
let v = serde_json::json!({
"trip_after": 4, "probe_after_ms": 8000,
"downstream": [{"name": "db", "type": "tcp", "target": "db:5432"}]
});
let cfg = CircuitConfig::parse(&v).unwrap();
assert_eq!(cfg.trip_after, 4);
assert_eq!(cfg.downstream[0].probe, ProbeKind::Tcp);
assert!(CircuitConfig::parse(&serde_json::json!({"trip_after": 0})).is_err());
assert!(CircuitConfig::parse(&serde_json::json!({
"downstream": [{"name": "x", "type": "http"}]
})).is_err());
assert!(CircuitConfig::parse(&serde_json::json!({
"downstream": [{"name": "x", "type": "passive"}, {"name": "x", "type": "passive"}]
})).is_err());
}
}