consumer_demo/consumer-demo.rs
1use async_trait::async_trait;
2use bb8::Pool;
3use serde::{Deserialize, Serialize};
4use sidekiq::{
5 ChainIter, Job, Processor, RedisConnectionManager, Result, ServerMiddleware, Worker, WorkerRef,
6};
7use std::sync::Arc;
8use tracing::{error, info};
9
10#[derive(Clone)]
11struct HelloWorker;
12
13#[async_trait]
14impl Worker<()> for HelloWorker {
15 async fn perform(&self, _args: ()) -> Result<()> {
16 // I don't use any args. I do my own work.
17 Ok(())
18 }
19}
20
21#[derive(Clone)]
22struct PaymentReportWorker;
23
24impl PaymentReportWorker {
25 async fn send_report(&self, user_guid: String) -> Result<()> {
26 // TODO: Some actual work goes here...
27 info!({"user_guid" = user_guid, "class_name" = Self::class_name()}, "Sending payment report to user");
28
29 Ok(())
30 }
31}
32
33#[derive(Deserialize, Debug, Serialize)]
34struct PaymentReportArgs {
35 user_guid: String,
36}
37
38#[async_trait]
39impl Worker<PaymentReportArgs> for PaymentReportWorker {
40 fn opts() -> sidekiq::WorkerOpts<PaymentReportArgs, Self> {
41 sidekiq::WorkerOpts::new().queue("yolo")
42 }
43
44 async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
45 self.send_report(args.user_guid).await
46 }
47}
48
49struct FilterExpiredUsersMiddleware;
50
51#[derive(Deserialize)]
52struct FiltereExpiredUsersArgs {
53 user_guid: String,
54}
55
56impl FiltereExpiredUsersArgs {
57 fn is_expired(&self) -> bool {
58 self.user_guid == "USR-123-EXPIRED"
59 }
60}
61
62#[async_trait]
63impl ServerMiddleware for FilterExpiredUsersMiddleware {
64 async fn call(
65 &self,
66 chain: ChainIter,
67 job: &Job,
68 worker: Arc<WorkerRef>,
69 redis: Pool<RedisConnectionManager>,
70 ) -> Result<()> {
71 let args: std::result::Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
72 serde_json::from_value(job.args.clone());
73
74 // If we can safely deserialize then attempt to filter based on user guid.
75 if let Ok((filter,)) = args {
76 if filter.is_expired() {
77 error!({
78 "class" = &job.class,
79 "jid" = &job.jid,
80 "user_guid" = filter.user_guid,
81 }, "Detected an expired user, skipping this job");
82 return Ok(());
83 }
84 }
85
86 chain.next(job, worker, redis).await
87 }
88}
89
90#[tokio::main]
91async fn main() -> Result<()> {
92 tracing_subscriber::fmt::init();
93
94 // Redis
95 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96 let redis = Pool::builder().build(manager).await?;
97 //
98 // tokio::spawn({
99 // let mut redis = redis.clone();
100 //
101 // async move {
102 // loop {
103 // PaymentReportWorker::perform_async(
104 // &mut redis,
105 // PaymentReportArgs {
106 // user_guid: "USR-123".into(),
107 // },
108 // )
109 // .await
110 // .unwrap();
111 //
112 // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 // }
114 // }
115 // });
116 //
117 // // Enqueue a job with the worker! There are many ways to do this.
118 // PaymentReportWorker::perform_async(
119 // &mut redis,
120 // PaymentReportArgs {
121 // user_guid: "USR-123".into(),
122 // },
123 // )
124 // .await?;
125 //
126 // PaymentReportWorker::perform_in(
127 // &mut redis,
128 // std::time::Duration::from_secs(10),
129 // PaymentReportArgs {
130 // user_guid: "USR-123".into(),
131 // },
132 // )
133 // .await?;
134 //
135 // PaymentReportWorker::opts()
136 // .queue("brolo")
137 // .perform_async(
138 // &mut redis,
139 // PaymentReportArgs {
140 // user_guid: "USR-123-EXPIRED".into(),
141 // },
142 // )
143 // .await?;
144 //
145 // sidekiq::perform_async(
146 // &mut redis,
147 // "PaymentReportWorker".into(),
148 // "yolo".into(),
149 // PaymentReportArgs {
150 // user_guid: "USR-123".to_string(),
151 // },
152 // )
153 // .await?;
154 //
155 // // Enqueue a job
156 // sidekiq::perform_async(
157 // &mut redis,
158 // "PaymentReportWorker".into(),
159 // "yolo".into(),
160 // PaymentReportArgs {
161 // user_guid: "USR-123".to_string(),
162 // },
163 // )
164 // .await?;
165 //
166 // // Enqueue a job with options
167 // sidekiq::opts()
168 // .queue("yolo".to_string())
169 // .perform_async(
170 // &mut redis,
171 // "PaymentReportWorker".into(),
172 // PaymentReportArgs {
173 // user_guid: "USR-123".to_string(),
174 // },
175 // )
176 // .await?;
177
178 // Sidekiq server
179 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181 // Add known workers
182 p.register(HelloWorker);
183 p.register(PaymentReportWorker);
184
185 // Custom Middlewares
186 p.using(FilterExpiredUsersMiddleware).await;
187
188 // // Reset cron jobs
189 // periodic::destroy_all(redis.clone()).await?;
190 //
191 // // Cron jobs
192 // periodic::builder("0 * * * * *")?
193 // .name("Payment report processing for a random user")
194 // .queue("yolo")
195 // //.args(PaymentReportArgs {
196 // // user_guid: "USR-123-PERIODIC".to_string(),
197 // //})?
198 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
200 // .await?;
201
202 p.run().await;
203 Ok(())
204}