pub struct JobRequest<T> { /* private fields */ }
Expand description

Represents a job which can be serialized and executed

Implementations§

Creates a new JobRequest

Creates a Job request with context provided

Get the underlying reference of the [Job]

Examples found in repository?
src/storage/worker.rs (line 157)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Gets a mutable reference to the job context.

Examples found in repository?
src/layers/extensions/mod.rs (line 81)
80
81
82
83
    fn call(&mut self, mut req: JobRequest<J>) -> Self::Future {
        req.context_mut().insert(self.value.clone());
        self.inner.call(req)
    }

Gets a reference to the job context.

Records a job attempt

Examples found in repository?
src/layers/retry/mod.rs (line 39)
37
38
39
40
41
    fn clone_request(&self, req: &Req<T>) -> Option<Req<T>> {
        let mut req = req.clone();
        req.record_attempt();
        Some(req)
    }
More examples
Hide additional examples
src/storage/worker.rs (line 147)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Methods from Deref<Target = JobContext>§

Get a reference to a type previously inserted on this JobContext.

Example
let mut ctx = JobContext::new(1.to_string());
assert!(ctx.data_opt::<i32>().is_none());
ctx.insert(5i32);

assert_eq!(ctx.data_opt::<i32>(), Some(&5i32));

Insert a type into this JobContext.

Important for embedding data for a job. If a extension of this type already existed, it will be returned.

Example
let mut ctx = JobContext::new(1.to_string());
assert!(ctx.insert(5i32).is_none());
assert!(ctx.insert(4u8).is_none());
assert_eq!(ctx.insert(9i32), Some(5i32));
Examples found in repository?
src/layers/extensions/mod.rs (line 81)
80
81
82
83
    fn call(&mut self, mut req: JobRequest<J>) -> Self::Future {
        req.context_mut().insert(self.value.clone());
        self.inner.call(req)
    }

Set the number of attempts

Gets the maximum attempts for a job. Default 25

Examples found in repository?
src/layers/retry/mod.rs (line 30)
23
24
25
26
27
28
29
30
31
32
33
34
35
    fn retry(&self, req: &Req<T>, result: Result<&Res, &Err>) -> Option<Self::Future> {
        match result {
            Ok(_) => {
                // Treat all `Response`s as success,
                // so don't retry...
                None
            }
            Err(_) if (req.max_attempts() - req.attempts() > 0) => {
                Some(future::ready(DefaultRetryPolicy))
            }
            Err(_) => None,
        }
    }

Get the id for a job

Examples found in repository?
src/layers/sentry/mod.rs (line 154)
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    fn call(&mut self, request: JobRequest<J>) -> Self::Future {
        let op = J::NAME;
        let trx_ctx = sentry_core::TransactionContext::new(op, "apalis.job");
        let job_type = std::any::type_name::<J>().to_string();
        let job_details = JobDetails {
            job_id: request.id(),
            current_attempt: request.attempts(),
            job_type,
        };

        SentryHttpFuture {
            on_first_poll: Some((job_details, trx_ctx)),
            transaction: None,
            future: self.service.call(request),
        }
    }
More examples
Hide additional examples
src/storage/worker.rs (line 144)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Gets the current attempts for a job. Default 0

Examples found in repository?
src/request.rs (line 72)
71
72
73
    pub fn record_attempt(&mut self) {
        self.context.set_attempts(self.context.attempts() + 1);
    }
More examples
Hide additional examples
src/layers/retry/mod.rs (line 30)
23
24
25
26
27
28
29
30
31
32
33
34
35
    fn retry(&self, req: &Req<T>, result: Result<&Res, &Err>) -> Option<Self::Future> {
        match result {
            Ok(_) => {
                // Treat all `Response`s as success,
                // so don't retry...
                None
            }
            Err(_) if (req.max_attempts() - req.attempts() > 0) => {
                Some(future::ready(DefaultRetryPolicy))
            }
            Err(_) => None,
        }
    }
src/layers/sentry/mod.rs (line 155)
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
    fn call(&mut self, request: JobRequest<J>) -> Self::Future {
        let op = J::NAME;
        let trx_ctx = sentry_core::TransactionContext::new(op, "apalis.job");
        let job_type = std::any::type_name::<J>().to_string();
        let job_details = JobDetails {
            job_id: request.id(),
            current_attempt: request.attempts(),
            job_type,
        };

        SentryHttpFuture {
            on_first_poll: Some((job_details, trx_ctx)),
            transaction: None,
            future: self.service.call(request),
        }
    }

Set the number of attempts

Examples found in repository?
src/request.rs (line 72)
71
72
73
    pub fn record_attempt(&mut self) {
        self.context.set_attempts(self.context.attempts() + 1);
    }

Get the time a job was done

Set the time a job was done

Examples found in repository?
src/storage/worker.rs (line 163)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Get the time a job is supposed to start

Set the time a job should run

Get the time a job was locked

Set the lock_at value

Examples found in repository?
src/storage/worker.rs (line 146)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Get the job status

Set the job status

Examples found in repository?
src/storage/worker.rs (line 145)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Get the time a job was locked

Set lock_by

Examples found in repository?
src/storage/worker.rs (line 148)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Get the time a job was locked

Set the last error

Examples found in repository?
src/storage/worker.rs (line 186)
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
    async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
        let instant = Instant::now();
        let mut storage = self.storage.clone();
        let worker_id = self.id.to_string();
        let handle = self.service().ready().await?;
        let job_id = job.id();
        job.set_status(JobState::Running);
        job.set_lock_at(Some(Utc::now()));
        job.record_attempt();
        job.set_lock_by(Some(worker_id.clone()));
        if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
            #[cfg(feature = "broker")]
            Broker::global()
                .issue_send(WorkerMessage::new(
                    worker_id.clone(),
                    WorkerEvent::Error(format!("{}", e)),
                ))
                .await;
            T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
        };
        T::on_service_ready(job.inner(), &job, instant.elapsed());
        let res = handle.call(job).await;

        if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
            job.set_done_at(Some(Utc::now()));
            let finalize = match res {
                Ok(ref r) => match r {
                    JobResult::Success => {
                        job.set_status(JobState::Done);
                        storage.ack(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Retry => {
                        job.set_status(JobState::Retry);
                        storage.retry(worker_id.clone(), job_id.clone()).await
                    }
                    JobResult::Kill => {
                        job.set_status(JobState::Killed);
                        storage.kill(worker_id.clone(), job_id.clone()).await
                    }

                    JobResult::Reschedule(wait) => {
                        job.set_status(JobState::Retry);
                        storage.reschedule(&job, *wait).await
                    }
                },
                Err(ref e) => {
                    job.set_status(JobState::Failed);
                    job.set_last_error(format!("{}", e));

                    #[cfg(feature = "broker")]
                    Broker::global()
                        .issue_send(WorkerMessage::new(
                            worker_id.clone(),
                            WorkerEvent::Error(format!("{}", e)),
                        ))
                        .await;
                    // let base: i32 = 2; // an explicit type is required
                    // let millis = base.pow(job.attempts());
                    storage.reschedule(&job, Duration::from_millis(10000)).await
                }
            };
            if let Err(e) = finalize {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            }
            if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
                #[cfg(feature = "broker")]
                Broker::global()
                    .issue_send(WorkerMessage::new(
                        worker_id.clone(),
                        WorkerEvent::Error(format!("{}", e)),
                    ))
                    .await;
                T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
            };
        }

        res
    }

Trait Implementations§

Returns a copy of the value. Read more
Performs copy-assignment from source. Read more
Formats the value using the given formatter. Read more
The resulting type after dereferencing.
Dereferences the value.
Mutably dereferences the value.
Deserialize this value from the given Serde deserializer. Read more
The Future type returned by Policy::retry.
Check the policy if a certain request should be retried. Read more
Tries to clone a request before being passed to the inner service. Read more
Serialize this value into the given Serde serializer. Read more
Responses given by the service.
Errors produced by the service.
The future response value.
Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
Process the request and return the response asynchronously. Read more
Responses given by the service.
Errors produced by the service.
The future response value.
Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
Process the request and return the response asynchronously. Read more
Responses given by the service.
Errors produced by the service.
The future response value.
Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
Process the request and return the response asynchronously. Read more
Responses given by the service.
Errors produced by the service.
The future response value.
Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
Process the request and return the response asynchronously. Read more
Responses given by the service.
Errors produced by the service.
The future response value.
Returns Poll::Ready(Ok(())) when the service is able to process requests. Read more
Process the request and return the response asynchronously. Read more

Auto Trait Implementations§

Blanket Implementations§

Gets the TypeId of self. Read more
Immutably borrows from an owned value. Read more
Mutably borrows from an owned value. Read more

Returns the argument unchanged.

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Instruments this type with the current Span, returning an Instrumented wrapper. Read more

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

The alignment of pointer.
The type for initializers.
Initializes a with the given initializer. Read more
Dereferences the given pointer. Read more
Mutably dereferences the given pointer. Read more
Drops the object pointed to by the given pointer. Read more
The resulting type after obtaining ownership.
Creates owned data from borrowed data, usually by cloning. Read more
Uses borrowed data to replace owned data, usually by cloning. Read more
The type returned in the event of a conversion error.
Performs the conversion.
The type returned in the event of a conversion error.
Performs the conversion.
Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more