Skip to main content

inferd_daemon/
router.rs

1//! Backend router — priority-ordered policy + per-backend circuit
2//! breaker.
3//!
4//! Per ADR 0007 §"Implementation shape":
5//!
6//! - **Policy** is priority-order. Backends are registered in the
7//!   operator's preferred order (highest priority first); `dispatch`
8//!   walks the list and returns the first backend that's both
9//!   `ready()` and not currently circuit-broken.
10//! - **Circuit breaker** is per-backend. After `failure_threshold`
11//!   pre-stream or mid-stream failures within `failure_window`, the
12//!   breaker opens and the backend is skipped for `cooldown` seconds.
13//!   After cooldown the breaker is half-open: the next dispatch
14//!   tries the backend again; success closes it, failure re-opens
15//!   for another `cooldown`.
16//! - **No retry, no failover**. `dispatch` returns one backend; the
17//!   caller (lifecycle / lifecycle_v2) generates against it once
18//!   and reports the outcome via `record_success` / `record_failure`.
19//!   If the backend errors mid-stream the daemon emits one terminal
20//!   error frame — it does *not* try a different backend (ADR 0007).
21//!
22//! Apps do not pick the backend (ADR 0006). There is no per-request
23//! `backend` field on the wire.
24//!
25//! The `Router` itself is `Send + Sync` and shared via `Arc`. Internal
26//! breaker state lives behind an `RwLock`; reads (the dispatch hot
27//! path) take the read lock, and the only writers are the
28//! `record_*` feedback hooks.
29
30use inferd_engine::Backend;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33use std::time::{Duration, Instant};
34
35/// Default failure threshold for the breaker. Modest — three
36/// pre-stream failures in a window is enough to call a cloud
37/// adapter hot, more is over-tolerance.
38pub const DEFAULT_FAILURE_THRESHOLD: u32 = 3;
39
40/// Default observation window for failures. A burst of three
41/// 5xx responses inside 60s is unusual for a healthy upstream.
42pub const DEFAULT_FAILURE_WINDOW: Duration = Duration::from_secs(60);
43
44/// Default cooldown after the breaker opens. Long enough that
45/// a flapping cloud upstream doesn't get hammered, short enough
46/// that an outage of <1 minute clears on its own.
47pub const DEFAULT_COOLDOWN: Duration = Duration::from_secs(30);
48
49/// Tunables for the per-backend circuit breaker. The same policy
50/// applies to every registered backend.
51#[derive(Debug, Clone, Copy)]
52pub struct BreakerPolicy {
53    /// Failures within `failure_window` that open the breaker.
54    pub failure_threshold: u32,
55    /// Sliding window over which failures are counted.
56    pub failure_window: Duration,
57    /// Open-state duration. After this elapses, the breaker
58    /// transitions to half-open on the next dispatch.
59    pub cooldown: Duration,
60}
61
62impl Default for BreakerPolicy {
63    fn default() -> Self {
64        Self {
65            failure_threshold: DEFAULT_FAILURE_THRESHOLD,
66            failure_window: DEFAULT_FAILURE_WINDOW,
67            cooldown: DEFAULT_COOLDOWN,
68        }
69    }
70}
71
72/// Per-backend breaker state. Internal to the router.
73#[derive(Debug, Clone)]
74struct BreakerState {
75    /// Timestamps of recent failures inside the window. Pruned on
76    /// every read/write.
77    failures: Vec<Instant>,
78    /// `Some(when)` while the breaker is open; the value is the
79    /// earliest dispatch instant at which we're willing to try the
80    /// backend again (half-open).
81    open_until: Option<Instant>,
82}
83
84impl BreakerState {
85    fn new() -> Self {
86        Self {
87            failures: Vec::new(),
88            open_until: None,
89        }
90    }
91
92    /// Drop failures older than `window` from the count window.
93    fn prune(&mut self, now: Instant, window: Duration) {
94        let cutoff = now.checked_sub(window).unwrap_or(now);
95        self.failures.retain(|&t| t >= cutoff);
96    }
97
98    /// `true` if the breaker is currently open at instant `now`.
99    fn is_open(&self, now: Instant) -> bool {
100        self.open_until.is_some_and(|until| now < until)
101    }
102}
103
104/// A backend known to the router with its circuit-breaker slot.
105struct Slot {
106    backend: Arc<dyn Backend>,
107    name: String,
108    state: BreakerState,
109}
110
111/// Outcome of `Router::dispatch` — chosen backend plus the diagnostic
112/// info the lifecycle layer needs to report success / failure later.
113pub struct Dispatch {
114    /// Chosen backend (use exactly once; report the outcome via
115    /// `Router::record_success` / `record_failure`).
116    pub backend: Arc<dyn Backend>,
117    /// Stable name (`Backend::name()` snapshot — saves a vtable hop
118    /// at outcome-recording time).
119    pub name: String,
120}
121
122/// Errors returned by `Router::dispatch`.
123#[derive(Debug, thiserror::Error, PartialEq, Eq)]
124pub enum RouterError {
125    /// No backends are registered. Configuration error.
126    #[error("no backends registered")]
127    NoBackends,
128    /// Every registered backend is unavailable — either not ready
129    /// or its circuit breaker is open. Surfaces to the caller as
130    /// `code: backend_unavailable`.
131    #[error("no backend available")]
132    NoneAvailable,
133}
134
135/// Backend router. Holds the priority-ordered backend list and
136/// per-backend breaker state. Cheap to clone via `Arc`.
137pub struct Router {
138    slots: RwLock<Vec<Slot>>,
139    policy: BreakerPolicy,
140    /// Map from `Backend::name()` → index into `slots`. The map is
141    /// populated once at construction and never changes — backends
142    /// aren't added or removed at runtime in v0.2 (no admin API for
143    /// it). Keeps name-keyed feedback (`record_success` /
144    /// `record_failure`) O(1) without a linear scan over the slot
145    /// list.
146    name_index: HashMap<String, usize>,
147}
148
149impl Router {
150    /// Build a router from a priority-ordered list of backends.
151    /// First entry is highest priority. Uses the default breaker
152    /// policy.
153    pub fn new(backends: Vec<Arc<dyn Backend>>) -> Self {
154        Self::with_policy(backends, BreakerPolicy::default())
155    }
156
157    /// Build a router with explicit breaker tuning. Used by tests
158    /// (short windows + cooldowns) and operator-tuned configs.
159    pub fn with_policy(backends: Vec<Arc<dyn Backend>>, policy: BreakerPolicy) -> Self {
160        let mut name_index = HashMap::with_capacity(backends.len());
161        let slots: Vec<Slot> = backends
162            .into_iter()
163            .enumerate()
164            .map(|(i, b)| {
165                let name = b.name().to_string();
166                name_index.insert(name.clone(), i);
167                Slot {
168                    backend: b,
169                    name,
170                    state: BreakerState::new(),
171                }
172            })
173            .collect();
174        Self {
175            slots: RwLock::new(slots),
176            policy,
177            name_index,
178        }
179    }
180
181    /// Number of registered backends. For diagnostics and tests.
182    pub fn len(&self) -> usize {
183        self.slots.read().expect("router rwlock poisoned").len()
184    }
185
186    /// `true` if no backends are registered.
187    pub fn is_empty(&self) -> bool {
188        self.slots
189            .read()
190            .expect("router rwlock poisoned")
191            .is_empty()
192    }
193
194    /// Pick a backend for the next request.
195    ///
196    /// Walks the slot list in priority order and returns the first
197    /// slot whose backend is `ready()` and whose breaker is not
198    /// currently open. Slots whose breaker has timed out into the
199    /// half-open state count as eligible — the next outcome reported
200    /// for them either closes the breaker (`record_success`) or
201    /// re-opens it (`record_failure`).
202    pub fn dispatch(&self) -> Result<Dispatch, RouterError> {
203        let now = Instant::now();
204        let mut guard = self.slots.write().expect("router rwlock poisoned");
205        if guard.is_empty() {
206            return Err(RouterError::NoBackends);
207        }
208
209        for slot in guard.iter_mut() {
210            // Prune old failures so a stale window doesn't keep the
211            // breaker open after a long quiet period.
212            slot.state.prune(now, self.policy.failure_window);
213
214            if !slot.backend.ready() {
215                continue;
216            }
217
218            if slot.state.is_open(now) {
219                continue;
220            }
221            // If the breaker had been open and the cooldown has
222            // expired, transition to half-open by clearing the
223            // `open_until` mark. The backend gets one shot; the
224            // next outcome decides whether it stays closed.
225            if slot.state.open_until.is_some() {
226                slot.state.open_until = None;
227                slot.state.failures.clear();
228            }
229            return Ok(Dispatch {
230                backend: Arc::clone(&slot.backend),
231                name: slot.name.clone(),
232            });
233        }
234
235        Err(RouterError::NoneAvailable)
236    }
237
238    /// `true` once every registered backend reports ready. The
239    /// lifecycle uses this to gate listener creation
240    /// (THREAT_MODEL F-13). Breaker state is *not* consulted —
241    /// readiness here is the boot-time engine-loaded check, not
242    /// the per-request availability check.
243    pub fn all_ready(&self) -> bool {
244        let guard = self.slots.read().expect("router rwlock poisoned");
245        !guard.is_empty() && guard.iter().all(|s| s.backend.ready())
246    }
247
248    /// Record a successful generation against the given backend.
249    /// Closes the circuit breaker (clears the open mark and the
250    /// failure window). Called by the lifecycle layer after a
251    /// terminal `Done` frame for a request served by this backend.
252    pub fn record_success(&self, name: &str) {
253        let Some(&idx) = self.name_index.get(name) else {
254            return;
255        };
256        let mut guard = self.slots.write().expect("router rwlock poisoned");
257        if let Some(slot) = guard.get_mut(idx) {
258            slot.state.failures.clear();
259            slot.state.open_until = None;
260        }
261    }
262
263    /// Record a failure against the given backend. Counts toward
264    /// the failure window; opens the breaker once the threshold is
265    /// reached. Called from both the pre-stream error path
266    /// (`generate*` returned `GenerateError`) and the mid-stream
267    /// error path (stream terminated without a `Done` frame).
268    /// Both kinds of failure count equally per ADR 0007.
269    pub fn record_failure(&self, name: &str) {
270        let Some(&idx) = self.name_index.get(name) else {
271            return;
272        };
273        let now = Instant::now();
274        let mut guard = self.slots.write().expect("router rwlock poisoned");
275        let Some(slot) = guard.get_mut(idx) else {
276            return;
277        };
278        slot.state.prune(now, self.policy.failure_window);
279        slot.state.failures.push(now);
280        if slot.state.failures.len() as u32 >= self.policy.failure_threshold {
281            slot.state.open_until = Some(now + self.policy.cooldown);
282        }
283    }
284
285    /// `true` if the named backend's circuit breaker is currently
286    /// open. Diagnostic surface; the dispatch path checks this
287    /// internally and returns `NoneAvailable` if every backend is
288    /// open.
289    pub fn breaker_open(&self, name: &str) -> bool {
290        let Some(&idx) = self.name_index.get(name) else {
291            return false;
292        };
293        let now = Instant::now();
294        let guard = self.slots.read().expect("router rwlock poisoned");
295        guard.get(idx).is_some_and(|slot| slot.state.is_open(now))
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302    use inferd_engine::mock::Mock;
303
304    fn fast_policy() -> BreakerPolicy {
305        BreakerPolicy {
306            failure_threshold: 2,
307            failure_window: Duration::from_millis(500),
308            cooldown: Duration::from_millis(100),
309        }
310    }
311
312    #[test]
313    fn empty_router_dispatch_returns_no_backends() {
314        let router = Router::new(vec![]);
315        assert!(router.is_empty());
316        assert_eq!(router.dispatch().err(), Some(RouterError::NoBackends));
317    }
318
319    #[test]
320    fn dispatch_returns_ready_backend() {
321        let mock = Arc::new(Mock::new());
322        let router = Router::new(vec![mock.clone()]);
323        let chosen = router.dispatch().expect("dispatch ok");
324        assert_eq!(chosen.name, "mock");
325        assert_eq!(chosen.backend.name(), "mock");
326        assert!(router.all_ready());
327    }
328
329    #[test]
330    fn unready_backend_returns_none_available() {
331        let mock = Arc::new(Mock::new());
332        mock.set_ready(false);
333        let router = Router::new(vec![mock]);
334        assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
335        assert!(!router.all_ready());
336    }
337
338    #[test]
339    fn priority_picks_first_ready_backend() {
340        // Two named backends — Mock currently always returns "mock"
341        // for name(), so we use a thin wrapper to give them distinct
342        // identities for the priority test.
343        struct Named {
344            inner: Mock,
345            name: &'static str,
346        }
347        #[async_trait::async_trait]
348        impl Backend for Named {
349            fn name(&self) -> &str {
350                self.name
351            }
352            fn ready(&self) -> bool {
353                self.inner.ready()
354            }
355            async fn generate(
356                &self,
357                req: inferd_proto::Resolved,
358            ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
359                self.inner.generate(req).await
360            }
361        }
362
363        let high = Arc::new(Named {
364            inner: Mock::new(),
365            name: "high",
366        });
367        let low = Arc::new(Named {
368            inner: Mock::new(),
369            name: "low",
370        });
371        high.inner.set_ready(false);
372        let router = Router::new(vec![high.clone(), low.clone()]);
373        // High not ready → fall through to low.
374        assert_eq!(router.dispatch().unwrap().name, "low");
375        // High recovers → router prefers high again.
376        high.inner.set_ready(true);
377        assert_eq!(router.dispatch().unwrap().name, "high");
378    }
379
380    #[test]
381    fn breaker_opens_after_threshold_failures() {
382        let mock = Arc::new(Mock::new());
383        let router = Router::with_policy(vec![mock], fast_policy());
384        // First failure — still closed.
385        router.record_failure("mock");
386        assert!(!router.breaker_open("mock"));
387        // Second failure → meets threshold (2), breaker opens.
388        router.record_failure("mock");
389        assert!(router.breaker_open("mock"));
390        // Dispatch sees no available backends.
391        assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
392    }
393
394    #[test]
395    fn success_resets_failure_count() {
396        let mock = Arc::new(Mock::new());
397        let router = Router::with_policy(vec![mock], fast_policy());
398        router.record_failure("mock");
399        router.record_success("mock");
400        // Next failure restarts the count from zero — breaker stays
401        // closed even though we've now had 2 lifetime failures.
402        router.record_failure("mock");
403        assert!(!router.breaker_open("mock"));
404    }
405
406    #[test]
407    fn breaker_recovers_after_cooldown() {
408        let mock = Arc::new(Mock::new());
409        let router = Router::with_policy(vec![mock], fast_policy());
410        router.record_failure("mock");
411        router.record_failure("mock");
412        assert!(router.breaker_open("mock"));
413        // Wait past the cooldown.
414        std::thread::sleep(Duration::from_millis(150));
415        assert!(!router.breaker_open("mock"));
416        // Dispatch puts the breaker into half-open and yields the
417        // backend.
418        assert_eq!(router.dispatch().unwrap().name, "mock");
419    }
420
421    #[test]
422    fn old_failures_outside_window_dont_open_breaker() {
423        let mock = Arc::new(Mock::new());
424        let router = Router::with_policy(vec![mock], fast_policy());
425        router.record_failure("mock");
426        std::thread::sleep(Duration::from_millis(600)); // exceed the 500ms window
427        router.record_failure("mock");
428        assert!(!router.breaker_open("mock"));
429    }
430
431    #[test]
432    fn record_failure_for_unknown_backend_is_a_noop() {
433        let mock = Arc::new(Mock::new());
434        let router = Router::new(vec![mock]);
435        router.record_failure("does-not-exist"); // should not panic
436        router.record_success("does-not-exist");
437        assert!(!router.breaker_open("does-not-exist"));
438    }
439}