1use std::sync::Arc;
2
3use crate::{consumer::Consumer, job::Job, redis_client::RedisClient};
4use anyhow::{Context, Result};
5use tokio::sync::mpsc;
6use tokio::sync::Mutex;
7use tokio::time::{sleep, Duration};
8
9#[derive(Clone)]
10pub struct Queue {
11 redis_client: RedisClient,
12 receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
13 sender: mpsc::Sender<Job>,
14 prefetch: i32,
15}
16
17impl Queue {
18 pub fn new(redis_client: RedisClient, prefetch: i32) -> Queue {
19 let (tx, rx) = mpsc::channel(prefetch as usize);
20 Queue {
21 redis_client,
22 receiver: Arc::new(Mutex::new(rx)),
23 sender: tx,
24 prefetch: prefetch,
25 }
26 }
27
28 pub async fn start(&mut self) -> Result<()> {
29 self.batch_consume().await
30 }
31
32 pub fn new_consumer(&mut self) -> Consumer {
33 Consumer::new(self.receiver.clone())
34 }
35
36 async fn batch_consume(&mut self) -> Result<()> {
37 loop {
38 let jobs = self.redis_client.rpop(self.prefetch).await?;
39
40 if jobs.is_empty() {
41 sleep(Duration::from_millis(1000)).await;
42 } else {
43 for job in jobs {
44 self.sender.send(job).await.context("failed to send job")?
45 }
46 }
47 }
48 }
49
50 pub async fn publish(&mut self, job: &Job) -> Result<()> {
51 self.redis_client.lpush(job).await
52 }
53}