Skip to main content

oxibonsai_runtime/
request_queue.rs

1//! Bounded request queue with backpressure for the inference pipeline.
2//!
3//! [`BoundedQueue`] is a generic FIFO queue with a fixed capacity.  When the
4//! queue is full, [`BoundedQueue::try_push`] returns `false` immediately,
5//! allowing the caller to issue an HTTP 503 response rather than blocking
6//! indefinitely.  [`BoundedQueue::push_timeout`] blocks for up to a given
7//! [`Duration`] waiting for a slot to become available.
8//!
9//! [`InferenceQueue`] builds on top of [`BoundedQueue`] and wraps every
10//! submitted work item with a one-shot [`std::sync::mpsc`] channel so that
11//! callers can await the inference result asynchronously.
12
13use std::collections::VecDeque;
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Arc, Condvar, Mutex};
16use std::time::{Duration, Instant};
17
18use crate::sampling::SamplingParams;
19
20// ─────────────────────────────────────────────────────────────────────────────
21// QueueStats
22// ─────────────────────────────────────────────────────────────────────────────
23
24/// A serialisable snapshot of queue utilisation.
25#[derive(Debug, Clone, serde::Serialize)]
26pub struct QueueStats {
27    /// Number of items currently waiting in the queue.
28    pub len: usize,
29    /// Maximum number of items the queue can hold.
30    pub capacity: usize,
31    /// `len / capacity` as a fraction in `[0.0, 1.0]`.
32    pub utilization: f32,
33    /// Total items ever successfully enqueued.
34    pub total_enqueued: u64,
35    /// Total items ever successfully dequeued.
36    pub total_dequeued: u64,
37    /// Total items dropped due to backpressure.
38    pub total_dropped: u64,
39    /// `total_dropped / (total_enqueued + total_dropped)`.
40    pub drop_rate: f32,
41}
42
43// ─────────────────────────────────────────────────────────────────────────────
44// BoundedQueue
45// ─────────────────────────────────────────────────────────────────────────────
46
47/// Thread-safe bounded FIFO queue with condvar-based blocking and backpressure.
48///
49/// The internal state is stored behind a `Mutex<VecDeque<(T, Instant)>>`.
50/// Two [`Condvar`]s (`not_empty` and `not_full`) allow producers and consumers
51/// to park efficiently instead of busy-waiting.
52pub struct BoundedQueue<T> {
53    /// Inner queue guarded by a mutex.
54    queue: Mutex<VecDeque<(T, Instant)>>,
55    /// Signalled whenever an item is pushed.
56    not_empty: Condvar,
57    /// Signalled whenever an item is popped.
58    not_full: Condvar,
59    /// Hard capacity limit.
60    capacity: usize,
61    /// Cumulative items enqueued without being dropped.
62    pub total_enqueued: AtomicU64,
63    /// Cumulative items dequeued.
64    pub total_dequeued: AtomicU64,
65    /// Cumulative items rejected due to a full queue.
66    pub total_dropped: AtomicU64,
67}
68
69impl<T: Send> BoundedQueue<T> {
70    /// Create a new bounded queue with the given maximum capacity.
71    pub fn new(capacity: usize) -> Self {
72        assert!(capacity > 0, "queue capacity must be at least 1");
73        Self {
74            queue: Mutex::new(VecDeque::with_capacity(capacity)),
75            not_empty: Condvar::new(),
76            not_full: Condvar::new(),
77            capacity,
78            total_enqueued: AtomicU64::new(0),
79            total_dequeued: AtomicU64::new(0),
80            total_dropped: AtomicU64::new(0),
81        }
82    }
83
84    /// Attempt to push `item` without blocking.
85    ///
86    /// Returns `true` on success, `false` if the queue is already at capacity
87    /// (backpressure: the caller should propagate a 503 to the client).
88    pub fn try_push(&self, item: T) -> bool {
89        let mut guard = self
90            .queue
91            .lock()
92            .expect("queue mutex should not be poisoned");
93
94        if guard.len() >= self.capacity {
95            self.total_dropped.fetch_add(1, Ordering::Relaxed);
96            return false;
97        }
98
99        guard.push_back((item, Instant::now()));
100        self.total_enqueued.fetch_add(1, Ordering::Relaxed);
101        self.not_empty.notify_one();
102        true
103    }
104
105    /// Push `item`, blocking up to `timeout` for a free slot.
106    ///
107    /// Returns `true` if the item was enqueued before the timeout elapsed,
108    /// `false` otherwise.
109    pub fn push_timeout(&self, item: T, timeout: Duration) -> bool {
110        let deadline = Instant::now() + timeout;
111
112        let mut guard = self
113            .queue
114            .lock()
115            .expect("queue mutex should not be poisoned");
116
117        loop {
118            if guard.len() < self.capacity {
119                guard.push_back((item, Instant::now()));
120                self.total_enqueued.fetch_add(1, Ordering::Relaxed);
121                self.not_empty.notify_one();
122                return true;
123            }
124
125            let remaining = match deadline.checked_duration_since(Instant::now()) {
126                Some(d) => d,
127                None => {
128                    self.total_dropped.fetch_add(1, Ordering::Relaxed);
129                    return false;
130                }
131            };
132
133            let (new_guard, timed_out) = self
134                .not_full
135                .wait_timeout(guard, remaining)
136                .expect("queue condvar should not be poisoned");
137            guard = new_guard;
138
139            if timed_out.timed_out() {
140                self.total_dropped.fetch_add(1, Ordering::Relaxed);
141                return false;
142            }
143        }
144    }
145
146    /// Remove and return the oldest item without blocking.
147    ///
148    /// Returns `None` if the queue is empty.
149    pub fn pop(&self) -> Option<T> {
150        let mut guard = self
151            .queue
152            .lock()
153            .expect("queue mutex should not be poisoned");
154
155        guard.pop_front().map(|(item, _enqueued_at)| {
156            self.total_dequeued.fetch_add(1, Ordering::Relaxed);
157            self.not_full.notify_one();
158            item
159        })
160    }
161
162    /// Remove and return the oldest item, blocking up to `timeout` for one to
163    /// become available.
164    ///
165    /// Returns `None` on timeout.
166    pub fn pop_timeout(&self, timeout: Duration) -> Option<T> {
167        let deadline = Instant::now() + timeout;
168
169        let mut guard = self
170            .queue
171            .lock()
172            .expect("queue mutex should not be poisoned");
173
174        loop {
175            if let Some((item, _)) = guard.pop_front() {
176                self.total_dequeued.fetch_add(1, Ordering::Relaxed);
177                self.not_full.notify_one();
178                return Some(item);
179            }
180
181            let remaining = deadline.checked_duration_since(Instant::now())?;
182
183            let (new_guard, timed_out) = self
184                .not_empty
185                .wait_timeout(guard, remaining)
186                .expect("queue condvar should not be poisoned");
187            guard = new_guard;
188
189            if timed_out.timed_out() && guard.is_empty() {
190                return None;
191            }
192        }
193    }
194
195    /// Current number of items in the queue.
196    pub fn len(&self) -> usize {
197        self.queue
198            .lock()
199            .expect("queue mutex should not be poisoned")
200            .len()
201    }
202
203    /// `true` if the queue contains no items.
204    pub fn is_empty(&self) -> bool {
205        self.len() == 0
206    }
207
208    /// `true` if the queue is at capacity.
209    pub fn is_full(&self) -> bool {
210        self.len() >= self.capacity
211    }
212
213    /// Maximum number of items the queue can hold.
214    pub fn capacity(&self) -> usize {
215        self.capacity
216    }
217
218    /// Current fill level as a fraction in `[0.0, 1.0]`.
219    pub fn utilization(&self) -> f32 {
220        self.len() as f32 / self.capacity as f32
221    }
222
223    /// Take a snapshot of queue statistics.
224    pub fn stats(&self) -> QueueStats {
225        let len = self.len();
226        let enqueued = self.total_enqueued.load(Ordering::Relaxed);
227        let dropped = self.total_dropped.load(Ordering::Relaxed);
228        let attempted = enqueued + dropped;
229        let drop_rate = if attempted == 0 {
230            0.0
231        } else {
232            dropped as f32 / attempted as f32
233        };
234
235        QueueStats {
236            len,
237            capacity: self.capacity,
238            utilization: len as f32 / self.capacity as f32,
239            total_enqueued: enqueued,
240            total_dequeued: self.total_dequeued.load(Ordering::Relaxed),
241            total_dropped: dropped,
242            drop_rate,
243        }
244    }
245
246    /// Drain all items from the queue and return them in FIFO order.
247    pub fn drain(&self) -> Vec<T> {
248        let mut guard = self
249            .queue
250            .lock()
251            .expect("queue mutex should not be poisoned");
252
253        let count = guard.len();
254        let items: Vec<T> = guard.drain(..).map(|(item, _)| item).collect();
255        self.total_dequeued
256            .fetch_add(count as u64, Ordering::Relaxed);
257        self.not_full.notify_all();
258        items
259    }
260}
261
262// ─────────────────────────────────────────────────────────────────────────────
263// InferenceWorkItem
264// ─────────────────────────────────────────────────────────────────────────────
265
266/// A single unit of work to be processed by the inference engine.
267pub struct InferenceWorkItem {
268    /// Unique monotonically-increasing request identifier.
269    pub id: u64,
270    /// Pre-tokenised prompt.
271    pub prompt_tokens: Vec<u32>,
272    /// Maximum number of tokens to generate.
273    pub max_tokens: usize,
274    /// Sampling hyper-parameters for this request.
275    pub params: SamplingParams,
276    /// Wall-clock time at which this item was submitted to the queue.
277    pub created_at: Instant,
278    /// Channel through which the inference result is delivered.
279    pub result_tx: std::sync::mpsc::SyncSender<Vec<u32>>,
280}
281
282impl InferenceWorkItem {
283    /// How long this item has been waiting in the queue.
284    pub fn wait_time(&self) -> Duration {
285        self.created_at.elapsed()
286    }
287
288    /// Whether this item has been waiting longer than `ttl`.
289    pub fn is_expired(&self, ttl: Duration) -> bool {
290        self.wait_time() > ttl
291    }
292}
293
294// ─────────────────────────────────────────────────────────────────────────────
295// InferenceQueue
296// ─────────────────────────────────────────────────────────────────────────────
297
298/// High-level inference request queue wrapping [`BoundedQueue<InferenceWorkItem>`].
299///
300/// Each call to [`InferenceQueue::submit`] returns a [`std::sync::mpsc::Receiver`]
301/// through which the caller can retrieve the generated token IDs once inference
302/// completes.  Returns `None` immediately if the queue is full (backpressure).
303pub struct InferenceQueue {
304    queue: Arc<BoundedQueue<InferenceWorkItem>>,
305    next_id: AtomicU64,
306}
307
308impl InferenceQueue {
309    /// Create a new inference queue with the given maximum pending-request capacity.
310    pub fn new(capacity: usize) -> Self {
311        Self {
312            queue: Arc::new(BoundedQueue::new(capacity)),
313            next_id: AtomicU64::new(1),
314        }
315    }
316
317    /// Submit an inference request.
318    ///
319    /// Returns a `Receiver` that will yield the generated token IDs once the
320    /// request is processed, or `None` if the queue is full.
321    pub fn submit(
322        &self,
323        prompt_tokens: Vec<u32>,
324        max_tokens: usize,
325        params: SamplingParams,
326    ) -> Option<std::sync::mpsc::Receiver<Vec<u32>>> {
327        let id = self.next_id.fetch_add(1, Ordering::Relaxed);
328        let (tx, rx) = std::sync::mpsc::sync_channel(1);
329
330        let item = InferenceWorkItem {
331            id,
332            prompt_tokens,
333            max_tokens,
334            params,
335            created_at: Instant::now(),
336            result_tx: tx,
337        };
338
339        if self.queue.try_push(item) {
340            Some(rx)
341        } else {
342            None
343        }
344    }
345
346    /// Number of pending requests currently in the queue.
347    pub fn queue_depth(&self) -> usize {
348        self.queue.len()
349    }
350
351    /// `true` if the queue is at capacity.
352    pub fn is_full(&self) -> bool {
353        self.queue.is_full()
354    }
355
356    /// Take a statistics snapshot.
357    pub fn stats(&self) -> QueueStats {
358        self.queue.stats()
359    }
360}
361
362// ─────────────────────────────────────────────────────────────────────────────
363// Tests
364// ─────────────────────────────────────────────────────────────────────────────
365
366#[cfg(test)]
367mod tests {
368    use super::*;
369    use std::sync::atomic::Ordering;
370
371    // ── BoundedQueue ──────────────────────────────────────────────────────
372
373    #[test]
374    fn test_bounded_queue_try_push() {
375        let q: BoundedQueue<u32> = BoundedQueue::new(4);
376        assert!(q.try_push(1));
377        assert!(q.try_push(2));
378        assert_eq!(q.len(), 2);
379        assert_eq!(q.total_enqueued.load(Ordering::Relaxed), 2);
380    }
381
382    #[test]
383    fn test_bounded_queue_try_push_full_returns_false() {
384        let q: BoundedQueue<u32> = BoundedQueue::new(2);
385        assert!(q.try_push(10));
386        assert!(q.try_push(20));
387        // Queue is now full — further push must fail.
388        assert!(!q.try_push(30));
389        assert_eq!(q.total_dropped.load(Ordering::Relaxed), 1);
390        assert_eq!(q.len(), 2);
391    }
392
393    #[test]
394    fn test_bounded_queue_pop_empty_returns_none() {
395        let q: BoundedQueue<u32> = BoundedQueue::new(4);
396        assert_eq!(q.pop(), None);
397    }
398
399    #[test]
400    fn test_bounded_queue_fifo_order() {
401        let q: BoundedQueue<u32> = BoundedQueue::new(8);
402        for i in 0..5u32 {
403            assert!(q.try_push(i));
404        }
405        for expected in 0..5u32 {
406            assert_eq!(q.pop(), Some(expected));
407        }
408        assert_eq!(q.pop(), None);
409    }
410
411    #[test]
412    fn test_bounded_queue_stats() {
413        let q: BoundedQueue<u32> = BoundedQueue::new(4);
414        q.try_push(1);
415        q.try_push(2);
416        q.pop();
417
418        let stats = q.stats();
419        assert_eq!(stats.capacity, 4);
420        assert_eq!(stats.len, 1);
421        assert_eq!(stats.total_enqueued, 2);
422        assert_eq!(stats.total_dequeued, 1);
423        assert_eq!(stats.total_dropped, 0);
424        assert!((stats.utilization - 0.25).abs() < f32::EPSILON);
425    }
426
427    #[test]
428    fn test_bounded_queue_drain() {
429        let q: BoundedQueue<u32> = BoundedQueue::new(8);
430        for i in 0..4u32 {
431            q.try_push(i);
432        }
433        let items = q.drain();
434        assert_eq!(items, vec![0, 1, 2, 3]);
435        assert_eq!(q.len(), 0);
436        assert_eq!(q.total_dequeued.load(Ordering::Relaxed), 4);
437    }
438
439    // ── InferenceQueue ────────────────────────────────────────────────────
440
441    #[test]
442    fn test_inference_queue_submit_and_depth() {
443        let iq = InferenceQueue::new(8);
444        let _rx1 = iq
445            .submit(vec![1, 2, 3], 16, SamplingParams::default())
446            .expect("submit should succeed on an empty queue");
447        let _rx2 = iq
448            .submit(vec![4, 5, 6], 16, SamplingParams::default())
449            .expect("second submit should succeed");
450
451        assert_eq!(iq.queue_depth(), 2);
452        assert!(!iq.is_full());
453    }
454
455    #[test]
456    fn test_inference_queue_full_returns_none() {
457        let iq = InferenceQueue::new(2);
458
459        let _rx1 = iq
460            .submit(vec![1], 8, SamplingParams::default())
461            .expect("first submit");
462        let _rx2 = iq
463            .submit(vec![2], 8, SamplingParams::default())
464            .expect("second submit");
465
466        // Queue is now full — next submit must return None.
467        assert!(iq.is_full());
468        let result = iq.submit(vec![3], 8, SamplingParams::default());
469        assert!(result.is_none(), "submit to a full queue must return None");
470    }
471}