pub struct Processor { /* private fields */ }
Implementations§
Source§impl Processor
impl Processor
Sourcepub fn new(redis: RedisPool, queues: Vec<String>) -> Self
pub fn new(redis: RedisPool, queues: Vec<String>) -> Self
Examples found in repository?
examples/namespaced_demo.rs (line 42)
18async fn main() -> Result<()> {
19 tracing_subscriber::fmt::init();
20
21 // Redis
22 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
23 let redis = Pool::builder()
24 .max_size(100)
25 .connection_customizer(sidekiq::with_custom_namespace("yolo_app".to_string()))
26 .build(manager)
27 .await?;
28
29 tokio::spawn({
30 let redis = redis.clone();
31
32 async move {
33 loop {
34 HelloWorker::perform_async(&redis, ()).await.unwrap();
35
36 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
37 }
38 }
39 });
40
41 // Sidekiq server
42 let mut p = Processor::new(redis.clone(), vec!["default".to_string()]);
43
44 // Add known workers
45 p.register(HelloWorker);
46
47 // Start!
48 p.run().await;
49 Ok(())
50}
More examples
examples/unique.rs (line 37)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
examples/demo.rs (line 202)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
examples/consumer-demo.rs (line 179)
91async fn main() -> Result<()> {
92 tracing_subscriber::fmt::init();
93
94 // Redis
95 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96 let redis = Pool::builder().build(manager).await?;
97 //
98 // tokio::spawn({
99 // let mut redis = redis.clone();
100 //
101 // async move {
102 // loop {
103 // PaymentReportWorker::perform_async(
104 // &mut redis,
105 // PaymentReportArgs {
106 // user_guid: "USR-123".into(),
107 // },
108 // )
109 // .await
110 // .unwrap();
111 //
112 // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 // }
114 // }
115 // });
116 //
117 // // Enqueue a job with the worker! There are many ways to do this.
118 // PaymentReportWorker::perform_async(
119 // &mut redis,
120 // PaymentReportArgs {
121 // user_guid: "USR-123".into(),
122 // },
123 // )
124 // .await?;
125 //
126 // PaymentReportWorker::perform_in(
127 // &mut redis,
128 // std::time::Duration::from_secs(10),
129 // PaymentReportArgs {
130 // user_guid: "USR-123".into(),
131 // },
132 // )
133 // .await?;
134 //
135 // PaymentReportWorker::opts()
136 // .queue("brolo")
137 // .perform_async(
138 // &mut redis,
139 // PaymentReportArgs {
140 // user_guid: "USR-123-EXPIRED".into(),
141 // },
142 // )
143 // .await?;
144 //
145 // sidekiq::perform_async(
146 // &mut redis,
147 // "PaymentReportWorker".into(),
148 // "yolo".into(),
149 // PaymentReportArgs {
150 // user_guid: "USR-123".to_string(),
151 // },
152 // )
153 // .await?;
154 //
155 // // Enqueue a job
156 // sidekiq::perform_async(
157 // &mut redis,
158 // "PaymentReportWorker".into(),
159 // "yolo".into(),
160 // PaymentReportArgs {
161 // user_guid: "USR-123".to_string(),
162 // },
163 // )
164 // .await?;
165 //
166 // // Enqueue a job with options
167 // sidekiq::opts()
168 // .queue("yolo".to_string())
169 // .perform_async(
170 // &mut redis,
171 // "PaymentReportWorker".into(),
172 // PaymentReportArgs {
173 // user_guid: "USR-123".to_string(),
174 // },
175 // )
176 // .await?;
177
178 // Sidekiq server
179 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181 // Add known workers
182 p.register(HelloWorker);
183 p.register(PaymentReportWorker);
184
185 // Custom Middlewares
186 p.using(FilterExpiredUsersMiddleware).await;
187
188 // // Reset cron jobs
189 // periodic::destroy_all(redis.clone()).await?;
190 //
191 // // Cron jobs
192 // periodic::builder("0 * * * * *")?
193 // .name("Payment report processing for a random user")
194 // .queue("yolo")
195 // //.args(PaymentReportArgs {
196 // // user_guid: "USR-123-PERIODIC".to_string(),
197 // //})?
198 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
200 // .await?;
201
202 p.run().await;
203 Ok(())
204}
pub fn with_config(self, config: ProcessorConfig) -> Self
pub async fn fetch(&mut self) -> Result<Option<UnitOfWork>>
pub async fn process_one(&mut self) -> Result<()>
pub async fn process_one_tick_once(&mut self) -> Result<WorkFetcher>
Sourcepub fn register<Args: Sync + Send + for<'de> Deserialize<'de> + 'static, W: Worker<Args> + 'static>(
&mut self,
worker: W,
)
pub fn register<Args: Sync + Send + for<'de> Deserialize<'de> + 'static, W: Worker<Args> + 'static>( &mut self, worker: W, )
Examples found in repository?
examples/namespaced_demo.rs (line 45)
18async fn main() -> Result<()> {
19 tracing_subscriber::fmt::init();
20
21 // Redis
22 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
23 let redis = Pool::builder()
24 .max_size(100)
25 .connection_customizer(sidekiq::with_custom_namespace("yolo_app".to_string()))
26 .build(manager)
27 .await?;
28
29 tokio::spawn({
30 let redis = redis.clone();
31
32 async move {
33 loop {
34 HelloWorker::perform_async(&redis, ()).await.unwrap();
35
36 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
37 }
38 }
39 });
40
41 // Sidekiq server
42 let mut p = Processor::new(redis.clone(), vec!["default".to_string()]);
43
44 // Add known workers
45 p.register(HelloWorker);
46
47 // Start!
48 p.run().await;
49 Ok(())
50}
More examples
examples/unique.rs (line 40)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
examples/demo.rs (line 205)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
examples/consumer-demo.rs (line 182)
91async fn main() -> Result<()> {
92 tracing_subscriber::fmt::init();
93
94 // Redis
95 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96 let redis = Pool::builder().build(manager).await?;
97 //
98 // tokio::spawn({
99 // let mut redis = redis.clone();
100 //
101 // async move {
102 // loop {
103 // PaymentReportWorker::perform_async(
104 // &mut redis,
105 // PaymentReportArgs {
106 // user_guid: "USR-123".into(),
107 // },
108 // )
109 // .await
110 // .unwrap();
111 //
112 // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 // }
114 // }
115 // });
116 //
117 // // Enqueue a job with the worker! There are many ways to do this.
118 // PaymentReportWorker::perform_async(
119 // &mut redis,
120 // PaymentReportArgs {
121 // user_guid: "USR-123".into(),
122 // },
123 // )
124 // .await?;
125 //
126 // PaymentReportWorker::perform_in(
127 // &mut redis,
128 // std::time::Duration::from_secs(10),
129 // PaymentReportArgs {
130 // user_guid: "USR-123".into(),
131 // },
132 // )
133 // .await?;
134 //
135 // PaymentReportWorker::opts()
136 // .queue("brolo")
137 // .perform_async(
138 // &mut redis,
139 // PaymentReportArgs {
140 // user_guid: "USR-123-EXPIRED".into(),
141 // },
142 // )
143 // .await?;
144 //
145 // sidekiq::perform_async(
146 // &mut redis,
147 // "PaymentReportWorker".into(),
148 // "yolo".into(),
149 // PaymentReportArgs {
150 // user_guid: "USR-123".to_string(),
151 // },
152 // )
153 // .await?;
154 //
155 // // Enqueue a job
156 // sidekiq::perform_async(
157 // &mut redis,
158 // "PaymentReportWorker".into(),
159 // "yolo".into(),
160 // PaymentReportArgs {
161 // user_guid: "USR-123".to_string(),
162 // },
163 // )
164 // .await?;
165 //
166 // // Enqueue a job with options
167 // sidekiq::opts()
168 // .queue("yolo".to_string())
169 // .perform_async(
170 // &mut redis,
171 // "PaymentReportWorker".into(),
172 // PaymentReportArgs {
173 // user_guid: "USR-123".to_string(),
174 // },
175 // )
176 // .await?;
177
178 // Sidekiq server
179 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181 // Add known workers
182 p.register(HelloWorker);
183 p.register(PaymentReportWorker);
184
185 // Custom Middlewares
186 p.using(FilterExpiredUsersMiddleware).await;
187
188 // // Reset cron jobs
189 // periodic::destroy_all(redis.clone()).await?;
190 //
191 // // Cron jobs
192 // periodic::builder("0 * * * * *")?
193 // .name("Payment report processing for a random user")
194 // .queue("yolo")
195 // //.args(PaymentReportArgs {
196 // // user_guid: "USR-123-PERIODIC".to_string(),
197 // //})?
198 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
200 // .await?;
201
202 p.run().await;
203 Ok(())
204}
pub fn get_cancellation_token(&self) -> CancellationToken
Sourcepub async fn run(self)
pub async fn run(self)
Takes self to consume the processor. This is for life-cycle management, not memory safety because you can clone processor pretty easily.
Examples found in repository?
examples/namespaced_demo.rs (line 48)
18async fn main() -> Result<()> {
19 tracing_subscriber::fmt::init();
20
21 // Redis
22 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
23 let redis = Pool::builder()
24 .max_size(100)
25 .connection_customizer(sidekiq::with_custom_namespace("yolo_app".to_string()))
26 .build(manager)
27 .await?;
28
29 tokio::spawn({
30 let redis = redis.clone();
31
32 async move {
33 loop {
34 HelloWorker::perform_async(&redis, ()).await.unwrap();
35
36 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
37 }
38 }
39 });
40
41 // Sidekiq server
42 let mut p = Processor::new(redis.clone(), vec!["default".to_string()]);
43
44 // Add known workers
45 p.register(HelloWorker);
46
47 // Start!
48 p.run().await;
49 Ok(())
50}
More examples
examples/unique.rs (line 67)
29async fn main() -> Result<()> {
30 tracing_subscriber::fmt::init();
31
32 // Redis
33 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
34 let redis = Pool::builder().build(manager).await?;
35
36 // Sidekiq server
37 let mut p = Processor::new(redis.clone(), vec!["customers".to_string()]);
38
39 // Add known workers
40 p.register(CustomerNotificationWorker);
41
42 // Create a bunch of jobs with the default uniqueness options. Only
43 // one of these should be created within a 30 second period.
44 for _ in 1..10 {
45 CustomerNotificationWorker::perform_async(
46 &redis,
47 CustomerNotification {
48 customer_guid: "CST-123".to_string(),
49 },
50 )
51 .await?;
52 }
53
54 // Override the unique_for option. Note: Because the code above
55 // uses the default unique_for value of 30, this code is essentially
56 // a no-op.
57 CustomerNotificationWorker::opts()
58 .unique_for(std::time::Duration::from_secs(90))
59 .perform_async(
60 &redis,
61 CustomerNotification {
62 customer_guid: "CST-123".to_string(),
63 },
64 )
65 .await?;
66
67 p.run().await;
68 Ok(())
69}
examples/demo.rs (line 231)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
examples/consumer-demo.rs (line 202)
91async fn main() -> Result<()> {
92 tracing_subscriber::fmt::init();
93
94 // Redis
95 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96 let redis = Pool::builder().build(manager).await?;
97 //
98 // tokio::spawn({
99 // let mut redis = redis.clone();
100 //
101 // async move {
102 // loop {
103 // PaymentReportWorker::perform_async(
104 // &mut redis,
105 // PaymentReportArgs {
106 // user_guid: "USR-123".into(),
107 // },
108 // )
109 // .await
110 // .unwrap();
111 //
112 // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 // }
114 // }
115 // });
116 //
117 // // Enqueue a job with the worker! There are many ways to do this.
118 // PaymentReportWorker::perform_async(
119 // &mut redis,
120 // PaymentReportArgs {
121 // user_guid: "USR-123".into(),
122 // },
123 // )
124 // .await?;
125 //
126 // PaymentReportWorker::perform_in(
127 // &mut redis,
128 // std::time::Duration::from_secs(10),
129 // PaymentReportArgs {
130 // user_guid: "USR-123".into(),
131 // },
132 // )
133 // .await?;
134 //
135 // PaymentReportWorker::opts()
136 // .queue("brolo")
137 // .perform_async(
138 // &mut redis,
139 // PaymentReportArgs {
140 // user_guid: "USR-123-EXPIRED".into(),
141 // },
142 // )
143 // .await?;
144 //
145 // sidekiq::perform_async(
146 // &mut redis,
147 // "PaymentReportWorker".into(),
148 // "yolo".into(),
149 // PaymentReportArgs {
150 // user_guid: "USR-123".to_string(),
151 // },
152 // )
153 // .await?;
154 //
155 // // Enqueue a job
156 // sidekiq::perform_async(
157 // &mut redis,
158 // "PaymentReportWorker".into(),
159 // "yolo".into(),
160 // PaymentReportArgs {
161 // user_guid: "USR-123".to_string(),
162 // },
163 // )
164 // .await?;
165 //
166 // // Enqueue a job with options
167 // sidekiq::opts()
168 // .queue("yolo".to_string())
169 // .perform_async(
170 // &mut redis,
171 // "PaymentReportWorker".into(),
172 // PaymentReportArgs {
173 // user_guid: "USR-123".to_string(),
174 // },
175 // )
176 // .await?;
177
178 // Sidekiq server
179 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181 // Add known workers
182 p.register(HelloWorker);
183 p.register(PaymentReportWorker);
184
185 // Custom Middlewares
186 p.using(FilterExpiredUsersMiddleware).await;
187
188 // // Reset cron jobs
189 // periodic::destroy_all(redis.clone()).await?;
190 //
191 // // Cron jobs
192 // periodic::builder("0 * * * * *")?
193 // .name("Payment report processing for a random user")
194 // .queue("yolo")
195 // //.args(PaymentReportArgs {
196 // // user_guid: "USR-123-PERIODIC".to_string(),
197 // //})?
198 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
200 // .await?;
201
202 p.run().await;
203 Ok(())
204}
Sourcepub async fn using<M>(&mut self, middleware: M)
pub async fn using<M>(&mut self, middleware: M)
Examples found in repository?
examples/demo.rs (line 209)
114async fn main() -> Result<()> {
115 tracing_subscriber::fmt().with_max_level(Level::INFO).init();
116
117 // Redis
118 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
119 let redis = Pool::builder().build(manager).await?;
120
121 tokio::spawn({
122 let redis = redis.clone();
123
124 async move {
125 loop {
126 PaymentReportWorker::perform_async(
127 &redis,
128 PaymentReportArgs {
129 user_guid: "USR-123".into(),
130 },
131 )
132 .await
133 .unwrap();
134
135 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
136 }
137 }
138 });
139
140 // Enqueue a job with the worker! There are many ways to do this.
141 PaymentReportWorker::perform_async(
142 &redis,
143 PaymentReportArgs {
144 user_guid: "USR-123".into(),
145 },
146 )
147 .await?;
148
149 PaymentReportWorker::perform_in(
150 &redis,
151 std::time::Duration::from_secs(10),
152 PaymentReportArgs {
153 user_guid: "USR-123".into(),
154 },
155 )
156 .await?;
157
158 PaymentReportWorker::opts()
159 .queue("brolo")
160 .perform_async(
161 &redis,
162 PaymentReportArgs {
163 user_guid: "USR-123-EXPIRED".into(),
164 },
165 )
166 .await?;
167
168 sidekiq::perform_async(
169 &redis,
170 "PaymentReportWorker".into(),
171 "yolo".into(),
172 PaymentReportArgs {
173 user_guid: "USR-123".to_string(),
174 },
175 )
176 .await?;
177
178 // Enqueue a job
179 sidekiq::perform_async(
180 &redis,
181 "PaymentReportWorker".into(),
182 "yolo".into(),
183 PaymentReportArgs {
184 user_guid: "USR-123".to_string(),
185 },
186 )
187 .await?;
188
189 // Enqueue a job with options
190 sidekiq::opts()
191 .queue("yolo".to_string())
192 .perform_async(
193 &redis,
194 "PaymentReportWorker".into(),
195 PaymentReportArgs {
196 user_guid: "USR-123".to_string(),
197 },
198 )
199 .await?;
200
201 // Sidekiq server
202 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
203
204 // Add known workers
205 p.register(HelloWorker);
206 p.register(PaymentReportWorker::new(redis.clone()));
207
208 // Custom Middlewares
209 p.using(FilterExpiredUsersMiddleware).await;
210
211 // Reset cron jobs
212 periodic::destroy_all(redis.clone()).await?;
213
214 // Cron jobs
215 periodic::builder("0 * * * * *")?
216 .name("Payment report processing for a user using json args")
217 .queue("yolo")
218 .args(json!({ "user_guid": "USR-123-PERIODIC-FROM-JSON-ARGS" }))?
219 .register(&mut p, PaymentReportWorker::new(redis.clone()))
220 .await?;
221
222 periodic::builder("0 * * * * *")?
223 .name("Payment report processing for a user using typed args")
224 .queue("yolo")
225 .args(PaymentReportArgs {
226 user_guid: "USR-123-PERIODIC-FROM-TYPED-ARGS".to_string(),
227 })?
228 .register(&mut p, PaymentReportWorker::new(redis.clone()))
229 .await?;
230
231 p.run().await;
232 Ok(())
233}
More examples
examples/consumer-demo.rs (line 186)
91async fn main() -> Result<()> {
92 tracing_subscriber::fmt::init();
93
94 // Redis
95 let manager = RedisConnectionManager::new("redis://127.0.0.1/")?;
96 let redis = Pool::builder().build(manager).await?;
97 //
98 // tokio::spawn({
99 // let mut redis = redis.clone();
100 //
101 // async move {
102 // loop {
103 // PaymentReportWorker::perform_async(
104 // &mut redis,
105 // PaymentReportArgs {
106 // user_guid: "USR-123".into(),
107 // },
108 // )
109 // .await
110 // .unwrap();
111 //
112 // tokio::time::sleep(std::time::Duration::from_secs(1)).await;
113 // }
114 // }
115 // });
116 //
117 // // Enqueue a job with the worker! There are many ways to do this.
118 // PaymentReportWorker::perform_async(
119 // &mut redis,
120 // PaymentReportArgs {
121 // user_guid: "USR-123".into(),
122 // },
123 // )
124 // .await?;
125 //
126 // PaymentReportWorker::perform_in(
127 // &mut redis,
128 // std::time::Duration::from_secs(10),
129 // PaymentReportArgs {
130 // user_guid: "USR-123".into(),
131 // },
132 // )
133 // .await?;
134 //
135 // PaymentReportWorker::opts()
136 // .queue("brolo")
137 // .perform_async(
138 // &mut redis,
139 // PaymentReportArgs {
140 // user_guid: "USR-123-EXPIRED".into(),
141 // },
142 // )
143 // .await?;
144 //
145 // sidekiq::perform_async(
146 // &mut redis,
147 // "PaymentReportWorker".into(),
148 // "yolo".into(),
149 // PaymentReportArgs {
150 // user_guid: "USR-123".to_string(),
151 // },
152 // )
153 // .await?;
154 //
155 // // Enqueue a job
156 // sidekiq::perform_async(
157 // &mut redis,
158 // "PaymentReportWorker".into(),
159 // "yolo".into(),
160 // PaymentReportArgs {
161 // user_guid: "USR-123".to_string(),
162 // },
163 // )
164 // .await?;
165 //
166 // // Enqueue a job with options
167 // sidekiq::opts()
168 // .queue("yolo".to_string())
169 // .perform_async(
170 // &mut redis,
171 // "PaymentReportWorker".into(),
172 // PaymentReportArgs {
173 // user_guid: "USR-123".to_string(),
174 // },
175 // )
176 // .await?;
177
178 // Sidekiq server
179 let mut p = Processor::new(redis.clone(), vec!["yolo".to_string(), "brolo".to_string()]);
180
181 // Add known workers
182 p.register(HelloWorker);
183 p.register(PaymentReportWorker);
184
185 // Custom Middlewares
186 p.using(FilterExpiredUsersMiddleware).await;
187
188 // // Reset cron jobs
189 // periodic::destroy_all(redis.clone()).await?;
190 //
191 // // Cron jobs
192 // periodic::builder("0 * * * * *")?
193 // .name("Payment report processing for a random user")
194 // .queue("yolo")
195 // //.args(PaymentReportArgs {
196 // // user_guid: "USR-123-PERIODIC".to_string(),
197 // //})?
198 // .args(json!({ "user_guid": "USR-123-PERIODIC" }))?
199 // .register(&mut p, PaymentReportWorker::new(logger.clone()))
200 // .await?;
201
202 p.run().await;
203 Ok(())
204}
Trait Implementations§
Auto Trait Implementations§
impl Freeze for Processor
impl !RefUnwindSafe for Processor
impl Send for Processor
impl Sync for Processor
impl Unpin for Processor
impl !UnwindSafe for Processor
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
Mutably borrows from an owned value. Read more