use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use rand::distributions::{Distribution, Uniform};
use serde::{Deserialize, Serialize};
use std::time::{SystemTime, UNIX_EPOCH};
use crate::error::TaskError;
mod async_result;
mod options;
mod request;
mod signature;
pub use async_result::AsyncResult;
pub use options::TaskOptions;
pub use request::Request;
pub use signature::Signature;
pub type TaskResult<R> = Result<R, TaskError>;
#[doc(hidden)]
pub trait AsTaskResult {
type Returns: Send + Sync + std::fmt::Debug;
}
impl<R> AsTaskResult for TaskResult<R>
where
R: Send + Sync + std::fmt::Debug,
{
type Returns = R;
}
#[async_trait]
pub trait Task: Send + Sync + std::marker::Sized {
const NAME: &'static str;
const ARGS: &'static [&'static str];
const DEFAULTS: TaskOptions = TaskOptions {
time_limit: None,
hard_time_limit: None,
max_retries: None,
min_retry_delay: None,
max_retry_delay: None,
retry_for_unexpected: None,
acks_late: None,
content_type: None,
};
type Params: Clone + Send + Sync + Serialize + for<'de> Deserialize<'de>;
type Returns: Send + Sync + std::fmt::Debug;
fn from_request(request: Request<Self>, options: TaskOptions) -> Self;
fn request(&self) -> &Request<Self>;
fn options(&self) -> &TaskOptions;
async fn run(&self, params: Self::Params) -> TaskResult<Self::Returns>;
#[allow(unused_variables)]
async fn on_failure(&self, err: &TaskError) {}
#[allow(unused_variables)]
async fn on_success(&self, returned: &Self::Returns) {}
fn name(&self) -> &'static str {
Self::NAME
}
fn retry_with_countdown(&self, countdown: u32) -> TaskResult<Self::Returns> {
let eta = match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(now) => {
let now_secs = now.as_secs() as u32;
let now_millis = now.subsec_millis();
let eta_secs = now_secs + countdown;
Some(DateTime::<Utc>::from_naive_utc_and_offset(
NaiveDateTime::from_timestamp_opt(eta_secs as i64, now_millis * 1000)
.ok_or_else(|| {
TaskError::UnexpectedError(format!(
"Invalid countdown seconds {countdown}",
))
})?,
Utc,
))
}
Err(_) => None,
};
Err(TaskError::Retry(eta))
}
fn retry_with_eta(&self, eta: DateTime<Utc>) -> TaskResult<Self::Returns> {
Err(TaskError::Retry(Some(eta)))
}
fn retry_eta(&self) -> Option<DateTime<Utc>> {
let retries = self.request().retries;
let delay_secs = std::cmp::min(
2u32.checked_pow(retries)
.unwrap_or_else(|| self.max_retry_delay()),
self.max_retry_delay(),
);
let delay_secs = std::cmp::max(delay_secs, self.min_retry_delay());
let between = Uniform::from(0..1000);
let mut rng = rand::thread_rng();
let delay_millis = between.sample(&mut rng);
match SystemTime::now().duration_since(UNIX_EPOCH) {
Ok(now) => {
let now_secs = now.as_secs() as u32;
let now_millis = now.subsec_millis();
let eta_secs = now_secs + delay_secs;
let eta_millis = now_millis + delay_millis;
NaiveDateTime::from_timestamp_opt(eta_secs as i64, eta_millis * 1000)
.map(|eta| DateTime::<Utc>::from_naive_utc_and_offset(eta, Utc))
}
Err(_) => None,
}
}
fn retry_for_unexpected(&self) -> bool {
Self::DEFAULTS
.retry_for_unexpected
.or(self.options().retry_for_unexpected)
.unwrap_or(true)
}
fn time_limit(&self) -> Option<u32> {
self.request().time_limit.or_else(|| {
let time_limit = Self::DEFAULTS.time_limit.or(self.options().time_limit);
let hard_time_limit = Self::DEFAULTS
.hard_time_limit
.or(self.options().hard_time_limit);
match (time_limit, hard_time_limit) {
(Some(t1), Some(t2)) => Some(std::cmp::min(t1, t2)),
(Some(t1), None) => Some(t1),
(None, Some(t2)) => Some(t2),
_ => None,
}
})
}
fn max_retries(&self) -> Option<u32> {
Self::DEFAULTS.max_retries.or(self.options().max_retries)
}
fn min_retry_delay(&self) -> u32 {
Self::DEFAULTS
.min_retry_delay
.or(self.options().min_retry_delay)
.unwrap_or(0)
}
fn max_retry_delay(&self) -> u32 {
Self::DEFAULTS
.max_retry_delay
.or(self.options().max_retry_delay)
.unwrap_or(3600)
}
fn acks_late(&self) -> bool {
Self::DEFAULTS
.acks_late
.or(self.options().acks_late)
.unwrap_or(false)
}
}
#[derive(Clone, Debug)]
pub(crate) enum TaskEvent {
StatusChange(TaskStatus),
}
#[derive(Clone, Debug)]
pub(crate) enum TaskStatus {
Pending,
Finished,
}
pub trait TaskResultExt<T, E, F, C> {
fn with_expected_err(self, f: F) -> Result<T, TaskError>;
fn with_unexpected_err(self, f: F) -> Result<T, TaskError>;
}
impl<T, E, F, C> TaskResultExt<T, E, F, C> for Result<T, E>
where
E: std::error::Error,
F: FnOnce() -> C,
C: std::fmt::Display + Send + Sync + 'static,
{
fn with_expected_err(self, f: F) -> Result<T, TaskError> {
self.map_err(|e| TaskError::ExpectedError(format!("{} ➥ Cause: {:?}", f(), e)))
}
fn with_unexpected_err(self, f: F) -> Result<T, TaskError> {
self.map_err(|e| TaskError::UnexpectedError(format!("{} ➥ Cause: {:?}", f(), e)))
}
}