Skip to main content

scheduler/
execution_guard.rs

1use crate::error::ExecutionGuardErrorKind;
2use chrono::{DateTime, Utc};
3use std::convert::Infallible;
4use std::future::Future;
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
9pub enum ExecutionGuardScope {
10    #[default]
11    Occurrence,
12    Resource,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct ExecutionSlot {
17    pub job_id: String,
18    pub resource_id: String,
19    pub scope: ExecutionGuardScope,
20    pub scheduled_at: Option<DateTime<Utc>>,
21}
22
23impl ExecutionSlot {
24    pub fn new(job_id: impl Into<String>, scheduled_at: DateTime<Utc>) -> Self {
25        let job_id = job_id.into();
26        Self::for_occurrence(job_id.clone(), job_id, scheduled_at)
27    }
28
29    pub fn for_occurrence(
30        job_id: impl Into<String>,
31        resource_id: impl Into<String>,
32        scheduled_at: DateTime<Utc>,
33    ) -> Self {
34        Self {
35            job_id: job_id.into(),
36            resource_id: resource_id.into(),
37            scope: ExecutionGuardScope::Occurrence,
38            scheduled_at: Some(scheduled_at),
39        }
40    }
41
42    pub fn for_resource(job_id: impl Into<String>, resource_id: impl Into<String>) -> Self {
43        Self {
44            job_id: job_id.into(),
45            resource_id: resource_id.into(),
46            scope: ExecutionGuardScope::Resource,
47            scheduled_at: None,
48        }
49    }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ExecutionLease {
54    pub job_id: String,
55    pub resource_id: String,
56    pub scope: ExecutionGuardScope,
57    pub scheduled_at: Option<DateTime<Utc>>,
58    pub token: String,
59    pub lease_key: String,
60}
61
62impl ExecutionLease {
63    pub fn new(
64        job_id: impl Into<String>,
65        resource_id: impl Into<String>,
66        scope: ExecutionGuardScope,
67        scheduled_at: Option<DateTime<Utc>>,
68        token: impl Into<String>,
69        lease_key: impl Into<String>,
70    ) -> Self {
71        Self {
72            job_id: job_id.into(),
73            resource_id: resource_id.into(),
74            scope,
75            scheduled_at,
76            token: token.into(),
77            lease_key: lease_key.into(),
78        }
79    }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub enum ExecutionGuardAcquire {
84    Acquired(ExecutionLease),
85    Contended,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ExecutionGuardRenewal {
90    Renewed,
91    Lost,
92}
93
94pub trait ExecutionGuard {
95    type Error: std::error::Error + Send + Sync + 'static;
96
97    fn acquire(
98        &self,
99        slot: ExecutionSlot,
100    ) -> impl Future<Output = Result<ExecutionGuardAcquire, Self::Error>> + Send;
101
102    fn renew(
103        &self,
104        lease: &ExecutionLease,
105    ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
106
107    fn release(
108        &self,
109        lease: &ExecutionLease,
110    ) -> impl Future<Output = Result<(), Self::Error>> + Send;
111
112    fn classify_error(_error: &Self::Error) -> ExecutionGuardErrorKind
113    where
114        Self: Sized,
115    {
116        ExecutionGuardErrorKind::Unknown
117    }
118
119    fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
120        None
121    }
122}
123
124#[derive(Debug, Clone, Copy, Default)]
125pub struct NoopExecutionGuard;
126
127impl ExecutionGuard for NoopExecutionGuard {
128    type Error = Infallible;
129
130    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
131        Ok(ExecutionGuardAcquire::Acquired(ExecutionLease::new(
132            slot.job_id,
133            slot.resource_id,
134            slot.scope,
135            slot.scheduled_at,
136            "",
137            "",
138        )))
139    }
140
141    async fn renew(&self, _lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
142        Ok(ExecutionGuardRenewal::Renewed)
143    }
144
145    async fn release(&self, _lease: &ExecutionLease) -> Result<(), Self::Error> {
146        Ok(())
147    }
148}
149
150impl<T> ExecutionGuard for Arc<T>
151where
152    T: ExecutionGuard + Send + Sync,
153{
154    type Error = T::Error;
155
156    async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
157        self.as_ref().acquire(slot).await
158    }
159
160    async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
161        self.as_ref().renew(lease).await
162    }
163
164    async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
165        self.as_ref().release(lease).await
166    }
167
168    fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
169    where
170        Self: Sized,
171    {
172        T::classify_error(error)
173    }
174
175    fn renew_interval(&self, lease: &ExecutionLease) -> Option<Duration> {
176        self.as_ref().renew_interval(lease)
177    }
178}