pub trait Worker<Args>: Send + Sync {
// Required method
fn perform<'life0, 'async_trait>(
&'life0 self,
args: Args,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: 'async_trait,
'life0: 'async_trait;
// Provided methods
fn disable_argument_coercion(&self) -> bool { ... }
fn opts() -> WorkerOpts<Args, Self>
where Self: Sized { ... }
fn max_retries(&self) -> usize { ... }
fn class_name() -> String
where Self: Sized { ... }
fn perform_async<'life0, 'async_trait>(
redis: &'life0 RedisPool,
args: Args,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: Sized + 'async_trait,
Args: Send + Sync + Serialize + 'static,
'life0: 'async_trait { ... }
fn perform_in<'life0, 'async_trait>(
redis: &'life0 RedisPool,
duration: Duration,
args: Args,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
where Self: Sized + 'async_trait,
Args: Send + Sync + Serialize + 'static,
'life0: 'async_trait { ... }
}
Required Methods§
fn perform<'life0, 'async_trait>(
&'life0 self,
args: Args,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Provided Methods§
Sourcefn disable_argument_coercion(&self) -> bool
fn disable_argument_coercion(&self) -> bool
Signal to WorkerRef to not attempt to modify the JsonValue args
before calling the perform function. This is useful if the args
are expected to be a Vec<T>
that might be len() == 1
or a
single sized tuple (T,)
.
Sourcefn opts() -> WorkerOpts<Args, Self>where
Self: Sized,
fn opts() -> WorkerOpts<Args, Self>where
Self: Sized,
Examples found in repository?
examples/unique.rs (line 57)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
More examples
examples/demo.rs (line 158)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
fn max_retries(&self) -> usize
Sourcefn class_name() -> Stringwhere
Self: Sized,
fn class_name() -> Stringwhere
Self: Sized,
Derive a class_name from the Worker type to be used with sidekiq. By default this method will
Examples found in repository?
More examples
Sourcefn perform_async<'life0, 'async_trait>(
redis: &'life0 RedisPool,
args: Args,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
fn perform_async<'life0, 'async_trait>( redis: &'life0 RedisPool, args: Args, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
Examples found in repository?
examples/namespaced_demo.rs (line 34)
18async fn main() -> Result<()> {
19 tracing_subscriber::fmt::init();
20
21 // Redis
22 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
23 let redis = Pool::builder()
24 .max_size(100)
25 .connection_customizer(sidekiq::with_custom_namespace("yolo_app".to_string()))
26 .build(manager)
27 .await?;
28
29 tokio::spawn({
30 let redis = redis.clone();
31
32 async move {
33 loop {
34 HelloWorker::perform_async(&redis, ()).await.unwrap();
35
36 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
37 }
38 }
39 });
40
41 // Sidekiq server
42 let mut p = Processor::new(redis.clone(), vec!["default".to_string()]);
43
44 // Add known workers
45 p.register(HelloWorker);
46
47 // Start!
48 p.run().await;
49 Ok(())
50}
More examples
examples/unique.rs (lines 45-50)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
examples/demo.rs (lines 126-131)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
examples/producer-demo.rs (lines 105-110)
93async fn main() -> Result<()> {
94 tracing_subscriber::fmt::init();
95
96 // Redis
97 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
98 let redis = Pool::builder().build(manager).await?;
99
100 let mut n = 0;
101 let mut last = 0;
102 let mut then = std::time::Instant::now();
103
104 loop {
105 PaymentReportWorker::perform_async(
106 &redis,
107 PaymentReportArgs {
108 user_guid: "USR-123".into(),
109 },
110 )
111 .await
112 .unwrap();
113
114 //tokio::time::sleep(std::time::Duration::from_millis(1)).await;
115
116 n += 1;
117
118 if n % 100000 == 0 {
119 let now = std::time::Instant::now();
120 let delta = n - last;
121 last = n;
122 let delta_time = now - then;
123 if delta_time.as_secs() == 0 {
124 continue;
125 }
126 then = now;
127 let rate = delta / delta_time.as_secs();
128 println!("Iterations since last: {delta} at a rate of: {rate} iter/sec");
129 }
130 }
131
132 // // Enqueue a job with the worker! There are many ways to do this.
133 // PaymentReportWorker::perform_async(
134 // &mut redis,
135 // PaymentReportArgs {
136 // user_guid: "USR-123".into(),
137 // },
138 // )
139 // .await?;
140 //
141 // PaymentReportWorker::perform_in(
142 // &mut redis,
143 // std::time::Duration::from_secs(10),
144 // PaymentReportArgs {
145 // user_guid: "USR-123".into(),
146 // },
147 // )
148 // .await?;
149 //
150 // PaymentReportWorker::opts()
151 // .queue("brolo")
152 // .perform_async(
153 // &mut redis,
154 // PaymentReportArgs {
155 // user_guid: "USR-123-EXPIRED".into(),
156 // },
157 // )
158 // .await?;
159 //
160 // sidekiq::perform_async(
161 // &mut redis,
162 // "PaymentReportWorker".into(),
163 // "yolo".into(),
164 // PaymentReportArgs {
165 // user_guid: "USR-123".to_string(),
166 // },
167 // )
168 // .await?;
169 //
170 // // Enqueue a job
171 // sidekiq::perform_async(
172 // &mut redis,
173 // "PaymentReportWorker".into(),
174 // "yolo".into(),
175 // PaymentReportArgs {
176 // user_guid: "USR-123".to_string(),
177 // },
178 // )
179 // .await?;
180 //
181 // // Enqueue a job with options
182 // sidekiq::opts()
183 // .queue("yolo".to_string())
184 // .perform_async(
185 // &mut redis,
186 // "PaymentReportWorker".into(),
187 // PaymentReportArgs {
188 // user_guid: "USR-123".to_string(),
189 // },
190 // )
191 // .await?;
192
193 // // Sidekiq server
194 // let mut p = Processor::new(
195 // redis.clone(),
196 // logger.clone(),
197 // //vec!["yolo".to_string(), "brolo".to_string()],
198 // vec![],
199 // );
200 //
201 // // // Add known workers
202 // // p.register(HelloWorker);
203 // // p.register(PaymentReportWorker::new(logger.clone()));
204 // //
205 // // Custom Middlewares
206 // p.using(FilterExpiredUsersMiddleware::new(logger.clone()))
207 // .await;
208 //
209 // // Reset cron jobs
210 // periodic::destroy_all(redis.clone()).await?;
211 //
212 // // Cron jobs
213 // periodic::builder("0 * * * * *")?
214 // .name("Payment report processing for a random user")
215 // .queue("yolo")
216 // //.args(PaymentReportArgs {
217 // // user_guid: "USR-123-PERIODIC".to_string(),
218 // //})?
219 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
220 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
221 // .await?;
222 //
223 // p.run().await;
224}
Sourcefn perform_in<'life0, 'async_trait>(
redis: &'life0 RedisPool,
duration: Duration,
args: Args,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
fn perform_in<'life0, 'async_trait>( redis: &'life0 RedisPool, duration: Duration, args: Args, ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>
Examples found in repository?
examples/demo.rs (lines 149-155)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}