consumer_demo/
consumer-demo.rs

1use async_trait::async_trait;
2use bb8::Pool;
3use serde::{Deserialize, Serialize};
4use sidekiq::{
5    ChainIter, Job, Processor, RedisConnectionManager, Result, ServerMiddleware, Worker, WorkerRef,
6};
7use std::sync::Arc;
8use tracing::{error, info};
9
10#[derive(Clone)]
11struct HelloWorker;
12
13#[async_trait]
14impl Worker<()> for HelloWorker {
15    async fn perform(&self, _args: ()) -> Result<()> {
16        // I don't use any args. I do my own work.
17        Ok(())
18    }
19}
20
21#[derive(Clone)]
22struct PaymentReportWorker;
23
24impl PaymentReportWorker {
25    async fn send_report(&self, user_guid: String) -> Result<()> {
26        // TODO: Some actual work goes here...
27        info!({"user_guid" = user_guid, "class_name" = Self::class_name()}, "Sending payment report to user");
28
29        Ok(())
30    }
31}
32
33#[derive(Deserialize, Debug, Serialize)]
34struct PaymentReportArgs {
35    user_guid: String,
36}
37
38#[async_trait]
39impl Worker<PaymentReportArgs> for PaymentReportWorker {
40    fn opts() -> sidekiq::WorkerOpts<PaymentReportArgs, Self> {
41        sidekiq::WorkerOpts::new().queue("yolo")
42    }
43
44    async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
45        self.send_report(args.user_guid).await
46    }
47}
48
49struct FilterExpiredUsersMiddleware;
50
51#[derive(Deserialize)]
52struct FiltereExpiredUsersArgs {
53    user_guid: String,
54}
55
56impl FiltereExpiredUsersArgs {
57    fn is_expired(&self) -> bool {
58        self.user_guid == "USR-123-EXPIRED"
59    }
60}
61
62#[async_trait]
63impl ServerMiddleware for FilterExpiredUsersMiddleware {
64    async fn call(
65        &self,
66        chain: ChainIter,
67        job: &Job,
68        worker: Arc<WorkerRef>,
69        redis: Pool<RedisConnectionManager>,
70    ) -> Result<()> {
71        let args: std::result::Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
72            serde_json::from_value(job.args.clone());
73
74        // If we can safely deserialize then attempt to filter based on user guid.
75        if let Ok((filter,)) = args {
76            if filter.is_expired() {
77                error!({
78                    "class" = &job.class,
79                    "jid" = &job.jid,
80                    "user_guid" = filter.user_guid,
81                }, "Detected an expired user, skipping this job");
82                return Ok(());
83            }
84        }
85
86        chain.next(job, worker, redis).await
87    }
88}
89
90#[tokio::main]
91async fn main() -> Result<()> {
92    tracing_subscriber::fmt::init();
93
94    // Redis
95    let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96    let redis = Pool::builder().build(manager).await?;
97    //
98    //    tokio::spawn({
99    //        let mut redis = redis.clone();
100    //
101    //        async move {
102    //            loop {
103    //                PaymentReportWorker::perform_async(
104    //                    &mut redis,
105    //                    PaymentReportArgs {
106    //                        user_guid: "USR-123".into(),
107    //                    },
108    //                )
109    //                .await
110    //                .unwrap();
111    //
112    //                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113    //            }
114    //        }
115    //    });
116    //
117    //    // Enqueue a job with the worker! There are many ways to do this.
118    //    PaymentReportWorker::perform_async(
119    //        &mut redis,
120    //        PaymentReportArgs {
121    //            user_guid: "USR-123".into(),
122    //        },
123    //    )
124    //    .await?;
125    //
126    //    PaymentReportWorker::perform_in(
127    //        &mut redis,
128    //        std::time::Duration::from_secs(10),
129    //        PaymentReportArgs {
130    //            user_guid: "USR-123".into(),
131    //        },
132    //    )
133    //    .await?;
134    //
135    //    PaymentReportWorker::opts()
136    //        .queue("brolo")
137    //        .perform_async(
138    //            &mut redis,
139    //            PaymentReportArgs {
140    //                user_guid: "USR-123-EXPIRED".into(),
141    //            },
142    //        )
143    //        .await?;
144    //
145    //    sidekiq::perform_async(
146    //        &mut redis,
147    //        "PaymentReportWorker".into(),
148    //        "yolo".into(),
149    //        PaymentReportArgs {
150    //            user_guid: "USR-123".to_string(),
151    //        },
152    //    )
153    //    .await?;
154    //
155    //    // Enqueue a job
156    //    sidekiq::perform_async(
157    //        &mut redis,
158    //        "PaymentReportWorker".into(),
159    //        "yolo".into(),
160    //        PaymentReportArgs {
161    //            user_guid: "USR-123".to_string(),
162    //        },
163    //    )
164    //    .await?;
165    //
166    //    // Enqueue a job with options
167    //    sidekiq::opts()
168    //        .queue("yolo".to_string())
169    //        .perform_async(
170    //            &mut redis,
171    //            "PaymentReportWorker".into(),
172    //            PaymentReportArgs {
173    //                user_guid: "USR-123".to_string(),
174    //            },
175    //        )
176    //        .await?;
177
178    // Sidekiq server
179    let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181    // Add known workers
182    p.register(HelloWorker);
183    p.register(PaymentReportWorker);
184
185    // Custom Middlewares
186    p.using(FilterExpiredUsersMiddleware).await;
187
188    //    // Reset cron jobs
189    //    periodic::destroy_all(redis.clone()).await?;
190    //
191    //    // Cron jobs
192    //    periodic::builder("0 * * * * *")?
193    //        .name("Payment report processing for a random user")
194    //        .queue("yolo")
195    //        //.args(PaymentReportArgs {
196    //        //    user_guid: "USR-123-PERIODIC".to_string(),
197    //        //})?
198    //        .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199    //        .register(&mut p, PaymentReportWorker::new(logger.clone()))
200    //        .await?;
201
202    p.run().await;
203    Ok(())
204}