ddns_a/monitor/poller/
stream.rs

1//! Polling stream implementation.
2//!
3//! This module provides [`PollingStream`], a stream that periodically
4//! fetches network adapter snapshots and yields IP address changes.
5
6use super::super::DebouncePolicy;
7use super::super::change::{IpChange, diff};
8use crate::network::{AdapterSnapshot, AddressFetcher, FetchError};
9use crate::time::Clock;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::time::{Interval, interval};
14use tokio_stream::Stream;
15
16/// A stream of IP address changes produced by polling.
17///
18/// This type is returned by [`super::PollingMonitor::into_stream`] and yields
19/// batches of [`IpChange`] events whenever changes are detected.
20pub struct PollingStream<F, C> {
21    fetcher: F,
22    clock: C,
23    interval: Interval,
24    debounce: Option<DebouncePolicy>,
25    /// Previous snapshot for comparison
26    prev_snapshot: Option<Vec<AdapterSnapshot>>,
27    /// Debounce state: `Some(start_time)` if currently debouncing
28    debounce_start: Option<tokio::time::Instant>,
29    /// Snapshot taken at debounce start for final comparison
30    debounce_baseline: Option<Vec<AdapterSnapshot>>,
31}
32
33impl<F, C> PollingStream<F, C>
34where
35    F: AddressFetcher,
36    C: Clock,
37{
38    pub(super) fn new(
39        fetcher: F,
40        clock: C,
41        poll_interval: Duration,
42        debounce: Option<DebouncePolicy>,
43    ) -> Self {
44        Self {
45            fetcher,
46            clock,
47            interval: interval(poll_interval),
48            debounce,
49            prev_snapshot: None,
50            debounce_start: None,
51            debounce_baseline: None,
52        }
53    }
54
55    /// Returns the current (most recent) snapshot of network adapters.
56    ///
57    /// Returns `None` if no snapshot has been taken yet (before the first poll).
58    #[must_use]
59    pub fn current_snapshot(&self) -> Option<&[AdapterSnapshot]> {
60        self.prev_snapshot.as_deref()
61    }
62
63    /// Performs a single poll and returns changes if any.
64    fn poll_once(&mut self) -> Result<Vec<IpChange>, FetchError> {
65        let current = self.fetcher.fetch()?;
66        let timestamp = self.clock.now();
67
68        let changes = self
69            .prev_snapshot
70            .as_ref()
71            .map_or_else(Vec::new, |prev| diff(prev, &current, timestamp));
72
73        self.prev_snapshot = Some(current);
74        Ok(changes)
75    }
76
77    /// Handles debounce logic, returning changes to emit (if any).
78    ///
79    /// `pre_poll_snapshot` is the snapshot state BEFORE this poll cycle,
80    /// used as baseline when starting a new debounce window.
81    fn process_with_debounce(
82        &mut self,
83        raw_changes: Vec<IpChange>,
84        pre_poll_snapshot: Option<Vec<AdapterSnapshot>>,
85    ) -> Option<Vec<IpChange>> {
86        let Some(debounce) = &self.debounce else {
87            // No debounce configured - emit immediately if non-empty
88            return if raw_changes.is_empty() {
89                None
90            } else {
91                Some(raw_changes)
92            };
93        };
94
95        if raw_changes.is_empty() && self.debounce_start.is_none() {
96            // No changes and not debouncing - nothing to do
97            return None;
98        }
99
100        let now = tokio::time::Instant::now();
101
102        if !raw_changes.is_empty() && self.debounce_start.is_none() {
103            // Start new debounce window, save baseline (state BEFORE changes)
104            self.debounce_start = Some(now);
105            self.debounce_baseline = pre_poll_snapshot;
106        }
107
108        // Check if debounce window has elapsed
109        if let Some(start) = self.debounce_start {
110            if now.duration_since(start) >= debounce.window() {
111                // Window expired - compute final changes from baseline
112                return self.finalize_debounce();
113            }
114        }
115
116        None
117    }
118
119    /// Finalizes debounce by computing net changes from baseline to current state.
120    fn finalize_debounce(&mut self) -> Option<Vec<IpChange>> {
121        let baseline = self.debounce_baseline.take()?;
122        self.debounce_start = None;
123
124        let current = self.prev_snapshot.as_ref()?;
125        let timestamp = self.clock.now();
126
127        let changes = diff(&baseline, current, timestamp);
128        if changes.is_empty() {
129            None
130        } else {
131            Some(changes)
132        }
133    }
134}
135
136impl<F, C> Stream for PollingStream<F, C>
137where
138    F: AddressFetcher + Unpin,
139    C: Clock + Unpin,
140{
141    type Item = Vec<IpChange>;
142
143    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144        loop {
145            // Poll the interval timer - registers waker for next tick when Pending
146            if Pin::new(&mut self.interval).poll_tick(cx).is_pending() {
147                return Poll::Pending;
148            }
149
150            // Capture snapshot BEFORE poll_once updates it (needed for debounce baseline)
151            // Only clone when we might start debouncing (entering debounce mode)
152            let pre_poll_snapshot = if self.debounce.is_some() && self.debounce_start.is_none() {
153                self.prev_snapshot.clone()
154            } else {
155                None
156            };
157
158            // Interval ticked - perform a poll
159            // Fetch errors are intentionally swallowed for resilient polling:
160            // transient network/system errors should not terminate the stream.
161            let Ok(changes) = self.poll_once() else {
162                // Error occurred - loop back to re-register waker via poll_tick
163                continue;
164            };
165
166            if let Some(result) = self.process_with_debounce(changes, pre_poll_snapshot) {
167                return Poll::Ready(Some(result));
168            }
169            // No changes to emit - loop back to re-register waker via poll_tick
170        }
171    }
172}