scheduler/store/
resilient.rs1use super::{InMemoryStateStore, ResilientStoreError, StateStore, StoreEvent, StoreOperation};
2use crate::error::StoreErrorKind;
3use crate::model::JobState;
4use std::collections::VecDeque;
5use std::sync::atomic::{AtomicBool, Ordering};
6use tokio::sync::Mutex;
7
8#[derive(Debug)]
9enum ResilientMode<S> {
10 Primary(S),
11 MemoryOnly,
12}
13
14#[derive(Debug)]
17pub struct ResilientStateStore<S>
18where
19 S: StateStore,
20 S::Error: ResilientStoreError,
21{
22 mode: Mutex<ResilientMode<S>>,
23 degraded: AtomicBool,
24 mirror: InMemoryStateStore,
25 events: Mutex<VecDeque<StoreEvent>>,
26}
27
28impl<S> ResilientStateStore<S>
29where
30 S: StateStore,
31 S::Error: ResilientStoreError,
32{
33 pub fn new(store: S) -> Self {
34 Self {
35 mode: Mutex::new(ResilientMode::Primary(store)),
36 degraded: AtomicBool::new(false),
37 mirror: InMemoryStateStore::new(),
38 events: Mutex::new(VecDeque::new()),
39 }
40 }
41
42 pub fn degraded() -> Self {
43 Self {
44 mode: Mutex::new(ResilientMode::MemoryOnly),
45 degraded: AtomicBool::new(true),
46 mirror: InMemoryStateStore::new(),
47 events: Mutex::new(VecDeque::new()),
48 }
49 }
50
51 pub fn from_result(result: Result<S, S::Error>) -> Result<Self, S::Error> {
52 match result {
53 Ok(store) => Ok(Self::new(store)),
54 Err(error) if error.is_connection_issue() => Ok(Self::degraded()),
55 Err(error) => Err(error),
56 }
57 }
58
59 pub fn is_degraded(&self) -> bool {
62 self.degraded.load(Ordering::SeqCst)
63 }
64
65 async fn load_mirror(&self, job_id: &str) -> Result<Option<JobState>, S::Error> {
66 match self.mirror.load(job_id).await {
67 Ok(state) => Ok(state),
68 Err(never) => match never {},
69 }
70 }
71
72 async fn save_mirror(&self, state: &JobState) -> Result<(), S::Error> {
73 match self.mirror.save(state).await {
74 Ok(()) => Ok(()),
75 Err(never) => match never {},
76 }
77 }
78
79 async fn delete_mirror(&self, job_id: &str) -> Result<(), S::Error> {
80 match self.mirror.delete(job_id).await {
81 Ok(()) => Ok(()),
82 Err(never) => match never {},
83 }
84 }
85
86 async fn sync_mirror(&self, job_id: &str, state: Option<&JobState>) -> Result<(), S::Error> {
87 match state {
88 Some(state) => self.save_mirror(state).await,
89 None => self.delete_mirror(job_id).await,
90 }
91 }
92
93 async fn mark_degraded(&self, operation: StoreOperation, error: &S::Error) {
94 let was_degraded = self.degraded.swap(true, Ordering::SeqCst);
95 if !was_degraded {
96 self.events.lock().await.push_back(StoreEvent::Degraded {
97 operation,
98 error: error.to_string(),
99 });
100 }
101 }
102}
103
104impl<S> StateStore for ResilientStateStore<S>
105where
106 S: StateStore + Send,
107 S::Error: ResilientStoreError,
108{
109 type Error = S::Error;
110
111 async fn load(&self, job_id: &str) -> Result<Option<JobState>, Self::Error> {
112 let mut mode = self.mode.lock().await;
113
114 match &mut *mode {
115 ResilientMode::Primary(store) => match store.load(job_id).await {
116 Ok(state) => {
117 drop(mode);
118 self.sync_mirror(job_id, state.as_ref()).await?;
119 Ok(state)
120 }
121 Err(error) if error.is_connection_issue() => {
122 drop(mode);
123 self.mark_degraded(StoreOperation::Load, &error).await;
124 let mut mode = self.mode.lock().await;
125 *mode = ResilientMode::MemoryOnly;
126 drop(mode);
127 self.load_mirror(job_id).await
128 }
129 Err(error) => Err(error),
130 },
131 ResilientMode::MemoryOnly => {
132 drop(mode);
133 self.load_mirror(job_id).await
134 }
135 }
136 }
137
138 async fn save(&self, state: &JobState) -> Result<(), Self::Error> {
139 let mut mode = self.mode.lock().await;
140
141 match &mut *mode {
142 ResilientMode::Primary(store) => match store.save(state).await {
143 Ok(()) => {
144 drop(mode);
145 self.save_mirror(state).await
146 }
147 Err(error) if error.is_connection_issue() => {
148 drop(mode);
149 self.mark_degraded(StoreOperation::Save, &error).await;
150 self.save_mirror(state).await?;
151 let mut mode = self.mode.lock().await;
152 *mode = ResilientMode::MemoryOnly;
153 Ok(())
154 }
155 Err(error) => Err(error),
156 },
157 ResilientMode::MemoryOnly => {
158 drop(mode);
159 self.save_mirror(state).await
160 }
161 }
162 }
163
164 async fn delete(&self, job_id: &str) -> Result<(), Self::Error> {
165 let mut mode = self.mode.lock().await;
166
167 match &mut *mode {
168 ResilientMode::Primary(store) => match store.delete(job_id).await {
169 Ok(()) => {
170 drop(mode);
171 self.delete_mirror(job_id).await
172 }
173 Err(error) if error.is_connection_issue() => {
174 drop(mode);
175 self.mark_degraded(StoreOperation::Delete, &error).await;
176 self.delete_mirror(job_id).await?;
177 let mut mode = self.mode.lock().await;
178 *mode = ResilientMode::MemoryOnly;
179 Ok(())
180 }
181 Err(error) => Err(error),
182 },
183 ResilientMode::MemoryOnly => {
184 drop(mode);
185 self.delete_mirror(job_id).await
186 }
187 }
188 }
189
190 async fn drain_events(&self) -> Result<Vec<StoreEvent>, Self::Error> {
191 let mut events = self.events.lock().await;
192 Ok(events.drain(..).collect())
193 }
194
195 fn classify_error(error: &Self::Error) -> StoreErrorKind
196 where
197 Self: Sized,
198 {
199 S::classify_error(error)
200 }
201}