Skip to main content

oxillama_server/threads/
queue.rs

1//! Unbounded MPSC work queue for the Assistants API run processor.
2//!
3//! Route handlers submit `RunWorkItem`s through the `RunQueueSender` when a
4//! new run is created.  The `spawn_run_worker` background task drains
5//! `RunQueueReceiver` and executes each run against the inference engine.
6
7use tokio::sync::mpsc;
8
9/// A single item submitted to the run worker queue.
10#[derive(Debug, Clone)]
11pub struct RunWorkItem {
12    /// ID of the thread the run belongs to.
13    pub thread_id: String,
14    /// ID of the run to execute.
15    pub run_id: String,
16    /// Optional model override (empty means use server default).
17    pub model: Option<String>,
18    /// Optional system-level instruction prepended to the thread's messages.
19    pub instructions: Option<String>,
20    /// Maximum tokens to generate.
21    pub max_tokens: usize,
22}
23
24/// Sender half of the run work queue.
25///
26/// Cloning this is cheap; all clones share the same underlying channel.
27#[derive(Debug, Clone)]
28pub struct RunQueueSender(pub mpsc::UnboundedSender<RunWorkItem>);
29
30impl RunQueueSender {
31    /// Submit a work item to the run queue.
32    ///
33    /// Returns an error if the receiving end has been dropped (worker exited).
34    pub fn send(&self, item: RunWorkItem) -> Result<(), mpsc::error::SendError<RunWorkItem>> {
35        self.0.send(item)
36    }
37}
38
39/// Receiver half of the run work queue.
40pub struct RunQueueReceiver(pub mpsc::UnboundedReceiver<RunWorkItem>);
41
42impl RunQueueReceiver {
43    /// Receive the next work item, waiting asynchronously.
44    ///
45    /// Returns `None` when all senders have been dropped.
46    pub async fn recv(&mut self) -> Option<RunWorkItem> {
47        self.0.recv().await
48    }
49}
50
51/// Create an unbounded run work queue.
52///
53/// Returns `(sender, receiver)`.  The queue has no capacity bound; callers
54/// are responsible for back-pressure at the API level (e.g. per-thread run
55/// limits).
56pub fn new_run_queue() -> (RunQueueSender, RunQueueReceiver) {
57    let (tx, rx) = mpsc::unbounded_channel();
58    (RunQueueSender(tx), RunQueueReceiver(rx))
59}
60
61// ── Tests ─────────────────────────────────────────────────────────────────────
62
63#[cfg(test)]
64mod tests {
65    use super::*;
66
67    #[tokio::test]
68    async fn queue_send_and_receive() {
69        let (tx, mut rx) = new_run_queue();
70        tx.send(RunWorkItem {
71            thread_id: "thread_1".to_string(),
72            run_id: "run_1".to_string(),
73            model: None,
74            instructions: None,
75            max_tokens: 256,
76        })
77        .expect("send should succeed");
78
79        let item = rx.recv().await.expect("recv should yield an item");
80        assert_eq!(item.thread_id, "thread_1");
81        assert_eq!(item.run_id, "run_1");
82        assert_eq!(item.max_tokens, 256);
83    }
84
85    #[tokio::test]
86    async fn queue_closed_when_sender_dropped() {
87        let (tx, mut rx) = new_run_queue();
88        drop(tx);
89        assert!(
90            rx.recv().await.is_none(),
91            "closed channel should yield None"
92        );
93    }
94
95    #[tokio::test]
96    async fn queue_multiple_items_in_order() {
97        let (tx, mut rx) = new_run_queue();
98
99        for i in 0..5_u32 {
100            tx.send(RunWorkItem {
101                thread_id: format!("thread_{i}"),
102                run_id: format!("run_{i}"),
103                model: None,
104                instructions: None,
105                max_tokens: 128,
106            })
107            .expect("send");
108        }
109
110        for i in 0..5_u32 {
111            let item = rx.recv().await.expect("recv");
112            assert_eq!(item.thread_id, format!("thread_{i}"));
113        }
114    }
115
116    #[tokio::test]
117    async fn queue_sender_clone_shares_channel() {
118        let (tx, mut rx) = new_run_queue();
119        let tx2 = tx.clone();
120
121        tx.send(RunWorkItem {
122            thread_id: "t1".into(),
123            run_id: "r1".into(),
124            model: None,
125            instructions: None,
126            max_tokens: 64,
127        })
128        .expect("send from tx");
129
130        tx2.send(RunWorkItem {
131            thread_id: "t2".into(),
132            run_id: "r2".into(),
133            model: None,
134            instructions: None,
135            max_tokens: 64,
136        })
137        .expect("send from tx2");
138
139        let a = rx.recv().await.expect("first item");
140        let b = rx.recv().await.expect("second item");
141        assert_eq!(a.thread_id, "t1");
142        assert_eq!(b.thread_id, "t2");
143    }
144}