1use crate::error::ExecutionGuardErrorKind;
2use chrono::{DateTime, Utc};
3use std::convert::Infallible;
4use std::future::Future;
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
9pub enum ExecutionGuardScope {
10 #[default]
11 Occurrence,
12 Resource,
13}
14
15#[derive(Debug, Clone, PartialEq, Eq)]
16pub struct ExecutionSlot {
17 pub job_id: String,
18 pub resource_id: String,
19 pub scope: ExecutionGuardScope,
20 pub scheduled_at: Option<DateTime<Utc>>,
21}
22
23impl ExecutionSlot {
24 pub fn new(job_id: impl Into<String>, scheduled_at: DateTime<Utc>) -> Self {
25 let job_id = job_id.into();
26 Self::for_occurrence(job_id.clone(), job_id, scheduled_at)
27 }
28
29 pub fn for_occurrence(
30 job_id: impl Into<String>,
31 resource_id: impl Into<String>,
32 scheduled_at: DateTime<Utc>,
33 ) -> Self {
34 Self {
35 job_id: job_id.into(),
36 resource_id: resource_id.into(),
37 scope: ExecutionGuardScope::Occurrence,
38 scheduled_at: Some(scheduled_at),
39 }
40 }
41
42 pub fn for_resource(job_id: impl Into<String>, resource_id: impl Into<String>) -> Self {
43 Self {
44 job_id: job_id.into(),
45 resource_id: resource_id.into(),
46 scope: ExecutionGuardScope::Resource,
47 scheduled_at: None,
48 }
49 }
50}
51
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct ExecutionLease {
54 pub job_id: String,
55 pub resource_id: String,
56 pub scope: ExecutionGuardScope,
57 pub scheduled_at: Option<DateTime<Utc>>,
58 pub token: String,
59 pub lease_key: String,
60}
61
62impl ExecutionLease {
63 pub fn new(
64 job_id: impl Into<String>,
65 resource_id: impl Into<String>,
66 scope: ExecutionGuardScope,
67 scheduled_at: Option<DateTime<Utc>>,
68 token: impl Into<String>,
69 lease_key: impl Into<String>,
70 ) -> Self {
71 Self {
72 job_id: job_id.into(),
73 resource_id: resource_id.into(),
74 scope,
75 scheduled_at,
76 token: token.into(),
77 lease_key: lease_key.into(),
78 }
79 }
80}
81
82#[derive(Debug, Clone, PartialEq, Eq)]
83pub enum ExecutionGuardAcquire {
84 Acquired(ExecutionLease),
85 Contended,
86}
87
88#[derive(Debug, Clone, Copy, PartialEq, Eq)]
89pub enum ExecutionGuardRenewal {
90 Renewed,
91 Lost,
92}
93
94pub trait ExecutionGuard {
95 type Error: std::error::Error + Send + Sync + 'static;
96
97 fn acquire(
98 &self,
99 slot: ExecutionSlot,
100 ) -> impl Future<Output = Result<ExecutionGuardAcquire, Self::Error>> + Send;
101
102 fn renew(
103 &self,
104 lease: &ExecutionLease,
105 ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
106
107 fn release(
108 &self,
109 lease: &ExecutionLease,
110 ) -> impl Future<Output = Result<(), Self::Error>> + Send;
111
112 fn classify_error(_error: &Self::Error) -> ExecutionGuardErrorKind
113 where
114 Self: Sized,
115 {
116 ExecutionGuardErrorKind::Unknown
117 }
118
119 fn renew_interval(&self, _lease: &ExecutionLease) -> Option<Duration> {
120 None
121 }
122}
123
124#[derive(Debug, Clone, Copy, Default)]
125pub struct NoopExecutionGuard;
126
127impl ExecutionGuard for NoopExecutionGuard {
128 type Error = Infallible;
129
130 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
131 Ok(ExecutionGuardAcquire::Acquired(ExecutionLease::new(
132 slot.job_id,
133 slot.resource_id,
134 slot.scope,
135 slot.scheduled_at,
136 "",
137 "",
138 )))
139 }
140
141 async fn renew(&self, _lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
142 Ok(ExecutionGuardRenewal::Renewed)
143 }
144
145 async fn release(&self, _lease: &ExecutionLease) -> Result<(), Self::Error> {
146 Ok(())
147 }
148}
149
150impl<T> ExecutionGuard for Arc<T>
151where
152 T: ExecutionGuard + Send + Sync,
153{
154 type Error = T::Error;
155
156 async fn acquire(&self, slot: ExecutionSlot) -> Result<ExecutionGuardAcquire, Self::Error> {
157 self.as_ref().acquire(slot).await
158 }
159
160 async fn renew(&self, lease: &ExecutionLease) -> Result<ExecutionGuardRenewal, Self::Error> {
161 self.as_ref().renew(lease).await
162 }
163
164 async fn release(&self, lease: &ExecutionLease) -> Result<(), Self::Error> {
165 self.as_ref().release(lease).await
166 }
167
168 fn classify_error(error: &Self::Error) -> ExecutionGuardErrorKind
169 where
170 Self: Sized,
171 {
172 T::classify_error(error)
173 }
174
175 fn renew_interval(&self, lease: &ExecutionLease) -> Option<Duration> {
176 self.as_ref().renew_interval(lease)
177 }
178}