cloudiful-scheduler 0.4.2

Single-job async scheduling library for background work with optional Valkey-backed state.
Documentation
use crate::error::ExecutionGuardError;
use crate::observer::{NoopObserver, SchedulerEvent, SchedulerObserver};
use crate::{ExecutionGuard, ExecutionGuardAcquire, ExecutionGuardRenewal, ExecutionSlot};
use std::future::Future;
use std::sync::Arc;
use tokio::time::{Instant, interval_at};

#[derive(Debug, Clone, PartialEq, Eq)]
pub enum GuardedRunResult<T> {
    Completed(T),
    Contended,
}

#[derive(Clone)]
pub struct GuardedSession<G> {
    guard: Arc<G>,
    observer: Arc<dyn SchedulerObserver>,
    lease: crate::ExecutionLease,
    catch_up: bool,
    trigger_count: u32,
}

#[derive(Clone)]
pub struct GuardedRunner<G> {
    guard: Arc<G>,
    observer: Arc<dyn SchedulerObserver>,
}

impl<G> GuardedRunner<G>
where
    G: ExecutionGuard + Send + Sync + 'static,
{
    pub fn new(guard: G) -> Self {
        Self::with_observer(guard, NoopObserver)
    }

    pub fn with_observer<O>(guard: G, observer: O) -> Self
    where
        O: SchedulerObserver,
    {
        Self {
            guard: Arc::new(guard),
            observer: Arc::new(observer),
        }
    }

    pub async fn run<F, Fut, T>(
        &self,
        slot: ExecutionSlot,
        run: F,
    ) -> Result<GuardedRunResult<T>, ExecutionGuardError>
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = T>,
    {
        self.run_with_metadata(slot, false, 0, run).await
    }

    pub async fn run_with_metadata<F, Fut, T>(
        &self,
        slot: ExecutionSlot,
        catch_up: bool,
        trigger_count: u32,
        run: F,
    ) -> Result<GuardedRunResult<T>, ExecutionGuardError>
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = T>,
    {
        let Some(session) = self
            .acquire_with_metadata(slot, catch_up, trigger_count)
            .await?
        else {
            return Ok(GuardedRunResult::Contended);
        };

        Ok(GuardedRunResult::Completed(session.run(run).await))
    }

    pub async fn acquire(
        &self,
        slot: ExecutionSlot,
    ) -> Result<Option<GuardedSession<G>>, ExecutionGuardError> {
        self.acquire_with_metadata(slot, false, 0).await
    }

    async fn acquire_with_metadata(
        &self,
        slot: ExecutionSlot,
        catch_up: bool,
        trigger_count: u32,
    ) -> Result<Option<GuardedSession<G>>, ExecutionGuardError> {
        let acquired = self.guard.acquire(slot.clone()).await.map_err(|error| {
            let kind = G::classify_error(&error);
            ExecutionGuardError::new(error, kind)
        })?;

        let ExecutionGuardAcquire::Acquired(lease) = acquired else {
            self.observer
                .on_event(&SchedulerEvent::ExecutionGuardContended {
                    job_id: slot.job_id,
                    resource_id: slot.resource_id,
                    scope: slot.scope,
                    scheduled_at: slot.scheduled_at,
                    catch_up,
                    trigger_count,
                });
            return Ok(None);
        };

        self.observer
            .on_event(&SchedulerEvent::ExecutionGuardAcquired {
                job_id: lease.job_id.clone(),
                resource_id: lease.resource_id.clone(),
                scope: lease.scope,
                lease_key: lease.lease_key.clone(),
                scheduled_at: lease.scheduled_at,
                catch_up,
                trigger_count,
            });

        Ok(Some(GuardedSession {
            guard: self.guard.clone(),
            observer: self.observer.clone(),
            lease,
            catch_up,
            trigger_count,
        }))
    }
}

impl<G> GuardedSession<G>
where
    G: ExecutionGuard + Send + Sync + 'static,
{
    pub async fn run<F, Fut, T>(self, run: F) -> T
    where
        F: FnOnce() -> Fut,
        Fut: Future<Output = T>,
    {
        let mut renewal_count = 0u32;
        let mut failed_renewal_count = 0u32;
        let mut lost_reported = false;
        let task = run();
        tokio::pin!(task);
        let mut renewal = self
            .guard
            .renew_interval(&self.lease)
            .map(|duration| interval_at(Instant::now() + duration, duration));

        let output = loop {
            if let Some(ticker) = renewal.as_mut() {
                let mut disable_renewal = false;
                let outcome = tokio::select! {
                    result = &mut task => Some(result),
                    _ = ticker.tick() => {
                        match self.guard.renew(&self.lease).await {
                            Ok(ExecutionGuardRenewal::Renewed) => {
                                renewal_count += 1;
                                self.observer.on_event(&SchedulerEvent::ExecutionGuardRenewed {
                                    job_id: self.lease.job_id.clone(),
                                    resource_id: self.lease.resource_id.clone(),
                                    scope: self.lease.scope,
                                    lease_key: self.lease.lease_key.clone(),
                                    scheduled_at: self.lease.scheduled_at,
                                    catch_up: self.catch_up,
                                    trigger_count: self.trigger_count,
                                    renewal_count,
                                });
                            }
                            Ok(ExecutionGuardRenewal::Lost) => {
                                if !lost_reported {
                                    self.observer.on_event(&SchedulerEvent::ExecutionGuardLost {
                                        job_id: self.lease.job_id.clone(),
                                        resource_id: self.lease.resource_id.clone(),
                                        scope: self.lease.scope,
                                        lease_key: self.lease.lease_key.clone(),
                                        scheduled_at: self.lease.scheduled_at,
                                        catch_up: self.catch_up,
                                        trigger_count: self.trigger_count,
                                        renewal_count,
                                        failed_renewal_count,
                                    });
                                    lost_reported = true;
                                }
                                disable_renewal = true;
                            }
                            Err(error) => {
                                failed_renewal_count += 1;
                                self.observer.on_event(&SchedulerEvent::ExecutionGuardRenewFailed {
                                    job_id: self.lease.job_id.clone(),
                                    resource_id: self.lease.resource_id.clone(),
                                    scope: self.lease.scope,
                                    lease_key: self.lease.lease_key.clone(),
                                    scheduled_at: self.lease.scheduled_at,
                                    catch_up: self.catch_up,
                                    trigger_count: self.trigger_count,
                                    renewal_count,
                                    failed_renewal_count,
                                    error: error.to_string(),
                                });
                                if !lost_reported {
                                    self.observer.on_event(&SchedulerEvent::ExecutionGuardLost {
                                        job_id: self.lease.job_id.clone(),
                                        resource_id: self.lease.resource_id.clone(),
                                        scope: self.lease.scope,
                                        lease_key: self.lease.lease_key.clone(),
                                        scheduled_at: self.lease.scheduled_at,
                                        catch_up: self.catch_up,
                                        trigger_count: self.trigger_count,
                                        renewal_count,
                                        failed_renewal_count,
                                    });
                                    lost_reported = true;
                                }
                                disable_renewal = true;
                            }
                        }
                        None
                    }
                };

                if disable_renewal {
                    renewal = None;
                }

                if let Some(result) = outcome {
                    break result;
                }
            } else {
                break task.await;
            }
        };

        if let Err(error) = self.guard.release(&self.lease).await {
            self.observer
                .on_event(&SchedulerEvent::ExecutionGuardReleaseFailed {
                    job_id: self.lease.job_id.clone(),
                    resource_id: self.lease.resource_id.clone(),
                    scope: self.lease.scope,
                    lease_key: self.lease.lease_key.clone(),
                    scheduled_at: self.lease.scheduled_at,
                    catch_up: self.catch_up,
                    trigger_count: self.trigger_count,
                    error: error.to_string(),
                });
        } else {
            self.observer
                .on_event(&SchedulerEvent::ExecutionGuardReleased {
                    job_id: self.lease.job_id.clone(),
                    resource_id: self.lease.resource_id.clone(),
                    scope: self.lease.scope,
                    lease_key: self.lease.lease_key.clone(),
                    scheduled_at: self.lease.scheduled_at,
                    catch_up: self.catch_up,
                    trigger_count: self.trigger_count,
                });
        }

        output
    }
}