Skip to main content

inferd_daemon/
router.rs

1//! Backend router — priority-ordered policy + per-backend circuit
2//! breaker.
3//!
4//! Per ADR 0007 §"Implementation shape":
5//!
6//! - **Policy** is priority-order. Backends are registered in the
7//!   operator's preferred order (highest priority first); `dispatch`
8//!   walks the list and returns the first backend that's both
9//!   `ready()` and not currently circuit-broken.
10//! - **Circuit breaker** is per-backend. After `failure_threshold`
11//!   pre-stream or mid-stream failures within `failure_window`, the
12//!   breaker opens and the backend is skipped for `cooldown` seconds.
13//!   After cooldown the breaker is half-open: the next dispatch
14//!   tries the backend again; success closes it, failure re-opens
15//!   for another `cooldown`.
16//! - **No retry, no failover**. `dispatch` returns one backend; the
17//!   caller (lifecycle / lifecycle_v2) generates against it once
18//!   and reports the outcome via `record_success` / `record_failure`.
19//!   If the backend errors mid-stream the daemon emits one terminal
20//!   error frame — it does *not* try a different backend (ADR 0007).
21//!
22//! Apps do not pick the backend (ADR 0006). There is no per-request
23//! `backend` field on the wire.
24//!
25//! The `Router` itself is `Send + Sync` and shared via `Arc`. Internal
26//! breaker state lives behind an `RwLock`; reads (the dispatch hot
27//! path) take the read lock, and the only writers are the
28//! `record_*` feedback hooks.
29
30use inferd_engine::Backend;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33use std::time::{Duration, Instant};
34
35/// Default failure threshold for the breaker. Modest — three
36/// pre-stream failures in a window is enough to call a cloud
37/// adapter hot, more is over-tolerance.
38pub const DEFAULT_FAILURE_THRESHOLD: u32 = 3;
39
40/// Default observation window for failures. A burst of three
41/// 5xx responses inside 60s is unusual for a healthy upstream.
42pub const DEFAULT_FAILURE_WINDOW: Duration = Duration::from_secs(60);
43
44/// Default cooldown after the breaker opens. Long enough that
45/// a flapping cloud upstream doesn't get hammered, short enough
46/// that an outage of <1 minute clears on its own.
47pub const DEFAULT_COOLDOWN: Duration = Duration::from_secs(30);
48
49/// Tunables for the per-backend circuit breaker. The same policy
50/// applies to every registered backend.
51#[derive(Debug, Clone, Copy)]
52pub struct BreakerPolicy {
53    /// Failures within `failure_window` that open the breaker.
54    pub failure_threshold: u32,
55    /// Sliding window over which failures are counted.
56    pub failure_window: Duration,
57    /// Open-state duration. After this elapses, the breaker
58    /// transitions to half-open on the next dispatch.
59    pub cooldown: Duration,
60}
61
62impl Default for BreakerPolicy {
63    fn default() -> Self {
64        Self {
65            failure_threshold: DEFAULT_FAILURE_THRESHOLD,
66            failure_window: DEFAULT_FAILURE_WINDOW,
67            cooldown: DEFAULT_COOLDOWN,
68        }
69    }
70}
71
72/// Per-backend breaker state. Internal to the router.
73#[derive(Debug, Clone)]
74struct BreakerState {
75    /// Timestamps of recent failures inside the window. Pruned on
76    /// every read/write.
77    failures: Vec<Instant>,
78    /// `Some(when)` while the breaker is open; the value is the
79    /// earliest dispatch instant at which we're willing to try the
80    /// backend again (half-open).
81    open_until: Option<Instant>,
82}
83
84impl BreakerState {
85    fn new() -> Self {
86        Self {
87            failures: Vec::new(),
88            open_until: None,
89        }
90    }
91
92    /// Drop failures older than `window` from the count window.
93    fn prune(&mut self, now: Instant, window: Duration) {
94        let cutoff = now.checked_sub(window).unwrap_or(now);
95        self.failures.retain(|&t| t >= cutoff);
96    }
97
98    /// `true` if the breaker is currently open at instant `now`.
99    fn is_open(&self, now: Instant) -> bool {
100        self.open_until.is_some_and(|until| now < until)
101    }
102}
103
104/// A backend known to the router with its circuit-breaker slot.
105struct Slot {
106    backend: Arc<dyn Backend>,
107    name: String,
108    state: BreakerState,
109}
110
111/// Outcome of `Router::dispatch` — chosen backend plus the diagnostic
112/// info the lifecycle layer needs to report success / failure later.
113pub struct Dispatch {
114    /// Chosen backend (use exactly once; report the outcome via
115    /// `Router::record_success` / `record_failure`).
116    pub backend: Arc<dyn Backend>,
117    /// Stable name (`Backend::name()` snapshot — saves a vtable hop
118    /// at outcome-recording time).
119    pub name: String,
120}
121
122/// Errors returned by `Router::dispatch`.
123#[derive(Debug, thiserror::Error, PartialEq, Eq)]
124pub enum RouterError {
125    /// No backends are registered. Configuration error.
126    #[error("no backends registered")]
127    NoBackends,
128    /// Every registered backend is unavailable — either not ready
129    /// or its circuit breaker is open. Surfaces to the caller as
130    /// `code: backend_unavailable`.
131    #[error("no backend available")]
132    NoneAvailable,
133}
134
135/// Backend router. Holds the priority-ordered backend list and
136/// per-backend breaker state. Cheap to clone via `Arc`.
137pub struct Router {
138    slots: RwLock<Vec<Slot>>,
139    policy: BreakerPolicy,
140    /// Map from `Backend::name()` → index into `slots`. The map is
141    /// populated once at construction and never changes — backends
142    /// aren't added or removed at runtime in v0.2 (no admin API for
143    /// it). Keeps name-keyed feedback (`record_success` /
144    /// `record_failure`) O(1) without a linear scan over the slot
145    /// list.
146    name_index: HashMap<String, usize>,
147}
148
149impl Router {
150    /// Build a router from a priority-ordered list of backends.
151    /// First entry is highest priority. Uses the default breaker
152    /// policy.
153    pub fn new(backends: Vec<Arc<dyn Backend>>) -> Self {
154        Self::with_policy(backends, BreakerPolicy::default())
155    }
156
157    /// Build a router with explicit breaker tuning. Used by tests
158    /// (short windows + cooldowns) and operator-tuned configs.
159    pub fn with_policy(backends: Vec<Arc<dyn Backend>>, policy: BreakerPolicy) -> Self {
160        let mut name_index = HashMap::with_capacity(backends.len());
161        let slots: Vec<Slot> = backends
162            .into_iter()
163            .enumerate()
164            .map(|(i, b)| {
165                let name = b.name().to_string();
166                name_index.insert(name.clone(), i);
167                Slot {
168                    backend: b,
169                    name,
170                    state: BreakerState::new(),
171                }
172            })
173            .collect();
174        Self {
175            slots: RwLock::new(slots),
176            policy,
177            name_index,
178        }
179    }
180
181    /// Number of registered backends. For diagnostics and tests.
182    pub fn len(&self) -> usize {
183        self.slots.read().expect("router rwlock poisoned").len()
184    }
185
186    /// `true` if no backends are registered.
187    pub fn is_empty(&self) -> bool {
188        self.slots
189            .read()
190            .expect("router rwlock poisoned")
191            .is_empty()
192    }
193
194    /// Pick a backend for the next request.
195    ///
196    /// Walks the slot list in priority order and returns the first
197    /// slot whose backend is `ready()` and whose breaker is not
198    /// currently open. Slots whose breaker has timed out into the
199    /// half-open state count as eligible — the next outcome reported
200    /// for them either closes the breaker (`record_success`) or
201    /// re-opens it (`record_failure`).
202    pub fn dispatch(&self) -> Result<Dispatch, RouterError> {
203        self.dispatch_filtered(|_| true)
204    }
205
206    /// Pick a backend for an embed request — same priority + breaker
207    /// rules as `dispatch`, but skip slots whose backend doesn't
208    /// advertise `capabilities().embed`. Without this filter, a
209    /// multi-backend config that puts a generate-only backend ahead
210    /// of an embed-capable one in the priority list would always
211    /// land embed requests on the generate slot and yield
212    /// `EmbedUnsupported`.
213    pub fn dispatch_embed(&self) -> Result<Dispatch, RouterError> {
214        self.dispatch_filtered(|backend| backend.capabilities().embed)
215    }
216
217    fn dispatch_filtered<F>(&self, predicate: F) -> Result<Dispatch, RouterError>
218    where
219        F: Fn(&Arc<dyn Backend>) -> bool,
220    {
221        let now = Instant::now();
222        let mut guard = self.slots.write().expect("router rwlock poisoned");
223        if guard.is_empty() {
224            return Err(RouterError::NoBackends);
225        }
226
227        for slot in guard.iter_mut() {
228            // Prune old failures so a stale window doesn't keep the
229            // breaker open after a long quiet period.
230            slot.state.prune(now, self.policy.failure_window);
231
232            if !slot.backend.ready() {
233                continue;
234            }
235
236            if !predicate(&slot.backend) {
237                continue;
238            }
239
240            if slot.state.is_open(now) {
241                continue;
242            }
243            // If the breaker had been open and the cooldown has
244            // expired, transition to half-open by clearing the
245            // `open_until` mark. The backend gets one shot; the
246            // next outcome decides whether it stays closed.
247            if slot.state.open_until.is_some() {
248                slot.state.open_until = None;
249                slot.state.failures.clear();
250            }
251            return Ok(Dispatch {
252                backend: Arc::clone(&slot.backend),
253                name: slot.name.clone(),
254            });
255        }
256
257        Err(RouterError::NoneAvailable)
258    }
259
260    /// `true` once every registered backend reports ready. The
261    /// lifecycle uses this to gate listener creation
262    /// (THREAT_MODEL F-13). Breaker state is *not* consulted —
263    /// readiness here is the boot-time engine-loaded check, not
264    /// the per-request availability check.
265    pub fn all_ready(&self) -> bool {
266        let guard = self.slots.read().expect("router rwlock poisoned");
267        !guard.is_empty() && guard.iter().all(|s| s.backend.ready())
268    }
269
270    /// Record a successful generation against the given backend.
271    /// Closes the circuit breaker (clears the open mark and the
272    /// failure window). Called by the lifecycle layer after a
273    /// terminal `Done` frame for a request served by this backend.
274    pub fn record_success(&self, name: &str) {
275        let Some(&idx) = self.name_index.get(name) else {
276            return;
277        };
278        let mut guard = self.slots.write().expect("router rwlock poisoned");
279        if let Some(slot) = guard.get_mut(idx) {
280            slot.state.failures.clear();
281            slot.state.open_until = None;
282        }
283    }
284
285    /// Record a failure against the given backend. Counts toward
286    /// the failure window; opens the breaker once the threshold is
287    /// reached. Called from both the pre-stream error path
288    /// (`generate*` returned `GenerateError`) and the mid-stream
289    /// error path (stream terminated without a `Done` frame).
290    /// Both kinds of failure count equally per ADR 0007.
291    pub fn record_failure(&self, name: &str) {
292        let Some(&idx) = self.name_index.get(name) else {
293            return;
294        };
295        let now = Instant::now();
296        let mut guard = self.slots.write().expect("router rwlock poisoned");
297        let Some(slot) = guard.get_mut(idx) else {
298            return;
299        };
300        slot.state.prune(now, self.policy.failure_window);
301        slot.state.failures.push(now);
302        if slot.state.failures.len() as u32 >= self.policy.failure_threshold {
303            slot.state.open_until = Some(now + self.policy.cooldown);
304        }
305    }
306
307    /// `true` if the named backend's circuit breaker is currently
308    /// open. Diagnostic surface; the dispatch path checks this
309    /// internally and returns `NoneAvailable` if every backend is
310    /// open.
311    pub fn breaker_open(&self, name: &str) -> bool {
312        let Some(&idx) = self.name_index.get(name) else {
313            return false;
314        };
315        let now = Instant::now();
316        let guard = self.slots.read().expect("router rwlock poisoned");
317        guard.get(idx).is_some_and(|slot| slot.state.is_open(now))
318    }
319}
320
321#[cfg(test)]
322mod tests {
323    use super::*;
324    use inferd_engine::mock::Mock;
325
326    fn fast_policy() -> BreakerPolicy {
327        BreakerPolicy {
328            failure_threshold: 2,
329            failure_window: Duration::from_millis(500),
330            cooldown: Duration::from_millis(100),
331        }
332    }
333
334    #[test]
335    fn empty_router_dispatch_returns_no_backends() {
336        let router = Router::new(vec![]);
337        assert!(router.is_empty());
338        assert_eq!(router.dispatch().err(), Some(RouterError::NoBackends));
339    }
340
341    #[test]
342    fn dispatch_returns_ready_backend() {
343        let mock = Arc::new(Mock::new());
344        let router = Router::new(vec![mock.clone()]);
345        let chosen = router.dispatch().expect("dispatch ok");
346        assert_eq!(chosen.name, "mock");
347        assert_eq!(chosen.backend.name(), "mock");
348        assert!(router.all_ready());
349    }
350
351    #[test]
352    fn unready_backend_returns_none_available() {
353        let mock = Arc::new(Mock::new());
354        mock.set_ready(false);
355        let router = Router::new(vec![mock]);
356        assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
357        assert!(!router.all_ready());
358    }
359
360    #[test]
361    fn priority_picks_first_ready_backend() {
362        // Two named backends — Mock currently always returns "mock"
363        // for name(), so we use a thin wrapper to give them distinct
364        // identities for the priority test.
365        struct Named {
366            inner: Mock,
367            name: &'static str,
368        }
369        #[async_trait::async_trait]
370        impl Backend for Named {
371            fn name(&self) -> &str {
372                self.name
373            }
374            fn ready(&self) -> bool {
375                self.inner.ready()
376            }
377            async fn generate(
378                &self,
379                req: inferd_proto::Resolved,
380            ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
381                self.inner.generate(req).await
382            }
383        }
384
385        let high = Arc::new(Named {
386            inner: Mock::new(),
387            name: "high",
388        });
389        let low = Arc::new(Named {
390            inner: Mock::new(),
391            name: "low",
392        });
393        high.inner.set_ready(false);
394        let router = Router::new(vec![high.clone(), low.clone()]);
395        // High not ready → fall through to low.
396        assert_eq!(router.dispatch().unwrap().name, "low");
397        // High recovers → router prefers high again.
398        high.inner.set_ready(true);
399        assert_eq!(router.dispatch().unwrap().name, "high");
400    }
401
402    #[test]
403    fn breaker_opens_after_threshold_failures() {
404        let mock = Arc::new(Mock::new());
405        let router = Router::with_policy(vec![mock], fast_policy());
406        // First failure — still closed.
407        router.record_failure("mock");
408        assert!(!router.breaker_open("mock"));
409        // Second failure → meets threshold (2), breaker opens.
410        router.record_failure("mock");
411        assert!(router.breaker_open("mock"));
412        // Dispatch sees no available backends.
413        assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
414    }
415
416    #[test]
417    fn success_resets_failure_count() {
418        let mock = Arc::new(Mock::new());
419        let router = Router::with_policy(vec![mock], fast_policy());
420        router.record_failure("mock");
421        router.record_success("mock");
422        // Next failure restarts the count from zero — breaker stays
423        // closed even though we've now had 2 lifetime failures.
424        router.record_failure("mock");
425        assert!(!router.breaker_open("mock"));
426    }
427
428    #[test]
429    fn breaker_recovers_after_cooldown() {
430        let mock = Arc::new(Mock::new());
431        let router = Router::with_policy(vec![mock], fast_policy());
432        router.record_failure("mock");
433        router.record_failure("mock");
434        assert!(router.breaker_open("mock"));
435        // Wait past the cooldown.
436        std::thread::sleep(Duration::from_millis(150));
437        assert!(!router.breaker_open("mock"));
438        // Dispatch puts the breaker into half-open and yields the
439        // backend.
440        assert_eq!(router.dispatch().unwrap().name, "mock");
441    }
442
443    #[test]
444    fn old_failures_outside_window_dont_open_breaker() {
445        let mock = Arc::new(Mock::new());
446        let router = Router::with_policy(vec![mock], fast_policy());
447        router.record_failure("mock");
448        std::thread::sleep(Duration::from_millis(600)); // exceed the 500ms window
449        router.record_failure("mock");
450        assert!(!router.breaker_open("mock"));
451    }
452
453    #[test]
454    fn record_failure_for_unknown_backend_is_a_noop() {
455        let mock = Arc::new(Mock::new());
456        let router = Router::new(vec![mock]);
457        router.record_failure("does-not-exist"); // should not panic
458        router.record_success("does-not-exist");
459        assert!(!router.breaker_open("does-not-exist"));
460    }
461
462    #[test]
463    fn dispatch_embed_skips_non_embed_backends() {
464        // Wrapper that overrides `capabilities().embed` to false but
465        // delegates everything else to Mock. Models the multi-backend
466        // shape inferd ships in its first-boot default config: a
467        // generate-only backend ahead of an embed-capable one in the
468        // priority list.
469        struct GenerateOnly {
470            inner: Mock,
471            name: &'static str,
472        }
473        #[async_trait::async_trait]
474        impl Backend for GenerateOnly {
475            fn name(&self) -> &str {
476                self.name
477            }
478            fn ready(&self) -> bool {
479                self.inner.ready()
480            }
481            fn capabilities(&self) -> inferd_engine::BackendCapabilities {
482                inferd_engine::BackendCapabilities {
483                    embed: false,
484                    ..self.inner.capabilities()
485                }
486            }
487            async fn generate(
488                &self,
489                req: inferd_proto::Resolved,
490            ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
491                self.inner.generate(req).await
492            }
493        }
494
495        let generate_only = Arc::new(GenerateOnly {
496            inner: Mock::new(),
497            name: "generate-only",
498        });
499        let embed_capable = Arc::new(Mock::new()); // mock advertises embed=true
500        let router = Router::new(vec![generate_only.clone(), embed_capable.clone()]);
501
502        // Plain dispatch picks the first ready backend regardless of
503        // capability — that's the existing generate-path semantics.
504        assert_eq!(router.dispatch().unwrap().name, "generate-only");
505
506        // dispatch_embed must skip generate-only and land on the
507        // embed-capable backend.
508        let chosen = router.dispatch_embed().expect("embed dispatch ok");
509        assert_eq!(chosen.name, "mock");
510        assert!(chosen.backend.capabilities().embed);
511    }
512
513    #[test]
514    fn dispatch_embed_returns_none_available_when_no_backend_supports_embed() {
515        struct GenerateOnly {
516            inner: Mock,
517        }
518        #[async_trait::async_trait]
519        impl Backend for GenerateOnly {
520            fn name(&self) -> &str {
521                "generate-only"
522            }
523            fn ready(&self) -> bool {
524                self.inner.ready()
525            }
526            fn capabilities(&self) -> inferd_engine::BackendCapabilities {
527                inferd_engine::BackendCapabilities {
528                    embed: false,
529                    ..self.inner.capabilities()
530                }
531            }
532            async fn generate(
533                &self,
534                req: inferd_proto::Resolved,
535            ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
536                self.inner.generate(req).await
537            }
538        }
539
540        let router = Router::new(vec![Arc::new(GenerateOnly { inner: Mock::new() })]);
541        assert_eq!(
542            router.dispatch_embed().err(),
543            Some(RouterError::NoneAvailable)
544        );
545    }
546}