Struct apalis_core::request::JobRequest
source · pub struct JobRequest<T> { /* private fields */ }Expand description
Represents a job which can be serialized and executed
Implementations§
source§impl<T> JobRequest<T>
impl<T> JobRequest<T>
sourcepub fn new(job: T) -> Self
pub fn new(job: T) -> Self
Creates a new JobRequest
sourcepub fn new_with_context(job: T, ctx: JobContext) -> Self
pub fn new_with_context(job: T, ctx: JobContext) -> Self
Creates a Job request with context provided
sourcepub fn inner(&self) -> &T
pub fn inner(&self) -> &T
Get the underlying reference of the [Job]
Examples found in repository?
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}sourcepub fn context_mut(&mut self) -> &mut JobContext
pub fn context_mut(&mut self) -> &mut JobContext
Gets a mutable reference to the job context.
sourcepub fn context(&self) -> &JobContext
pub fn context(&self) -> &JobContext
Gets a reference to the job context.
sourcepub fn record_attempt(&mut self)
pub fn record_attempt(&mut self)
Records a job attempt
Examples found in repository?
More examples
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}Methods from Deref<Target = JobContext>§
sourcepub fn data_opt<D: Any + Send + Sync>(&self) -> Option<&D>
pub fn data_opt<D: Any + Send + Sync>(&self) -> Option<&D>
Get a reference to a type previously inserted on this JobContext.
Example
let mut ctx = JobContext::new(1.to_string());
assert!(ctx.data_opt::<i32>().is_none());
ctx.insert(5i32);
assert_eq!(ctx.data_opt::<i32>(), Some(&5i32));sourcepub fn insert<D: Any + Send + Sync>(&mut self, data: D) -> Option<D>
pub fn insert<D: Any + Send + Sync>(&mut self, data: D) -> Option<D>
Insert a type into this JobContext.
Important for embedding data for a job. If a extension of this type already existed, it will be returned.
Example
let mut ctx = JobContext::new(1.to_string());
assert!(ctx.insert(5i32).is_none());
assert!(ctx.insert(4u8).is_none());
assert_eq!(ctx.insert(9i32), Some(5i32));sourcepub fn set_max_attempts(&mut self, max_attempts: i32)
pub fn set_max_attempts(&mut self, max_attempts: i32)
Set the number of attempts
sourcepub fn max_attempts(&self) -> i32
pub fn max_attempts(&self) -> i32
Gets the maximum attempts for a job. Default 25
Examples found in repository?
23 24 25 26 27 28 29 30 31 32 33 34 35
fn retry(&self, req: &Req<T>, result: Result<&Res, &Err>) -> Option<Self::Future> {
match result {
Ok(_) => {
// Treat all `Response`s as success,
// so don't retry...
None
}
Err(_) if (req.max_attempts() - req.attempts() > 0) => {
Some(future::ready(DefaultRetryPolicy))
}
Err(_) => None,
}
}sourcepub fn id(&self) -> String
pub fn id(&self) -> String
Get the id for a job
Examples found in repository?
149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
fn call(&mut self, request: JobRequest<J>) -> Self::Future {
let op = J::NAME;
let trx_ctx = sentry_core::TransactionContext::new(op, "apalis.job");
let job_type = std::any::type_name::<J>().to_string();
let job_details = JobDetails {
job_id: request.id(),
current_attempt: request.attempts(),
job_type,
};
SentryHttpFuture {
on_first_poll: Some((job_details, trx_ctx)),
transaction: None,
future: self.service.call(request),
}
}More examples
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}sourcepub fn attempts(&self) -> i32
pub fn attempts(&self) -> i32
Gets the current attempts for a job. Default 0
Examples found in repository?
More examples
23 24 25 26 27 28 29 30 31 32 33 34 35
fn retry(&self, req: &Req<T>, result: Result<&Res, &Err>) -> Option<Self::Future> {
match result {
Ok(_) => {
// Treat all `Response`s as success,
// so don't retry...
None
}
Err(_) if (req.max_attempts() - req.attempts() > 0) => {
Some(future::ready(DefaultRetryPolicy))
}
Err(_) => None,
}
}149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164
fn call(&mut self, request: JobRequest<J>) -> Self::Future {
let op = J::NAME;
let trx_ctx = sentry_core::TransactionContext::new(op, "apalis.job");
let job_type = std::any::type_name::<J>().to_string();
let job_details = JobDetails {
job_id: request.id(),
current_attempt: request.attempts(),
job_type,
};
SentryHttpFuture {
on_first_poll: Some((job_details, trx_ctx)),
transaction: None,
future: self.service.call(request),
}
}sourcepub fn set_attempts(&mut self, attempts: i32)
pub fn set_attempts(&mut self, attempts: i32)
Set the number of attempts
sourcepub fn set_done_at(&mut self, done_at: Option<DateTime<Utc>>)
pub fn set_done_at(&mut self, done_at: Option<DateTime<Utc>>)
Set the time a job was done
Examples found in repository?
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}sourcepub fn set_run_at(&mut self, run_at: DateTime<Utc>)
pub fn set_run_at(&mut self, run_at: DateTime<Utc>)
Set the time a job should run
sourcepub fn set_lock_at(&mut self, lock_at: Option<DateTime<Utc>>)
pub fn set_lock_at(&mut self, lock_at: Option<DateTime<Utc>>)
Set the lock_at value
Examples found in repository?
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}sourcepub fn set_status(&mut self, status: JobState)
pub fn set_status(&mut self, status: JobState)
Set the job status
Examples found in repository?
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}sourcepub fn set_lock_by(&mut self, lock_by: Option<String>)
pub fn set_lock_by(&mut self, lock_by: Option<String>)
Set lock_by
Examples found in repository?
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}sourcepub fn last_error(&self) -> &Option<String>
pub fn last_error(&self) -> &Option<String>
Get the time a job was locked
sourcepub fn set_last_error(&mut self, error: String)
pub fn set_last_error(&mut self, error: String)
Set the last error
Examples found in repository?
139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223
async fn handle_job(&mut self, mut job: JobRequest<Self::Job>) -> Result<JobResult, JobError> {
let instant = Instant::now();
let mut storage = self.storage.clone();
let worker_id = self.id.to_string();
let handle = self.service().ready().await?;
let job_id = job.id();
job.set_status(JobState::Running);
job.set_lock_at(Some(Utc::now()));
job.record_attempt();
job.set_lock_by(Some(worker_id.clone()));
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
T::on_service_ready(job.inner(), &job, instant.elapsed());
let res = handle.call(job).await;
if let Ok(Some(mut job)) = storage.fetch_by_id(job_id.clone()).await {
job.set_done_at(Some(Utc::now()));
let finalize = match res {
Ok(ref r) => match r {
JobResult::Success => {
job.set_status(JobState::Done);
storage.ack(worker_id.clone(), job_id.clone()).await
}
JobResult::Retry => {
job.set_status(JobState::Retry);
storage.retry(worker_id.clone(), job_id.clone()).await
}
JobResult::Kill => {
job.set_status(JobState::Killed);
storage.kill(worker_id.clone(), job_id.clone()).await
}
JobResult::Reschedule(wait) => {
job.set_status(JobState::Retry);
storage.reschedule(&job, *wait).await
}
},
Err(ref e) => {
job.set_status(JobState::Failed);
job.set_last_error(format!("{}", e));
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
// let base: i32 = 2; // an explicit type is required
// let millis = base.pow(job.attempts());
storage.reschedule(&job, Duration::from_millis(10000)).await
}
};
if let Err(e) = finalize {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
}
if let Err(e) = storage.update_by_id(job_id.clone(), &job).await {
#[cfg(feature = "broker")]
Broker::global()
.issue_send(WorkerMessage::new(
worker_id.clone(),
WorkerEvent::Error(format!("{}", e)),
))
.await;
T::on_worker_error(job.inner(), &job, &WorkerError::Storage(e));
};
}
res
}Trait Implementations§
source§impl<T: Clone> Clone for JobRequest<T>
impl<T: Clone> Clone for JobRequest<T>
source§fn clone(&self) -> JobRequest<T>
fn clone(&self) -> JobRequest<T>
1.0.0 · source§fn clone_from(&mut self, source: &Self)
fn clone_from(&mut self, source: &Self)
source. Read moresource§impl<T: Debug> Debug for JobRequest<T>
impl<T: Debug> Debug for JobRequest<T>
source§impl<T> Deref for JobRequest<T>
impl<T> Deref for JobRequest<T>
source§impl<T> DerefMut for JobRequest<T>
impl<T> DerefMut for JobRequest<T>
source§impl<'de, T> Deserialize<'de> for JobRequest<T>where
T: Deserialize<'de>,
impl<'de, T> Deserialize<'de> for JobRequest<T>where
T: Deserialize<'de>,
source§fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
fn deserialize<__D>(__deserializer: __D) -> Result<Self, __D::Error>where
__D: Deserializer<'de>,
source§impl<T> Policy<JobRequest<T>, JobResult, JobError> for DefaultRetryPolicywhere
T: Clone,
Available on crate feature retry only.
impl<T> Policy<JobRequest<T>, JobResult, JobError> for DefaultRetryPolicywhere
T: Clone,
retry only.§type Future = Ready<DefaultRetryPolicy>
type Future = Ready<DefaultRetryPolicy>
Future type returned by Policy::retry.source§fn retry(
&self,
req: &JobRequest<T>,
result: Result<&JobResult, &JobError>
) -> Option<Self::Future>
fn retry(
&self,
req: &JobRequest<T>,
result: Result<&JobResult, &JobError>
) -> Option<Self::Future>
source§fn clone_request(&self, req: &JobRequest<T>) -> Option<JobRequest<T>>
fn clone_request(&self, req: &JobRequest<T>) -> Option<JobRequest<T>>
source§impl<T> Serialize for JobRequest<T>where
T: Serialize,
impl<T> Serialize for JobRequest<T>where
T: Serialize,
source§impl<J, S, T> Service<JobRequest<J>> for AddExtension<S, T>where
S: Service<JobRequest<J>>,
T: Clone + Send + Sync + 'static,
Available on crate feature extensions only.
impl<J, S, T> Service<JobRequest<J>> for AddExtension<S, T>where
S: Service<JobRequest<J>>,
T: Clone + Send + Sync + 'static,
extensions only.§type Response = <S as Service<JobRequest<J>>>::Response
type Response = <S as Service<JobRequest<J>>>::Response
§type Error = <S as Service<JobRequest<J>>>::Error
type Error = <S as Service<JobRequest<J>>>::Error
§type Future = <S as Service<JobRequest<J>>>::Future
type Future = <S as Service<JobRequest<J>>>::Future
source§fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
Poll::Ready(Ok(())) when the service is able to process requests. Read moresource§fn call(&mut self, req: JobRequest<J>) -> Self::Future
fn call(&mut self, req: JobRequest<J>) -> Self::Future
source§impl<S, J, F> Service<JobRequest<J>> for PrometheusService<S>where
S: Service<JobRequest<J>, Response = JobResult, Error = JobError, Future = F>,
F: Future<Output = Result<JobResult, JobError>> + 'static,
J: Job,
Available on crate feature prometheus only.
impl<S, J, F> Service<JobRequest<J>> for PrometheusService<S>where
S: Service<JobRequest<J>, Response = JobResult, Error = JobError, Future = F>,
F: Future<Output = Result<JobResult, JobError>> + 'static,
J: Job,
prometheus only.§type Response = <S as Service<JobRequest<J>>>::Response
type Response = <S as Service<JobRequest<J>>>::Response
§type Error = <S as Service<JobRequest<J>>>::Error
type Error = <S as Service<JobRequest<J>>>::Error
§type Future = ResponseFuture<F>
type Future = ResponseFuture<F>
source§fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
Poll::Ready(Ok(())) when the service is able to process requests. Read moresource§fn call(&mut self, request: JobRequest<J>) -> Self::Future
fn call(&mut self, request: JobRequest<J>) -> Self::Future
source§impl<S, J, F> Service<JobRequest<J>> for SentryJobService<S>where
S: Service<JobRequest<J>, Response = JobResult, Error = JobError, Future = F>,
F: Future<Output = Result<JobResult, JobError>> + 'static,
J: Job,
Available on crate feature sentry only.
impl<S, J, F> Service<JobRequest<J>> for SentryJobService<S>where
S: Service<JobRequest<J>, Response = JobResult, Error = JobError, Future = F>,
F: Future<Output = Result<JobResult, JobError>> + 'static,
J: Job,
sentry only.§type Response = <S as Service<JobRequest<J>>>::Response
type Response = <S as Service<JobRequest<J>>>::Response
§type Error = <S as Service<JobRequest<J>>>::Error
type Error = <S as Service<JobRequest<J>>>::Error
§type Future = SentryHttpFuture<<S as Service<JobRequest<J>>>::Future>
type Future = SentryHttpFuture<<S as Service<JobRequest<J>>>::Future>
source§fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
Poll::Ready(Ok(())) when the service is able to process requests. Read moresource§fn call(&mut self, request: JobRequest<J>) -> Self::Future
fn call(&mut self, request: JobRequest<J>) -> Self::Future
source§impl<J, S, OnRequestT, OnResponseT, OnFailureT, MakeSpanT, F> Service<JobRequest<J>> for Trace<S, MakeSpanT, OnRequestT, OnResponseT, OnFailureT>where
S: Service<JobRequest<J>, Response = JobResult, Error = JobError, Future = F> + Unpin + Send + 'static,
S::Error: Display + 'static,
MakeSpanT: MakeSpan<J>,
OnRequestT: OnRequest<J>,
OnResponseT: OnResponse + Clone + 'static,
F: Future<Output = Result<JobResult, JobError>> + 'static,
OnFailureT: OnFailure + Clone + 'static,
Available on crate feature trace only.
impl<J, S, OnRequestT, OnResponseT, OnFailureT, MakeSpanT, F> Service<JobRequest<J>> for Trace<S, MakeSpanT, OnRequestT, OnResponseT, OnFailureT>where
S: Service<JobRequest<J>, Response = JobResult, Error = JobError, Future = F> + Unpin + Send + 'static,
S::Error: Display + 'static,
MakeSpanT: MakeSpan<J>,
OnRequestT: OnRequest<J>,
OnResponseT: OnResponse + Clone + 'static,
F: Future<Output = Result<JobResult, JobError>> + 'static,
OnFailureT: OnFailure + Clone + 'static,
trace only.§type Future = ResponseFuture<F, OnResponseT, OnFailureT>
type Future = ResponseFuture<F, OnResponseT, OnFailureT>
source§fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
Poll::Ready(Ok(())) when the service is able to process requests. Read moresource§fn call(&mut self, req: JobRequest<J>) -> Self::Future
fn call(&mut self, req: JobRequest<J>) -> Self::Future
source§impl<T, F, Res, Request> Service<JobRequest<Request>> for JobFn<T>where
Request: Debug + 'static + Job,
T: Fn(Request, JobContext) -> F,
Res: IntoJobResponse,
F: Future<Output = Res> + 'static + Send,
impl<T, F, Res, Request> Service<JobRequest<Request>> for JobFn<T>where
Request: Debug + 'static + Job,
T: Fn(Request, JobContext) -> F,
Res: IntoJobResponse,
F: Future<Output = Res> + 'static + Send,
§type Future = JobFnHttpFuture<Pin<Box<dyn Future<Output = Result<JobResult, JobError>> + Send + 'static, Global>>>
type Future = JobFnHttpFuture<Pin<Box<dyn Future<Output = Result<JobResult, JobError>> + Send + 'static, Global>>>
source§fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>>
Poll::Ready(Ok(())) when the service is able to process requests. Read more