allframe_core/resilience/
offline.rs1use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use tokio::sync::Mutex;
12
13#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum ConnectivityStatus {
16 Online,
18 Offline,
20 Degraded {
22 reason: String,
24 },
25}
26
27#[async_trait]
29pub trait ConnectivityProbe: Send + Sync {
30 async fn check(&self) -> ConnectivityStatus;
32}
33
34#[derive(Debug)]
36pub enum CallResult<T, E> {
37 Executed(Result<T, E>),
39 Queued,
41}
42
43impl<T, E> CallResult<T, E> {
44 pub fn is_queued(&self) -> bool {
46 matches!(self, CallResult::Queued)
47 }
48}
49
50type BoxedFnOnce = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
51
52pub struct OfflineCircuitBreaker<P: ConnectivityProbe> {
54 #[allow(dead_code)]
55 name: String,
56 probe: P,
57 queue: Arc<Mutex<Vec<BoxedFnOnce>>>,
58}
59
60impl<P: ConnectivityProbe> OfflineCircuitBreaker<P> {
61 pub fn new(name: impl Into<String>, probe: P) -> Self {
63 Self {
64 name: name.into(),
65 probe,
66 queue: Arc::new(Mutex::new(Vec::new())),
67 }
68 }
69
70 pub async fn call<F, Fut, T, E>(&self, f: F) -> CallResult<T, E>
72 where
73 F: FnOnce() -> Fut + Send + 'static,
74 Fut: Future<Output = Result<T, E>> + Send + 'static,
75 T: Send + 'static,
76 E: Send + 'static,
77 {
78 match self.probe.check().await {
79 ConnectivityStatus::Online => {
80 let result = f().await;
81 CallResult::Executed(result)
82 }
83 _ => {
84 let wrapper: BoxedFnOnce = Box::new(move || {
86 Box::pin(async move {
87 let _ = f().await;
88 })
89 });
90 self.queue.lock().await.push(wrapper);
91 CallResult::Queued
92 }
93 }
94 }
95
96 pub async fn queued_count(&self) -> usize {
98 self.queue.lock().await.len()
99 }
100
101 pub async fn drain(&self) -> Result<(), String> {
103 let ops: Vec<BoxedFnOnce> = {
104 let mut q = self.queue.lock().await;
105 q.drain(..).collect()
106 };
107 for op in ops {
108 op().await;
109 }
110 Ok(())
111 }
112}
113
114#[derive(Debug, Clone)]
116pub struct PendingOperation {
117 pub id: String,
119}
120
121#[derive(Debug, Clone)]
123pub struct ReplayReport {
124 pub replayed: usize,
126 pub failed: usize,
128}
129
130#[derive(Clone)]
132pub struct InMemoryQueue {
133 ops: Arc<Mutex<Vec<PendingOperation>>>,
134}
135
136impl InMemoryQueue {
137 pub fn new() -> Self {
139 Self {
140 ops: Arc::new(Mutex::new(Vec::new())),
141 }
142 }
143
144 async fn push(&self, op: PendingOperation) {
145 self.ops.lock().await.push(op);
146 }
147
148 async fn drain_all(&self) -> Vec<PendingOperation> {
149 let mut q = self.ops.lock().await;
150 q.drain(..).collect()
151 }
152
153 async fn len(&self) -> usize {
154 self.ops.lock().await.len()
155 }
156
157 async fn peek_all(&self) -> Vec<PendingOperation> {
158 self.ops.lock().await.clone()
159 }
160}
161
162impl Default for InMemoryQueue {
163 fn default() -> Self {
164 Self::new()
165 }
166}
167
168pub struct StoreAndForward<Q = InMemoryQueue, P: ConnectivityProbe = AlwaysOnlineProbe> {
170 queue: Q,
171 #[allow(dead_code)]
172 probe: P,
173}
174
175pub struct AlwaysOnlineProbe;
177
178#[async_trait]
179impl ConnectivityProbe for AlwaysOnlineProbe {
180 async fn check(&self) -> ConnectivityStatus {
181 ConnectivityStatus::Online
182 }
183}
184
185impl StoreAndForward<InMemoryQueue, AlwaysOnlineProbe> {
186 pub fn default_new() -> Self {
188 Self {
189 queue: InMemoryQueue::new(),
190 probe: AlwaysOnlineProbe,
191 }
192 }
193}
194
195impl<P: ConnectivityProbe> StoreAndForward<InMemoryQueue, P> {
196 pub fn new(queue: InMemoryQueue, probe: P) -> Self {
198 Self { queue, probe }
199 }
200
201 pub async fn execute<F, Fut>(&self, id: &str, f: F)
203 where
204 F: FnOnce() -> Fut + Send,
205 Fut: Future<Output = Result<(), String>> + Send,
206 {
207 let result = f().await;
208 if result.is_err() {
209 self.queue
210 .push(PendingOperation { id: id.to_string() })
211 .await;
212 }
213 }
214
215 pub async fn pending_count(&self) -> usize {
217 self.queue.len().await
218 }
219
220 pub async fn peek_pending(&self) -> Vec<PendingOperation> {
222 self.queue.peek_all().await
223 }
224
225 pub async fn replay_all<F, Fut>(&self, handler: F) -> Result<ReplayReport, String>
227 where
228 F: Fn(String) -> Fut + Send,
229 Fut: Future<Output = Result<(), String>> + Send,
230 {
231 let ops = self.queue.drain_all().await;
232 let mut replayed = 0;
233 let mut failed = 0;
234 for op in ops {
235 match handler(op.id).await {
236 Ok(()) => replayed += 1,
237 Err(_) => failed += 1,
238 }
239 }
240 Ok(ReplayReport { replayed, failed })
241 }
242}