Skip to main content

scheduler/
coordinated_store.rs

1use crate::error::{ExecutionGuardErrorKind, StoreErrorKind};
2use crate::execution_guard::{ExecutionGuardRenewal, ExecutionLease};
3use crate::model::JobState;
4use chrono::{DateTime, Utc};
5use std::convert::Infallible;
6use std::future::Future;
7use std::sync::Arc;
8use std::time::Duration;
9
10#[derive(Debug, Clone, Copy, PartialEq, Eq)]
11pub struct CoordinatedLeaseConfig {
12    pub ttl: Duration,
13    pub renew_interval: Duration,
14}
15
16impl CoordinatedLeaseConfig {
17    pub fn validate(self) -> Result<Self, &'static str> {
18        if self.ttl.is_zero() {
19            return Err("coordinated lease ttl must be greater than zero");
20        }
21        if self.renew_interval.is_zero() {
22            return Err("coordinated lease renew_interval must be greater than zero");
23        }
24        if self.renew_interval >= self.ttl {
25            return Err("coordinated lease renew_interval must be less than ttl");
26        }
27        Ok(self)
28    }
29}
30
31#[derive(Debug, Clone, PartialEq, Eq)]
32pub struct CoordinatedRuntimeState {
33    pub state: JobState,
34    pub revision: u64,
35}
36
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub struct CoordinatedPendingTrigger {
39    pub scheduled_at: DateTime<Utc>,
40    pub catch_up: bool,
41    pub trigger_count: u32,
42}
43
44#[derive(Debug, Clone, PartialEq, Eq)]
45pub struct CoordinatedClaim {
46    pub state: CoordinatedRuntimeState,
47    pub trigger: CoordinatedPendingTrigger,
48    pub lease: ExecutionLease,
49    pub replayed: bool,
50}
51
52pub trait CoordinatedStateStore {
53    type Error: std::error::Error + Send + Sync + 'static;
54
55    fn load_or_initialize(
56        &self,
57        job_id: &str,
58        initial_state: JobState,
59    ) -> impl Future<Output = Result<CoordinatedRuntimeState, Self::Error>> + Send;
60
61    fn save_state(
62        &self,
63        job_id: &str,
64        revision: u64,
65        state: &JobState,
66    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
67
68    fn reclaim_inflight(
69        &self,
70        job_id: &str,
71        resource_id: &str,
72        lease_config: CoordinatedLeaseConfig,
73    ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
74
75    fn claim_trigger(
76        &self,
77        job_id: &str,
78        resource_id: &str,
79        revision: u64,
80        trigger: CoordinatedPendingTrigger,
81        next_state: &JobState,
82        lease_config: CoordinatedLeaseConfig,
83    ) -> impl Future<Output = Result<Option<CoordinatedClaim>, Self::Error>> + Send;
84
85    fn renew(
86        &self,
87        lease: &ExecutionLease,
88        lease_config: CoordinatedLeaseConfig,
89    ) -> impl Future<Output = Result<ExecutionGuardRenewal, Self::Error>> + Send;
90
91    fn complete(
92        &self,
93        job_id: &str,
94        revision: u64,
95        lease: &ExecutionLease,
96        state: &JobState,
97    ) -> impl Future<Output = Result<bool, Self::Error>> + Send;
98
99    fn delete(&self, job_id: &str) -> impl Future<Output = Result<(), Self::Error>> + Send;
100
101    fn classify_store_error(_error: &Self::Error) -> StoreErrorKind
102    where
103        Self: Sized,
104    {
105        StoreErrorKind::Unknown
106    }
107
108    fn classify_guard_error(_error: &Self::Error) -> ExecutionGuardErrorKind
109    where
110        Self: Sized,
111    {
112        ExecutionGuardErrorKind::Unknown
113    }
114}
115
116#[derive(Debug, Clone, Copy, Default)]
117pub struct NoopCoordinatedStateStore;
118
119impl CoordinatedStateStore for NoopCoordinatedStateStore {
120    type Error = Infallible;
121
122    async fn load_or_initialize(
123        &self,
124        _job_id: &str,
125        initial_state: JobState,
126    ) -> Result<CoordinatedRuntimeState, Self::Error> {
127        Ok(CoordinatedRuntimeState {
128            state: initial_state,
129            revision: 0,
130        })
131    }
132
133    async fn save_state(
134        &self,
135        _job_id: &str,
136        _revision: u64,
137        _state: &JobState,
138    ) -> Result<bool, Self::Error> {
139        Ok(true)
140    }
141
142    async fn reclaim_inflight(
143        &self,
144        _job_id: &str,
145        _resource_id: &str,
146        _lease_config: CoordinatedLeaseConfig,
147    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
148        Ok(None)
149    }
150
151    async fn claim_trigger(
152        &self,
153        _job_id: &str,
154        _resource_id: &str,
155        _revision: u64,
156        _trigger: CoordinatedPendingTrigger,
157        _next_state: &JobState,
158        _lease_config: CoordinatedLeaseConfig,
159    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
160        Ok(None)
161    }
162
163    async fn renew(
164        &self,
165        _lease: &ExecutionLease,
166        _lease_config: CoordinatedLeaseConfig,
167    ) -> Result<ExecutionGuardRenewal, Self::Error> {
168        Ok(ExecutionGuardRenewal::Lost)
169    }
170
171    async fn complete(
172        &self,
173        _job_id: &str,
174        _revision: u64,
175        _lease: &ExecutionLease,
176        _state: &JobState,
177    ) -> Result<bool, Self::Error> {
178        Ok(true)
179    }
180
181    async fn delete(&self, _job_id: &str) -> Result<(), Self::Error> {
182        Ok(())
183    }
184}
185
186impl<T> CoordinatedStateStore for Arc<T>
187where
188    T: CoordinatedStateStore + Send + Sync,
189{
190    type Error = T::Error;
191
192    async fn load_or_initialize(
193        &self,
194        job_id: &str,
195        initial_state: JobState,
196    ) -> Result<CoordinatedRuntimeState, Self::Error> {
197        self.as_ref()
198            .load_or_initialize(job_id, initial_state)
199            .await
200    }
201
202    async fn save_state(
203        &self,
204        job_id: &str,
205        revision: u64,
206        state: &JobState,
207    ) -> Result<bool, Self::Error> {
208        self.as_ref().save_state(job_id, revision, state).await
209    }
210
211    async fn reclaim_inflight(
212        &self,
213        job_id: &str,
214        resource_id: &str,
215        lease_config: CoordinatedLeaseConfig,
216    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
217        self.as_ref()
218            .reclaim_inflight(job_id, resource_id, lease_config)
219            .await
220    }
221
222    async fn claim_trigger(
223        &self,
224        job_id: &str,
225        resource_id: &str,
226        revision: u64,
227        trigger: CoordinatedPendingTrigger,
228        next_state: &JobState,
229        lease_config: CoordinatedLeaseConfig,
230    ) -> Result<Option<CoordinatedClaim>, Self::Error> {
231        self.as_ref()
232            .claim_trigger(
233                job_id,
234                resource_id,
235                revision,
236                trigger,
237                next_state,
238                lease_config,
239            )
240            .await
241    }
242
243    async fn renew(
244        &self,
245        lease: &ExecutionLease,
246        lease_config: CoordinatedLeaseConfig,
247    ) -> Result<ExecutionGuardRenewal, Self::Error> {
248        self.as_ref().renew(lease, lease_config).await
249    }
250
251    async fn complete(
252        &self,
253        job_id: &str,
254        revision: u64,
255        lease: &ExecutionLease,
256        state: &JobState,
257    ) -> Result<bool, Self::Error> {
258        self.as_ref().complete(job_id, revision, lease, state).await
259    }
260
261    async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
262        self.as_ref().delete(job_id).await
263    }
264
265    fn classify_store_error(error: &Self::Error) -> StoreErrorKind
266    where
267        Self: Sized,
268    {
269        T::classify_store_error(error)
270    }
271
272    fn classify_guard_error(error: &Self::Error) -> ExecutionGuardErrorKind
273    where
274        Self: Sized,
275    {
276        T::classify_guard_error(error)
277    }
278}