1mod in_memory;
2mod resilient;
3
4use crate::error::StoreErrorKind;
5use crate::model::JobState;
6use std::convert::Infallible;
7use std::error::Error;
8use std::future::{Future, ready};
9use std::sync::Arc;
10
11pub use in_memory::InMemoryStateStore;
12pub use resilient::ResilientStateStore;
13
14pub trait StateStore {
15 type Error: std::error::Error + Send + Sync + 'static;
16
17 fn load(
18 &self,
19 job_id: &str,
20 ) -> impl Future<Output = Result<Option<JobState>, Self::Error>> + Send;
21
22 fn save(&self, state: &JobState) -> impl Future<Output = Result<(), Self::Error>> + Send;
23
24 fn delete(&self, _job_id: &str) -> impl Future<Output = Result<(), Self::Error>> + Send {
25 ready(Ok(()))
26 }
27
28 fn drain_events(&self) -> impl Future<Output = Result<Vec<StoreEvent>, Self::Error>> + Send {
29 ready(Ok(Vec::new()))
30 }
31
32 fn classify_error(_error: &Self::Error) -> StoreErrorKind
33 where
34 Self: Sized,
35 {
36 StoreErrorKind::Unknown
37 }
38}
39
40pub trait ResilientStoreError: Error + Send + Sync + 'static {
43 fn is_connection_issue(&self) -> bool;
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum StoreOperation {
48 Load,
49 Save,
50 Delete,
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
54pub enum StoreEvent {
55 Degraded {
56 operation: StoreOperation,
57 error: String,
58 },
59}
60
61impl<T> StateStore for Arc<T>
62where
63 T: StateStore + Send + Sync,
64{
65 type Error = T::Error;
66
67 async fn load(&self, job_id: &str) -> Result<Option<JobState>, Self::Error> {
68 self.as_ref().load(job_id).await
69 }
70
71 async fn save(&self, state: &JobState) -> Result<(), Self::Error> {
72 self.as_ref().save(state).await
73 }
74
75 async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
76 self.as_ref().delete(job_id).await
77 }
78
79 async fn drain_events(&self) -> Result<Vec<StoreEvent>, Self::Error> {
80 self.as_ref().drain_events().await
81 }
82
83 fn classify_error(error: &Self::Error) -> StoreErrorKind
84 where
85 Self: Sized,
86 {
87 T::classify_error(error)
88 }
89}
90
91impl ResilientStoreError for Infallible {
92 fn is_connection_issue(&self) -> bool {
93 match *self {}
94 }
95}