Skip to main content

allframe_core/resilience/
offline.rs

1//! Offline-aware resilience patterns
2//!
3//! Provides connectivity probing, offline circuit breakers, and
4//! store-and-forward queuing for offline-first deployments.
5
6use std::future::Future;
7use std::pin::Pin;
8use std::sync::Arc;
9
10use async_trait::async_trait;
11use tokio::sync::Mutex;
12
13/// Connectivity status returned by a probe.
14#[derive(Debug, Clone, PartialEq, Eq)]
15pub enum ConnectivityStatus {
16    /// Fully connected
17    Online,
18    /// No connectivity
19    Offline,
20    /// Partial connectivity
21    Degraded {
22        /// Reason for degradation
23        reason: String,
24    },
25}
26
27/// Trait for checking network connectivity.
28#[async_trait]
29pub trait ConnectivityProbe: Send + Sync {
30    /// Check current connectivity status.
31    async fn check(&self) -> ConnectivityStatus;
32}
33
34/// Result of calling through an offline circuit breaker.
35#[derive(Debug)]
36pub enum CallResult<T, E> {
37    /// The operation was executed (may have succeeded or failed).
38    Executed(Result<T, E>),
39    /// The operation was queued because connectivity is offline.
40    Queued,
41}
42
43impl<T, E> CallResult<T, E> {
44    /// Returns true if the operation was queued rather than executed.
45    pub fn is_queued(&self) -> bool {
46        matches!(self, CallResult::Queued)
47    }
48}
49
50type BoxedFnOnce = Box<dyn FnOnce() -> Pin<Box<dyn Future<Output = ()> + Send>> + Send>;
51
52/// Circuit breaker that queues operations when offline.
53pub struct OfflineCircuitBreaker<P: ConnectivityProbe> {
54    #[allow(dead_code)]
55    name: String,
56    probe: P,
57    queue: Arc<Mutex<Vec<BoxedFnOnce>>>,
58}
59
60impl<P: ConnectivityProbe> OfflineCircuitBreaker<P> {
61    /// Create a new offline circuit breaker.
62    pub fn new(name: impl Into<String>, probe: P) -> Self {
63        Self {
64            name: name.into(),
65            probe,
66            queue: Arc::new(Mutex::new(Vec::new())),
67        }
68    }
69
70    /// Call a function, queuing it if offline.
71    pub async fn call<F, Fut, T, E>(&self, f: F) -> CallResult<T, E>
72    where
73        F: FnOnce() -> Fut + Send + 'static,
74        Fut: Future<Output = Result<T, E>> + Send + 'static,
75        T: Send + 'static,
76        E: Send + 'static,
77    {
78        match self.probe.check().await {
79            ConnectivityStatus::Online => {
80                let result = f().await;
81                CallResult::Executed(result)
82            }
83            _ => {
84                // Queue the operation (fire-and-forget wrapper)
85                let wrapper: BoxedFnOnce = Box::new(move || {
86                    Box::pin(async move {
87                        let _ = f().await;
88                    })
89                });
90                self.queue.lock().await.push(wrapper);
91                CallResult::Queued
92            }
93        }
94    }
95
96    /// Number of queued operations.
97    pub async fn queued_count(&self) -> usize {
98        self.queue.lock().await.len()
99    }
100
101    /// Drain and replay all queued operations.
102    pub async fn drain(&self) -> Result<(), String> {
103        let ops: Vec<BoxedFnOnce> = {
104            let mut q = self.queue.lock().await;
105            q.drain(..).collect()
106        };
107        for op in ops {
108            op().await;
109        }
110        Ok(())
111    }
112}
113
114/// A pending operation in a store-and-forward queue.
115#[derive(Debug, Clone)]
116pub struct PendingOperation {
117    /// Unique identifier for this operation.
118    pub id: String,
119}
120
121/// Report from replaying stored operations.
122#[derive(Debug, Clone)]
123pub struct ReplayReport {
124    /// Number of operations successfully replayed.
125    pub replayed: usize,
126    /// Number of operations that failed during replay.
127    pub failed: usize,
128}
129
130/// In-memory queue for store-and-forward operations.
131#[derive(Clone)]
132pub struct InMemoryQueue {
133    ops: Arc<Mutex<Vec<PendingOperation>>>,
134}
135
136impl InMemoryQueue {
137    /// Create a new empty queue.
138    pub fn new() -> Self {
139        Self {
140            ops: Arc::new(Mutex::new(Vec::new())),
141        }
142    }
143
144    async fn push(&self, op: PendingOperation) {
145        self.ops.lock().await.push(op);
146    }
147
148    async fn drain_all(&self) -> Vec<PendingOperation> {
149        let mut q = self.ops.lock().await;
150        q.drain(..).collect()
151    }
152
153    async fn len(&self) -> usize {
154        self.ops.lock().await.len()
155    }
156
157    async fn peek_all(&self) -> Vec<PendingOperation> {
158        self.ops.lock().await.clone()
159    }
160}
161
162impl Default for InMemoryQueue {
163    fn default() -> Self {
164        Self::new()
165    }
166}
167
168/// Store-and-forward pattern: stores operations when offline, replays on reconnect.
169pub struct StoreAndForward<Q = InMemoryQueue, P: ConnectivityProbe = AlwaysOnlineProbe> {
170    queue: Q,
171    #[allow(dead_code)]
172    probe: P,
173}
174
175/// A probe that always reports online. Used as default.
176pub struct AlwaysOnlineProbe;
177
178#[async_trait]
179impl ConnectivityProbe for AlwaysOnlineProbe {
180    async fn check(&self) -> ConnectivityStatus {
181        ConnectivityStatus::Online
182    }
183}
184
185impl StoreAndForward<InMemoryQueue, AlwaysOnlineProbe> {
186    /// Create a store-and-forward with in-memory queue and always-online probe.
187    pub fn default_new() -> Self {
188        Self {
189            queue: InMemoryQueue::new(),
190            probe: AlwaysOnlineProbe,
191        }
192    }
193}
194
195impl<P: ConnectivityProbe> StoreAndForward<InMemoryQueue, P> {
196    /// Create a new store-and-forward with the given queue and probe.
197    pub fn new(queue: InMemoryQueue, probe: P) -> Self {
198        Self { queue, probe }
199    }
200
201    /// Execute an operation; if it fails, store it for later replay.
202    pub async fn execute<F, Fut>(&self, id: &str, f: F)
203    where
204        F: FnOnce() -> Fut + Send,
205        Fut: Future<Output = Result<(), String>> + Send,
206    {
207        let result = f().await;
208        if result.is_err() {
209            self.queue
210                .push(PendingOperation { id: id.to_string() })
211                .await;
212        }
213    }
214
215    /// Number of pending operations.
216    pub async fn pending_count(&self) -> usize {
217        self.queue.len().await
218    }
219
220    /// Peek at all pending operations (FIFO order).
221    pub async fn peek_pending(&self) -> Vec<PendingOperation> {
222        self.queue.peek_all().await
223    }
224
225    /// Replay all pending operations through the given handler.
226    pub async fn replay_all<F, Fut>(&self, handler: F) -> Result<ReplayReport, String>
227    where
228        F: Fn(String) -> Fut + Send,
229        Fut: Future<Output = Result<(), String>> + Send,
230    {
231        let ops = self.queue.drain_all().await;
232        let mut replayed = 0;
233        let mut failed = 0;
234        for op in ops {
235            match handler(op.id).await {
236                Ok(()) => replayed += 1,
237                Err(_) => failed += 1,
238            }
239        }
240        Ok(ReplayReport { replayed, failed })
241    }
242}