use chrono::{DateTime, Duration, Utc};
use rand::Rng;
pub use serde;
use serde::Serialize;
use sqlx::FromRow;
use sqlx::PgExecutor;
mod engine;
mod model;
mod queue;
mod result;
mod stager;
pub use ishikari_macros::{job, worker};
pub use engine::{Engine, Postgres, Storage};
pub use model::{Job, JobState};
pub use queue::Queue;
pub use result::{Cancel, Complete, PerformError, PerformResult, Snooze, Status};
pub use stager::Stager;
pub mod prelude {
pub use crate::{Cancel, Complete, Context, PerformResult, Snooze, Status, Worker};
}
pub(crate) type State = Arc<dyn std::any::Any + Send + Sync>;
pub enum Backoff {
Fixed(Duration),
Linear(Duration),
Exponential(Duration),
ExponentialJitter(Duration),
Custom(Box<dyn Fn(i32) -> DateTime<Utc> + Send + Sync>),
}
impl Backoff {
pub fn next_retry(&self, attempt: i32) -> DateTime<Utc> {
match self {
Backoff::Fixed(duration) => Utc::now() + *duration,
Backoff::Linear(base) => Utc::now() + (*base * attempt),
Backoff::Exponential(base) => {
let base_seconds = base.num_seconds();
let exp_delay = base_seconds * 2_i64.pow(attempt as u32);
Utc::now() + Duration::seconds(exp_delay)
}
Backoff::ExponentialJitter(base) => {
let base_seconds = base.num_seconds();
let exp_delay = base_seconds * 2_i64.pow(attempt as u32);
let jitter = rand::thread_rng().gen_range(0..exp_delay);
Utc::now() + Duration::seconds(exp_delay + jitter)
}
Backoff::Custom(strategy) => strategy(attempt),
}
}
}
use std::any::Any;
use std::fmt::Debug;
use std::sync::Arc;
use tracing::{info, instrument};
#[derive(Debug)]
pub struct Context {
pub job: Arc<Job>,
pub state: State,
}
impl Context {
pub(crate) fn new(job: Arc<Job>, state: State) -> Self {
Self { job, state }
}
pub fn job(&self) -> Arc<Job> {
Arc::clone(&self.job)
}
pub fn state<T: Any + Send + Sync + 'static>(&self) -> Result<Arc<T>, &'static str> {
if let Ok(downcasted) = Arc::clone(&self.state).downcast::<T>() {
Ok(downcasted)
} else {
Err("Failed to extract the specified type from the context")
}
}
}
#[typetag::serde(tag = "ishikari_worker")]
#[async_trait::async_trait]
pub trait Worker: Send + Sync {
fn worker() -> &'static str
where
Self: Sized,
{
std::any::type_name::<Self>()
}
fn queue(&self) -> &'static str {
"default"
}
fn max_attempts(&self) -> i32 {
20
}
fn backoff(&self, attempt: i32) -> DateTime<Utc> {
Backoff::Exponential(Duration::seconds(5)).next_retry(attempt)
}
async fn perform(&self, context: Context) -> PerformResult;
}
#[instrument(skip(executor))]
pub async fn insert<'a, J, E>(job: J, executor: E) -> Result<Job, sqlx::Error>
where
J: Debug + Serialize + Worker + Send + Sync + 'static,
E: PgExecutor<'a>,
{
let args = serde_json::to_value(&job as &dyn Worker).unwrap();
let row =
sqlx::query(r#"insert into jobs (queue, worker, args, max_attempts) values ($1, $2, $3, $4) returning *"#)
.bind(job.queue())
.bind(J::worker())
.bind(args)
.bind(job.max_attempts())
.fetch_one(executor)
.await?;
let inserted = Job::from_row(&row)?;
info!("Job inserted id={}, args={:?}", inserted.id, job);
Ok(inserted)
}