Skip to main content

scheduler/
coordinated_store.rs

1use crate::error::{ExecutionGuardErrorKind, StoreErrorKind};
2use crate::execution_guard::{ExecutionGuardRenewal, ExecutionLease};
3use crate::model::JobState;
4use chrono::{DateTime, Utc};
5use std::convert::Infallible;
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct CoordinatedLeaseConfig {
12    pub ttl: Duration,
13    pub renew_interval: Duration,
14}
15
16impl CoordinatedLeaseConfig {
17    pub fn validate(self) -> Result<Self, &'static str> {
18        if self.ttl.is_zero() {
19            return Err("coordinated lease ttl must be greater than zero");
20        }
21        if self.renew_interval.is_zero() {
22            return Err("coordinated lease renew_interval must be greater than zero");
23        }
24        if self.renew_interval >= self.ttl {
25            return Err("coordinated lease renew_interval must be less than ttl");
26        }
27        Ok(self)
28    }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct CoordinatedRuntimeState {
33    pub state: JobState,
34    pub revision: u64,
35    pub paused: bool,
36}
37
38#[derive(Debug, Clone, PartialEq, Eq)]
39pub struct CoordinatedPendingTrigger {
40    pub scheduled_at: DateTime<Utc>,
41    pub catch_up: bool,
42    pub trigger_count: u32,
43}
44
45#[derive(Debug, Clone, PartialEq, Eq)]
46pub struct CoordinatedClaim {
47    pub state: CoordinatedRuntimeState,
48    pub trigger: CoordinatedPendingTrigger,
49    pub lease: ExecutionLease,
50    pub replayed: bool,
51}
52
53pub trait CoordinatedStateStore {
54    type Error: std::error::Error + Send + Sync + 'static;
55
56    fn load_or_initialize(
57        &self,
58        job_id: &str,
59        initial_state: JobState,
60    ) -> impl Future<Output = Result<CoordinatedRuntimeState, Self::Error>> + Send;
61
62    fn save_state(
63        &self,
64        job_id: &str,
65        revision: u64,
66        state: &JobState,
67    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
68
69    fn reclaim_inflight(
70        &self,
71        job_id: &str,
72        resource_id: &str,
73        lease_config: CoordinatedLeaseConfig,
74    ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
75
76    fn claim_trigger(
77        &self,
78        job_id: &str,
79        resource_id: &str,
80        revision: u64,
81        trigger: CoordinatedPendingTrigger,
82        next_state: &JobState,
83        lease_config: CoordinatedLeaseConfig,
84    ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
85
86    fn renew(
87        &self,
88        lease: &ExecutionLease,
89        lease_config: CoordinatedLeaseConfig,
90    ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
91
92    fn complete(
93        &self,
94        job_id: &str,
95        revision: u64,
96        lease: &ExecutionLease,
97        state: &JobState,
98    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
99
100    fn delete(&self, job_id: &str) -> impl Future<Output = Result<(), Self::Error>> + Send;
101
102    fn pause(&self, job_id: &str) -> impl Future<Output = Result<bool, Self::Error>> + Send;
103
104    fn resume(&self, job_id: &str) -> impl Future<Output = Result<bool, Self::Error>> + Send;
105
106    fn classify_store_error(_error: &Self::Error) -> StoreErrorKind
107    where
108        Self: Sized,
109    {
110        StoreErrorKind::Unknown
111    }
112
113    fn classify_guard_error(_error: &Self::Error) -> ExecutionGuardErrorKind
114    where
115        Self: Sized,
116    {
117        ExecutionGuardErrorKind::Unknown
118    }
119}
120
121#[derive(Debug, Clone, Copy, Default)]
122pub struct NoopCoordinatedStateStore;
123
124impl CoordinatedStateStore for NoopCoordinatedStateStore {
125    type Error = Infallible;
126
127    async fn load_or_initialize(
128        &self,
129        _job_id: &str,
130        initial_state: JobState,
131    ) -> Result<CoordinatedRuntimeState, Self::Error> {
132        Ok(CoordinatedRuntimeState {
133            state: initial_state,
134            revision: 0,
135            paused: false,
136        })
137    }
138
139    async fn save_state(
140        &self,
141        _job_id: &str,
142        _revision: u64,
143        _state: &JobState,
144    ) -> Result<bool, Self::Error> {
145        Ok(true)
146    }
147
148    async fn reclaim_inflight(
149        &self,
150        _job_id: &str,
151        _resource_id: &str,
152        _lease_config: CoordinatedLeaseConfig,
153    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
154        Ok(None)
155    }
156
157    async fn claim_trigger(
158        &self,
159        _job_id: &str,
160        _resource_id: &str,
161        _revision: u64,
162        _trigger: CoordinatedPendingTrigger,
163        _next_state: &JobState,
164        _lease_config: CoordinatedLeaseConfig,
165    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
166        Ok(None)
167    }
168
169    async fn renew(
170        &self,
171        _lease: &ExecutionLease,
172        _lease_config: CoordinatedLeaseConfig,
173    ) -> Result<ExecutionGuardRenewal, Self::Error> {
174        Ok(ExecutionGuardRenewal::Lost)
175    }
176
177    async fn complete(
178        &self,
179        _job_id: &str,
180        _revision: u64,
181        _lease: &ExecutionLease,
182        _state: &JobState,
183    ) -> Result<bool, Self::Error> {
184        Ok(true)
185    }
186
187    async fn delete(&self, _job_id: &str) -> Result<(), Self::Error> {
188        Ok(())
189    }
190
191    async fn pause(&self, _job_id: &str) -> Result<bool, Self::Error> {
192        Ok(false)
193    }
194
195    async fn resume(&self, _job_id: &str) -> Result<bool, Self::Error> {
196        Ok(false)
197    }
198}
199
200impl<T> CoordinatedStateStore for Arc<T>
201where
202    T: CoordinatedStateStore + Send + Sync,
203{
204    type Error = T::Error;
205
206    async fn load_or_initialize(
207        &self,
208        job_id: &str,
209        initial_state: JobState,
210    ) -> Result<CoordinatedRuntimeState, Self::Error> {
211        self.as_ref()
212            .load_or_initialize(job_id, initial_state)
213            .await
214    }
215
216    async fn save_state(
217        &self,
218        job_id: &str,
219        revision: u64,
220        state: &JobState,
221    ) -> Result<bool, Self::Error> {
222        self.as_ref().save_state(job_id, revision, state).await
223    }
224
225    async fn reclaim_inflight(
226        &self,
227        job_id: &str,
228        resource_id: &str,
229        lease_config: CoordinatedLeaseConfig,
230    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
231        self.as_ref()
232            .reclaim_inflight(job_id, resource_id, lease_config)
233            .await
234    }
235
236    async fn claim_trigger(
237        &self,
238        job_id: &str,
239        resource_id: &str,
240        revision: u64,
241        trigger: CoordinatedPendingTrigger,
242        next_state: &JobState,
243        lease_config: CoordinatedLeaseConfig,
244    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
245        self.as_ref()
246            .claim_trigger(
247                job_id,
248                resource_id,
249                revision,
250                trigger,
251                next_state,
252                lease_config,
253            )
254            .await
255    }
256
257    async fn renew(
258        &self,
259        lease: &ExecutionLease,
260        lease_config: CoordinatedLeaseConfig,
261    ) -> Result<ExecutionGuardRenewal, Self::Error> {
262        self.as_ref().renew(lease, lease_config).await
263    }
264
265    async fn complete(
266        &self,
267        job_id: &str,
268        revision: u64,
269        lease: &ExecutionLease,
270        state: &JobState,
271    ) -> Result<bool, Self::Error> {
272        self.as_ref().complete(job_id, revision, lease, state).await
273    }
274
275    async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
276        self.as_ref().delete(job_id).await
277    }
278
279    async fn pause(&self, job_id: &str) -> Result<bool, Self::Error> {
280        self.as_ref().pause(job_id).await
281    }
282
283    async fn resume(&self, job_id: &str) -> Result<bool, Self::Error> {
284        self.as_ref().resume(job_id).await
285    }
286
287    fn classify_store_error(error: &Self::Error) -> StoreErrorKind
288    where
289        Self: Sized,
290    {
291        T::classify_store_error(error)
292    }
293
294    fn classify_guard_error(error: &Self::Error) -> ExecutionGuardErrorKind
295    where
296        Self: Sized,
297    {
298        T::classify_guard_error(error)
299    }
300}