unique/
unique.rs

1use async_trait::async_trait;
2use bb8::Pool;
3use serde::{Deserialize, Serialize};
4use sidekiq::{Processor, RedisConnectionManager, Result, Worker};
5
6#[derive(Clone)]
7struct CustomerNotificationWorker;
8
9#[async_trait]
10impl Worker<CustomerNotification> for CustomerNotificationWorker {
11    fn opts() -> sidekiq::WorkerOpts<CustomerNotification, Self> {
12        // Use default options to set the unique_for option by default.
13        sidekiq::WorkerOpts::new()
14            .queue("customers")
15            .unique_for(std::time::Duration::from_secs(30))
16    }
17
18    async fn perform(&self, _args: CustomerNotification) -> Result<()> {
19        Ok(())
20    }
21}
22
23#[derive(Deserialize, Debug, Serialize)]
24struct CustomerNotification {
25    customer_guid: String,
26}
27
28#[tokio::main]
29async fn main() -> Result<()> {
30    tracing_subscriber::fmt::init();
31
32    // Redis
33    let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34    let redis = Pool::builder().build(manager).await?;
35
36    // Sidekiq server
37    let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39    // Add known workers
40    p.register(CustomerNotificationWorker);
41
42    // Create a bunch of jobs with the default uniqueness options. Only
43    // one of these should be created within a 30 second period.
44    for _ in 1..10 {
45        CustomerNotificationWorker::perform_async(
46            &redis,
47            CustomerNotification {
48                customer_guid: "CST-123".to_string(),
49            },
50        )
51        .await?;
52    }
53
54    // Override the unique_for option. Note: Because the code above
55    // uses the default unique_for value of 30, this code is essentially
56    // a no-op.
57    CustomerNotificationWorker::opts()
58        .unique_for(std::time::Duration::from_secs(90))
59        .perform_async(
60            &redis,
61            CustomerNotification {
62                customer_guid: "CST-123".to_string(),
63            },
64        )
65        .await?;
66
67    p.run().await;
68    Ok(())
69}