1use async_trait::async_trait;
2use bb8::Pool;
3use serde::{Deserialize, Serialize};
4use serde_json::json;
5use sidekiq::{
6 periodic, ChainIter, Job, Processor, RedisConnectionManager, RedisPool, Result,
7 ServerMiddleware, Worker, WorkerRef,
8};
9use std::sync::Arc;
10use tracing::{debug, error, info, Level};
11
12#[derive(Clone)]
13struct HelloWorker;
14
15#[async_trait]
16impl Worker<()> for HelloWorker {
17 async fn perform(&self, _args: ()) -> Result<()> {
18 Ok(())
20 }
21}
22
23#[derive(Clone)]
24struct PaymentReportWorker {
25 redis: RedisPool,
26}
27
28impl PaymentReportWorker {
29 fn new(redis: RedisPool) -> Self {
30 Self { redis }
31 }
32
33 async fn send_report(&self, user_guid: String) -> Result<()> {
34 info!({
36 "user_guid" = user_guid,
37 "class_name" = Self::class_name()
38 }, "Sending payment report to user");
39
40 Ok(())
41 }
42}
43
44#[derive(Deserialize, Debug, Serialize)]
45struct PaymentReportArgs {
46 user_guid: String,
47}
48
49#[async_trait]
50impl Worker<PaymentReportArgs> for PaymentReportWorker {
51 fn opts() -> sidekiq::WorkerOpts<PaymentReportArgs, Self> {
52 sidekiq::WorkerOpts::new().queue("yolo")
53 }
54
55 async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
56 use redis::AsyncCommands;
57
58 let times_called: usize = self
59 .redis
60 .get()
61 .await?
62 .unnamespaced_borrow_mut()
63 .incr("example_of_accessing_the_raw_redis_connection", 1)
64 .await?;
65
66 debug!({ "times_called" = times_called }, "Called this worker");
67
68 self.send_report(args.user_guid).await
69 }
70}
71
72struct FilterExpiredUsersMiddleware;
73
74#[derive(Deserialize)]
75struct FiltereExpiredUsersArgs {
76 user_guid: String,
77}
78
79impl FiltereExpiredUsersArgs {
80 fn is_expired(&self) -> bool {
81 self.user_guid == "USR-123-EXPIRED"
82 }
83}
84
85#[async_trait]
86impl ServerMiddleware for FilterExpiredUsersMiddleware {
87 async fn call(
88 &self,
89 chain: ChainIter,
90 job: &Job,
91 worker: Arc<WorkerRef>,
92 redis: RedisPool,
93 ) -> Result<()> {
94 let args: std::result::Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
95 serde_json::from_value(job.args.clone());
96
97 if let Ok((filter,)) = args {
99 if filter.is_expired() {
100 error!({
101 "class" = &job.class,
102 "jid" = &job.jid,
103 "user_guid" = filter.user_guid
104 }, "Detected an expired user, skipping this job");
105 return Ok(());
106 }
107 }
108
109 chain.next(job, worker, redis).await
110 }
111}
112
113#[tokio::main]
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 p.using(FilterExpiredUsersMiddleware).await;
210
211 periodic::destroy_all(redis.clone()).await?;
213
214 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}