rexecutor-sqlx 0.1.1

A robust job processing library
Documentation
use std::ops::Sub;

use chrono::{DateTime, Utc};
use rexecutor::{backend::BackendError, executor::ExecutorIdentifier};

use crate::RexecutorPgBackend;

pub struct ReadyJobStream {
    pub(crate) backend: RexecutorPgBackend,
    pub(crate) executor_identifier: ExecutorIdentifier,
    pub(crate) receiver: tokio::sync::mpsc::UnboundedReceiver<DateTime<Utc>>,
}

impl ReadyJobStream {
    const DEFAULT_DELAY: std::time::Duration = std::time::Duration::from_secs(30);
    const DELTA: std::time::Duration = std::time::Duration::from_millis(15);

    pub async fn next(&mut self) -> Result<rexecutor::backend::Job, BackendError> {
        loop {
            let delay = match self
                .backend
                .next_available_job_scheduled_at_for_executor(self.executor_identifier.as_str())
                .await
                .map_err(|_| BackendError::BadState)?
            {
                Some(timestamp) => timestamp
                    .sub(Utc::now())
                    .to_std()
                    .unwrap_or(Self::DELTA)
                    .min(Self::DEFAULT_DELAY),
                _ => Self::DEFAULT_DELAY,
            };
            if delay <= Self::DELTA {
                if let Some(job) = self
                    .backend
                    .load_job_mark_as_executing_for_executor(self.executor_identifier.as_str())
                    .await
                    .map_err(|_| BackendError::BadState)?
                {
                    return job.try_into();
                }
            }
            tokio::select! {
                _ = self.receiver.recv() => { },
                _ = tokio::time::sleep(delay) => { },

            }
        }
    }
}