Skip to main content

aura_effects/
network_monitor.rs

1//! Debounced network-change monitoring handler.
2//!
3//! This handler wraps an upstream `NetworkChangeEffects` implementation and
4//! suppresses transient unusable states for a configurable debounce window.
5
6use std::time::Duration;
7
8use async_trait::async_trait;
9use aura_core::effects::{
10    NetworkChange, NetworkChangeEffects, NetworkChangeStream, NetworkError, NetworkUsability,
11};
12
13/// Default debounce window for unusable network transitions.
14const DEFAULT_UNUSABLE_DEBOUNCE_MS: u64 = 2_000;
15
16/// Network monitor wrapper that debounces unusable transitions.
17#[derive(Clone)]
18pub struct NetworkMonitorHandler<E> {
19    upstream: E,
20    unusable_debounce: Duration,
21}
22
23impl<E> NetworkMonitorHandler<E> {
24    /// Create a monitor with the default unusable debounce duration (2 seconds).
25    pub fn new(upstream: E) -> Self {
26        Self {
27            upstream,
28            unusable_debounce: Duration::from_millis(DEFAULT_UNUSABLE_DEBOUNCE_MS),
29        }
30    }
31
32    /// Create a monitor with a custom unusable debounce duration.
33    pub fn with_debounce(upstream: E, unusable_debounce: Duration) -> Self {
34        Self {
35            upstream,
36            unusable_debounce,
37        }
38    }
39}
40
41#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
42#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
43impl<E> NetworkChangeEffects for NetworkMonitorHandler<E>
44where
45    E: NetworkChangeEffects + Send + Sync,
46{
47    async fn subscribe_network_changes(
48        &self,
49    ) -> Result<Box<dyn NetworkChangeStream>, NetworkError> {
50        let stream = self.upstream.subscribe_network_changes().await?;
51        Ok(Box::new(DebouncedNetworkChangeStream::new(
52            stream,
53            self.unusable_debounce,
54        )))
55    }
56}
57
58struct DebouncedNetworkChangeStream {
59    inner: Box<dyn NetworkChangeStream>,
60    #[cfg(not(target_arch = "wasm32"))]
61    unusable_debounce: Duration,
62}
63
64impl DebouncedNetworkChangeStream {
65    fn new(inner: Box<dyn NetworkChangeStream>, unusable_debounce: Duration) -> Self {
66        #[cfg(target_arch = "wasm32")]
67        let _ = unusable_debounce;
68
69        Self {
70            inner,
71            #[cfg(not(target_arch = "wasm32"))]
72            unusable_debounce,
73        }
74    }
75}
76
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
79impl NetworkChangeStream for DebouncedNetworkChangeStream {
80    async fn next_change(&mut self) -> Result<Option<NetworkChange>, NetworkError> {
81        let Some(change) = self.inner.next_change().await? else {
82            return Ok(None);
83        };
84
85        if !matches!(change.usability, NetworkUsability::Unusable { .. }) {
86            return Ok(Some(change));
87        }
88
89        #[cfg(target_arch = "wasm32")]
90        {
91            // Browser harness/runtime paths do not have a Send-safe debounce timer future here.
92            // Deliver unusable transitions immediately on wasm instead of hanging the async_trait future.
93            return Ok(Some(change));
94        }
95
96        #[cfg(not(target_arch = "wasm32"))]
97        {
98            let mut pending_unusable = change;
99            let timer = tokio::time::sleep(self.unusable_debounce);
100            tokio::pin!(timer);
101
102            loop {
103                tokio::select! {
104                    _ = &mut timer => {
105                        return Ok(Some(pending_unusable));
106                    }
107                    next = self.inner.next_change() => {
108                        let Some(next_change) = next? else {
109                            return Ok(Some(pending_unusable));
110                        };
111
112                        match next_change.usability {
113                            NetworkUsability::Usable => return Ok(Some(next_change)),
114                            NetworkUsability::Unusable { .. } => {
115                                pending_unusable = next_change;
116                            }
117                        }
118                    }
119                }
120            }
121        }
122    }
123}
124
125#[cfg(test)]
126mod tests {
127    use std::{collections::VecDeque, sync::Arc};
128
129    use tokio::sync::Mutex as AsyncMutex;
130
131    use super::*;
132
133    #[derive(Clone, Default)]
134    struct MemoryNetworkEffects {
135        queue: Arc<AsyncMutex<VecDeque<NetworkChange>>>,
136    }
137
138    impl MemoryNetworkEffects {
139        fn new(changes: Vec<NetworkChange>) -> Self {
140            Self {
141                queue: Arc::new(AsyncMutex::new(changes.into_iter().collect())),
142            }
143        }
144    }
145
146    struct MemoryStream {
147        queue: Arc<AsyncMutex<VecDeque<NetworkChange>>>,
148    }
149
150    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
151    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
152    impl NetworkChangeStream for MemoryStream {
153        async fn next_change(&mut self) -> Result<Option<NetworkChange>, NetworkError> {
154            Ok(self.queue.lock().await.pop_front())
155        }
156    }
157
158    #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
159    #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
160    impl NetworkChangeEffects for MemoryNetworkEffects {
161        async fn subscribe_network_changes(
162            &self,
163        ) -> Result<Box<dyn NetworkChangeStream>, NetworkError> {
164            Ok(Box::new(MemoryStream {
165                queue: Arc::clone(&self.queue),
166            }))
167        }
168    }
169
170    fn usable(generation: u64) -> NetworkChange {
171        NetworkChange {
172            generation,
173            usability: NetworkUsability::Usable,
174            interfaces: vec![],
175        }
176    }
177
178    fn unusable(generation: u64, reason: &str) -> NetworkChange {
179        NetworkChange {
180            generation,
181            usability: NetworkUsability::Unusable {
182                reason: reason.to_string(),
183            },
184            interfaces: vec![],
185        }
186    }
187
188    #[tokio::test]
189    async fn suppresses_transient_unusable_when_usable_arrives_before_deadline() {
190        let upstream = MemoryNetworkEffects::new(vec![unusable(1, "wifi-down"), usable(2)]);
191        let handler = NetworkMonitorHandler::with_debounce(
192            upstream,
193            Duration::from_millis(DEFAULT_UNUSABLE_DEBOUNCE_MS),
194        );
195
196        let mut stream = handler.subscribe_network_changes().await.unwrap();
197        let first = stream.next_change().await.unwrap().unwrap();
198
199        assert_eq!(first.generation, 2);
200        assert!(matches!(first.usability, NetworkUsability::Usable));
201    }
202
203    #[tokio::test]
204    async fn emits_unusable_when_no_recovery_arrives() {
205        let upstream = MemoryNetworkEffects::new(vec![unusable(10, "carrier-loss")]);
206        let handler = NetworkMonitorHandler::with_debounce(upstream, Duration::from_millis(1));
207
208        let mut stream = handler.subscribe_network_changes().await.unwrap();
209        let first = stream.next_change().await.unwrap().unwrap();
210
211        assert_eq!(first.generation, 10);
212        assert!(matches!(first.usability, NetworkUsability::Unusable { .. }));
213    }
214}