use tokio::sync::mpsc;
#[derive(Debug, Clone)]
pub struct RunWorkItem {
pub thread_id: String,
pub run_id: String,
pub model: Option<String>,
pub instructions: Option<String>,
pub max_tokens: usize,
}
#[derive(Debug, Clone)]
pub struct RunQueueSender(pub mpsc::UnboundedSender<RunWorkItem>);
impl RunQueueSender {
pub fn send(&self, item: RunWorkItem) -> Result<(), mpsc::error::SendError<RunWorkItem>> {
self.0.send(item)
}
}
pub struct RunQueueReceiver(pub mpsc::UnboundedReceiver<RunWorkItem>);
impl RunQueueReceiver {
pub async fn recv(&mut self) -> Option<RunWorkItem> {
self.0.recv().await
}
}
pub fn new_run_queue() -> (RunQueueSender, RunQueueReceiver) {
let (tx, rx) = mpsc::unbounded_channel();
(RunQueueSender(tx), RunQueueReceiver(rx))
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn queue_send_and_receive() {
let (tx, mut rx) = new_run_queue();
tx.send(RunWorkItem {
thread_id: "thread_1".to_string(),
run_id: "run_1".to_string(),
model: None,
instructions: None,
max_tokens: 256,
})
.expect("send should succeed");
let item = rx.recv().await.expect("recv should yield an item");
assert_eq!(item.thread_id, "thread_1");
assert_eq!(item.run_id, "run_1");
assert_eq!(item.max_tokens, 256);
}
#[tokio::test]
async fn queue_closed_when_sender_dropped() {
let (tx, mut rx) = new_run_queue();
drop(tx);
assert!(
rx.recv().await.is_none(),
"closed channel should yield None"
);
}
#[tokio::test]
async fn queue_multiple_items_in_order() {
let (tx, mut rx) = new_run_queue();
for i in 0..5_u32 {
tx.send(RunWorkItem {
thread_id: format!("thread_{i}"),
run_id: format!("run_{i}"),
model: None,
instructions: None,
max_tokens: 128,
})
.expect("send");
}
for i in 0..5_u32 {
let item = rx.recv().await.expect("recv");
assert_eq!(item.thread_id, format!("thread_{i}"));
}
}
#[tokio::test]
async fn queue_sender_clone_shares_channel() {
let (tx, mut rx) = new_run_queue();
let tx2 = tx.clone();
tx.send(RunWorkItem {
thread_id: "t1".into(),
run_id: "r1".into(),
model: None,
instructions: None,
max_tokens: 64,
})
.expect("send from tx");
tx2.send(RunWorkItem {
thread_id: "t2".into(),
run_id: "r2".into(),
model: None,
instructions: None,
max_tokens: 64,
})
.expect("send from tx2");
let a = rx.recv().await.expect("first item");
let b = rx.recv().await.expect("second item");
assert_eq!(a.thread_id, "t1");
assert_eq!(b.thread_id, "t2");
}
}