consume/
consume.rs

1
2use std::time::Duration;
3
4use anyhow::Result;
5use futures::future::try_join_all;
6use redis_mq::job::Job;
7use redis_mq::queue::Queue;
8use redis_mq::redis_client::RedisClient;
9use tokio::time::sleep;
10
11#[tokio::main]
12async fn main() -> Result<()> {
13    let redis = RedisClient::new().await?;
14    let mut queue = Queue::new(redis, 10);
15
16    let _t2 = tokio::spawn({
17        let mut q = queue.clone();
18
19        async move {
20            let mut i: i32 = 0;
21            loop {
22                let job = Job { id: format!("{i}") };
23                i += 1;
24
25                q.publish(&job).await.unwrap();
26                sleep(Duration::from_millis(5)).await;
27            }
28        }
29    });
30
31    let consumers: Vec<_> = (1..50)
32        .into_iter()
33        .map(|i| {
34            let mut consumer = queue.new_consumer();
35
36            tokio::spawn({
37                async move {
38                    loop {
39                        let job = consumer.fetch_task().await;
40                        println!("consumer_id: {}, consumed job {:?}", i, job)
41                    }
42                }
43            })
44        })
45        .collect();
46
47    queue.start().await?;
48    try_join_all(consumers).await?;
49
50    Ok(())
51}