Skip to main content

netwatch/netmon/
actor.rs

1use n0_future::time::{self, Duration, Instant};
2use n0_watcher::Watchable;
3pub(super) use os::Error;
4use os::RouteMonitor;
5#[cfg(not(wasm_browser))]
6pub(crate) use os::is_interesting_interface;
7use tokio::sync::mpsc;
8use tracing::{debug, trace};
9
10#[cfg(target_os = "android")]
11use super::android as os;
12#[cfg(any(
13    target_os = "freebsd",
14    target_os = "openbsd",
15    target_os = "netbsd",
16    target_os = "macos",
17    target_os = "ios"
18))]
19use super::bsd as os;
20#[cfg(target_os = "linux")]
21use super::linux as os;
22#[cfg(wasm_browser)]
23use super::wasm_browser as os;
24#[cfg(target_os = "windows")]
25use super::windows as os;
26use crate::interfaces::State;
27
28/// The message sent by the OS specific monitors.
29#[derive(Debug, Copy, Clone)]
30pub(super) enum NetworkMessage {
31    /// A change was detected.
32    #[allow(dead_code)]
33    Change,
34}
35
36/// How often we execute a check for big jumps in wall time.
37#[cfg(not(any(target_os = "ios", target_os = "android")))]
38const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(15);
39/// Set background polling time to 1h to effectively disable it on mobile,
40/// to avoid increased battery usage. Sleep detection won't work this way there.
41#[cfg(any(target_os = "ios", target_os = "android"))]
42const POLL_WALL_TIME_INTERVAL: Duration = Duration::from_secs(60 * 60);
43const MON_CHAN_CAPACITY: usize = 16;
44const ACTOR_CHAN_CAPACITY: usize = 16;
45
46pub(super) struct Actor {
47    /// Latest known interface state.
48    interface_state: Watchable<State>,
49    /// Latest observed wall time.
50    wall_time: Instant,
51    /// OS specific monitor.
52    #[allow(dead_code)]
53    route_monitor: RouteMonitor,
54    mon_receiver: mpsc::Receiver<NetworkMessage>,
55    actor_receiver: mpsc::Receiver<ActorMessage>,
56    actor_sender: mpsc::Sender<ActorMessage>,
57}
58
59pub(super) enum ActorMessage {
60    NetworkChange,
61}
62
63impl Actor {
64    pub(super) async fn new() -> Result<Self, os::Error> {
65        let interface_state = State::new().await;
66        let wall_time = Instant::now();
67
68        let (mon_sender, mon_receiver) = mpsc::channel(MON_CHAN_CAPACITY);
69        let route_monitor = RouteMonitor::new(mon_sender)?;
70        let (actor_sender, actor_receiver) = mpsc::channel(ACTOR_CHAN_CAPACITY);
71
72        Ok(Actor {
73            interface_state: Watchable::new(interface_state),
74            wall_time,
75            route_monitor,
76            mon_receiver,
77            actor_receiver,
78            actor_sender,
79        })
80    }
81
82    pub(super) fn state(&self) -> &Watchable<State> {
83        &self.interface_state
84    }
85
86    pub(super) fn subscribe(&self) -> mpsc::Sender<ActorMessage> {
87        self.actor_sender.clone()
88    }
89
90    pub(super) async fn run(mut self) {
91        const DEBOUNCE: Duration = Duration::from_millis(250);
92
93        let mut pending_change = false;
94        let mut pending_time_jump = false;
95        let debounce = time::sleep(DEBOUNCE);
96        tokio::pin!(debounce);
97        let mut wall_time_interval = time::interval(POLL_WALL_TIME_INTERVAL);
98
99        loop {
100            tokio::select! {
101                _ = &mut debounce, if pending_change || pending_time_jump => {
102                    self.handle_potential_change(pending_time_jump).await;
103                    pending_change = false;
104                    pending_time_jump = false;
105                }
106                _ = wall_time_interval.tick() => {
107                    trace!("tick: wall_time_interval");
108                    if self.check_wall_time_advance() {
109                        pending_time_jump = true;
110                        debounce.as_mut().reset(Instant::now() + DEBOUNCE);
111                    }
112                }
113                event = self.mon_receiver.recv() => {
114                    match event {
115                        Some(NetworkMessage::Change) => {
116                            trace!("network activity detected");
117                            pending_change = true;
118                            debounce.as_mut().reset(Instant::now() + DEBOUNCE);
119                        }
120                        None => {
121                            debug!("shutting down, network monitor receiver gone");
122                            break;
123                        }
124                    }
125                }
126                msg = self.actor_receiver.recv() => {
127                    match msg {
128                        Some(ActorMessage::NetworkChange) => {
129                            trace!("external network activity detected");
130                            pending_change = true;
131                            debounce.as_mut().reset(Instant::now() + DEBOUNCE);
132                        }
133                        None => {
134                            debug!("shutting down, actor receiver gone");
135                            break;
136                        }
137                    }
138                }
139            }
140        }
141    }
142
143    async fn handle_potential_change(&mut self, time_jumped: bool) {
144        trace!("potential change");
145
146        let mut new_state = State::new().await;
147        let old_state = &self.interface_state.get();
148
149        if time_jumped {
150            new_state.last_unsuspend.replace(Instant::now());
151        } else if old_state == &new_state {
152            // No major changes, continue on
153            debug!("no changes detected");
154            return;
155        }
156
157        self.interface_state.set(new_state).ok();
158    }
159
160    /// Reports whether wall time jumped more than 150%
161    /// of `POLL_WALL_TIME_INTERVAL`, indicating we probably just came out of sleep.
162    fn check_wall_time_advance(&mut self) -> bool {
163        let now = Instant::now();
164        let jumped = if let Some(elapsed) = now.checked_duration_since(self.wall_time) {
165            elapsed > POLL_WALL_TIME_INTERVAL * 3 / 2
166        } else {
167            false
168        };
169
170        self.wall_time = now;
171        jumped
172    }
173}