Skip to main content

scheduler/store/
mod.rs

1mod in_memory;
2mod resilient;
3
4use crate::error::StoreErrorKind;
5use crate::model::JobState;
6use std::convert::Infallible;
7use std::error::Error;
8use std::future::{Future, ready};
9use std::sync::Arc;
10
11pub use in_memory::InMemoryStateStore;
12pub use resilient::ResilientStateStore;
13
14pub trait StateStore {
15    type Error: std::error::Error + Send + Sync + 'static;
16
17    fn load(
18        &self,
19        job_id: &str,
20    ) -> impl Future<Output = Result<Option<JobState>, Self::Error>> + Send;
21
22    fn save(&self, state: &JobState) -> impl Future<Output = Result<(), Self::Error>> + Send;
23
24    fn delete(&self, _job_id: &str) -> impl Future<Output = Result<(), Self::Error>> + Send {
25        ready(Ok(()))
26    }
27
28    fn drain_events(&self) -> impl Future<Output = Result<Vec<StoreEvent>, Self::Error>> + Send {
29        ready(Ok(Vec::new()))
30    }
31
32    fn classify_error(_error: &Self::Error) -> StoreErrorKind
33    where
34        Self: Sized,
35    {
36        StoreErrorKind::Unknown
37    }
38}
39
40/// Classifies store errors that should trigger a one-way downgrade to the
41/// in-process mirror store.
42pub trait ResilientStoreError: Error + Send + Sync + 'static {
43    fn is_connection_issue(&self) -> bool;
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum StoreOperation {
48    Load,
49    Save,
50    Delete,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum StoreEvent {
55    Degraded {
56        operation: StoreOperation,
57        error: String,
58    },
59}
60
61impl<T> StateStore for Arc<T>
62where
63    T: StateStore + Send + Sync,
64{
65    type Error = T::Error;
66
67    async fn load(&self, job_id: &str) -> Result<Option<JobState>, Self::Error> {
68        self.as_ref().load(job_id).await
69    }
70
71    async fn save(&self, state: &JobState) -> Result<(), Self::Error> {
72        self.as_ref().save(state).await
73    }
74
75    async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
76        self.as_ref().delete(job_id).await
77    }
78
79    async fn drain_events(&self) -> Result<Vec<StoreEvent>, Self::Error> {
80        self.as_ref().drain_events().await
81    }
82
83    fn classify_error(error: &Self::Error) -> StoreErrorKind
84    where
85        Self: Sized,
86    {
87        T::classify_error(error)
88    }
89}
90
91impl ResilientStoreError for Infallible {
92    fn is_connection_issue(&self) -> bool {
93        match *self {}
94    }
95}