use async_trait::async_trait;
use chrono::TimeDelta;
use serde::{de::DeserializeOwned, Serialize};
use std::{error::Error, fmt::Display};
use crate::{
backend::{Backend, Query},
backoff::{BackoffStrategy, Exponential, Jitter, Strategy},
global_backend::GlobalBackend,
job::{builder::JobBuilder, query::Where, uniqueness_criteria::UniquenessCriteria, Job, JobId},
RexecutorError,
};
type Result<T> = std::result::Result<T, RexecutorError>;
const DEFAULT_BACKOFF_STRATEGY: BackoffStrategy<Exponential> =
BackoffStrategy::exponential(TimeDelta::seconds(4))
.with_max(TimeDelta::days(7))
.with_jitter(Jitter::Relative(0.1));
#[async_trait]
pub trait Executor {
type Data: Serialize + DeserializeOwned;
type Metadata: Serialize + DeserializeOwned;
const NAME: &'static str;
const MAX_ATTEMPTS: u16 = 5;
const MAX_CONCURRENCY: Option<usize> = None;
const BLOCKING: bool = false;
const UNIQUENESS_CRITERIA: Option<UniquenessCriteria<'static>> = None;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult;
fn backoff(job: &Job<Self::Data, Self::Metadata>) -> TimeDelta {
DEFAULT_BACKOFF_STRATEGY.backoff(job.attempt)
}
fn timeout(_job: &Job<Self::Data, Self::Metadata>) -> Option<std::time::Duration> {
None
}
fn builder<'a>() -> JobBuilder<'a, Self>
where
Self: Sized,
Self::Data: Serialize + DeserializeOwned,
Self::Metadata: Serialize + DeserializeOwned,
{
Default::default()
}
async fn cancel_job(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
) -> Result<()> {
Self::cancel_job_on_backend(job_id, cancellation_reason, GlobalBackend::as_ref()?).await
}
async fn cancel_job_on_backend<B>(
job_id: JobId,
cancellation_reason: Box<dyn CancellationReason>,
backend: &B,
) -> Result<()>
where
B: Backend + ?Sized + Sync,
{
backend
.mark_job_cancelled(job_id, cancellation_reason.into())
.await?;
Ok(())
}
async fn rerun_job(job_id: JobId) -> Result<()> {
Self::rerun_job_on_backend(job_id, GlobalBackend::as_ref()?).await
}
async fn rerun_job_on_backend<B>(job_id: JobId, backend: &B) -> Result<()>
where
B: Backend + ?Sized + Sync,
{
backend.rerun_job(job_id).await?;
Ok(())
}
async fn query_jobs<'a>(
query: Where<'a, Self::Data, Self::Metadata>,
) -> Result<Vec<Job<Self::Data, Self::Metadata>>>
where
Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync,
Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync,
{
Self::query_jobs_on_backend(query, GlobalBackend::as_ref()?).await
}
async fn query_jobs_on_backend<'a, B>(
query: Where<'a, Self::Data, Self::Metadata>,
backend: &B,
) -> Result<Vec<Job<Self::Data, Self::Metadata>>>
where
Self::Data: 'static + Send + Serialize + DeserializeOwned + Sync,
Self::Metadata: 'static + Send + Serialize + DeserializeOwned + Sync,
B: Backend + ?Sized + Sync,
{
Ok(backend
.query(Query::try_from(query.0)?.for_executor(Self::NAME))
.await?
.into_iter()
.map(TryFrom::try_from)
.collect::<std::result::Result<Vec<_>, _>>()?)
}
async fn update_job(job: Job<Self::Data, Self::Metadata>) -> Result<()>
where
Self::Data: 'static + Send + Serialize + Sync,
Self::Metadata: 'static + Send + Serialize + Sync,
{
Self::update_job_on_backend(job, GlobalBackend::as_ref()?).await
}
async fn update_job_on_backend<B>(
job: Job<Self::Data, Self::Metadata>,
backend: &B,
) -> Result<()>
where
B: Backend + ?Sized + Sync,
Self::Data: 'static + Serialize + Send + Sync,
Self::Metadata: 'static + Serialize + Send + Sync,
{
let job = job.try_into()?;
backend.update_job(job).await?;
Ok(())
}
}
#[derive(Debug, Eq, PartialEq, Clone, Hash)]
pub struct ExecutorIdentifier(&'static str);
impl From<&'static str> for ExecutorIdentifier {
fn from(value: &'static str) -> Self {
Self(value)
}
}
impl ExecutorIdentifier {
pub fn as_str(&self) -> &'static str {
self.0
}
}
impl std::ops::Deref for ExecutorIdentifier {
type Target = str;
fn deref(&self) -> &Self::Target {
self.0
}
}
pub enum ExecutionResult {
Done,
Cancel {
reason: Box<dyn CancellationReason>,
},
Snooze {
delay: TimeDelta,
},
Error {
error: Box<dyn ExecutionError>,
},
}
impl<T> From<T> for ExecutionResult
where
T: ExecutionError + 'static,
{
fn from(value: T) -> Self {
Self::Error {
error: Box::new(value),
}
}
}
pub trait ExecutionError: Error + Send {
fn error_type(&self) -> &'static str;
}
pub trait CancellationReason: Display + Send {}
impl<T> CancellationReason for T where T: Display + Send {}
#[cfg(test)]
pub(crate) mod test {
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use crate::{
backend::{BackendError, MockBackend},
job::JobStatus,
};
use super::*;
pub(crate) struct SimpleExecutor;
#[async_trait]
impl Executor for SimpleExecutor {
type Data = String;
type Metadata = String;
const NAME: &'static str = "simple_executor";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(_job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
ExecutionResult::Done
}
}
pub(crate) struct MockReturnExecutor;
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub(crate) enum MockExecutionResult {
Done,
Panic,
Timeout,
Cancelled { reason: String },
Snooze { delay: std::time::Duration },
Error { error: MockError },
}
#[derive(Debug, Clone, Serialize, Deserialize, Hash)]
pub(crate) struct MockError(pub String);
impl std::fmt::Display for MockError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl Error for MockError {}
impl ExecutionError for MockError {
fn error_type(&self) -> &'static str {
"custom"
}
}
#[async_trait]
impl Executor for MockReturnExecutor {
type Data = MockExecutionResult;
type Metadata = ();
const NAME: &'static str = "basic_executor";
const MAX_ATTEMPTS: u16 = 2;
async fn execute(job: Job<Self::Data, Self::Metadata>) -> ExecutionResult {
match job.data {
MockExecutionResult::Panic => panic!("job paniced"),
MockExecutionResult::Timeout => {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
ExecutionResult::Done
}
MockExecutionResult::Done => ExecutionResult::Done,
MockExecutionResult::Cancelled { reason } => ExecutionResult::Cancel {
reason: Box::new(reason),
},
MockExecutionResult::Snooze { delay } => ExecutionResult::Snooze {
delay: TimeDelta::from_std(delay).unwrap(),
},
MockExecutionResult::Error { error } => ExecutionResult::Error {
error: Box::new(error),
},
}
}
fn timeout(job: &Job<Self::Data, Self::Metadata>) -> Option<std::time::Duration> {
if matches!(job.data, MockExecutionResult::Timeout) {
Some(std::time::Duration::from_millis(1))
} else {
None
}
}
}
#[tokio::test]
async fn cancel_job_error() {
let job_id = 0.into();
let mut backend = MockBackend::default();
backend
.expect_mark_job_cancelled()
.returning(move |_, _| Err(BackendError::BadState));
let reason = Box::new("No longer needed");
let result = SimpleExecutor::cancel_job_on_backend(job_id, reason, &backend).await;
assert!(result.is_err());
}
#[tokio::test]
async fn cancel_job_success() {
let job_id = 0.into();
let mut backend = MockBackend::default();
backend
.expect_mark_job_cancelled()
.returning(move |_, _| Ok(()));
let reason = Box::new("No longer needed");
let result = SimpleExecutor::cancel_job_on_backend(job_id, reason, &backend).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn rerun_job_error() {
let job_id = 0.into();
let mut backend = MockBackend::default();
backend
.expect_rerun_job()
.returning(move |_| Err(BackendError::BadState));
let result = SimpleExecutor::rerun_job_on_backend(job_id, &backend).await;
assert!(result.is_err());
}
#[tokio::test]
async fn rerun_job_success() {
let job_id = 0.into();
let mut backend = MockBackend::default();
backend.expect_rerun_job().returning(move |_| Ok(()));
let result = SimpleExecutor::rerun_job_on_backend(job_id, &backend).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn query_jobs_error() {
let mut backend = MockBackend::default();
backend
.expect_query()
.returning(move |_| Err(BackendError::BadState));
let query = Where::status_equals(JobStatus::Cancelled);
let result = SimpleExecutor::query_jobs_on_backend(query, &backend).await;
assert!(result.is_err());
}
#[tokio::test]
async fn query_jobs_success_empty() {
let mut backend = MockBackend::default();
backend.expect_query().returning(move |_| Ok(vec![]));
let query = Where::status_equals(JobStatus::Cancelled);
let result = SimpleExecutor::query_jobs_on_backend(query, &backend).await;
assert!(result.is_ok());
assert!(result.unwrap().is_empty());
}
#[tokio::test]
async fn query_jobs_success() {
let mut backend = MockBackend::default();
backend.expect_query().returning(move |_| {
Ok(vec![
crate::backend::Job::mock_job::<SimpleExecutor>().with_data("Data")
])
});
let query = Where::status_equals(JobStatus::Cancelled);
let result = SimpleExecutor::query_jobs_on_backend(query, &backend).await;
assert!(result.is_ok());
assert_eq!(result.unwrap().len(), 1);
}
#[tokio::test]
async fn update_job_error() {
let job = crate::backend::Job::mock_job::<SimpleExecutor>()
.with_data("Data")
.try_into()
.unwrap();
let mut backend = MockBackend::default();
backend
.expect_update_job()
.returning(move |_| Err(BackendError::BadState));
let result = SimpleExecutor::update_job_on_backend(job, &backend).await;
assert!(result.is_err());
}
#[tokio::test]
async fn update_job_success() {
let job = crate::backend::Job::mock_job::<SimpleExecutor>()
.with_data("Data")
.try_into()
.unwrap();
let mut backend = MockBackend::default();
backend.expect_update_job().returning(move |_| Ok(()));
let result = SimpleExecutor::update_job_on_backend(job, &backend).await;
assert!(result.is_ok());
}
#[test]
fn default_timeout_is_none() {
let job = crate::backend::Job::mock_job::<SimpleExecutor>()
.with_data("Data")
.try_into()
.unwrap();
assert!(SimpleExecutor::timeout(&job).is_none());
}
}