inferd_daemon/queue.rs
1//! Bounded admission gate.
2//!
3//! Per `docs/protocol-v1.md` §"Admission semantics": at most
4//! `active_permits + queue_depth` outstanding requests across the
5//! whole daemon at any time. The (active_permits + 1)th request is
6//! still admitted (it just queues behind the active one); the
7//! (active_permits + queue_depth + 1)th is rejected immediately
8//! with `Response::Error{code: queue_full}`.
9//!
10//! Implementation: a single `tokio::sync::Semaphore` whose total
11//! permit count is `active_permits + queue_depth`. Per-request
12//! flow:
13//! 1. `try_acquire_owned` — non-blocking. If no permit available,
14//! return `QueueFull`. The wire layer translates that into a
15//! terminal `Response::Error` frame.
16//! 2. Hold the permit for the duration of the generation (the
17//! `OwnedSemaphorePermit` guard goes onto the request future's
18//! stack).
19//! 3. Drop on completion / cancellation. Permit returns to the
20//! pool, freeing slot for the next admit.
21//!
22//! For v0.1 the daemon's only backend (`llamacpp`) is single-
23//! threaded internally — concurrent generates serialise on its
24//! inner mutex. Setting `active_permits=1` matches that reality
25//! without bottlenecking the wire layer (which is happy to read
26//! and queue many requests). v0.2's continuous-batching backends
27//! will raise `active_permits` above 1.
28
29use std::sync::Arc;
30use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
31
32/// Errors returned by `Admission::try_admit`.
33#[derive(Debug, thiserror::Error, PartialEq, Eq)]
34pub enum SubmitError {
35 /// Admission gate is at capacity (active + queued). The wire
36 /// layer must respond with `Response::Error{code: queue_full}`
37 /// and close the request stream.
38 #[error("queue full")]
39 QueueFull,
40 /// Admission gate has been shut down (semaphore closed).
41 /// Submits during shutdown are rejected.
42 #[error("queue closed")]
43 Closed,
44}
45
46/// Shared admission gate handed to every per-connection task via
47/// `lifecycle::AcceptContext`. Cheap to clone (just an `Arc` bump).
48#[derive(Clone)]
49pub struct Admission {
50 inner: Arc<Semaphore>,
51 /// Cached for `capacity()` reporting; the semaphore itself
52 /// doesn't expose its initial size.
53 capacity: usize,
54}
55
56impl Admission {
57 /// Build an admission gate sized for `active_permits +
58 /// queue_depth` simultaneous outstanding requests across the
59 /// daemon.
60 pub fn new(active_permits: usize, queue_depth: usize) -> Self {
61 // Saturate at 1 below; the v0.1 invariant is active=1, but
62 // operators can pass 0 by accident (env var unset → "0"
63 // parse on some setups). Treat as "at least one slot."
64 let total = active_permits.max(1) + queue_depth;
65 Self {
66 inner: Arc::new(Semaphore::new(total)),
67 capacity: total,
68 }
69 }
70
71 /// Attempt to admit one request. Non-blocking. The returned
72 /// `OwnedSemaphorePermit` must be held for the duration of the
73 /// generation; dropping it returns the slot to the pool.
74 pub fn try_admit(&self) -> Result<OwnedSemaphorePermit, SubmitError> {
75 match Arc::clone(&self.inner).try_acquire_owned() {
76 Ok(permit) => Ok(permit),
77 Err(TryAcquireError::NoPermits) => Err(SubmitError::QueueFull),
78 Err(TryAcquireError::Closed) => Err(SubmitError::Closed),
79 }
80 }
81
82 /// Total slots configured (active_permits + queue_depth).
83 /// Diagnostic only; the wire layer doesn't surface this.
84 pub fn capacity(&self) -> usize {
85 self.capacity
86 }
87
88 /// Approximate count of slots currently free. Diagnostic;
89 /// racy by definition (a concurrent admit can change the
90 /// answer between the call and the use).
91 pub fn available_permits(&self) -> usize {
92 self.inner.available_permits()
93 }
94}
95
96#[cfg(test)]
97mod tests {
98 use super::*;
99
100 #[test]
101 fn admit_succeeds_until_capacity() {
102 let a = Admission::new(1, 2);
103 assert_eq!(a.capacity(), 3);
104 let _p1 = a.try_admit().unwrap();
105 let _p2 = a.try_admit().unwrap();
106 let _p3 = a.try_admit().unwrap();
107 // Fourth admit fails — over capacity.
108 assert_eq!(a.try_admit().unwrap_err(), SubmitError::QueueFull);
109 }
110
111 #[test]
112 fn dropping_permit_frees_slot() {
113 let a = Admission::new(1, 1);
114 let p1 = a.try_admit().unwrap();
115 let _p2 = a.try_admit().unwrap();
116 assert!(a.try_admit().is_err());
117 drop(p1);
118 // Slot now available.
119 let _p3 = a.try_admit().unwrap();
120 }
121
122 #[test]
123 fn zero_active_permits_treated_as_one() {
124 // Operator misconfigured to 0; we still admit at least one
125 // slot (queue_depth=0 + 1 active = 1 total). Otherwise the
126 // daemon would refuse every request which is worse than a
127 // silent floor.
128 let a = Admission::new(0, 0);
129 assert_eq!(a.capacity(), 1);
130 let _p = a.try_admit().unwrap();
131 assert!(a.try_admit().is_err());
132 }
133
134 #[test]
135 fn available_permits_reflects_holds() {
136 let a = Admission::new(1, 2);
137 assert_eq!(a.available_permits(), 3);
138 let _p1 = a.try_admit().unwrap();
139 assert_eq!(a.available_permits(), 2);
140 let _p2 = a.try_admit().unwrap();
141 assert_eq!(a.available_permits(), 1);
142 }
143
144 #[tokio::test]
145 async fn admission_is_clone_safe_across_tasks() {
146 let a = Admission::new(1, 2);
147 let a2 = a.clone();
148 let h = tokio::spawn(async move {
149 let _p = a2.try_admit().unwrap();
150 // Permit drops on task exit.
151 });
152 h.await.unwrap();
153 // After the spawned task drops its permit, full capacity
154 // is back.
155 let p1 = a.try_admit().unwrap();
156 let p2 = a.try_admit().unwrap();
157 let p3 = a.try_admit().unwrap();
158 drop((p1, p2, p3));
159 }
160}