aura_effects/
network_monitor.rs1use std::time::Duration;
7
8use async_trait::async_trait;
9use aura_core::effects::{
10 NetworkChange, NetworkChangeEffects, NetworkChangeStream, NetworkError, NetworkUsability,
11};
12
13const DEFAULT_UNUSABLE_DEBOUNCE_MS: u64 = 2_000;
15
16#[derive(Clone)]
18pub struct NetworkMonitorHandler<E> {
19 upstream: E,
20 unusable_debounce: Duration,
21}
22
23impl<E> NetworkMonitorHandler<E> {
24 pub fn new(upstream: E) -> Self {
26 Self {
27 upstream,
28 unusable_debounce: Duration::from_millis(DEFAULT_UNUSABLE_DEBOUNCE_MS),
29 }
30 }
31
32 pub fn with_debounce(upstream: E, unusable_debounce: Duration) -> Self {
34 Self {
35 upstream,
36 unusable_debounce,
37 }
38 }
39}
40
41#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
42#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
43impl<E> NetworkChangeEffects for NetworkMonitorHandler<E>
44where
45 E: NetworkChangeEffects + Send + Sync,
46{
47 async fn subscribe_network_changes(
48 &self,
49 ) -> Result<Box<dyn NetworkChangeStream>, NetworkError> {
50 let stream = self.upstream.subscribe_network_changes().await?;
51 Ok(Box::new(DebouncedNetworkChangeStream::new(
52 stream,
53 self.unusable_debounce,
54 )))
55 }
56}
57
58struct DebouncedNetworkChangeStream {
59 inner: Box<dyn NetworkChangeStream>,
60 #[cfg(not(target_arch = "wasm32"))]
61 unusable_debounce: Duration,
62}
63
64impl DebouncedNetworkChangeStream {
65 fn new(inner: Box<dyn NetworkChangeStream>, unusable_debounce: Duration) -> Self {
66 #[cfg(target_arch = "wasm32")]
67 let _ = unusable_debounce;
68
69 Self {
70 inner,
71 #[cfg(not(target_arch = "wasm32"))]
72 unusable_debounce,
73 }
74 }
75}
76
77#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
78#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
79impl NetworkChangeStream for DebouncedNetworkChangeStream {
80 async fn next_change(&mut self) -> Result<Option<NetworkChange>, NetworkError> {
81 let Some(change) = self.inner.next_change().await? else {
82 return Ok(None);
83 };
84
85 if !matches!(change.usability, NetworkUsability::Unusable { .. }) {
86 return Ok(Some(change));
87 }
88
89 #[cfg(target_arch = "wasm32")]
90 {
91 return Ok(Some(change));
94 }
95
96 #[cfg(not(target_arch = "wasm32"))]
97 {
98 let mut pending_unusable = change;
99 let timer = tokio::time::sleep(self.unusable_debounce);
100 tokio::pin!(timer);
101
102 loop {
103 tokio::select! {
104 _ = &mut timer => {
105 return Ok(Some(pending_unusable));
106 }
107 next = self.inner.next_change() => {
108 let Some(next_change) = next? else {
109 return Ok(Some(pending_unusable));
110 };
111
112 match next_change.usability {
113 NetworkUsability::Usable => return Ok(Some(next_change)),
114 NetworkUsability::Unusable { .. } => {
115 pending_unusable = next_change;
116 }
117 }
118 }
119 }
120 }
121 }
122 }
123}
124
125#[cfg(test)]
126mod tests {
127 use std::{collections::VecDeque, sync::Arc};
128
129 use tokio::sync::Mutex as AsyncMutex;
130
131 use super::*;
132
133 #[derive(Clone, Default)]
134 struct MemoryNetworkEffects {
135 queue: Arc<AsyncMutex<VecDeque<NetworkChange>>>,
136 }
137
138 impl MemoryNetworkEffects {
139 fn new(changes: Vec<NetworkChange>) -> Self {
140 Self {
141 queue: Arc::new(AsyncMutex::new(changes.into_iter().collect())),
142 }
143 }
144 }
145
146 struct MemoryStream {
147 queue: Arc<AsyncMutex<VecDeque<NetworkChange>>>,
148 }
149
150 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
151 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
152 impl NetworkChangeStream for MemoryStream {
153 async fn next_change(&mut self) -> Result<Option<NetworkChange>, NetworkError> {
154 Ok(self.queue.lock().await.pop_front())
155 }
156 }
157
158 #[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
159 #[cfg_attr(not(target_arch = "wasm32"), async_trait)]
160 impl NetworkChangeEffects for MemoryNetworkEffects {
161 async fn subscribe_network_changes(
162 &self,
163 ) -> Result<Box<dyn NetworkChangeStream>, NetworkError> {
164 Ok(Box::new(MemoryStream {
165 queue: Arc::clone(&self.queue),
166 }))
167 }
168 }
169
170 fn usable(generation: u64) -> NetworkChange {
171 NetworkChange {
172 generation,
173 usability: NetworkUsability::Usable,
174 interfaces: vec![],
175 }
176 }
177
178 fn unusable(generation: u64, reason: &str) -> NetworkChange {
179 NetworkChange {
180 generation,
181 usability: NetworkUsability::Unusable {
182 reason: reason.to_string(),
183 },
184 interfaces: vec![],
185 }
186 }
187
188 #[tokio::test]
189 async fn suppresses_transient_unusable_when_usable_arrives_before_deadline() {
190 let upstream = MemoryNetworkEffects::new(vec![unusable(1, "wifi-down"), usable(2)]);
191 let handler = NetworkMonitorHandler::with_debounce(
192 upstream,
193 Duration::from_millis(DEFAULT_UNUSABLE_DEBOUNCE_MS),
194 );
195
196 let mut stream = handler.subscribe_network_changes().await.unwrap();
197 let first = stream.next_change().await.unwrap().unwrap();
198
199 assert_eq!(first.generation, 2);
200 assert!(matches!(first.usability, NetworkUsability::Usable));
201 }
202
203 #[tokio::test]
204 async fn emits_unusable_when_no_recovery_arrives() {
205 let upstream = MemoryNetworkEffects::new(vec![unusable(10, "carrier-loss")]);
206 let handler = NetworkMonitorHandler::with_debounce(upstream, Duration::from_millis(1));
207
208 let mut stream = handler.subscribe_network_changes().await.unwrap();
209 let first = stream.next_change().await.unwrap().unwrap();
210
211 assert_eq!(first.generation, 10);
212 assert!(matches!(first.usability, NetworkUsability::Unusable { .. }));
213 }
214}