oxanus 1.1.1

A simple & fast job queue system.
Documentation
use std::collections::HashMap;
use std::str::FromStr;

use crate::error::OxanusError;
use crate::worker::{BoundJob, BoxedProcessable, FromContext, Worker};

pub(crate) type JobFactory<DT, ET> =
    fn(serde_json::Value, &DT) -> Result<BoxedProcessable<ET>, OxanusError>;

pub struct WorkerRegistry<DT, ET> {
    jobs: HashMap<String, JobFactory<DT, ET>>,
    pub schedules: HashMap<String, CronJob>,
}

pub struct WorkerConfig<DT, ET> {
    pub name: String,
    pub factory: JobFactory<DT, ET>,
    pub kind: WorkerConfigKind,
}

pub enum WorkerConfigKind {
    Normal,
    Cron {
        schedule: String,
        queue_key: String,
        resurrect: bool,
    },
}

#[derive(Debug, Clone)]
pub struct CronJob {
    pub schedule: cron::Schedule,
    pub queue_key: String,
    pub resurrect: bool,
}

pub fn job_factory<W, A, DT, ET>(
    value: serde_json::Value,
    ctx: &DT,
) -> Result<BoxedProcessable<ET>, OxanusError>
where
    W: Worker<A, Error = ET> + FromContext<DT> + 'static,
    A: serde::de::DeserializeOwned + Send + Sync + 'static,
    DT: Send + Sync + Clone + 'static,
    ET: std::error::Error + Send + Sync + 'static,
{
    let job: A = serde_json::from_value(value)?;
    let worker = W::from_context(ctx);
    Ok(Box::new(BoundJob { worker, job }))
}

impl<DT, ET> WorkerRegistry<DT, ET> {
    pub fn new() -> Self {
        Self {
            jobs: HashMap::new(),
            schedules: HashMap::new(),
        }
    }

    pub fn register_worker_with(&mut self, config: WorkerConfig<DT, ET>) {
        match config.kind {
            WorkerConfigKind::Normal => {
                self.jobs.insert(config.name, config.factory);
            }
            WorkerConfigKind::Cron {
                schedule,
                queue_key,
                resurrect,
            } => {
                self.jobs.insert(config.name.clone(), config.factory);

                let schedule = cron::Schedule::from_str(&schedule).unwrap_or_else(|_| {
                    panic!("{}: Invalid cron schedule: {schedule}", config.name)
                });

                self.schedules.insert(
                    config.name,
                    CronJob {
                        schedule,
                        queue_key,
                        resurrect,
                    },
                );
            }
        }
    }

    pub fn worker_names(&self) -> Vec<&str> {
        self.jobs.keys().map(|s| s.as_str()).collect()
    }

    pub fn has_registered(&self, name: &str) -> bool {
        self.jobs.contains_key(name)
    }

    pub fn has_registered_cron(&self, name: &str) -> bool {
        self.schedules.contains_key(name)
    }

    pub fn build(
        &self,
        name: &str,
        json: serde_json::Value,
        ctx: &DT,
    ) -> Result<BoxedProcessable<ET>, OxanusError> {
        let factory = self
            .jobs
            .get(name)
            .ok_or_else(|| OxanusError::GenericError(format!("Job type {name} not registered")))?;
        match factory(json, ctx) {
            Ok(job) => Ok(job),
            Err(e) => Err(OxanusError::JobFactoryError(format!(
                "Failed to build job {name}: {e}"
            ))),
        }
    }
}

impl<DT, ET> Default for WorkerRegistry<DT, ET> {
    fn default() -> Self {
        Self::new()
    }
}