Skip to main content

scheduler/
execution_guard.rs

1use crate::error::ExecutionGuardErrorKind;
2use chrono::{DateTime, Utc};
3use std::convert::Infallible;
4use std::future::Future;
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct ExecutionSlot {
10    pub job_id: String,
11    pub scheduled_at: DateTime<Utc>,
12}
13
14impl ExecutionSlot {
15    pub fn new(job_id: impl Into<String>, scheduled_at: DateTime<Utc>) -> Self {
16        Self {
17            job_id: job_id.into(),
18            scheduled_at,
19        }
20    }
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct ExecutionLease {
25    pub job_id: String,
26    pub scheduled_at: DateTime<Utc>,
27    pub token: String,
28    pub lease_key: String,
29}
30
31impl ExecutionLease {
32    pub fn new(
33        job_id: impl Into<String>,
34        scheduled_at: DateTime<Utc>,
35        token: impl Into<String>,
36        lease_key: impl Into<String>,
37    ) -> Self {
38        Self {
39            job_id: job_id.into(),
40            scheduled_at,
41            token: token.into(),
42            lease_key: lease_key.into(),
43        }
44    }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum ExecutionGuardAcquire {
49    Acquired(ExecutionLease),
50    Contended,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ExecutionGuardRenewal {
55    Renewed,
56    Lost,
57}
58
59pub trait ExecutionGuard {
60    type Error: std::error::Error + Send + Sync + 'static;
61
62    fn acquire(
63        &self,
64        slot: ExecutionSlot,
65    ) -> impl Future<Output = Result<ExecutionGuardAcquire, Self::Error>> + Send;
66
67    fn renew(
68        &self,
69        lease: &ExecutionLease,
70    ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
71
72    fn release(
73        &self,
74        lease: &ExecutionLease,
75    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
76
77    fn classify_error(_error: &Self::Error) -> ExecutionGuardErrorKind
78    where
79        Self: Sized,
80    {
81        ExecutionGuardErrorKind::Unknown
82    }
83
84    fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
85        None
86    }
87}
88
89#[derive(Debug, Clone, Copy, Default)]
90pub struct NoopExecutionGuard;
91
92impl ExecutionGuard for NoopExecutionGuard {
93    type Error = Infallible;
94
95    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
96        Ok(ExecutionGuardAcquire::Acquired(ExecutionLease::new(
97            slot.job_id,
98            slot.scheduled_at,
99            "",
100            "",
101        )))
102    }
103
104    async fn renew(&self, _lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
105        Ok(ExecutionGuardRenewal::Renewed)
106    }
107
108    async fn release(&self, _lease: &ExecutionLease) -> Result<(), Self::Error> {
109        Ok(())
110    }
111}
112
113impl<T> ExecutionGuard for Arc<T>
114where
115    T: ExecutionGuard + Send + Sync,
116{
117    type Error = T::Error;
118
119    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
120        self.as_ref().acquire(slot).await
121    }
122
123    async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
124        self.as_ref().renew(lease).await
125    }
126
127    async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
128        self.as_ref().release(lease).await
129    }
130
131    fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
132    where
133        Self: Sized,
134    {
135        T::classify_error(error)
136    }
137
138    fn renew_interval(&self, lease: &ExecutionLease) -> Option<Duration> {
139        self.as_ref().renew_interval(lease)
140    }
141}