later 0.0.10

Distributed Background jobs manager and runner for Rust
Documentation
use crate::{
    encoder::{self},
    id::{Id, IdOf},
    models::{DelayedStage, Job, RecurringJob, RequeuedStage, Stage, StageName},
    storage::{Storage, StorageIter, StorageEx},
    JobId, RecurringJobId, UtcDateTime,
};
use serde::{de::DeserializeOwned, Serialize};

pub(crate) struct Persist {
    pub(crate) inner: Box<dyn Storage>,
    key_prefix: String,
}

pub(crate) struct Config<'s> {
    inner: &'s Box<dyn Storage>,
    key_prefix: String,
}

impl Persist {
    pub fn new(storage: Box<dyn Storage>, key_prefix: String) -> Self {
        Self {
            inner: storage,
            key_prefix,
        }
    }

    pub fn config(&self) -> Config {
        Config {
            inner: &self.inner,
            key_prefix: self.key_prefix.clone(),
        }
    }

    #[tracing::instrument(skip(self), level = "trace")]
    pub async fn get_job(&self, id: JobId) -> Option<Job> {
        let id = IdOf::SavedJob(id).get_id(&self.key_prefix);
        self.get_of_type::<Job>(id).await
    }

    #[tracing::instrument(skip(self), level = "trace")]
    pub async fn get_recurring_job(&self, id: RecurringJobId) -> Option<RecurringJob> {
        let id = IdOf::SavedRecurringJob(id).get_id(&self.key_prefix);
        self.get_of_type::<RecurringJob>(id).await
    }

    #[tracing::instrument(skip(self), level = "trace")]
    pub async fn expire(&self, job_id: JobId) -> anyhow::Result<()> {
        let id = IdOf::SavedJob(job_id).get_id(&self.key_prefix);
        Ok(self.inner.del(&id.to_string()).await?)
    }

    #[tracing::instrument(skip(self, range), level = "trace")]
    pub async fn trim(&self, range: Box<dyn StorageIter>) -> anyhow::Result<()> {
        Ok(self.inner.trim(range).await?)
    }

    pub async fn get_of_type<T>(&self, id: Id) -> Option<T>
    where
        T: DeserializeOwned,
    {
        self.inner
            .get(&id.to_string())
            .await
            .and_then(|bytes| encoder::decode::<T>(&bytes).ok())
    }

    pub async fn save_job(&self, job: &Job) -> anyhow::Result<()> {
        let id = IdOf::SavedJob(job.id.clone()).get_id(&self.key_prefix);
        self.save(id, job).await
    }

    #[tracing::instrument(skip(self))]
    pub async fn save_recurring_job(&self, job: &RecurringJob) -> anyhow::Result<()> {
        let id = IdOf::SavedRecurringJob(job.id.clone()).get_id(&self.key_prefix);
        self.save(id, job).await
    }

    #[tracing::instrument(skip(self))]
    pub async fn save<T>(&self, id: Id, item: T) -> anyhow::Result<()>
    where
        T: Serialize + std::fmt::Debug,
    {
        let bytes = encoder::encode(item)?;
        self.inner.set(&id.to_string(), &bytes).await
    }

    #[tracing::instrument(skip(self))]
    pub async fn push<T>(&self, id: Id, item: T) -> anyhow::Result<()>
    where
        T: Serialize + std::fmt::Debug,
    {
        let bytes = encoder::encode(item)?;
        self.inner.push(&id.to_string(), &bytes).await
    }

    #[tracing::instrument(skip(self))]
    pub async fn save_continuation(
        &self,
        job_id: &JobId,
        parent_job_id: JobId,
    ) -> anyhow::Result<()> {
        let id = IdOf::ContinuationOf(parent_job_id).get_id(&self.key_prefix);
        self.push(id, job_id).await
    }

    #[tracing::instrument(skip(self))]
    pub async fn get_continuation_job(&self, job: &Job) -> Option<Vec<Job>> {
        let id = IdOf::ContinuationOf(job.id.clone()).get_id(&self.key_prefix);

        let mut items = Vec::default();
        let mut iter = self.inner.scan_range(&id.to_string()).await;
        while let Some(id_bytes) = iter.next(&self.inner).await {
            if let Ok(job_id) = encoder::decode::<JobId>(&id_bytes) {
                if let Some(job) = self.get_job(job_id).await {
                    items.push(job);
                }
            }
        }

        if items.len() > 0 {
            Some(items)
        } else {
            None
        }
    }

    pub async fn del_get_continuation_job(&self, job: &Job) -> anyhow::Result<()> {
        let id = IdOf::ContinuationOf(job.id.clone()).get_id(&self.key_prefix);
        self.inner.del_range(&id.to_string()).await
    }

    pub async fn save_job_id(&self, id: &JobId, stage: &Stage) -> anyhow::Result<()> {
        let key = IdOf::JobsInStagesId(stage.get_name()).get_id(&&self.key_prefix);

        self.inner
            .push(&key.to_string(), &encoder::encode(&id)?)
            .await?;

        Ok(())
    }

    pub async fn get_delayed_jobs(&self) -> anyhow::Result<Box<dyn StorageIter>> {
        self.get_jobs_to_poll(&DelayedStage::get_name()).await
    }

    pub async fn get_reqd_jobs(&self) -> anyhow::Result<Box<dyn StorageIter>> {
        self.get_jobs_to_poll(&RequeuedStage::get_name()).await
    }

    async fn get_jobs_to_poll(&self, name: &str) -> Result<Box<dyn StorageIter>, anyhow::Error> {
        let key = IdOf::JobsInStagesId(name.to_string()).get_id(&&self.key_prefix);
        let iter = self.inner.scan_range(&key.to_string()).await;
        Ok(iter)
    }
}

impl<'s> Config<'s> {
    pub async fn poll_delayed_jobs_last_run(&self) -> UtcDateTime {
        let one_year =
            chrono::Duration::from_std(std::time::Duration::from_secs(3600 * 24 * 365)).unwrap();
        let def = chrono::Utc::now().checked_sub_signed(one_year).unwrap();

        self.get(
            IdOf::ConfigDateLastPolledForDelayedJobs.get_id(&self.key_prefix),
            def,
        )
        .await
    }

    pub async fn poll_requeued_jobs_last_run(&self) -> UtcDateTime {
        let one_year =
            chrono::Duration::from_std(std::time::Duration::from_secs(3600 * 24 * 365)).unwrap();
        let def = chrono::Utc::now().checked_sub_signed(one_year).unwrap();

        self.get(
            IdOf::ConfigDateLastPolledForReqdJobs.get_id(&self.key_prefix),
            def,
        )
        .await
    }

    pub async fn poll_delayed_jobs_last_run_set(&self) -> anyhow::Result<()> {
        self.set(
            IdOf::ConfigDateLastPolledForDelayedJobs.get_id(&self.key_prefix),
            chrono::Utc::now(),
        )
        .await
    }

    pub async fn poll_requeued_jobs_last_run_set(&self) -> anyhow::Result<()> {
        self.set(
            IdOf::ConfigDateLastPolledForReqdJobs.get_id(&self.key_prefix),
            chrono::Utc::now(),
        )
        .await
    }

    async fn get<T: DeserializeOwned>(&self, key: Id, def: T) -> T {
        self.inner
            .get(&key.to_string())
            .await
            .map(|b| encoder::decode::<T>(&b).ok())
            .flatten()
            .unwrap_or_else(|| def)
    }

    async fn set<T: Serialize>(&self, key: Id, val: T) -> anyhow::Result<()> {
        self.inner
            .set(&key.to_string(), &encoder::encode(&val)?)
            .await
    }
}