scheduler/
execution_guard.rs1use crate::error::ExecutionGuardErrorKind;
2use chrono::{DateTime, Utc};
3use std::convert::Infallible;
4use std::future::Future;
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Debug, Clone, PartialEq, Eq)]
9pub struct ExecutionSlot {
10 pub job_id: String,
11 pub scheduled_at: DateTime<Utc>,
12}
13
14impl ExecutionSlot {
15 pub fn new(job_id: impl Into<String>, scheduled_at: DateTime<Utc>) -> Self {
16 Self {
17 job_id: job_id.into(),
18 scheduled_at,
19 }
20 }
21}
22
23#[derive(Debug, Clone, PartialEq, Eq)]
24pub struct ExecutionLease {
25 pub job_id: String,
26 pub scheduled_at: DateTime<Utc>,
27 pub token: String,
28 pub lease_key: String,
29}
30
31impl ExecutionLease {
32 pub fn new(
33 job_id: impl Into<String>,
34 scheduled_at: DateTime<Utc>,
35 token: impl Into<String>,
36 lease_key: impl Into<String>,
37 ) -> Self {
38 Self {
39 job_id: job_id.into(),
40 scheduled_at,
41 token: token.into(),
42 lease_key: lease_key.into(),
43 }
44 }
45}
46
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum ExecutionGuardAcquire {
49 Acquired(ExecutionLease),
50 Contended,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq)]
54pub enum ExecutionGuardRenewal {
55 Renewed,
56 Lost,
57}
58
59pub trait ExecutionGuard {
60 type Error: std::error::Error + Send + Sync + 'static;
61
62 fn acquire(
63 &self,
64 slot: ExecutionSlot,
65 ) -> impl Future<Output = Result<ExecutionGuardAcquire, Self::Error>> + Send;
66
67 fn renew(
68 &self,
69 lease: &ExecutionLease,
70 ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
71
72 fn release(
73 &self,
74 lease: &ExecutionLease,
75 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
76
77 fn classify_error(_error: &Self::Error) -> ExecutionGuardErrorKind
78 where
79 Self: Sized,
80 {
81 ExecutionGuardErrorKind::Unknown
82 }
83
84 fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
85 None
86 }
87}
88
89#[derive(Debug, Clone, Copy, Default)]
90pub struct NoopExecutionGuard;
91
92impl ExecutionGuard for NoopExecutionGuard {
93 type Error = Infallible;
94
95 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
96 Ok(ExecutionGuardAcquire::Acquired(ExecutionLease::new(
97 slot.job_id,
98 slot.scheduled_at,
99 "",
100 "",
101 )))
102 }
103
104 async fn renew(&self, _lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
105 Ok(ExecutionGuardRenewal::Renewed)
106 }
107
108 async fn release(&self, _lease: &ExecutionLease) -> Result<(), Self::Error> {
109 Ok(())
110 }
111}
112
113impl<T> ExecutionGuard for Arc<T>
114where
115 T: ExecutionGuard + Send + Sync,
116{
117 type Error = T::Error;
118
119 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
120 self.as_ref().acquire(slot).await
121 }
122
123 async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
124 self.as_ref().renew(lease).await
125 }
126
127 async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
128 self.as_ref().release(lease).await
129 }
130
131 fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
132 where
133 Self: Sized,
134 {
135 T::classify_error(error)
136 }
137
138 fn renew_interval(&self, lease: &ExecutionLease) -> Option<Duration> {
139 self.as_ref().renew_interval(lease)
140 }
141}