Skip to main content

inferd_daemon/
queue.rs

1//! Bounded admission gate.
2//!
3//! Per `docs/protocol-v1.md` §"Admission semantics": at most
4//! `active_permits + queue_depth` outstanding requests across the
5//! whole daemon at any time. The (active_permits + 1)th request is
6//! still admitted (it just queues behind the active one); the
7//! (active_permits + queue_depth + 1)th is rejected immediately
8//! with `Response::Error{code: queue_full}`.
9//!
10//! Implementation: a single `tokio::sync::Semaphore` whose total
11//! permit count is `active_permits + queue_depth`. Per-request
12//! flow:
13//! 1. `try_acquire_owned` — non-blocking. If no permit available,
14//!    return `QueueFull`. The wire layer translates that into a
15//!    terminal `Response::Error` frame.
16//! 2. Hold the permit for the duration of the generation (the
17//!    `OwnedSemaphorePermit` guard goes onto the request future's
18//!    stack).
19//! 3. Drop on completion / cancellation. Permit returns to the
20//!    pool, freeing slot for the next admit.
21//!
22//! For v0.1 the daemon's only backend (`llamacpp`) is single-
23//! threaded internally — concurrent generates serialise on its
24//! inner mutex. Setting `active_permits=1` matches that reality
25//! without bottlenecking the wire layer (which is happy to read
26//! and queue many requests). v0.2's continuous-batching backends
27//! will raise `active_permits` above 1.
28
29use std::sync::Arc;
30use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
31
32/// Errors returned by `Admission::try_admit`.
33#[derive(Debug, thiserror::Error, PartialEq, Eq)]
34pub enum SubmitError {
35    /// Admission gate is at capacity (active + queued). The wire
36    /// layer must respond with `Response::Error{code: queue_full}`
37    /// and close the request stream.
38    #[error("queue full")]
39    QueueFull,
40    /// Admission gate has been shut down (semaphore closed).
41    /// Submits during shutdown are rejected.
42    #[error("queue closed")]
43    Closed,
44}
45
46/// Shared admission gate handed to every per-connection task via
47/// `lifecycle::AcceptContext`. Cheap to clone (just an `Arc` bump).
48#[derive(Clone)]
49pub struct Admission {
50    inner: Arc<Semaphore>,
51    /// Cached for `capacity()` reporting; the semaphore itself
52    /// doesn't expose its initial size.
53    capacity: usize,
54}
55
56impl Admission {
57    /// Build an admission gate sized for `active_permits +
58    /// queue_depth` simultaneous outstanding requests across the
59    /// daemon.
60    pub fn new(active_permits: usize, queue_depth: usize) -> Self {
61        // Saturate at 1 below; the v0.1 invariant is active=1, but
62        // operators can pass 0 by accident (env var unset → "0"
63        // parse on some setups). Treat as "at least one slot."
64        let total = active_permits.max(1) + queue_depth;
65        Self {
66            inner: Arc::new(Semaphore::new(total)),
67            capacity: total,
68        }
69    }
70
71    /// Attempt to admit one request. Non-blocking. The returned
72    /// `OwnedSemaphorePermit` must be held for the duration of the
73    /// generation; dropping it returns the slot to the pool.
74    pub fn try_admit(&self) -> Result<OwnedSemaphorePermit, SubmitError> {
75        match Arc::clone(&self.inner).try_acquire_owned() {
76            Ok(permit) => Ok(permit),
77            Err(TryAcquireError::NoPermits) => Err(SubmitError::QueueFull),
78            Err(TryAcquireError::Closed) => Err(SubmitError::Closed),
79        }
80    }
81
82    /// Total slots configured (active_permits + queue_depth).
83    /// Diagnostic only; the wire layer doesn't surface this.
84    pub fn capacity(&self) -> usize {
85        self.capacity
86    }
87
88    /// Approximate count of slots currently free. Diagnostic;
89    /// racy by definition (a concurrent admit can change the
90    /// answer between the call and the use).
91    pub fn available_permits(&self) -> usize {
92        self.inner.available_permits()
93    }
94}
95
96#[cfg(test)]
97mod tests {
98    use super::*;
99
100    #[test]
101    fn admit_succeeds_until_capacity() {
102        let a = Admission::new(1, 2);
103        assert_eq!(a.capacity(), 3);
104        let _p1 = a.try_admit().unwrap();
105        let _p2 = a.try_admit().unwrap();
106        let _p3 = a.try_admit().unwrap();
107        // Fourth admit fails — over capacity.
108        assert_eq!(a.try_admit().unwrap_err(), SubmitError::QueueFull);
109    }
110
111    #[test]
112    fn dropping_permit_frees_slot() {
113        let a = Admission::new(1, 1);
114        let p1 = a.try_admit().unwrap();
115        let _p2 = a.try_admit().unwrap();
116        assert!(a.try_admit().is_err());
117        drop(p1);
118        // Slot now available.
119        let _p3 = a.try_admit().unwrap();
120    }
121
122    #[test]
123    fn zero_active_permits_treated_as_one() {
124        // Operator misconfigured to 0; we still admit at least one
125        // slot (queue_depth=0 + 1 active = 1 total). Otherwise the
126        // daemon would refuse every request which is worse than a
127        // silent floor.
128        let a = Admission::new(0, 0);
129        assert_eq!(a.capacity(), 1);
130        let _p = a.try_admit().unwrap();
131        assert!(a.try_admit().is_err());
132    }
133
134    #[test]
135    fn available_permits_reflects_holds() {
136        let a = Admission::new(1, 2);
137        assert_eq!(a.available_permits(), 3);
138        let _p1 = a.try_admit().unwrap();
139        assert_eq!(a.available_permits(), 2);
140        let _p2 = a.try_admit().unwrap();
141        assert_eq!(a.available_permits(), 1);
142    }
143
144    #[tokio::test]
145    async fn admission_is_clone_safe_across_tasks() {
146        let a = Admission::new(1, 2);
147        let a2 = a.clone();
148        let h = tokio::spawn(async move {
149            let _p = a2.try_admit().unwrap();
150            // Permit drops on task exit.
151        });
152        h.await.unwrap();
153        // After the spawned task drops its permit, full capacity
154        // is back.
155        let p1 = a.try_admit().unwrap();
156        let p2 = a.try_admit().unwrap();
157        let p3 = a.try_admit().unwrap();
158        drop((p1, p2, p3));
159    }
160}