Skip to main content

ralph_workflow/reducer/state/connectivity/
mod.rs

1//! Connectivity state for offline detection and freeze-and-resume workflow.
2//!
3//! Tracks network connectivity status to enable the pipeline to detect when
4//! network connectivity is lost, freeze workflow state without consuming
5//! continuation/retry budgets, and automatically resume when connectivity returns.
6
7use serde::{Deserialize, Serialize};
8
9/// Tracks network connectivity status for offline detection.
10///
11/// This state is used to implement the freeze-and-resume workflow feature:
12/// - When network connectivity is lost, the pipeline enters offline mode
13/// - While offline, all budget-consuming operations are suspended
14/// - When connectivity returns, the pipeline resumes from the frozen checkpoint
15///
16/// # State Transitions
17///
18/// ```text
19/// [Online] --(probe failed)--> [Probe Failing] --(threshold reached)--> [Offline]
20/// [Offline] --(probe succeeded)--> [Back Online] --(orchestrator resumes)--> [Online]
21/// ```
22///
23/// # Debouncing
24///
25/// To prevent rapid offline/online thrashing on unstable connections:
26/// - Must fail N consecutive probes before entering offline mode (default: 2)
27/// - Must succeed 1 probe before exiting offline mode (default: 1)
28#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
29pub struct ConnectivityState {
30    /// True when we have confirmed the pipeline is in offline mode (poll loop active).
31    pub is_offline: bool,
32
33    /// True when an InvocationFailed{Network} event was received and we need to verify
34    /// connectivity before consuming retry budget.
35    pub check_pending: bool,
36
37    /// True when offline is confirmed and we are in the poll-for-reconnect loop.
38    pub poll_pending: bool,
39
40    /// Consecutive failed probes (used for debounce threshold before entering offline mode).
41    pub consecutive_failures: u32,
42
43    /// Consecutive successful probes (used for debounce before exiting offline mode).
44    pub consecutive_successes: u32,
45
46    /// How many consecutive failures before entering offline mode (default: 2).
47    pub required_failures_to_go_offline: u32,
48
49    /// How many consecutive successes before exiting offline mode (default: 1).
50    pub required_successes_to_go_online: u32,
51
52    /// Milliseconds to wait between polls while offline (default: 5000).
53    pub offline_poll_interval_ms: u64,
54}
55
56impl Default for ConnectivityState {
57    fn default() -> Self {
58        Self {
59            is_offline: false,
60            check_pending: false,
61            poll_pending: false,
62            consecutive_failures: 0,
63            consecutive_successes: 0,
64            required_failures_to_go_offline: 2,
65            required_successes_to_go_online: 1,
66            offline_poll_interval_ms: 5000,
67        }
68    }
69}
70
71impl ConnectivityState {
72    /// Create a new empty connectivity state (fully online, no pending checks).
73    #[must_use]
74    pub fn new() -> Self {
75        Self::default()
76    }
77
78    /// Mark that a connectivity check is needed (triggered by Network error).
79    #[must_use]
80    pub fn trigger_check(self) -> Self {
81        Self {
82            check_pending: true,
83            ..self
84        }
85    }
86
87    /// Clear the check_pending flag.
88    #[must_use]
89    pub fn clear_check_pending(self) -> Self {
90        Self {
91            check_pending: false,
92            ..self
93        }
94    }
95
96    /// Record a failed connectivity probe.
97    ///
98    /// Returns a new state with updated counters. If the failure threshold is reached,
99    /// transitions to offline mode.
100    #[must_use]
101    pub fn on_probe_failed(self) -> Self {
102        let consecutive_failures = self.consecutive_failures.saturating_add(1);
103        let consecutive_successes = 0;
104        let is_offline = consecutive_failures >= self.required_failures_to_go_offline;
105        let poll_pending = is_offline;
106        // Keep checking until threshold is reached
107        let check_pending = !is_offline;
108        Self {
109            is_offline,
110            poll_pending,
111            check_pending,
112            consecutive_failures,
113            consecutive_successes,
114            ..self
115        }
116    }
117
118    /// Record a successful connectivity probe.
119    ///
120    /// Returns a new state with updated counters. If the success threshold is reached
121    /// while offline, transitions back to online mode.
122    #[must_use]
123    pub fn on_probe_succeeded(self) -> Self {
124        let consecutive_successes = self.consecutive_successes.saturating_add(1);
125        let consecutive_failures = 0;
126        let back_online = consecutive_successes >= self.required_successes_to_go_online;
127        let is_offline = if back_online { false } else { self.is_offline };
128        let poll_pending = if back_online {
129            false
130        } else {
131            self.poll_pending
132        };
133        let check_pending = false;
134        Self {
135            is_offline,
136            poll_pending,
137            check_pending,
138            consecutive_failures,
139            consecutive_successes,
140            ..self
141        }
142    }
143
144    /// Reset debounce counters without changing offline/online status.
145    ///
146    /// Use when a transient state change should reset the debounce counters.
147    #[must_use]
148    pub fn reset_debounce(self) -> Self {
149        Self {
150            consecutive_failures: 0,
151            consecutive_successes: 0,
152            ..self
153        }
154    }
155
156    /// Returns true if we have entered offline mode (debounce threshold was met).
157    #[must_use]
158    pub const fn is_offline_mode(&self) -> bool {
159        self.is_offline
160    }
161
162    /// Returns true if a connectivity check is pending.
163    #[must_use]
164    pub const fn is_check_pending(&self) -> bool {
165        self.check_pending
166    }
167
168    /// Returns true if we are actively polling for connectivity (offline mode).
169    #[must_use]
170    pub const fn is_poll_pending(&self) -> bool {
171        self.poll_pending
172    }
173}
174
175#[cfg(test)]
176mod tests {
177    use super::*;
178
179    #[test]
180    fn test_default_state_is_fully_online() {
181        let state = ConnectivityState::default();
182        assert!(!state.is_offline);
183        assert!(!state.check_pending);
184        assert!(!state.poll_pending);
185        assert_eq!(state.consecutive_failures, 0);
186        assert_eq!(state.consecutive_successes, 0);
187    }
188
189    #[test]
190    fn test_trigger_check_sets_check_pending() {
191        let state = ConnectivityState::default().trigger_check();
192        assert!(state.check_pending);
193        assert!(!state.is_offline);
194        assert!(!state.poll_pending);
195    }
196
197    #[test]
198    fn test_single_probe_failure_below_threshold() {
199        // required_failures_to_go_offline is 2 by default
200        let state = ConnectivityState::default().on_probe_failed();
201        assert!(
202            !state.is_offline,
203            "Should not be offline after only 1 failure"
204        );
205        assert!(
206            state.check_pending,
207            "Should still be checking after 1 failure"
208        );
209        assert!(!state.poll_pending);
210        assert_eq!(state.consecutive_failures, 1);
211    }
212
213    #[test]
214    fn test_threshold_probe_failures_trigger_offline() {
215        // required_failures_to_go_offline is 2 by default
216        let state = ConnectivityState::default()
217            .on_probe_failed()
218            .on_probe_failed();
219        assert!(
220            state.is_offline,
221            "Should be offline after 2 consecutive failures"
222        );
223        assert!(!state.check_pending, "Should not be checking when offline");
224        assert!(state.poll_pending);
225        assert_eq!(state.consecutive_failures, 2);
226    }
227
228    #[test]
229    fn test_probe_success_while_online_clears_check() {
230        let state = ConnectivityState::default()
231            .trigger_check()
232            .on_probe_succeeded();
233        assert!(!state.check_pending);
234        assert!(!state.is_offline);
235        assert_eq!(state.consecutive_failures, 0);
236    }
237
238    #[test]
239    fn test_probe_success_while_offline_triggers_back_online() {
240        // required_successes_to_go_online is 1 by default
241        let state = ConnectivityState {
242            is_offline: true,
243            poll_pending: true,
244            check_pending: false,
245            consecutive_failures: 2,
246            consecutive_successes: 0,
247            ..Default::default()
248        }
249        .on_probe_succeeded();
250
251        assert!(
252            !state.is_offline,
253            "Should be back online after 1 successful probe"
254        );
255        assert!(!state.poll_pending);
256        assert!(!state.check_pending);
257        assert_eq!(state.consecutive_failures, 0);
258        assert_eq!(state.consecutive_successes, 1);
259    }
260
261    #[test]
262    fn test_debounce_resets_on_success() {
263        let state = ConnectivityState::default()
264            .on_probe_failed()
265            .on_probe_succeeded();
266
267        assert_eq!(
268            state.consecutive_failures, 0,
269            "Failures should reset to 0 on success"
270        );
271        assert_eq!(state.consecutive_successes, 1);
272    }
273
274    #[test]
275    fn test_clear_check_pending() {
276        let state = ConnectivityState::default()
277            .trigger_check()
278            .clear_check_pending();
279
280        assert!(!state.check_pending);
281    }
282
283    #[test]
284    fn test_reset_debounce() {
285        let state = ConnectivityState {
286            consecutive_failures: 3,
287            consecutive_successes: 2,
288            ..Default::default()
289        }
290        .reset_debounce();
291
292        assert_eq!(state.consecutive_failures, 0);
293        assert_eq!(state.consecutive_successes, 0);
294        // Online status should be unchanged
295        assert!(!state.is_offline);
296    }
297}