inferd_daemon/router.rs
1//! Backend router — priority-ordered policy + per-backend circuit
2//! breaker.
3//!
4//! Per ADR 0007 §"Implementation shape":
5//!
6//! - **Policy** is priority-order. Backends are registered in the
7//! operator's preferred order (highest priority first); `dispatch`
8//! walks the list and returns the first backend that's both
9//! `ready()` and not currently circuit-broken.
10//! - **Circuit breaker** is per-backend. After `failure_threshold`
11//! pre-stream or mid-stream failures within `failure_window`, the
12//! breaker opens and the backend is skipped for `cooldown` seconds.
13//! After cooldown the breaker is half-open: the next dispatch
14//! tries the backend again; success closes it, failure re-opens
15//! for another `cooldown`.
16//! - **No retry, no failover**. `dispatch` returns one backend; the
17//! caller (lifecycle / lifecycle_v2) generates against it once
18//! and reports the outcome via `record_success` / `record_failure`.
19//! If the backend errors mid-stream the daemon emits one terminal
20//! error frame — it does *not* try a different backend (ADR 0007).
21//!
22//! Apps do not pick the backend (ADR 0006). There is no per-request
23//! `backend` field on the wire.
24//!
25//! The `Router` itself is `Send + Sync` and shared via `Arc`. Internal
26//! breaker state lives behind an `RwLock`; reads (the dispatch hot
27//! path) take the read lock, and the only writers are the
28//! `record_*` feedback hooks.
29
30use inferd_engine::Backend;
31use std::collections::HashMap;
32use std::sync::{Arc, RwLock};
33use std::time::{Duration, Instant};
34
35/// Default failure threshold for the breaker. Modest — three
36/// pre-stream failures in a window is enough to call a cloud
37/// adapter hot, more is over-tolerance.
38pub const DEFAULT_FAILURE_THRESHOLD: u32 = 3;
39
40/// Default observation window for failures. A burst of three
41/// 5xx responses inside 60s is unusual for a healthy upstream.
42pub const DEFAULT_FAILURE_WINDOW: Duration = Duration::from_secs(60);
43
44/// Default cooldown after the breaker opens. Long enough that
45/// a flapping cloud upstream doesn't get hammered, short enough
46/// that an outage of <1 minute clears on its own.
47pub const DEFAULT_COOLDOWN: Duration = Duration::from_secs(30);
48
49/// Tunables for the per-backend circuit breaker. The same policy
50/// applies to every registered backend.
51#[derive(Debug, Clone, Copy)]
52pub struct BreakerPolicy {
53 /// Failures within `failure_window` that open the breaker.
54 pub failure_threshold: u32,
55 /// Sliding window over which failures are counted.
56 pub failure_window: Duration,
57 /// Open-state duration. After this elapses, the breaker
58 /// transitions to half-open on the next dispatch.
59 pub cooldown: Duration,
60}
61
62impl Default for BreakerPolicy {
63 fn default() -> Self {
64 Self {
65 failure_threshold: DEFAULT_FAILURE_THRESHOLD,
66 failure_window: DEFAULT_FAILURE_WINDOW,
67 cooldown: DEFAULT_COOLDOWN,
68 }
69 }
70}
71
72/// Per-backend breaker state. Internal to the router.
73#[derive(Debug, Clone)]
74struct BreakerState {
75 /// Timestamps of recent failures inside the window. Pruned on
76 /// every read/write.
77 failures: Vec<Instant>,
78 /// `Some(when)` while the breaker is open; the value is the
79 /// earliest dispatch instant at which we're willing to try the
80 /// backend again (half-open).
81 open_until: Option<Instant>,
82}
83
84impl BreakerState {
85 fn new() -> Self {
86 Self {
87 failures: Vec::new(),
88 open_until: None,
89 }
90 }
91
92 /// Drop failures older than `window` from the count window.
93 fn prune(&mut self, now: Instant, window: Duration) {
94 let cutoff = now.checked_sub(window).unwrap_or(now);
95 self.failures.retain(|&t| t >= cutoff);
96 }
97
98 /// `true` if the breaker is currently open at instant `now`.
99 fn is_open(&self, now: Instant) -> bool {
100 self.open_until.is_some_and(|until| now < until)
101 }
102}
103
104/// A backend known to the router with its circuit-breaker slot.
105struct Slot {
106 backend: Arc<dyn Backend>,
107 name: String,
108 state: BreakerState,
109}
110
111/// Outcome of `Router::dispatch` — chosen backend plus the diagnostic
112/// info the lifecycle layer needs to report success / failure later.
113pub struct Dispatch {
114 /// Chosen backend (use exactly once; report the outcome via
115 /// `Router::record_success` / `record_failure`).
116 pub backend: Arc<dyn Backend>,
117 /// Stable name (`Backend::name()` snapshot — saves a vtable hop
118 /// at outcome-recording time).
119 pub name: String,
120}
121
122/// Errors returned by `Router::dispatch`.
123#[derive(Debug, thiserror::Error, PartialEq, Eq)]
124pub enum RouterError {
125 /// No backends are registered. Configuration error.
126 #[error("no backends registered")]
127 NoBackends,
128 /// Every registered backend is unavailable — either not ready
129 /// or its circuit breaker is open. Surfaces to the caller as
130 /// `code: backend_unavailable`.
131 #[error("no backend available")]
132 NoneAvailable,
133}
134
135/// Backend router. Holds the priority-ordered backend list and
136/// per-backend breaker state. Cheap to clone via `Arc`.
137pub struct Router {
138 slots: RwLock<Vec<Slot>>,
139 policy: BreakerPolicy,
140 /// Map from `Backend::name()` → index into `slots`. The map is
141 /// populated once at construction and never changes — backends
142 /// aren't added or removed at runtime in v0.2 (no admin API for
143 /// it). Keeps name-keyed feedback (`record_success` /
144 /// `record_failure`) O(1) without a linear scan over the slot
145 /// list.
146 name_index: HashMap<String, usize>,
147}
148
149impl Router {
150 /// Build a router from a priority-ordered list of backends.
151 /// First entry is highest priority. Uses the default breaker
152 /// policy.
153 pub fn new(backends: Vec<Arc<dyn Backend>>) -> Self {
154 Self::with_policy(backends, BreakerPolicy::default())
155 }
156
157 /// Build a router with explicit breaker tuning. Used by tests
158 /// (short windows + cooldowns) and operator-tuned configs.
159 pub fn with_policy(backends: Vec<Arc<dyn Backend>>, policy: BreakerPolicy) -> Self {
160 let mut name_index = HashMap::with_capacity(backends.len());
161 let slots: Vec<Slot> = backends
162 .into_iter()
163 .enumerate()
164 .map(|(i, b)| {
165 let name = b.name().to_string();
166 name_index.insert(name.clone(), i);
167 Slot {
168 backend: b,
169 name,
170 state: BreakerState::new(),
171 }
172 })
173 .collect();
174 Self {
175 slots: RwLock::new(slots),
176 policy,
177 name_index,
178 }
179 }
180
181 /// Number of registered backends. For diagnostics and tests.
182 pub fn len(&self) -> usize {
183 self.slots.read().expect("router rwlock poisoned").len()
184 }
185
186 /// `true` if no backends are registered.
187 pub fn is_empty(&self) -> bool {
188 self.slots
189 .read()
190 .expect("router rwlock poisoned")
191 .is_empty()
192 }
193
194 /// Pick a backend for the next request.
195 ///
196 /// Walks the slot list in priority order and returns the first
197 /// slot whose backend is `ready()` and whose breaker is not
198 /// currently open. Slots whose breaker has timed out into the
199 /// half-open state count as eligible — the next outcome reported
200 /// for them either closes the breaker (`record_success`) or
201 /// re-opens it (`record_failure`).
202 pub fn dispatch(&self) -> Result<Dispatch, RouterError> {
203 let now = Instant::now();
204 let mut guard = self.slots.write().expect("router rwlock poisoned");
205 if guard.is_empty() {
206 return Err(RouterError::NoBackends);
207 }
208
209 for slot in guard.iter_mut() {
210 // Prune old failures so a stale window doesn't keep the
211 // breaker open after a long quiet period.
212 slot.state.prune(now, self.policy.failure_window);
213
214 if !slot.backend.ready() {
215 continue;
216 }
217
218 if slot.state.is_open(now) {
219 continue;
220 }
221 // If the breaker had been open and the cooldown has
222 // expired, transition to half-open by clearing the
223 // `open_until` mark. The backend gets one shot; the
224 // next outcome decides whether it stays closed.
225 if slot.state.open_until.is_some() {
226 slot.state.open_until = None;
227 slot.state.failures.clear();
228 }
229 return Ok(Dispatch {
230 backend: Arc::clone(&slot.backend),
231 name: slot.name.clone(),
232 });
233 }
234
235 Err(RouterError::NoneAvailable)
236 }
237
238 /// `true` once every registered backend reports ready. The
239 /// lifecycle uses this to gate listener creation
240 /// (THREAT_MODEL F-13). Breaker state is *not* consulted —
241 /// readiness here is the boot-time engine-loaded check, not
242 /// the per-request availability check.
243 pub fn all_ready(&self) -> bool {
244 let guard = self.slots.read().expect("router rwlock poisoned");
245 !guard.is_empty() && guard.iter().all(|s| s.backend.ready())
246 }
247
248 /// Record a successful generation against the given backend.
249 /// Closes the circuit breaker (clears the open mark and the
250 /// failure window). Called by the lifecycle layer after a
251 /// terminal `Done` frame for a request served by this backend.
252 pub fn record_success(&self, name: &str) {
253 let Some(&idx) = self.name_index.get(name) else {
254 return;
255 };
256 let mut guard = self.slots.write().expect("router rwlock poisoned");
257 if let Some(slot) = guard.get_mut(idx) {
258 slot.state.failures.clear();
259 slot.state.open_until = None;
260 }
261 }
262
263 /// Record a failure against the given backend. Counts toward
264 /// the failure window; opens the breaker once the threshold is
265 /// reached. Called from both the pre-stream error path
266 /// (`generate*` returned `GenerateError`) and the mid-stream
267 /// error path (stream terminated without a `Done` frame).
268 /// Both kinds of failure count equally per ADR 0007.
269 pub fn record_failure(&self, name: &str) {
270 let Some(&idx) = self.name_index.get(name) else {
271 return;
272 };
273 let now = Instant::now();
274 let mut guard = self.slots.write().expect("router rwlock poisoned");
275 let Some(slot) = guard.get_mut(idx) else {
276 return;
277 };
278 slot.state.prune(now, self.policy.failure_window);
279 slot.state.failures.push(now);
280 if slot.state.failures.len() as u32 >= self.policy.failure_threshold {
281 slot.state.open_until = Some(now + self.policy.cooldown);
282 }
283 }
284
285 /// `true` if the named backend's circuit breaker is currently
286 /// open. Diagnostic surface; the dispatch path checks this
287 /// internally and returns `NoneAvailable` if every backend is
288 /// open.
289 pub fn breaker_open(&self, name: &str) -> bool {
290 let Some(&idx) = self.name_index.get(name) else {
291 return false;
292 };
293 let now = Instant::now();
294 let guard = self.slots.read().expect("router rwlock poisoned");
295 guard.get(idx).is_some_and(|slot| slot.state.is_open(now))
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302 use inferd_engine::mock::Mock;
303
304 fn fast_policy() -> BreakerPolicy {
305 BreakerPolicy {
306 failure_threshold: 2,
307 failure_window: Duration::from_millis(500),
308 cooldown: Duration::from_millis(100),
309 }
310 }
311
312 #[test]
313 fn empty_router_dispatch_returns_no_backends() {
314 let router = Router::new(vec![]);
315 assert!(router.is_empty());
316 assert_eq!(router.dispatch().err(), Some(RouterError::NoBackends));
317 }
318
319 #[test]
320 fn dispatch_returns_ready_backend() {
321 let mock = Arc::new(Mock::new());
322 let router = Router::new(vec![mock.clone()]);
323 let chosen = router.dispatch().expect("dispatch ok");
324 assert_eq!(chosen.name, "mock");
325 assert_eq!(chosen.backend.name(), "mock");
326 assert!(router.all_ready());
327 }
328
329 #[test]
330 fn unready_backend_returns_none_available() {
331 let mock = Arc::new(Mock::new());
332 mock.set_ready(false);
333 let router = Router::new(vec![mock]);
334 assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
335 assert!(!router.all_ready());
336 }
337
338 #[test]
339 fn priority_picks_first_ready_backend() {
340 // Two named backends — Mock currently always returns "mock"
341 // for name(), so we use a thin wrapper to give them distinct
342 // identities for the priority test.
343 struct Named {
344 inner: Mock,
345 name: &'static str,
346 }
347 #[async_trait::async_trait]
348 impl Backend for Named {
349 fn name(&self) -> &str {
350 self.name
351 }
352 fn ready(&self) -> bool {
353 self.inner.ready()
354 }
355 async fn generate(
356 &self,
357 req: inferd_proto::Resolved,
358 ) -> Result<inferd_engine::TokenStream, inferd_engine::GenerateError> {
359 self.inner.generate(req).await
360 }
361 }
362
363 let high = Arc::new(Named {
364 inner: Mock::new(),
365 name: "high",
366 });
367 let low = Arc::new(Named {
368 inner: Mock::new(),
369 name: "low",
370 });
371 high.inner.set_ready(false);
372 let router = Router::new(vec![high.clone(), low.clone()]);
373 // High not ready → fall through to low.
374 assert_eq!(router.dispatch().unwrap().name, "low");
375 // High recovers → router prefers high again.
376 high.inner.set_ready(true);
377 assert_eq!(router.dispatch().unwrap().name, "high");
378 }
379
380 #[test]
381 fn breaker_opens_after_threshold_failures() {
382 let mock = Arc::new(Mock::new());
383 let router = Router::with_policy(vec![mock], fast_policy());
384 // First failure — still closed.
385 router.record_failure("mock");
386 assert!(!router.breaker_open("mock"));
387 // Second failure → meets threshold (2), breaker opens.
388 router.record_failure("mock");
389 assert!(router.breaker_open("mock"));
390 // Dispatch sees no available backends.
391 assert_eq!(router.dispatch().err(), Some(RouterError::NoneAvailable));
392 }
393
394 #[test]
395 fn success_resets_failure_count() {
396 let mock = Arc::new(Mock::new());
397 let router = Router::with_policy(vec![mock], fast_policy());
398 router.record_failure("mock");
399 router.record_success("mock");
400 // Next failure restarts the count from zero — breaker stays
401 // closed even though we've now had 2 lifetime failures.
402 router.record_failure("mock");
403 assert!(!router.breaker_open("mock"));
404 }
405
406 #[test]
407 fn breaker_recovers_after_cooldown() {
408 let mock = Arc::new(Mock::new());
409 let router = Router::with_policy(vec![mock], fast_policy());
410 router.record_failure("mock");
411 router.record_failure("mock");
412 assert!(router.breaker_open("mock"));
413 // Wait past the cooldown.
414 std::thread::sleep(Duration::from_millis(150));
415 assert!(!router.breaker_open("mock"));
416 // Dispatch puts the breaker into half-open and yields the
417 // backend.
418 assert_eq!(router.dispatch().unwrap().name, "mock");
419 }
420
421 #[test]
422 fn old_failures_outside_window_dont_open_breaker() {
423 let mock = Arc::new(Mock::new());
424 let router = Router::with_policy(vec![mock], fast_policy());
425 router.record_failure("mock");
426 std::thread::sleep(Duration::from_millis(600)); // exceed the 500ms window
427 router.record_failure("mock");
428 assert!(!router.breaker_open("mock"));
429 }
430
431 #[test]
432 fn record_failure_for_unknown_backend_is_a_noop() {
433 let mock = Arc::new(Mock::new());
434 let router = Router::new(vec![mock]);
435 router.record_failure("does-not-exist"); // should not panic
436 router.record_success("does-not-exist");
437 assert!(!router.breaker_open("does-not-exist"));
438 }
439}