use futures::FutureExt;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;
use crate::context::JobState;
use crate::job_envelope::JobEnvelope;
use crate::worker::BoxedProcessable;
use crate::{Config, JobContext, OxanusError};
#[derive(Debug)]
enum ExecutionResult<ET> {
NotPanic(Result<(), ET>),
Panic(String),
}
pub(crate) enum ExecutionError<ET> {
NotPanic(ET),
Panic(),
}
pub async fn run<DT, ET>(
config: Arc<Config<DT, ET>>,
worker: BoxedProcessable<ET>,
envelope: &mut JobEnvelope,
) -> Result<Result<(), ExecutionError<ET>>, OxanusError>
where
DT: Send + Sync + Clone + 'static,
ET: std::error::Error + Send + Sync + 'static,
{
config.storage.internal.set_started_at(envelope).await?;
tracing::info!(
job_id = envelope.id,
queue = envelope.queue,
worker = envelope.job.name,
latency_ms = envelope.meta.latency_millis(),
"Job started"
);
let start = std::time::Instant::now();
let job_ctx = JobContext {
meta: envelope.meta.clone(),
state: JobState::new(
config.storage.clone(),
envelope.id.clone(),
envelope.meta.state.clone(),
),
};
let result = match AssertUnwindSafe(process(&worker, job_ctx, envelope))
.catch_unwind()
.await
{
Ok(result) => ExecutionResult::NotPanic(result),
Err(panic) => {
let panic_msg = if let Some(s) = panic.downcast_ref::<&str>() {
(*s).to_string()
} else if let Some(s) = panic.downcast_ref::<String>() {
s.clone()
} else {
"Unknown panic occurred".to_string()
};
ExecutionResult::Panic(panic_msg)
}
};
let duration = start.elapsed();
let is_err = !matches!(result, ExecutionResult::NotPanic(Ok(_)));
tracing::info!(
job_id = envelope.id,
queue = envelope.queue,
job = envelope.job.name,
success = !is_err,
duration = duration.as_millis(),
retries = envelope.meta.retries,
"Job finished"
);
let max_retries = worker.max_retries();
match result {
ExecutionResult::NotPanic(result) => {
match &result {
Ok(()) => {
if let Err(e) = config.storage.internal.finish_with_success(envelope).await {
tracing::error!("Failed to finish job: {}", e);
}
}
Err(e) => {
let default_delay = worker.retry_delay(envelope.meta.retries);
let retry_delay = config
.retry_delay_override
.as_ref()
.and_then(|f| f(e, envelope.meta.retries, default_delay))
.unwrap_or(default_delay);
#[cfg(feature = "sentry")]
sentry_core::capture_error(e);
tracing::error!(
job_id = envelope.id,
queue = envelope.queue,
worker = envelope.job.name,
"Job failed"
);
handle_err(config, &e.to_string(), envelope, retry_delay, max_retries).await;
}
}
Ok(result.map_err(ExecutionError::NotPanic))
}
ExecutionResult::Panic(panic_msg) => {
let retry_delay = worker.retry_delay(envelope.meta.retries);
#[cfg(feature = "sentry")]
sentry_core::capture_message(&panic_msg, sentry_core::Level::Error);
handle_err(config, &panic_msg, envelope, retry_delay, max_retries).await;
Ok(Err(ExecutionError::Panic()))
}
}
}
#[cfg_attr(feature = "tracing-instrument", tracing::instrument(skip_all, name = "job", fields(
job_id = envelope.id,
queue = envelope.queue,
worker = envelope.job.name,
args = %envelope.job.args,
retries = envelope.meta.retries,
latency_ms = envelope.meta.latency_millis(),
success = false,
)))]
async fn process<ET>(
worker: &BoxedProcessable<ET>,
job_ctx: JobContext,
#[cfg_attr(not(feature = "tracing-instrument"), allow(unused_variables))]
envelope: &JobEnvelope,
) -> Result<(), ET>
where
ET: std::error::Error + Send + Sync + 'static,
{
#[cfg(feature = "tracing-instrument")]
let span = tracing::Span::current();
let result = worker.process(&job_ctx).await;
#[cfg(feature = "tracing-instrument")]
span.record("success", result.is_ok());
result
}
async fn handle_err<DT, ET>(
config: Arc<Config<DT, ET>>,
err_msg: &str,
envelope: &JobEnvelope,
retry_delay: u64,
max_retries: u32,
) where
DT: Send + Sync + Clone + 'static,
ET: std::error::Error + Send + Sync + 'static,
{
if envelope.meta.retries < max_retries {
if let Err(e) = config.storage.internal.finish_with_failure(envelope).await {
tracing::error!("Failed to finish job: {}", e);
}
if let Err(e) = config
.storage
.internal
.retry_in(envelope.id.clone(), retry_delay, err_msg.to_string())
.await
{
tracing::error!("Failed to retry job: {}", e);
}
} else {
tracing::error!(
"Job {} failed after {} retries: {}",
envelope.id,
max_retries,
err_msg
);
if let Err(e) = config
.storage
.internal
.kill(envelope, err_msg.to_string())
.await
{
tracing::error!("Failed to kill job: {}", e);
}
}
}