producer_demo/
producer-demo.rs

1use async_trait::async_trait;
2use bb8::Pool;
3use serde::{Deserialize, Serialize};
4use sidekiq::{
5    ChainIter, Job, RedisConnectionManager, Result, ServerMiddleware, Worker, WorkerRef,
6};
7use std::sync::Arc;
8use tracing::{error, info};
9
10#[derive(Clone)]
11struct HelloWorker;
12
13#[async_trait]
14impl Worker<()> for HelloWorker {
15    async fn perform(&self, _args: ()) -> Result<()> {
16        // I don't use any args. I do my own work.
17        Ok(())
18    }
19}
20
21#[derive(Clone)]
22struct PaymentReportWorker {}
23
24impl PaymentReportWorker {
25    async fn send_report(&self, user_guid: String) -> Result<()> {
26        // TODO: Some actual work goes here...
27        info!({"user_guid" = user_guid, "class_name" = Self::class_name()}, "Sending payment report to user");
28
29        Ok(())
30    }
31}
32
33#[derive(Deserialize, Debug, Serialize)]
34struct PaymentReportArgs {
35    user_guid: String,
36}
37
38#[async_trait]
39impl Worker<PaymentReportArgs> for PaymentReportWorker {
40    fn opts() -> sidekiq::WorkerOpts<PaymentReportArgs, Self> {
41        sidekiq::WorkerOpts::new().queue("yolo")
42    }
43
44    async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
45        self.send_report(args.user_guid).await
46    }
47}
48
49struct FilterExpiredUsersMiddleware {}
50
51#[derive(Deserialize)]
52struct FiltereExpiredUsersArgs {
53    user_guid: String,
54}
55
56impl FiltereExpiredUsersArgs {
57    fn is_expired(&self) -> bool {
58        self.user_guid == "USR-123-EXPIRED"
59    }
60}
61
62#[async_trait]
63impl ServerMiddleware for FilterExpiredUsersMiddleware {
64    async fn call(
65        &self,
66        chain: ChainIter,
67        job: &Job,
68        worker: Arc<WorkerRef>,
69        redis: Pool<RedisConnectionManager>,
70    ) -> Result<()> {
71        let args: std::result::Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
72            serde_json::from_value(job.args.clone());
73
74        // If we can safely deserialize then attempt to filter based on user guid.
75        if let Ok((filter,)) = args {
76            if filter.is_expired() {
77                error!({
78                        "class" = &job.class,
79                        "jid" = &job.jid,
80                        "user_guid" = filter.user_guid
81                    },
82                    "Detected an expired user, skipping this job"
83                );
84                return Ok(());
85            }
86        }
87
88        chain.next(job, worker, redis).await
89    }
90}
91
92#[tokio::main]
93async fn main() -> Result<()> {
94    tracing_subscriber::fmt::init();
95
96    // Redis
97    let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
98    let redis = Pool::builder().build(manager).await?;
99
100    let mut n = 0;
101    let mut last = 0;
102    let mut then = std::time::Instant::now();
103
104    loop {
105        PaymentReportWorker::perform_async(
106            &redis,
107            PaymentReportArgs {
108                user_guid: "USR-123".into(),
109            },
110        )
111        .await
112        .unwrap();
113
114        //tokio::time::sleep(std::time::Duration::from_millis(1)).await;
115
116        n += 1;
117
118        if n % 100000 == 0 {
119            let now = std::time::Instant::now();
120            let delta = n - last;
121            last = n;
122            let delta_time = now - then;
123            if delta_time.as_secs() == 0 {
124                continue;
125            }
126            then = now;
127            let rate = delta / delta_time.as_secs();
128            println!("Iterations since last: {delta} at a rate of: {rate} iter/sec");
129        }
130    }
131
132    //    // Enqueue a job with the worker! There are many ways to do this.
133    //    PaymentReportWorker::perform_async(
134    //        &mut redis,
135    //        PaymentReportArgs {
136    //            user_guid: "USR-123".into(),
137    //        },
138    //    )
139    //    .await?;
140    //
141    //    PaymentReportWorker::perform_in(
142    //        &mut redis,
143    //        std::time::Duration::from_secs(10),
144    //        PaymentReportArgs {
145    //            user_guid: "USR-123".into(),
146    //        },
147    //    )
148    //    .await?;
149    //
150    //    PaymentReportWorker::opts()
151    //        .queue("brolo")
152    //        .perform_async(
153    //            &mut redis,
154    //            PaymentReportArgs {
155    //                user_guid: "USR-123-EXPIRED".into(),
156    //            },
157    //        )
158    //        .await?;
159    //
160    //    sidekiq::perform_async(
161    //        &mut redis,
162    //        "PaymentReportWorker".into(),
163    //        "yolo".into(),
164    //        PaymentReportArgs {
165    //            user_guid: "USR-123".to_string(),
166    //        },
167    //    )
168    //    .await?;
169    //
170    //    // Enqueue a job
171    //    sidekiq::perform_async(
172    //        &mut redis,
173    //        "PaymentReportWorker".into(),
174    //        "yolo".into(),
175    //        PaymentReportArgs {
176    //            user_guid: "USR-123".to_string(),
177    //        },
178    //    )
179    //    .await?;
180    //
181    //    // Enqueue a job with options
182    //    sidekiq::opts()
183    //        .queue("yolo".to_string())
184    //        .perform_async(
185    //            &mut redis,
186    //            "PaymentReportWorker".into(),
187    //            PaymentReportArgs {
188    //                user_guid: "USR-123".to_string(),
189    //            },
190    //        )
191    //        .await?;
192
193    //    // Sidekiq server
194    //    let mut p = Processor::new(
195    //        redis.clone(),
196    //        logger.clone(),
197    //        //vec!["yolo".to_string(), "brolo".to_string()],
198    //        vec![],
199    //    );
200    //
201    //    //    // Add known workers
202    //    //    p.register(HelloWorker);
203    //    //    p.register(PaymentReportWorker::new(logger.clone()));
204    //    //
205    //    // Custom Middlewares
206    //    p.using(FilterExpiredUsersMiddleware::new(logger.clone()))
207    //        .await;
208    //
209    //    // Reset cron jobs
210    //    periodic::destroy_all(redis.clone()).await?;
211    //
212    //    // Cron jobs
213    //    periodic::builder("0 * * * * *")?
214    //        .name("Payment report processing for a random user")
215    //        .queue("yolo")
216    //        //.args(PaymentReportArgs {
217    //        //    user_guid: "USR-123-PERIODIC".to_string(),
218    //        //})?
219    //        .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
220    //        .register(&mut p, PaymentReportWorker::new(logger.clone()))
221    //        .await?;
222    //
223    //    p.run().await;
224}