apalis-core 0.4.9

Core for apalis: simple, extensible multithreaded background processing for Rust
use std::{marker::PhantomData, time::Duration};

use crate::worker::{HeartBeat, WorkerId};

use super::Storage;

#[derive(Debug, Clone)]
pub(super) struct KeepAlive<ST, Service>(WorkerId, ST, Duration, PhantomData<Service>);

impl<ST, Service> KeepAlive<ST, Service> {
    pub(super) fn new<J: 'static>(
        worker_id: &WorkerId,
        storage: ST,
        interval: Duration,
    ) -> KeepAlive<ST, Service>
    where
        ST: Storage<Output = J>,
    {
        KeepAlive(worker_id.clone(), storage, interval, PhantomData)
    }
}

#[async_trait::async_trait]
impl<S: Storage + Send + Sync, Service: Sync + Send> HeartBeat for KeepAlive<S, Service> {
    async fn heart_beat(&mut self) {
        match self.1.keep_alive::<Service>(&self.0).await {
            Err(e) => {
                tracing::warn!("An error occurred while attempting the keep alive heartbeat for worker {}. Error: {}", self.0, e)
            }
            _ => {
                tracing::trace!("keep alive heartbeat successful for worker: {}", self.0)
            }
        }
    }
    fn interval(&self) -> Duration {
        self.2
    }
}

#[derive(Debug, Clone)]
pub(super) struct ReenqueueOrphaned<ST>(ST, i32, Duration, Duration);

impl<ST> ReenqueueOrphaned<ST> {
    pub(super) fn new<J: 'static>(
        storage: ST,
        count: i32,
        interval: Duration,
        timeout_worker: Duration,
    ) -> ReenqueueOrphaned<ST>
    where
        ST: Storage<Output = J>,
    {
        ReenqueueOrphaned(storage, count, interval, timeout_worker)
    }
}

#[async_trait::async_trait]
impl<S: Storage + Send + Sync> HeartBeat for ReenqueueOrphaned<S> {
    async fn heart_beat(&mut self) {
        match self
            .0
            .heartbeat(super::StorageWorkerPulse::ReenqueueOrphaned {
                count: self.1,
                timeout_worker: self.3,
            })
            .await
        {
            Err(e) => {
                tracing::warn!("An error occurred while attempting the reenqueue orphaned heartbeat. Error: {}", e)
            }
            _ => {
                tracing::trace!("reenqueue orphaned heartbeat successful for storage")
            }
        }
    }
    fn interval(&self) -> Duration {
        self.2
    }
}

#[derive(Debug, Clone)]
pub(super) struct EnqueueScheduled<ST>(ST, i32, Duration);

impl<ST> EnqueueScheduled<ST> {
    pub(super) fn new<J: 'static>(
        storage: ST,
        count: i32,
        interval: Duration,
    ) -> EnqueueScheduled<ST>
    where
        ST: Storage<Output = J>,
    {
        EnqueueScheduled(storage, count, interval)
    }
}

#[async_trait::async_trait]
impl<S: Storage + Send + Sync> HeartBeat for EnqueueScheduled<S> {
    async fn heart_beat(&mut self) {
        match self
            .0
            .heartbeat(super::StorageWorkerPulse::EnqueueScheduled { count: self.1 })
            .await
        {
            Err(e) => {
                tracing::warn!(
                    "An error occurred while attempting the enqueue scheduled heartbeat. Error: {}",
                    e
                )
            }
            _ => {
                tracing::trace!("enqueue scheduled heartbeat successful for storage")
            }
        }
    }
    fn interval(&self) -> Duration {
        self.2
    }
}