Skip to main content

scheduler/store/
resilient.rs

1use super::{InMemoryStateStore, ResilientStoreError, StateStore, StoreEvent, StoreOperation};
2use crate::error::StoreErrorKind;
3use crate::model::JobState;
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, Ordering};
6use tokio::sync::Mutex;
7
8#[derive(Debug)]
9enum ResilientMode<S> {
10    Primary(S),
11    MemoryOnly,
12}
13
14/// Wraps a primary store with an in-process mirror that takes over after
15/// connection-class failures.
16#[derive(Debug)]
17pub struct ResilientStateStore<S>
18where
19    S: StateStore,
20    S::Error: ResilientStoreError,
21{
22    mode: Mutex<ResilientMode<S>>,
23    degraded: AtomicBool,
24    mirror: InMemoryStateStore,
25    events: Mutex<VecDeque<StoreEvent>>,
26}
27
28impl<S> ResilientStateStore<S>
29where
30    S: StateStore,
31    S::Error: ResilientStoreError,
32{
33    pub fn new(store: S) -> Self {
34        Self {
35            mode: Mutex::new(ResilientMode::Primary(store)),
36            degraded: AtomicBool::new(false),
37            mirror: InMemoryStateStore::new(),
38            events: Mutex::new(VecDeque::new()),
39        }
40    }
41
42    pub fn degraded() -> Self {
43        Self {
44            mode: Mutex::new(ResilientMode::MemoryOnly),
45            degraded: AtomicBool::new(true),
46            mirror: InMemoryStateStore::new(),
47            events: Mutex::new(VecDeque::new()),
48        }
49    }
50
51    pub fn from_result(result: Result<S, S::Error>) -> Result<Self, S::Error> {
52        match result {
53            Ok(store) => Ok(Self::new(store)),
54            Err(error) if error.is_connection_issue() => Ok(Self::degraded()),
55            Err(error) => Err(error),
56        }
57    }
58
59    /// Returns true once the store has permanently fallen back to its
60    /// in-process mirror.
61    pub fn is_degraded(&self) -> bool {
62        self.degraded.load(Ordering::SeqCst)
63    }
64
65    async fn load_mirror(&self, job_id: &str) -> Result<Option<JobState>, S::Error> {
66        match self.mirror.load(job_id).await {
67            Ok(state) => Ok(state),
68            Err(never) => match never {},
69        }
70    }
71
72    async fn save_mirror(&self, state: &JobState) -> Result<(), S::Error> {
73        match self.mirror.save(state).await {
74            Ok(()) => Ok(()),
75            Err(never) => match never {},
76        }
77    }
78
79    async fn delete_mirror(&self, job_id: &str) -> Result<(), S::Error> {
80        match self.mirror.delete(job_id).await {
81            Ok(()) => Ok(()),
82            Err(never) => match never {},
83        }
84    }
85
86    async fn sync_mirror(&self, job_id: &str, state: Option<&JobState>) -> Result<(), S::Error> {
87        match state {
88            Some(state) => self.save_mirror(state).await,
89            None => self.delete_mirror(job_id).await,
90        }
91    }
92
93    async fn mark_degraded(&self, operation: StoreOperation, error: &S::Error) {
94        let was_degraded = self.degraded.swap(true, Ordering::SeqCst);
95        if !was_degraded {
96            self.events.lock().await.push_back(StoreEvent::Degraded {
97                operation,
98                error: error.to_string(),
99            });
100        }
101    }
102}
103
104impl<S> StateStore for ResilientStateStore<S>
105where
106    S: StateStore + Send,
107    S::Error: ResilientStoreError,
108{
109    type Error = S::Error;
110
111    async fn load(&self, job_id: &str) -> Result<Option<JobState>, Self::Error> {
112        let mut mode = self.mode.lock().await;
113
114        match &mut *mode {
115            ResilientMode::Primary(store) => match store.load(job_id).await {
116                Ok(state) => {
117                    drop(mode);
118                    self.sync_mirror(job_id, state.as_ref()).await?;
119                    Ok(state)
120                }
121                Err(error) if error.is_connection_issue() => {
122                    drop(mode);
123                    self.mark_degraded(StoreOperation::Load, &error).await;
124                    let mut mode = self.mode.lock().await;
125                    *mode = ResilientMode::MemoryOnly;
126                    drop(mode);
127                    self.load_mirror(job_id).await
128                }
129                Err(error) => Err(error),
130            },
131            ResilientMode::MemoryOnly => {
132                drop(mode);
133                self.load_mirror(job_id).await
134            }
135        }
136    }
137
138    async fn save(&self, state: &JobState) -> Result<(), Self::Error> {
139        let mut mode = self.mode.lock().await;
140
141        match &mut *mode {
142            ResilientMode::Primary(store) => match store.save(state).await {
143                Ok(()) => {
144                    drop(mode);
145                    self.save_mirror(state).await
146                }
147                Err(error) if error.is_connection_issue() => {
148                    drop(mode);
149                    self.mark_degraded(StoreOperation::Save, &error).await;
150                    self.save_mirror(state).await?;
151                    let mut mode = self.mode.lock().await;
152                    *mode = ResilientMode::MemoryOnly;
153                    Ok(())
154                }
155                Err(error) => Err(error),
156            },
157            ResilientMode::MemoryOnly => {
158                drop(mode);
159                self.save_mirror(state).await
160            }
161        }
162    }
163
164    async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
165        let mut mode = self.mode.lock().await;
166
167        match &mut *mode {
168            ResilientMode::Primary(store) => match store.delete(job_id).await {
169                Ok(()) => {
170                    drop(mode);
171                    self.delete_mirror(job_id).await
172                }
173                Err(error) if error.is_connection_issue() => {
174                    drop(mode);
175                    self.mark_degraded(StoreOperation::Delete, &error).await;
176                    self.delete_mirror(job_id).await?;
177                    let mut mode = self.mode.lock().await;
178                    *mode = ResilientMode::MemoryOnly;
179                    Ok(())
180                }
181                Err(error) => Err(error),
182            },
183            ResilientMode::MemoryOnly => {
184                drop(mode);
185                self.delete_mirror(job_id).await
186            }
187        }
188    }
189
190    async fn drain_events(&self) -> Result<Vec<StoreEvent>, Self::Error> {
191        let mut events = self.events.lock().await;
192        Ok(events.drain(..).collect())
193    }
194
195    fn classify_error(error: &Self::Error) -> StoreErrorKind
196    where
197        Self: Sized,
198    {
199        S::classify_error(error)
200    }
201}