redis_mq/
queue.rs

1use std::sync::Arc;
2
3use crate::{consumer::Consumer, job::Job, redis_client::RedisClient};
4use anyhow::{Context, Result};
5use tokio::sync::mpsc;
6use tokio::sync::Mutex;
7use tokio::time::{sleep, Duration};
8
9#[derive(Clone)]
10pub struct Queue {
11    redis_client: RedisClient,
12    receiver: Arc<Mutex<mpsc::Receiver<Job>>>,
13    sender: mpsc::Sender<Job>,
14    prefetch: i32,
15}
16
17impl Queue {
18    pub fn new(redis_client: RedisClient, prefetch: i32) -> Queue {
19        let (tx, rx) = mpsc::channel(prefetch as usize);
20        Queue {
21            redis_client,
22            receiver: Arc::new(Mutex::new(rx)),
23            sender: tx,
24            prefetch: prefetch,
25        }
26    }
27
28    pub async fn start(&mut self) -> Result<()> {
29        self.batch_consume().await
30    }
31
32    pub fn new_consumer(&mut self) -> Consumer {
33        Consumer::new(self.receiver.clone())
34    }
35
36    async fn batch_consume(&mut self) -> Result<()> {
37        loop {
38            let jobs = self.redis_client.rpop(self.prefetch).await?;
39
40            if jobs.is_empty() {
41                sleep(Duration::from_millis(1000)).await;
42            } else {
43                for job in jobs {
44                    self.sender.send(job).await.context("failed to send job")?
45                }
46            }
47        }
48    }
49
50    pub async fn publish(&mut self, job: &Job) -> Result<()> {
51        self.redis_client.lpush(job).await
52    }
53}