1use crate::error::{ExecutionGuardErrorKind, StoreErrorKind};
2use crate::execution_guard::{ExecutionGuardRenewal, ExecutionLease};
3use crate::model::JobState;
4use chrono::{DateTime, Utc};
5use std::convert::Infallible;
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct CoordinatedLeaseConfig {
12 pub ttl: Duration,
13 pub renew_interval: Duration,
14}
15
16impl CoordinatedLeaseConfig {
17 pub fn validate(self) -> Result<Self, &'static str> {
18 if self.ttl.is_zero() {
19 return Err("coordinated lease ttl must be greater than zero");
20 }
21 if self.renew_interval.is_zero() {
22 return Err("coordinated lease renew_interval must be greater than zero");
23 }
24 if self.renew_interval >= self.ttl {
25 return Err("coordinated lease renew_interval must be less than ttl");
26 }
27 Ok(self)
28 }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct CoordinatedRuntimeState {
33 pub state: JobState,
34 pub revision: u64,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct CoordinatedPendingTrigger {
39 pub scheduled_at: DateTime<Utc>,
40 pub catch_up: bool,
41 pub trigger_count: u32,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct CoordinatedClaim {
46 pub state: CoordinatedRuntimeState,
47 pub trigger: CoordinatedPendingTrigger,
48 pub lease: ExecutionLease,
49 pub replayed: bool,
50}
51
52pub trait CoordinatedStateStore {
53 type Error: std::error::Error + Send + Sync + 'static;
54
55 fn load_or_initialize(
56 &self,
57 job_id: &str,
58 initial_state: JobState,
59 ) -> impl Future<Output = Result<CoordinatedRuntimeState, Self::Error>> + Send;
60
61 fn save_state(
62 &self,
63 job_id: &str,
64 revision: u64,
65 state: &JobState,
66 ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
67
68 fn reclaim_inflight(
69 &self,
70 job_id: &str,
71 resource_id: &str,
72 lease_config: CoordinatedLeaseConfig,
73 ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
74
75 fn claim_trigger(
76 &self,
77 job_id: &str,
78 resource_id: &str,
79 revision: u64,
80 trigger: CoordinatedPendingTrigger,
81 next_state: &JobState,
82 lease_config: CoordinatedLeaseConfig,
83 ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
84
85 fn renew(
86 &self,
87 lease: &ExecutionLease,
88 lease_config: CoordinatedLeaseConfig,
89 ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
90
91 fn complete(
92 &self,
93 job_id: &str,
94 revision: u64,
95 lease: &ExecutionLease,
96 state: &JobState,
97 ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
98
99 fn delete(&self, job_id: &str) -> impl Future<Output = Result<(), Self::Error>> + Send;
100
101 fn classify_store_error(_error: &Self::Error) -> StoreErrorKind
102 where
103 Self: Sized,
104 {
105 StoreErrorKind::Unknown
106 }
107
108 fn classify_guard_error(_error: &Self::Error) -> ExecutionGuardErrorKind
109 where
110 Self: Sized,
111 {
112 ExecutionGuardErrorKind::Unknown
113 }
114}
115
116#[derive(Debug, Clone, Copy, Default)]
117pub struct NoopCoordinatedStateStore;
118
119impl CoordinatedStateStore for NoopCoordinatedStateStore {
120 type Error = Infallible;
121
122 async fn load_or_initialize(
123 &self,
124 _job_id: &str,
125 initial_state: JobState,
126 ) -> Result<CoordinatedRuntimeState, Self::Error> {
127 Ok(CoordinatedRuntimeState {
128 state: initial_state,
129 revision: 0,
130 })
131 }
132
133 async fn save_state(
134 &self,
135 _job_id: &str,
136 _revision: u64,
137 _state: &JobState,
138 ) -> Result<bool, Self::Error> {
139 Ok(true)
140 }
141
142 async fn reclaim_inflight(
143 &self,
144 _job_id: &str,
145 _resource_id: &str,
146 _lease_config: CoordinatedLeaseConfig,
147 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
148 Ok(None)
149 }
150
151 async fn claim_trigger(
152 &self,
153 _job_id: &str,
154 _resource_id: &str,
155 _revision: u64,
156 _trigger: CoordinatedPendingTrigger,
157 _next_state: &JobState,
158 _lease_config: CoordinatedLeaseConfig,
159 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
160 Ok(None)
161 }
162
163 async fn renew(
164 &self,
165 _lease: &ExecutionLease,
166 _lease_config: CoordinatedLeaseConfig,
167 ) -> Result<ExecutionGuardRenewal, Self::Error> {
168 Ok(ExecutionGuardRenewal::Lost)
169 }
170
171 async fn complete(
172 &self,
173 _job_id: &str,
174 _revision: u64,
175 _lease: &ExecutionLease,
176 _state: &JobState,
177 ) -> Result<bool, Self::Error> {
178 Ok(true)
179 }
180
181 async fn delete(&self, _job_id: &str) -> Result<(), Self::Error> {
182 Ok(())
183 }
184}
185
186impl<T> CoordinatedStateStore for Arc<T>
187where
188 T: CoordinatedStateStore + Send + Sync,
189{
190 type Error = T::Error;
191
192 async fn load_or_initialize(
193 &self,
194 job_id: &str,
195 initial_state: JobState,
196 ) -> Result<CoordinatedRuntimeState, Self::Error> {
197 self.as_ref()
198 .load_or_initialize(job_id, initial_state)
199 .await
200 }
201
202 async fn save_state(
203 &self,
204 job_id: &str,
205 revision: u64,
206 state: &JobState,
207 ) -> Result<bool, Self::Error> {
208 self.as_ref().save_state(job_id, revision, state).await
209 }
210
211 async fn reclaim_inflight(
212 &self,
213 job_id: &str,
214 resource_id: &str,
215 lease_config: CoordinatedLeaseConfig,
216 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
217 self.as_ref()
218 .reclaim_inflight(job_id, resource_id, lease_config)
219 .await
220 }
221
222 async fn claim_trigger(
223 &self,
224 job_id: &str,
225 resource_id: &str,
226 revision: u64,
227 trigger: CoordinatedPendingTrigger,
228 next_state: &JobState,
229 lease_config: CoordinatedLeaseConfig,
230 ) -> Result<Option<CoordinatedClaim>, Self::Error> {
231 self.as_ref()
232 .claim_trigger(
233 job_id,
234 resource_id,
235 revision,
236 trigger,
237 next_state,
238 lease_config,
239 )
240 .await
241 }
242
243 async fn renew(
244 &self,
245 lease: &ExecutionLease,
246 lease_config: CoordinatedLeaseConfig,
247 ) -> Result<ExecutionGuardRenewal, Self::Error> {
248 self.as_ref().renew(lease, lease_config).await
249 }
250
251 async fn complete(
252 &self,
253 job_id: &str,
254 revision: u64,
255 lease: &ExecutionLease,
256 state: &JobState,
257 ) -> Result<bool, Self::Error> {
258 self.as_ref().complete(job_id, revision, lease, state).await
259 }
260
261 async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
262 self.as_ref().delete(job_id).await
263 }
264
265 fn classify_store_error(error: &Self::Error) -> StoreErrorKind
266 where
267 Self: Sized,
268 {
269 T::classify_store_error(error)
270 }
271
272 fn classify_guard_error(error: &Self::Error) -> ExecutionGuardErrorKind
273 where
274 Self: Sized,
275 {
276 T::classify_guard_error(error)
277 }
278}