oxillama_server/threads/
queue.rs1use tokio::sync::mpsc;
8
9#[derive(Debug, Clone)]
11pub struct RunWorkItem {
12 pub thread_id: String,
14 pub run_id: String,
16 pub model: Option<String>,
18 pub instructions: Option<String>,
20 pub max_tokens: usize,
22}
23
24#[derive(Debug, Clone)]
28pub struct RunQueueSender(pub mpsc::UnboundedSender<RunWorkItem>);
29
30impl RunQueueSender {
31 pub fn send(&self, item: RunWorkItem) -> Result<(), mpsc::error::SendError<RunWorkItem>> {
35 self.0.send(item)
36 }
37}
38
39pub struct RunQueueReceiver(pub mpsc::UnboundedReceiver<RunWorkItem>);
41
42impl RunQueueReceiver {
43 pub async fn recv(&mut self) -> Option<RunWorkItem> {
47 self.0.recv().await
48 }
49}
50
51pub fn new_run_queue() -> (RunQueueSender, RunQueueReceiver) {
57 let (tx, rx) = mpsc::unbounded_channel();
58 (RunQueueSender(tx), RunQueueReceiver(rx))
59}
60
61#[cfg(test)]
64mod tests {
65 use super::*;
66
67 #[tokio::test]
68 async fn queue_send_and_receive() {
69 let (tx, mut rx) = new_run_queue();
70 tx.send(RunWorkItem {
71 thread_id: "thread_1".to_string(),
72 run_id: "run_1".to_string(),
73 model: None,
74 instructions: None,
75 max_tokens: 256,
76 })
77 .expect("send should succeed");
78
79 let item = rx.recv().await.expect("recv should yield an item");
80 assert_eq!(item.thread_id, "thread_1");
81 assert_eq!(item.run_id, "run_1");
82 assert_eq!(item.max_tokens, 256);
83 }
84
85 #[tokio::test]
86 async fn queue_closed_when_sender_dropped() {
87 let (tx, mut rx) = new_run_queue();
88 drop(tx);
89 assert!(
90 rx.recv().await.is_none(),
91 "closed channel should yield None"
92 );
93 }
94
95 #[tokio::test]
96 async fn queue_multiple_items_in_order() {
97 let (tx, mut rx) = new_run_queue();
98
99 for i in 0..5_u32 {
100 tx.send(RunWorkItem {
101 thread_id: format!("thread_{i}"),
102 run_id: format!("run_{i}"),
103 model: None,
104 instructions: None,
105 max_tokens: 128,
106 })
107 .expect("send");
108 }
109
110 for i in 0..5_u32 {
111 let item = rx.recv().await.expect("recv");
112 assert_eq!(item.thread_id, format!("thread_{i}"));
113 }
114 }
115
116 #[tokio::test]
117 async fn queue_sender_clone_shares_channel() {
118 let (tx, mut rx) = new_run_queue();
119 let tx2 = tx.clone();
120
121 tx.send(RunWorkItem {
122 thread_id: "t1".into(),
123 run_id: "r1".into(),
124 model: None,
125 instructions: None,
126 max_tokens: 64,
127 })
128 .expect("send from tx");
129
130 tx2.send(RunWorkItem {
131 thread_id: "t2".into(),
132 run_id: "r2".into(),
133 model: None,
134 instructions: None,
135 max_tokens: 64,
136 })
137 .expect("send from tx2");
138
139 let a = rx.recv().await.expect("first item");
140 let b = rx.recv().await.expect("second item");
141 assert_eq!(a.thread_id, "t1");
142 assert_eq!(b.thread_id, "t2");
143 }
144}