1use crate::error::{ExecutionGuardErrorKind, StoreErrorKind};
2use crate::execution_guard::{ExecutionGuardRenewal, ExecutionLease};
3use crate::model::JobState;
4use chrono::{DateTime, Utc};
5use std::convert::Infallible;
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct CoordinatedLeaseConfig {
12 pub ttl: Duration,
13 pub renew_interval: Duration,
14}
15
16impl CoordinatedLeaseConfig {
17 pub fn validate(self) -> Result<Self, &'static str> {
18 if self.ttl.is_zero() {
19 return Err("coordinated lease ttl must be greater than zero");
20 }
21 if self.renew_interval.is_zero() {
22 return Err("coordinated lease renew_interval must be greater than zero");
23 }
24 if self.renew_interval >= self.ttl {
25 return Err("coordinated lease renew_interval must be less than ttl");
26 }
27 Ok(self)
28 }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct CoordinatedRuntimeState {
33 pub state: JobState,
34 pub revision: u64,
35 pub paused: bool,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct CoordinatedPendingTrigger {
40 pub scheduled_at: DateTime<Utc>,
41 pub catch_up: bool,
42 pub trigger_count: u32,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct CoordinatedClaim {
47 pub state: CoordinatedRuntimeState,
48 pub trigger: CoordinatedPendingTrigger,
49 pub lease: ExecutionLease,
50 pub replayed: bool,
51}
52
53pub trait CoordinatedStateStore {
54 type Error: std::error::Error + Send + Sync + 'static;
55
56 fn load_or_initialize(
57 &self,
58 job_id: &str,
59 initial_state: JobState,
60 ) -> impl Future<Output = Result<CoordinatedRuntimeState, Self::Error>> + Send;
61
62 fn save_state(
63 &self,
64 job_id: &str,
65 revision: u64,
66 state: &JobState,
67 ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
68
69 fn reclaim_inflight(
70 &self,
71 job_id: &str,
72 resource_id: &str,
73 lease_config: CoordinatedLeaseConfig,
74 ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
75
76 fn claim_trigger(
77 &self,
78 job_id: &str,
79 resource_id: &str,
80 revision: u64,
81 trigger: CoordinatedPendingTrigger,
82 next_state: &JobState,
83 lease_config: CoordinatedLeaseConfig,
84 ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
85
86 fn renew(
87 &self,
88 lease: &ExecutionLease,
89 lease_config: CoordinatedLeaseConfig,
90 ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
91
92 fn complete(
93 &self,
94 job_id: &str,
95 revision: u64,
96 lease: &ExecutionLease,
97 state: &JobState,
98 ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
99
100 fn delete(&self, job_id: &str) -> impl Future<Output = Result<(), Self::Error>> + Send;
101
102 fn pause(&self, job_id: &str) -> impl Future<Output = Result<bool, Self::Error>> + Send;
103
104 fn resume(&self, job_id: &str) -> impl Future<Output = Result<bool, Self::Error>> + Send;
105
106 fn classify_store_error(_error: &Self::Error) -> StoreErrorKind
107 where
108 Self: Sized,
109 {
110 StoreErrorKind::Unknown
111 }
112
113 fn classify_guard_error(_error: &Self::Error) -> ExecutionGuardErrorKind
114 where
115 Self: Sized,
116 {
117 ExecutionGuardErrorKind::Unknown
118 }
119}
120
121#[derive(Debug, Clone, Copy, Default)]
122pub struct NoopCoordinatedStateStore;
123
124impl CoordinatedStateStore for NoopCoordinatedStateStore {
125 type Error = Infallible;
126
127 async fn load_or_initialize(
128 &self,
129 _job_id: &str,
130 initial_state: JobState,
131 ) -> Result<CoordinatedRuntimeState, Self::Error> {
132 Ok(CoordinatedRuntimeState {
133 state: initial_state,
134 revision: 0,
135 paused: false,
136 })
137 }
138
139 async fn save_state(
140 &self,
141 _job_id: &str,
142 _revision: u64,
143 _state: &JobState,
144 ) -> Result<bool, Self::Error> {
145 Ok(true)
146 }
147
148 async fn reclaim_inflight(
149 &self,
150 _job_id: &str,
151 _resource_id: &str,
152 _lease_config: CoordinatedLeaseConfig,
153 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
154 Ok(None)
155 }
156
157 async fn claim_trigger(
158 &self,
159 _job_id: &str,
160 _resource_id: &str,
161 _revision: u64,
162 _trigger: CoordinatedPendingTrigger,
163 _next_state: &JobState,
164 _lease_config: CoordinatedLeaseConfig,
165 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
166 Ok(None)
167 }
168
169 async fn renew(
170 &self,
171 _lease: &ExecutionLease,
172 _lease_config: CoordinatedLeaseConfig,
173 ) -> Result<ExecutionGuardRenewal, Self::Error> {
174 Ok(ExecutionGuardRenewal::Lost)
175 }
176
177 async fn complete(
178 &self,
179 _job_id: &str,
180 _revision: u64,
181 _lease: &ExecutionLease,
182 _state: &JobState,
183 ) -> Result<bool, Self::Error> {
184 Ok(true)
185 }
186
187 async fn delete(&self, _job_id: &str) -> Result<(), Self::Error> {
188 Ok(())
189 }
190
191 async fn pause(&self, _job_id: &str) -> Result<bool, Self::Error> {
192 Ok(false)
193 }
194
195 async fn resume(&self, _job_id: &str) -> Result<bool, Self::Error> {
196 Ok(false)
197 }
198}
199
200impl<T> CoordinatedStateStore for Arc<T>
201where
202 T: CoordinatedStateStore + Send + Sync,
203{
204 type Error = T::Error;
205
206 async fn load_or_initialize(
207 &self,
208 job_id: &str,
209 initial_state: JobState,
210 ) -> Result<CoordinatedRuntimeState, Self::Error> {
211 self.as_ref()
212 .load_or_initialize(job_id, initial_state)
213 .await
214 }
215
216 async fn save_state(
217 &self,
218 job_id: &str,
219 revision: u64,
220 state: &JobState,
221 ) -> Result<bool, Self::Error> {
222 self.as_ref().save_state(job_id, revision, state).await
223 }
224
225 async fn reclaim_inflight(
226 &self,
227 job_id: &str,
228 resource_id: &str,
229 lease_config: CoordinatedLeaseConfig,
230 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
231 self.as_ref()
232 .reclaim_inflight(job_id, resource_id, lease_config)
233 .await
234 }
235
236 async fn claim_trigger(
237 &self,
238 job_id: &str,
239 resource_id: &str,
240 revision: u64,
241 trigger: CoordinatedPendingTrigger,
242 next_state: &JobState,
243 lease_config: CoordinatedLeaseConfig,
244 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
245 self.as_ref()
246 .claim_trigger(
247 job_id,
248 resource_id,
249 revision,
250 trigger,
251 next_state,
252 lease_config,
253 )
254 .await
255 }
256
257 async fn renew(
258 &self,
259 lease: &ExecutionLease,
260 lease_config: CoordinatedLeaseConfig,
261 ) -> Result<ExecutionGuardRenewal, Self::Error> {
262 self.as_ref().renew(lease, lease_config).await
263 }
264
265 async fn complete(
266 &self,
267 job_id: &str,
268 revision: u64,
269 lease: &ExecutionLease,
270 state: &JobState,
271 ) -> Result<bool, Self::Error> {
272 self.as_ref().complete(job_id, revision, lease, state).await
273 }
274
275 async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
276 self.as_ref().delete(job_id).await
277 }
278
279 async fn pause(&self, job_id: &str) -> Result<bool, Self::Error> {
280 self.as_ref().pause(job_id).await
281 }
282
283 async fn resume(&self, job_id: &str) -> Result<bool, Self::Error> {
284 self.as_ref().resume(job_id).await
285 }
286
287 fn classify_store_error(error: &Self::Error) -> StoreErrorKind
288 where
289 Self: Sized,
290 {
291 T::classify_store_error(error)
292 }
293
294 fn classify_guard_error(error: &Self::Error) -> ExecutionGuardErrorKind
295 where
296 Self: Sized,
297 {
298 T::classify_guard_error(error)
299 }
300}