producer_demo/producer-demo.rs
1use async_trait::async_trait;
2use bb8::Pool;
3use serde::{Deserialize, Serialize};
4use sidekiq::{
5 ChainIter, Job, RedisConnectionManager, Result, ServerMiddleware, Worker, WorkerRef,
6};
7use std::sync::Arc;
8use tracing::{error, info};
9
10#[derive(Clone)]
11struct HelloWorker;
12
13#[async_trait]
14impl Worker<()> for HelloWorker {
15 async fn perform(&self, _args: ()) -> Result<()> {
16 // I don't use any args. I do my own work.
17 Ok(())
18 }
19}
20
21#[derive(Clone)]
22struct PaymentReportWorker {}
23
24impl PaymentReportWorker {
25 async fn send_report(&self, user_guid: String) -> Result<()> {
26 // TODO: Some actual work goes here...
27 info!({"user_guid" = user_guid, "class_name" = Self::class_name()}, "Sending payment report to user");
28
29 Ok(())
30 }
31}
32
33#[derive(Deserialize, Debug, Serialize)]
34struct PaymentReportArgs {
35 user_guid: String,
36}
37
38#[async_trait]
39impl Worker<PaymentReportArgs> for PaymentReportWorker {
40 fn opts() -> sidekiq::WorkerOpts<PaymentReportArgs, Self> {
41 sidekiq::WorkerOpts::new().queue("yolo")
42 }
43
44 async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
45 self.send_report(args.user_guid).await
46 }
47}
48
49struct FilterExpiredUsersMiddleware {}
50
51#[derive(Deserialize)]
52struct FiltereExpiredUsersArgs {
53 user_guid: String,
54}
55
56impl FiltereExpiredUsersArgs {
57 fn is_expired(&self) -> bool {
58 self.user_guid == "USR-123-EXPIRED"
59 }
60}
61
62#[async_trait]
63impl ServerMiddleware for FilterExpiredUsersMiddleware {
64 async fn call(
65 &self,
66 chain: ChainIter,
67 job: &Job,
68 worker: Arc<WorkerRef>,
69 redis: Pool<RedisConnectionManager>,
70 ) -> Result<()> {
71 let args: std::result::Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
72 serde_json::from_value(job.args.clone());
73
74 // If we can safely deserialize then attempt to filter based on user guid.
75 if let Ok((filter,)) = args {
76 if filter.is_expired() {
77 error!({
78 "class" = &job.class,
79 "jid" = &job.jid,
80 "user_guid" = filter.user_guid
81 },
82 "Detected an expired user, skipping this job"
83 );
84 return Ok(());
85 }
86 }
87
88 chain.next(job, worker, redis).await
89 }
90}
91
92#[tokio::main]
93async fn main() -> Result<()> {
94 tracing_subscriber::fmt::init();
95
96 // Redis
97 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
98 let redis = Pool::builder().build(manager).await?;
99
100 let mut n = 0;
101 let mut last = 0;
102 let mut then = std::time::Instant::now();
103
104 loop {
105 PaymentReportWorker::perform_async(
106 &redis,
107 PaymentReportArgs {
108 user_guid: "USR-123".into(),
109 },
110 )
111 .await
112 .unwrap();
113
114 //tokio::time::sleep(std::time::Duration::from_millis(1)).await;
115
116 n += 1;
117
118 if n % 100000 == 0 {
119 let now = std::time::Instant::now();
120 let delta = n - last;
121 last = n;
122 let delta_time = now - then;
123 if delta_time.as_secs() == 0 {
124 continue;
125 }
126 then = now;
127 let rate = delta / delta_time.as_secs();
128 println!("Iterations since last: {delta} at a rate of: {rate} iter/sec");
129 }
130 }
131
132 // // Enqueue a job with the worker! There are many ways to do this.
133 // PaymentReportWorker::perform_async(
134 // &mut redis,
135 // PaymentReportArgs {
136 // user_guid: "USR-123".into(),
137 // },
138 // )
139 // .await?;
140 //
141 // PaymentReportWorker::perform_in(
142 // &mut redis,
143 // std::time::Duration::from_secs(10),
144 // PaymentReportArgs {
145 // user_guid: "USR-123".into(),
146 // },
147 // )
148 // .await?;
149 //
150 // PaymentReportWorker::opts()
151 // .queue("brolo")
152 // .perform_async(
153 // &mut redis,
154 // PaymentReportArgs {
155 // user_guid: "USR-123-EXPIRED".into(),
156 // },
157 // )
158 // .await?;
159 //
160 // sidekiq::perform_async(
161 // &mut redis,
162 // "PaymentReportWorker".into(),
163 // "yolo".into(),
164 // PaymentReportArgs {
165 // user_guid: "USR-123".to_string(),
166 // },
167 // )
168 // .await?;
169 //
170 // // Enqueue a job
171 // sidekiq::perform_async(
172 // &mut redis,
173 // "PaymentReportWorker".into(),
174 // "yolo".into(),
175 // PaymentReportArgs {
176 // user_guid: "USR-123".to_string(),
177 // },
178 // )
179 // .await?;
180 //
181 // // Enqueue a job with options
182 // sidekiq::opts()
183 // .queue("yolo".to_string())
184 // .perform_async(
185 // &mut redis,
186 // "PaymentReportWorker".into(),
187 // PaymentReportArgs {
188 // user_guid: "USR-123".to_string(),
189 // },
190 // )
191 // .await?;
192
193 // // Sidekiq server
194 // let mut p = Processor::new(
195 // redis.clone(),
196 // logger.clone(),
197 // //vec!["yolo".to_string(), "brolo".to_string()],
198 // vec![],
199 // );
200 //
201 // // // Add known workers
202 // // p.register(HelloWorker);
203 // // p.register(PaymentReportWorker::new(logger.clone()));
204 // //
205 // // Custom Middlewares
206 // p.using(FilterExpiredUsersMiddleware::new(logger.clone()))
207 // .await;
208 //
209 // // Reset cron jobs
210 // periodic::destroy_all(redis.clone()).await?;
211 //
212 // // Cron jobs
213 // periodic::builder("0 * * * * *")?
214 // .name("Payment report processing for a random user")
215 // .queue("yolo")
216 // //.args(PaymentReportArgs {
217 // // user_guid: "USR-123-PERIODIC".to_string(),
218 // //})?
219 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
220 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
221 // .await?;
222 //
223 // p.run().await;
224}