celery-rs 0.6.2

Community-maintained Rust implementation of Celery (fork of rusty-celery)
Documentation
/// This module contains the definition of application-provided scheduler backends.
use super::scheduled_task::ScheduledTask;
use crate::error::BeatError;
use std::{collections::BinaryHeap, future::Future, pin::Pin, time::Duration};

mod redis;
pub use redis::{RedisBackendConfig, RedisSchedulerBackend};

/// A `SchedulerBackend` is in charge of keeping track of the internal state of the scheduler
/// according to some source of truth, such as a database.
///
/// The default scheduler backend, [`LocalSchedulerBackend`](struct.LocalSchedulerBackend.html),
/// doesn't do any external synchronization, so the source of truth is just the locally defined
/// schedules.
pub trait SchedulerBackend {
    /// Check whether the internal state of the scheduler should be synchronized.
    /// If this method returns `true`, then `sync` will be called as soon as possible.
    fn should_sync(&self) -> bool;

    /// Synchronize the internal state of the scheduler.
    ///
    /// This method is called in the pauses between scheduled tasks. Synchronization should
    /// be as quick as possible, as it may otherwise delay the execution of due tasks.
    /// If synchronization is slow, it should be done incrementally (i.e., it should span
    /// multiple calls to `sync`).
    ///
    /// This method will not be called if `should_sync` returns `false`.
    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError>;

    /// Return a mutable reference to the distributed capability of this backend, if any.
    ///
    /// Backends that do not support distributed coordination can leave the default
    /// implementation untouched, and the beat loop will fall back to single-instance
    /// behaviour.
    fn as_distributed(&mut self) -> Option<&mut dyn DistributedScheduler> {
        None
    }

    // Maybe we should consider some methods to inform the backend that a task has been executed.
    // Not sure about what Python does, but at least it keeps a counter with the number of executed tasks,
    // and the backend has access to that.
}

pub struct TickDecision {
    pub execute_tasks: bool,
    pub sleep_hint: Option<Duration>,
}

impl TickDecision {
    pub fn execute() -> Self {
        TickDecision {
            execute_tasks: true,
            sleep_hint: None,
        }
    }

    pub fn execute_with_hint(sleep_hint: Duration) -> Self {
        TickDecision {
            execute_tasks: true,
            sleep_hint: Some(sleep_hint),
        }
    }

    pub fn skip(sleep_hint: Duration) -> Self {
        TickDecision {
            execute_tasks: false,
            sleep_hint: Some(sleep_hint),
        }
    }
}

pub trait DistributedScheduler {
    fn before_tick<'a>(
        &'a mut self,
    ) -> Pin<Box<dyn Future<Output = Result<TickDecision, BeatError>> + 'a>>;

    fn after_tick<'a>(
        &'a mut self,
        scheduled_tasks: &'a mut BinaryHeap<ScheduledTask>,
    ) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + 'a>>;

    fn shutdown<'a>(&'a mut self) -> Pin<Box<dyn Future<Output = Result<(), BeatError>> + 'a>> {
        Box::pin(async { Ok(()) })
    }
}

/// The default [`SchedulerBackend`](trait.SchedulerBackend.html).
pub struct LocalSchedulerBackend {}

#[allow(clippy::new_without_default)]
impl LocalSchedulerBackend {
    pub fn new() -> Self {
        Self {}
    }
}

impl SchedulerBackend for LocalSchedulerBackend {
    fn should_sync(&self) -> bool {
        false
    }

    #[allow(unused_variables)]
    fn sync(&mut self, scheduled_tasks: &mut BinaryHeap<ScheduledTask>) -> Result<(), BeatError> {
        unimplemented!()
    }
}