Skip to main content

oxillama_server/batch_spool/
queue.rs

1//! Tokio mpsc-based work queue for the batch processing pipeline.
2//!
3//! The `BatchQueue` provides a bounded channel through which route handlers
4//! submit job IDs to the background `BatchWorker`.  The worker receives job
5//! IDs, reads the input from disk, processes each line through the inference
6//! engine, and writes results back to disk.
7
8use tokio::sync::mpsc;
9
10/// A single work item: a job ID to process.
11#[derive(Debug, Clone)]
12pub struct BatchWorkItem {
13    /// The batch job ID (subdirectory name under the spool dir).
14    pub job_id: String,
15}
16
17/// Sender half of the batch work queue.
18pub type BatchQueueSender = mpsc::Sender<BatchWorkItem>;
19
20/// Receiver half of the batch work queue.
21pub type BatchQueueReceiver = mpsc::Receiver<BatchWorkItem>;
22
23/// Create a bounded batch work queue with the given capacity.
24pub fn new_batch_queue(capacity: usize) -> (BatchQueueSender, BatchQueueReceiver) {
25    mpsc::channel(capacity)
26}
27
28#[cfg(test)]
29mod tests {
30    use super::*;
31
32    #[tokio::test]
33    async fn queue_send_and_receive() {
34        let (tx, mut rx) = new_batch_queue(8);
35        tx.send(BatchWorkItem {
36            job_id: "batch_123".to_string(),
37        })
38        .await
39        .expect("send should succeed");
40
41        let item = rx.recv().await.expect("recv should yield an item");
42        assert_eq!(item.job_id, "batch_123");
43    }
44
45    #[tokio::test]
46    async fn queue_closed_when_sender_dropped() {
47        let (tx, mut rx) = new_batch_queue(4);
48        drop(tx);
49        assert!(
50            rx.recv().await.is_none(),
51            "closed channel should yield None"
52        );
53    }
54}