inferd_daemon/router.rs
1//! Backend router — priority-ordered policy + per-backend circuit
2//! breaker.
3//!
4//! Per ADR 0007 §"Implementation shape":
5//!
6//! - **Policy** is priority-order. Backends are registered in the
7//! operator's preferred order (highest priority first); `dispatch`
8//! walks the list and returns the first backend that's both
9//! `ready()` and not currently circuit-broken.
10//! - **Circuit breaker** is per-backend. After `failure_threshold`
11//! pre-stream or mid-stream failures within `failure_window`, the
12//! breaker opens and the backend is skipped for `cooldown` seconds.
13//! After cooldown the breaker is half-open: the next dispatch
14//! tries the backend again; success closes it, failure re-opens
15//! for another `cooldown`.
16//! - **No retry, no failover**. `dispatch` returns one backend; the
17//! caller (lifecycle / lifecycle_v2) generates against it once
18//! and reports the outcome via `record_success` / `record_failure`.
19//! If the backend errors mid-stream the daemon emits one terminal
20//! error frame — it does *not* try a different backend (ADR 0007).
21//!
22//! Apps do not pick the backend (ADR 0006). There is no per-request
23//! `backend` field on the wire.
24//!
25//! The `Router` itself is `Send + Sync` and shared via `Arc`. Internal
26//! breaker state lives behind an `RwLock`; reads (the dispatch hot
27//! path) take the read lock, and the only writers are the
28//! `record_*` feedback hooks.
29
30use inferd_engine::Backend;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33use std::time::{Duration, Instant};
34
35/// Default failure threshold for the breaker. Modest — three
36/// pre-stream failures in a window is enough to call a cloud
37/// adapter hot, more is over-tolerance.
38pub const DEFAULT_FAILURE_THRESHOLD: u32 = 3;
39
40/// Default observation window for failures. A burst of three
41/// 5xx responses inside 60s is unusual for a healthy upstream.
42pub const DEFAULT_FAILURE_WINDOW: Duration = Duration::from_secs(60);
43
44/// Default cooldown after the breaker opens. Long enough that
45/// a flapping cloud upstream doesn't get hammered, short enough
46/// that an outage of <1 minute clears on its own.
47pub const DEFAULT_COOLDOWN: Duration = Duration::from_secs(30);
48
49/// Tunables for the per-backend circuit breaker. The same policy
50/// applies to every registered backend.
51#[derive(Debug, Clone, Copy)]
52pub struct BreakerPolicy {
53 /// Failures within `failure_window` that open the breaker.
54 pub failure_threshold: u32,
55 /// Sliding window over which failures are counted.
56 pub failure_window: Duration,
57 /// Open-state duration. After this elapses, the breaker
58 /// transitions to half-open on the next dispatch.
59 pub cooldown: Duration,
60}
61
62impl Default for BreakerPolicy {
63 fn default() -> Self {
64 Self {
65 failure_threshold: DEFAULT_FAILURE_THRESHOLD,
66 failure_window: DEFAULT_FAILURE_WINDOW,
67 cooldown: DEFAULT_COOLDOWN,
68 }
69 }
70}
71
72/// Per-backend breaker state. Internal to the router.
73#[derive(Debug, Clone)]
74struct BreakerState {
75 /// Timestamps of recent failures inside the window. Pruned on
76 /// every read/write.
77 failures: Vec<Instant>,
78 /// `Some(when)` while the breaker is open; the value is the
79 /// earliest dispatch instant at which we're willing to try the
80 /// backend again (half-open).
81 open_until: Option<Instant>,
82}
83
84impl BreakerState {
85 fn new() -> Self {
86 Self {
87 failures: Vec::new(),
88 open_until: None,
89 }
90 }
91
92 /// Drop failures older than `window` from the count window.
93 fn prune(&mut self, now: Instant, window: Duration) {
94 let cutoff = now.checked_sub(window).unwrap_or(now);
95 self.failures.retain(|&t| t >= cutoff);
96 }
97
98 /// `true` if the breaker is currently open at instant `now`.
99 fn is_open(&self, now: Instant) -> bool {
100 self.open_until.is_some_and(|until| now < until)
101 }
102}
103
104/// A backend known to the router with its circuit-breaker slot.
105struct Slot {
106 backend: Arc<dyn Backend>,
107 name: String,
108 state: BreakerState,
109}
110
111/// Outcome of `Router::dispatch` — chosen backend plus the diagnostic
112/// info the lifecycle layer needs to report success / failure later.
113pub struct Dispatch {
114 /// Chosen backend (use exactly once; report the outcome via
115 /// `Router::record_success` / `record_failure`).
116 pub backend: Arc<dyn Backend>,
117 /// Stable name (`Backend::name()` snapshot — saves a vtable hop
118 /// at outcome-recording time).
119 pub name: String,
120}
121
122/// Errors returned by `Router::dispatch`.
123#[derive(Debug, thiserror::Error, PartialEq, Eq)]
124pub enum RouterError {
125 /// No backends are registered. Configuration error.
126 #[error("no backends registered")]
127 NoBackends,
128 /// Every registered backend is unavailable — either not ready
129 /// or its circuit breaker is open. Surfaces to the caller as
130 /// `code: backend_unavailable`.
131 #[error("no backend available")]
132 NoneAvailable,
133}
134
135/// Backend router. Holds the priority-ordered backend list and
136/// per-backend breaker state. Cheap to clone via `Arc`.
137pub struct Router {
138 slots: RwLock<Vec<Slot>>,
139 policy: BreakerPolicy,
140 /// Map from `Backend::name()` → index into `slots`. The map is
141 /// populated once at construction and never changes — backends
142 /// aren't added or removed at runtime in v0.2 (no admin API for
143 /// it). Keeps name-keyed feedback (`record_success` /
144 /// `record_failure`) O(1) without a linear scan over the slot
145 /// list.
146 name_index: HashMap<String, usize>,
147}
148
149impl Router {
150 /// Build a router from a priority-ordered list of backends.
151 /// First entry is highest priority. Uses the default breaker
152 /// policy.
153 pub fn new(backends: Vec<Arc<dyn Backend>>) -> Self {
154 Self::with_policy(backends, BreakerPolicy::default())
155 }
156
157 /// Build a router with explicit breaker tuning. Used by tests
158 /// (short windows + cooldowns) and operator-tuned configs.
159 pub fn with_policy(backends: Vec<Arc<dyn Backend>>, policy: BreakerPolicy) -> Self {
160 let mut name_index = HashMap::with_capacity(backends.len());
161 let slots: Vec<Slot> = backends
162 .into_iter()
163 .enumerate()
164 .map(|(i, b)| {
165 let name = b.name().to_string();
166 name_index.insert(name.clone(), i);
167 Slot {
168 backend: b,
169 name,
170 state: BreakerState::new(),
171 }
172 })
173 .collect();
174 Self {
175 slots: RwLock::new(slots),
176 policy,
177 name_index,
178 }
179 }
180
181 /// Number of registered backends. For diagnostics and tests.
182 pub fn len(&self) -> usize {
183 self.slots.read().expect("router rwlock poisoned").len()
184 }
185
186 /// `true` if no backends are registered.
187 pub fn is_empty(&self) -> bool {
188 self.slots
189 .read()
190 .expect("router rwlock poisoned")
191 .is_empty()
192 }
193
194 /// Pick a backend for the next request.
195 ///
196 /// Walks the slot list in priority order and returns the first
197 /// slot whose backend is `ready()` and whose breaker is not
198 /// currently open. Slots whose breaker has timed out into the
199 /// half-open state count as eligible — the next outcome reported
200 /// for them either closes the breaker (`record_success`) or
201 /// re-opens it (`record_failure`).
202 pub fn dispatch(&self) -> Result<Dispatch, RouterError> {
203 self.dispatch_filtered(|_| true)
204 }
205
206 /// Pick a backend for an embed request — same priority + breaker
207 /// rules as `dispatch`, but skip slots whose backend doesn't
208 /// advertise `capabilities().embed`. Without this filter, a
209 /// multi-backend config that puts a generate-only backend ahead
210 /// of an embed-capable one in the priority list would always
211 /// land embed requests on the generate slot and yield
212 /// `EmbedUnsupported`.
213 pub fn dispatch_embed(&self) -> Result<Dispatch, RouterError> {
214 self.dispatch_filtered(|backend| backend.capabilities().embed)
215 }
216
217 fn dispatch_filtered<F>(&self, predicate: F) -> Result<Dispatch, RouterError>
218 where
219 F: Fn(&Arc<dyn Backend>) -> bool,
220 {
221 let now = Instant::now();
222 let mut guard = self.slots.write().expect("router rwlock poisoned");
223 if guard.is_empty() {
224 return Err(RouterError::NoBackends);
225 }
226
227 for slot in guard.iter_mut() {
228 // Prune old failures so a stale window doesn't keep the
229 // breaker open after a long quiet period.
230 slot.state.prune(now, self.policy.failure_window);
231
232 if !slot.backend.ready() {
233 continue;
234 }
235
236 if !predicate(&slot.backend) {
237 continue;
238 }
239
240 if slot.state.is_open(now) {
241 continue;
242 }
243 // If the breaker had been open and the cooldown has
244 // expired, transition to half-open by clearing the
245 // `open_until` mark. The backend gets one shot; the
246 // next outcome decides whether it stays closed.
247 if slot.state.open_until.is_some() {
248 slot.state.open_until = None;
249 slot.state.failures.clear();
250 }
251 return Ok(Dispatch {
252 backend: Arc::clone(&slot.backend),
253 name: slot.name.clone(),
254 });
255 }
256
257 Err(RouterError::NoneAvailable)
258 }
259
260 /// `true` once every registered backend reports ready. The
261 /// lifecycle uses this to gate listener creation
262 /// (THREAT_MODEL F-13). Breaker state is *not* consulted —
263 /// readiness here is the boot-time engine-loaded check, not
264 /// the per-request availability check.
265 pub fn all_ready(&self) -> bool {
266 let guard = self.slots.read().expect("router rwlock poisoned");
267 !guard.is_empty() && guard.iter().all(|s| s.backend.ready())
268 }
269
270 /// Record a successful generation against the given backend.
271 /// Closes the circuit breaker (clears the open mark and the
272 /// failure window). Called by the lifecycle layer after a
273 /// terminal `Done` frame for a request served by this backend.
274 pub fn record_success(&self, name: &str) {
275 let Some(&idx) = self.name_index.get(name) else {
276 return;
277 };
278 let mut guard = self.slots.write().expect("router rwlock poisoned");
279 if let Some(slot) = guard.get_mut(idx) {
280 slot.state.failures.clear();
281 slot.state.open_until = None;
282 }
283 }
284
285 /// Record a failure against the given backend. Counts toward
286 /// the failure window; opens the breaker once the threshold is
287 /// reached. Called from both the pre-stream error path
288 /// (`generate*` returned `GenerateError`) and the mid-stream
289 /// error path (stream terminated without a `Done` frame).
290 /// Both kinds of failure count equally per ADR 0007.
291 pub fn record_failure(&self, name: &str) {
292 let Some(&idx) = self.name_index.get(name) else {
293 return;
294 };
295 let now = Instant::now();
296 let mut guard = self.slots.write().expect("router rwlock poisoned");
297 let Some(slot) = guard.get_mut(idx) else {
298 return;
299 };
300 slot.state.prune(now, self.policy.failure_window);
301 slot.state.failures.push(now);
302 if slot.state.failures.len() as u32 >= self.policy.failure_threshold {
303 slot.state.open_until = Some(now + self.policy.cooldown);
304 }
305 }
306
307 /// `true` if the named backend's circuit breaker is currently
308 /// open. Diagnostic surface; the dispatch path checks this
309 /// internally and returns `NoneAvailable` if every backend is
310 /// open.
311 pub fn breaker_open(&self, name: &str) -> bool {
312 let Some(&idx) = self.name_index.get(name) else {
313 return false;
314 };
315 let now = Instant::now();
316 let guard = self.slots.read().expect("router rwlock poisoned");
317 guard.get(idx).is_some_and(|slot| slot.state.is_open(now))
318 }
319}
320
321#[cfg(test)]
322mod tests {
323 use super::*;
324 use inferd_engine::mock::Mock;
325
326 fn fast_policy() -> BreakerPolicy {
327 BreakerPolicy {
328 failure_threshold: 2,
329 failure_window: Duration::from_millis(500),
330 cooldown: Duration::from_millis(100),
331 }
332 }
333
334 #[test]
335 fn empty_router_dispatch_returns_no_backends() {
336 let router = Router::new(vec![]);
337 assert!(router.is_empty());
338 assert_eq!(router.dispatch().err(), Some(RouterError::NoBackends));
339 }
340
341 #[test]
342 fn dispatch_returns_ready_backend() {
343 let mock = Arc::new(Mock::new());
344 let router = Router::new(vec![mock.clone()]);
345 let chosen = router.dispatch().expect("dispatch ok");
346 assert_eq!(chosen.name, "mock");
347 assert_eq!(chosen.backend.name(), "mock");
348 assert!(router.all_ready());
349 }
350
351 #[test]
352 fn unready_backend_returns_none_available() {
353 let mock = Arc::new(Mock::new());
354 mock.set_ready(false);
355 let router = Router::new(vec![mock]);
356 assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
357 assert!(!router.all_ready());
358 }
359
360 #[test]
361 fn priority_picks_first_ready_backend() {
362 // Two named backends — Mock currently always returns "mock"
363 // for name(), so we use a thin wrapper to give them distinct
364 // identities for the priority test.
365 struct Named {
366 inner: Mock,
367 name: &'static str,
368 }
369 #[async_trait::async_trait]
370 impl Backend for Named {
371 fn name(&self) -> &str {
372 self.name
373 }
374 fn ready(&self) -> bool {
375 self.inner.ready()
376 }
377 async fn generate(
378 &self,
379 req: inferd_proto::Resolved,
380 ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
381 self.inner.generate(req).await
382 }
383 }
384
385 let high = Arc::new(Named {
386 inner: Mock::new(),
387 name: "high",
388 });
389 let low = Arc::new(Named {
390 inner: Mock::new(),
391 name: "low",
392 });
393 high.inner.set_ready(false);
394 let router = Router::new(vec![high.clone(), low.clone()]);
395 // High not ready → fall through to low.
396 assert_eq!(router.dispatch().unwrap().name, "low");
397 // High recovers → router prefers high again.
398 high.inner.set_ready(true);
399 assert_eq!(router.dispatch().unwrap().name, "high");
400 }
401
402 #[test]
403 fn breaker_opens_after_threshold_failures() {
404 let mock = Arc::new(Mock::new());
405 let router = Router::with_policy(vec![mock], fast_policy());
406 // First failure — still closed.
407 router.record_failure("mock");
408 assert!(!router.breaker_open("mock"));
409 // Second failure → meets threshold (2), breaker opens.
410 router.record_failure("mock");
411 assert!(router.breaker_open("mock"));
412 // Dispatch sees no available backends.
413 assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
414 }
415
416 #[test]
417 fn success_resets_failure_count() {
418 let mock = Arc::new(Mock::new());
419 let router = Router::with_policy(vec![mock], fast_policy());
420 router.record_failure("mock");
421 router.record_success("mock");
422 // Next failure restarts the count from zero — breaker stays
423 // closed even though we've now had 2 lifetime failures.
424 router.record_failure("mock");
425 assert!(!router.breaker_open("mock"));
426 }
427
428 #[test]
429 fn breaker_recovers_after_cooldown() {
430 let mock = Arc::new(Mock::new());
431 let router = Router::with_policy(vec![mock], fast_policy());
432 router.record_failure("mock");
433 router.record_failure("mock");
434 assert!(router.breaker_open("mock"));
435 // Wait past the cooldown.
436 std::thread::sleep(Duration::from_millis(150));
437 assert!(!router.breaker_open("mock"));
438 // Dispatch puts the breaker into half-open and yields the
439 // backend.
440 assert_eq!(router.dispatch().unwrap().name, "mock");
441 }
442
443 #[test]
444 fn old_failures_outside_window_dont_open_breaker() {
445 let mock = Arc::new(Mock::new());
446 let router = Router::with_policy(vec![mock], fast_policy());
447 router.record_failure("mock");
448 std::thread::sleep(Duration::from_millis(600)); // exceed the 500ms window
449 router.record_failure("mock");
450 assert!(!router.breaker_open("mock"));
451 }
452
453 #[test]
454 fn record_failure_for_unknown_backend_is_a_noop() {
455 let mock = Arc::new(Mock::new());
456 let router = Router::new(vec![mock]);
457 router.record_failure("does-not-exist"); // should not panic
458 router.record_success("does-not-exist");
459 assert!(!router.breaker_open("does-not-exist"));
460 }
461
462 #[test]
463 fn dispatch_embed_skips_non_embed_backends() {
464 // Wrapper that overrides `capabilities().embed` to false but
465 // delegates everything else to Mock. Models the multi-backend
466 // shape inferd ships in its first-boot default config: a
467 // generate-only backend ahead of an embed-capable one in the
468 // priority list.
469 struct GenerateOnly {
470 inner: Mock,
471 name: &'static str,
472 }
473 #[async_trait::async_trait]
474 impl Backend for GenerateOnly {
475 fn name(&self) -> &str {
476 self.name
477 }
478 fn ready(&self) -> bool {
479 self.inner.ready()
480 }
481 fn capabilities(&self) -> inferd_engine::BackendCapabilities {
482 inferd_engine::BackendCapabilities {
483 embed: false,
484 ..self.inner.capabilities()
485 }
486 }
487 async fn generate(
488 &self,
489 req: inferd_proto::Resolved,
490 ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
491 self.inner.generate(req).await
492 }
493 }
494
495 let generate_only = Arc::new(GenerateOnly {
496 inner: Mock::new(),
497 name: "generate-only",
498 });
499 let embed_capable = Arc::new(Mock::new()); // mock advertises embed=true
500 let router = Router::new(vec![generate_only.clone(), embed_capable.clone()]);
501
502 // Plain dispatch picks the first ready backend regardless of
503 // capability — that's the existing generate-path semantics.
504 assert_eq!(router.dispatch().unwrap().name, "generate-only");
505
506 // dispatch_embed must skip generate-only and land on the
507 // embed-capable backend.
508 let chosen = router.dispatch_embed().expect("embed dispatch ok");
509 assert_eq!(chosen.name, "mock");
510 assert!(chosen.backend.capabilities().embed);
511 }
512
513 #[test]
514 fn dispatch_embed_returns_none_available_when_no_backend_supports_embed() {
515 struct GenerateOnly {
516 inner: Mock,
517 }
518 #[async_trait::async_trait]
519 impl Backend for GenerateOnly {
520 fn name(&self) -> &str {
521 "generate-only"
522 }
523 fn ready(&self) -> bool {
524 self.inner.ready()
525 }
526 fn capabilities(&self) -> inferd_engine::BackendCapabilities {
527 inferd_engine::BackendCapabilities {
528 embed: false,
529 ..self.inner.capabilities()
530 }
531 }
532 async fn generate(
533 &self,
534 req: inferd_proto::Resolved,
535 ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
536 self.inner.generate(req).await
537 }
538 }
539
540 let router = Router::new(vec![Arc::new(GenerateOnly { inner: Mock::new() })]);
541 assert_eq!(
542 router.dispatch_embed().err(),
543 Some(RouterError::NoneAvailable)
544 );
545 }
546}