Skip to main content

envoy/
circuit.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3
4use chrono::{DateTime, Utc};
5use serde::Serialize;
6
7/// Per-agent circuit breaker state.
8#[derive(Debug, Clone)]
9enum CircuitState {
10    Closed {
11        failures: u32,
12    },
13    Open {
14        opened_at: DateTime<Utc>,
15        failures: u32,
16    },
17    HalfOpen {
18        failures: u32,
19    },
20}
21
22/// Whether delivery is allowed for an agent.
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
24pub enum CanDeliver {
25    Yes,
26    No,
27    Probe,
28}
29
30/// Snapshot of circuit state for status queries.
31#[derive(Debug, Clone, Serialize)]
32pub struct CircuitStatus {
33    pub agent_id: String,
34    pub state: String,
35    pub failures: u32,
36    pub opened_at: Option<String>,
37}
38
39/// Configuration for the circuit breaker.
40#[derive(Debug, Clone)]
41pub struct CircuitConfig {
42    pub failure_threshold: u32,
43    pub cooldown_seconds: u64,
44}
45
46impl Default for CircuitConfig {
47    fn default() -> Self {
48        Self {
49            failure_threshold: 5,
50            cooldown_seconds: 60,
51        }
52    }
53}
54
55/// Per-agent circuit breaker for WS delivery.
56///
57/// Tracks delivery failures per agent and prevents hammering dead endpoints.
58/// Heartbeat from an agent resets its circuit to Closed (agent is alive).
59pub struct CircuitBreaker {
60    states: Arc<Mutex<HashMap<String, CircuitState>>>,
61    config: CircuitConfig,
62}
63
64impl CircuitBreaker {
65    pub fn new(config: CircuitConfig) -> Self {
66        Self {
67            states: Arc::new(Mutex::new(HashMap::new())),
68            config,
69        }
70    }
71
72    pub fn with_defaults() -> Self {
73        Self::new(CircuitConfig::default())
74    }
75
76    /// Check if delivery is allowed for this agent.
77    pub fn check(&self, agent_id: &str) -> CanDeliver {
78        let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
79        let state = states
80            .entry(agent_id.to_string())
81            .or_insert_with(|| CircuitState::Closed { failures: 0 });
82
83        match state {
84            CircuitState::Closed { .. } => CanDeliver::Yes,
85            CircuitState::Open { opened_at, .. } => {
86                let elapsed = (Utc::now() - *opened_at).num_seconds();
87                if elapsed >= self.config.cooldown_seconds as i64 {
88                    let failures = match state {
89                        CircuitState::Open { failures, .. } => *failures,
90                        _ => 0,
91                    };
92                    *state = CircuitState::HalfOpen { failures };
93                    CanDeliver::Probe
94                } else {
95                    CanDeliver::No
96                }
97            }
98            CircuitState::HalfOpen { .. } => CanDeliver::Probe,
99        }
100    }
101
102    /// Record a delivery failure. May transition Closed -> Open.
103    pub fn record_failure(&self, agent_id: &str) {
104        let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
105        let state = states
106            .entry(agent_id.to_string())
107            .or_insert_with(|| CircuitState::Closed { failures: 0 });
108
109        match state {
110            CircuitState::Closed { failures } => {
111                let new_failures = *failures + 1;
112                if new_failures >= self.config.failure_threshold {
113                    *state = CircuitState::Open {
114                        opened_at: Utc::now(),
115                        failures: new_failures,
116                    };
117                } else {
118                    *state = CircuitState::Closed {
119                        failures: new_failures,
120                    };
121                }
122            }
123            CircuitState::Open { failures, .. } => {
124                *state = CircuitState::Open {
125                    opened_at: Utc::now(),
126                    failures: *failures + 1,
127                };
128            }
129            CircuitState::HalfOpen { failures } => {
130                *state = CircuitState::Open {
131                    opened_at: Utc::now(),
132                    failures: *failures + 1,
133                };
134            }
135        }
136    }
137
138    /// Record a delivery success. Transitions HalfOpen -> Closed.
139    pub fn record_success(&self, agent_id: &str) {
140        let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
141        if let Some(state) = states.get_mut(agent_id) {
142            if matches!(state, CircuitState::HalfOpen { .. }) {
143                *state = CircuitState::Closed { failures: 0 };
144            }
145        }
146    }
147
148    /// Reset circuit on heartbeat. Any state -> Closed.
149    pub fn reset(&self, agent_id: &str) {
150        let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
151        states.insert(agent_id.to_string(), CircuitState::Closed { failures: 0 });
152    }
153
154    /// Remove circuit state for an agent (on disconnect).
155    pub fn remove(&self, agent_id: &str) {
156        let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
157        states.remove(agent_id);
158    }
159
160    /// Evict entries that have been Open for longer than 1 hour.
161    /// Returns the number of evicted entries.
162    pub fn evict_stale(&self) -> usize {
163        let mut states = self.states.lock().unwrap_or_else(|e| e.into_inner());
164        let cutoff = Utc::now() - chrono::Duration::hours(1);
165        let before = states.len();
166        states.retain(|_, state| match state {
167            CircuitState::Open { opened_at, .. } => *opened_at > cutoff,
168            _ => true,
169        });
170        before - states.len()
171    }
172
173    /// Get current state for status queries.
174    pub fn get_state(&self, agent_id: &str) -> CircuitStatus {
175        let states = self.states.lock().unwrap_or_else(|e| e.into_inner());
176        match states.get(agent_id) {
177            None => CircuitStatus {
178                agent_id: agent_id.to_string(),
179                state: "closed".to_string(),
180                failures: 0,
181                opened_at: None,
182            },
183            Some(CircuitState::Closed { failures }) => CircuitStatus {
184                agent_id: agent_id.to_string(),
185                state: "closed".to_string(),
186                failures: *failures,
187                opened_at: None,
188            },
189            Some(CircuitState::Open {
190                opened_at,
191                failures,
192            }) => CircuitStatus {
193                agent_id: agent_id.to_string(),
194                state: "open".to_string(),
195                failures: *failures,
196                opened_at: Some(opened_at.to_rfc3339()),
197            },
198            Some(CircuitState::HalfOpen { failures }) => CircuitStatus {
199                agent_id: agent_id.to_string(),
200                state: "half_open".to_string(),
201                failures: *failures,
202                opened_at: None,
203            },
204        }
205    }
206
207    /// List all circuits with non-default state.
208    pub fn list_active(&self) -> Vec<CircuitStatus> {
209        let states = self.states.lock().unwrap_or_else(|e| e.into_inner());
210        let mut result = Vec::new();
211        for (agent_id, state) in states.iter() {
212            match state {
213                CircuitState::Closed { failures: 0 } => continue,
214                CircuitState::Closed { failures } => result.push(CircuitStatus {
215                    agent_id: agent_id.clone(),
216                    state: "closed".to_string(),
217                    failures: *failures,
218                    opened_at: None,
219                }),
220                CircuitState::Open {
221                    opened_at,
222                    failures,
223                } => result.push(CircuitStatus {
224                    agent_id: agent_id.clone(),
225                    state: "open".to_string(),
226                    failures: *failures,
227                    opened_at: Some(opened_at.to_rfc3339()),
228                }),
229                CircuitState::HalfOpen { failures } => result.push(CircuitStatus {
230                    agent_id: agent_id.clone(),
231                    state: "half_open".to_string(),
232                    failures: *failures,
233                    opened_at: None,
234                }),
235            }
236        }
237        result
238    }
239}
240
241#[cfg(test)]
242mod tests {
243    use super::*;
244
245    fn test_breaker() -> CircuitBreaker {
246        CircuitBreaker::new(CircuitConfig {
247            failure_threshold: 3,
248            cooldown_seconds: 60,
249        })
250    }
251
252    fn breaker_with_cooldown() -> CircuitBreaker {
253        CircuitBreaker::new(CircuitConfig {
254            failure_threshold: 3,
255            cooldown_seconds: 60,
256        })
257    }
258
259    #[test]
260    fn starts_closed() {
261        let cb = test_breaker();
262        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
263    }
264
265    #[test]
266    fn stays_closed_below_threshold() {
267        let cb = test_breaker();
268        cb.record_failure("agent1");
269        cb.record_failure("agent1");
270        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
271    }
272
273    #[test]
274    fn opens_at_threshold() {
275        let cb = test_breaker();
276        cb.record_failure("agent1");
277        cb.record_failure("agent1");
278        cb.record_failure("agent1");
279        assert_eq!(cb.check("agent1"), CanDeliver::No);
280        let status = cb.get_state("agent1");
281        assert_eq!(status.state, "open");
282    }
283
284    #[test]
285    fn half_open_after_cooldown() {
286        let cb = breaker_with_cooldown();
287        cb.record_failure("agent1");
288        cb.record_failure("agent1");
289        cb.record_failure("agent1");
290        assert_eq!(cb.check("agent1"), CanDeliver::No);
291
292        // Manually transition to expired open state
293        {
294            let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
295            states.insert(
296                "agent1".to_string(),
297                CircuitState::Open {
298                    opened_at: Utc::now() - chrono::Duration::seconds(120),
299                    failures: 3,
300                },
301            );
302        }
303        assert_eq!(cb.check("agent1"), CanDeliver::Probe);
304    }
305
306    #[test]
307    fn half_open_success_closes() {
308        let cb = test_breaker();
309        // Force half-open
310        {
311            let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
312            states.insert("agent1".to_string(), CircuitState::HalfOpen { failures: 3 });
313        }
314        cb.record_success("agent1");
315        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
316        let status = cb.get_state("agent1");
317        assert_eq!(status.failures, 0);
318    }
319
320    #[test]
321    fn half_open_failure_reopens() {
322        let cb = breaker_with_cooldown();
323        {
324            let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
325            states.insert("agent1".to_string(), CircuitState::HalfOpen { failures: 3 });
326        }
327        cb.record_failure("agent1");
328        assert_eq!(cb.check("agent1"), CanDeliver::No);
329    }
330
331    #[test]
332    fn heartbeat_resets_closed() {
333        let cb = test_breaker();
334        cb.record_failure("agent1");
335        cb.record_failure("agent1");
336        cb.record_failure("agent1");
337        assert_eq!(cb.check("agent1"), CanDeliver::No);
338
339        cb.reset("agent1");
340        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
341        assert_eq!(cb.get_state("agent1").failures, 0);
342    }
343
344    #[test]
345    fn heartbeat_resets_open() {
346        let cb = breaker_with_cooldown();
347        cb.record_failure("agent1");
348        cb.record_failure("agent1");
349        cb.record_failure("agent1");
350        assert_eq!(cb.check("agent1"), CanDeliver::No);
351
352        cb.reset("agent1");
353        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
354    }
355
356    #[test]
357    fn heartbeat_resets_half_open() {
358        let cb = breaker_with_cooldown();
359        {
360            let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
361            states.insert("agent1".to_string(), CircuitState::HalfOpen { failures: 3 });
362        }
363        cb.reset("agent1");
364        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
365    }
366
367    #[test]
368    fn different_agents_independent() {
369        let cb = test_breaker();
370        cb.record_failure("agent1");
371        cb.record_failure("agent1");
372        cb.record_failure("agent1");
373        assert_eq!(cb.check("agent1"), CanDeliver::No);
374        assert_eq!(cb.check("agent2"), CanDeliver::Yes);
375    }
376
377    #[test]
378    fn remove_clears_state() {
379        let cb = test_breaker();
380        cb.record_failure("agent1");
381        cb.record_failure("agent1");
382        cb.record_failure("agent1");
383        cb.remove("agent1");
384        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
385    }
386
387    #[test]
388    fn list_active_skips_healthy() {
389        let cb = test_breaker();
390        cb.record_failure("agent1");
391        cb.record_failure("agent1");
392        cb.record_failure("agent1");
393        let active = cb.list_active();
394        assert_eq!(active.len(), 1);
395        assert_eq!(active[0].agent_id, "agent1");
396    }
397
398    #[test]
399    fn full_lifecycle() {
400        let cb = breaker_with_cooldown();
401
402        // Start closed
403        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
404
405        // Accumulate failures
406        for _ in 0..3 {
407            cb.record_failure("agent1");
408        }
409        assert_eq!(cb.check("agent1"), CanDeliver::No);
410
411        // Expire cooldown -> half-open
412        {
413            let mut states = cb.states.lock().unwrap_or_else(|e| e.into_inner());
414            states.insert(
415                "agent1".to_string(),
416                CircuitState::Open {
417                    opened_at: Utc::now() - chrono::Duration::seconds(120),
418                    failures: 3,
419                },
420            );
421        }
422        assert_eq!(cb.check("agent1"), CanDeliver::Probe);
423
424        // Probe succeeds -> closed
425        cb.record_success("agent1");
426        assert_eq!(cb.check("agent1"), CanDeliver::Yes);
427        assert_eq!(cb.get_state("agent1").failures, 0);
428    }
429}