pub struct WorkerOpts<Args, W: Worker<Args> + ?Sized> { /* private fields */ }
Implementations§
Source§impl<Args, W> WorkerOpts<Args, W>where
W: Worker<Args>,
impl<Args, W> WorkerOpts<Args, W>where
W: Worker<Args>,
Sourcepub fn new() -> Self
pub fn new() -> Self
Examples found in repository?
More examples
pub fn retry<RO>(self, retry: RO) -> Self
pub fn retry_queue<S: Into<String>>(self, retry_queue: S) -> Self
Sourcepub fn queue<S: Into<String>>(self, queue: S) -> Self
pub fn queue<S: Into<String>>(self, queue: S) -> Self
Examples found in repository?
More examples
examples/demo.rs (line 52)
51 fn opts() -> sidekiq::WorkerOpts<PaymentReportArgs, Self> {
52 sidekiq::WorkerOpts::new().queue("yolo")
53 }
54
55 async fn perform(&self, args: PaymentReportArgs) -> Result<()> {
56 use redis::AsyncCommands;
57
58 let times_called: usize = self
59 .redis
60 .get()
61 .await?
62 .unnamespaced_borrow_mut()
63 .incr("example_of_accessing_the_raw_redis_connection", 1)
64 .await?;
65
66 debug!({ "times_called" = times_called }, "Called this worker");
67
68 self.send_report(args.user_guid).await
69 }
70}
71
72struct FilterExpiredUsersMiddleware;
73
74#[derive(Deserialize)]
75struct FiltereExpiredUsersArgs {
76 user_guid: String,
77}
78
79impl FiltereExpiredUsersArgs {
80 fn is_expired(&self) -> bool {
81 self.user_guid == "USR-123-EXPIRED"
82 }
83}
84
85#[async_trait]
86impl ServerMiddleware for FilterExpiredUsersMiddleware {
87 async fn call(
88 &self,
89 chain: ChainIter,
90 job: &Job,
91 worker: Arc<WorkerRef>,
92 redis: RedisPool,
93 ) -> Result<()> {
94 let args: std::result::Result<(FiltereExpiredUsersArgs,), serde_json::Error> =
95 serde_json::from_value(job.args.clone());
96
97 // If we can safely deserialize then attempt to filter based on user guid.
98 if let Ok((filter,)) = args {
99 if filter.is_expired() {
100 error!({
101 "class" = &job.class,
102 "jid" = &job.jid,
103 "user_guid" = filter.user_guid
104 }, "Detected an expired user, skipping this job");
105 return Ok(());
106 }
107 }
108
109 chain.next(job, worker, redis).await
110 }
111}
112
113#[tokio::main]
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
Sourcepub fn unique_for(self, unique_for: Duration) -> Self
pub fn unique_for(self, unique_for: Duration) -> Self
Examples found in repository?
examples/unique.rs (line 15)
11 fn opts() -> sidekiq::WorkerOpts<CustomerNotification, Self> {
12 // Use default options to set the unique_for option by default.
13 sidekiq::WorkerOpts::new()
14 .queue("customers")
15 .unique_for(std::time::Duration::from_secs(30))
16 }
17
18 async fn perform(&self, _args: CustomerNotification) -> Result<()> {
19 Ok(())
20 }
21}
22
23#[derive(Deserialize, Debug, Serialize)]
24struct CustomerNotification {
25 customer_guid: String,
26}
27
28#[tokio::main]
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
pub fn into_opts(&self) -> EnqueueOpts
Sourcepub async fn perform_async(
&self,
redis: &RedisPool,
args: impl Serialize + Send + 'static,
) -> Result<()>
pub async fn perform_async( &self, redis: &RedisPool, args: impl Serialize + Send + 'static, ) -> Result<()>
Examples found in repository?
examples/unique.rs (lines 59-64)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
More examples
examples/demo.rs (lines 160-165)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
pub async fn perform_in( &self, redis: &RedisPool, duration: Duration, args: impl Serialize + Send + 'static, ) -> Result<()>
Trait Implementations§
Source§impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W>
impl<Args, W: Worker<Args>> Default for WorkerOpts<Args, W>
Source§impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts
impl<Args, W: Worker<Args>> From<&WorkerOpts<Args, W>> for EnqueueOpts
Source§fn from(opts: &WorkerOpts<Args, W>) -> Self
fn from(opts: &WorkerOpts<Args, W>) -> Self
Converts to this type from the input type.
Auto Trait Implementations§
impl<Args, W> Freeze for WorkerOpts<Args, W>where
W: ?Sized,
impl<Args, W> RefUnwindSafe for WorkerOpts<Args, W>
impl<Args, W> Send for WorkerOpts<Args, W>
impl<Args, W> Sync for WorkerOpts<Args, W>
impl<Args, W> Unpin for WorkerOpts<Args, W>
impl<Args, W> UnwindSafe for WorkerOpts<Args, W>
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more