ddns_a/monitor/hybrid/
stream.rs

1//! Hybrid stream implementation.
2//!
3//! This module provides [`HybridStream`], a stream that combines API event
4//! notifications with periodic polling for IP address change detection.
5
6use crate::monitor::DebouncePolicy;
7use crate::monitor::change::{IpChange, diff};
8use crate::monitor::error::ApiError;
9use crate::network::{AdapterSnapshot, AddressFetcher, FetchError};
10use crate::time::Clock;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::time::Duration;
14use tokio::time::{Interval, interval};
15use tokio_stream::Stream;
16
17/// Internal state of the hybrid stream.
18#[derive(Debug)]
19enum StreamState<S> {
20    /// Hybrid mode: API events + polling.
21    Hybrid {
22        /// The API notification stream.
23        api_stream: S,
24    },
25    /// Polling-only mode: API has failed, using polling as sole source.
26    PollingOnly,
27}
28
29/// What triggered the current poll iteration.
30#[derive(Debug)]
31enum PollTrigger {
32    /// API notification received
33    ApiEvent,
34    /// API stream ended or errored - degrade
35    ApiDegraded,
36    /// Polling interval elapsed
37    Interval,
38    /// Nothing ready yet
39    Pending,
40}
41
42impl PollTrigger {
43    /// Returns a human-readable label for logging.
44    const fn label(&self) -> &'static str {
45        match self {
46            Self::ApiEvent => "API event",
47            Self::ApiDegraded => "API degradation",
48            Self::Interval => "polling interval",
49            Self::Pending => "pending",
50        }
51    }
52}
53
54/// A stream of IP address changes produced by hybrid monitoring.
55///
56/// This type is returned by [`super::HybridMonitor::into_stream`] and yields
57/// batches of [`IpChange`] events whenever changes are detected.
58///
59/// The stream operates in two modes:
60/// - **Hybrid**: Reacts to both API notifications and polling interval
61/// - **Polling-only**: Falls back to polling if the API fails
62///
63/// Degradation from hybrid to polling-only is automatic and permanent
64/// for the lifetime of this stream.
65#[derive(Debug)]
66pub struct HybridStream<F, S, C> {
67    fetcher: F,
68    clock: C,
69    interval: Interval,
70    debounce: Option<DebouncePolicy>,
71    state: StreamState<S>,
72    /// Previous snapshot for comparison.
73    prev_snapshot: Option<Vec<AdapterSnapshot>>,
74    /// Debounce state: `Some(start_time)` if currently debouncing.
75    debounce_start: Option<tokio::time::Instant>,
76    /// Snapshot taken at debounce start for final comparison.
77    debounce_baseline: Option<Vec<AdapterSnapshot>>,
78}
79
80impl<F, S, C> HybridStream<F, S, C>
81where
82    F: AddressFetcher,
83    S: Stream<Item = Result<(), ApiError>> + Unpin,
84    C: Clock,
85{
86    pub(super) fn new(
87        fetcher: F,
88        api_stream: S,
89        clock: C,
90        poll_interval: Duration,
91        debounce: Option<DebouncePolicy>,
92    ) -> Self {
93        Self {
94            fetcher,
95            clock,
96            interval: interval(poll_interval),
97            debounce,
98            state: StreamState::Hybrid { api_stream },
99            prev_snapshot: None,
100            debounce_start: None,
101            debounce_baseline: None,
102        }
103    }
104
105    /// Returns true if currently in polling-only mode.
106    #[must_use]
107    pub const fn is_polling_only(&self) -> bool {
108        matches!(self.state, StreamState::PollingOnly)
109    }
110
111    /// Returns the current (most recent) snapshot of network adapters.
112    ///
113    /// Returns `None` if no snapshot has been taken yet (before the first fetch).
114    #[must_use]
115    pub fn current_snapshot(&self) -> Option<&[AdapterSnapshot]> {
116        self.prev_snapshot.as_deref()
117    }
118
119    /// Performs a single fetch and returns changes if any.
120    fn fetch_changes(&mut self) -> Result<Vec<IpChange>, FetchError> {
121        let current = self.fetcher.fetch()?;
122        let timestamp = self.clock.now();
123
124        let changes = self
125            .prev_snapshot
126            .as_ref()
127            .map_or_else(Vec::new, |prev| diff(prev, &current, timestamp));
128
129        self.prev_snapshot = Some(current);
130        Ok(changes)
131    }
132
133    /// Handles debounce logic, returning changes to emit (if any).
134    ///
135    /// `pre_fetch_snapshot` is the snapshot state BEFORE this fetch cycle,
136    /// used as baseline when starting a new debounce window.
137    ///
138    /// `triggered_by_api`: When true, starts debounce window even if no changes
139    /// detected yet. This handles Windows API timing where `NotifyIpInterfaceChange`
140    /// fires before the new IP is visible in `GetAdaptersAddresses`.
141    fn process_with_debounce(
142        &mut self,
143        raw_changes: Vec<IpChange>,
144        pre_fetch_snapshot: Option<Vec<AdapterSnapshot>>,
145        triggered_by_api: bool,
146    ) -> Option<Vec<IpChange>> {
147        let Some(debounce) = &self.debounce else {
148            // No debounce configured - emit immediately if non-empty
149            return if raw_changes.is_empty() {
150                None
151            } else {
152                Some(raw_changes)
153            };
154        };
155
156        let now = tokio::time::Instant::now();
157
158        // Decide whether to start a new debounce window:
159        // - Either we detected changes (normal case)
160        // - Or API event fired (signal that changes are coming, even if not visible yet)
161        // - But only if we have a valid baseline to compare against
162        let has_valid_baseline = pre_fetch_snapshot.is_some();
163        let should_start_window = self.debounce_start.is_none()
164            && has_valid_baseline
165            && (!raw_changes.is_empty() || triggered_by_api);
166
167        if should_start_window {
168            if triggered_by_api && raw_changes.is_empty() {
169                tracing::trace!(
170                    "API event triggered but no changes detected yet, starting observation window"
171                );
172            }
173            // Start new debounce window, save baseline (state BEFORE changes)
174            self.debounce_start = Some(now);
175            self.debounce_baseline = pre_fetch_snapshot;
176        }
177
178        // Check if debounce window has elapsed
179        if let Some(start) = self.debounce_start {
180            if now.duration_since(start) >= debounce.window() {
181                // Window expired - compute final changes from baseline
182                return self.finalize_debounce();
183            }
184        }
185
186        None
187    }
188
189    /// Finalizes debounce by computing net changes from baseline to current state.
190    fn finalize_debounce(&mut self) -> Option<Vec<IpChange>> {
191        // Always reset debounce_start to avoid stuck state if baseline is None
192        let baseline = self.debounce_baseline.take();
193        self.debounce_start = None;
194
195        let baseline = baseline?;
196        let current = self.prev_snapshot.as_ref()?;
197        let timestamp = self.clock.now();
198
199        let changes = diff(&baseline, current, timestamp);
200        if changes.is_empty() {
201            None
202        } else {
203            Some(changes)
204        }
205    }
206
207    /// Transitions to polling-only mode.
208    fn degrade_to_polling(&mut self) {
209        self.state = StreamState::PollingOnly;
210    }
211}
212
213impl<F, S, C> Stream for HybridStream<F, S, C>
214where
215    F: AddressFetcher + Unpin,
216    S: Stream<Item = Result<(), ApiError>> + Unpin,
217    C: Clock + Unpin,
218{
219    type Item = Vec<IpChange>;
220
221    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
222        loop {
223            let trigger = match &mut self.state {
224                StreamState::Hybrid { api_stream } => {
225                    // Check API stream first (higher priority for responsiveness)
226                    match Pin::new(api_stream).poll_next(cx) {
227                        Poll::Ready(Some(Ok(()))) => PollTrigger::ApiEvent,
228                        Poll::Ready(Some(Err(_)) | None) => {
229                            // API failed or ended - will degrade
230                            PollTrigger::ApiDegraded
231                        }
232                        Poll::Pending => {
233                            // API not ready - check interval
234                            if Pin::new(&mut self.interval).poll_tick(cx).is_ready() {
235                                PollTrigger::Interval
236                            } else {
237                                PollTrigger::Pending
238                            }
239                        }
240                    }
241                }
242                StreamState::PollingOnly => {
243                    // Only check interval in polling-only mode
244                    if Pin::new(&mut self.interval).poll_tick(cx).is_ready() {
245                        PollTrigger::Interval
246                    } else {
247                        PollTrigger::Pending
248                    }
249                }
250            };
251
252            match trigger {
253                PollTrigger::Pending => return Poll::Pending,
254                PollTrigger::ApiDegraded => {
255                    // Degrade to polling-only mode
256                    self.degrade_to_polling();
257                    // Continue loop to check interval
258                }
259                PollTrigger::ApiEvent | PollTrigger::Interval => {
260                    tracing::debug!("Check triggered by {}", trigger.label());
261
262                    // Capture snapshot BEFORE fetch (needed for debounce baseline)
263                    // Only clone when we might start debouncing
264                    let pre_fetch_snapshot =
265                        if self.debounce.is_some() && self.debounce_start.is_none() {
266                            self.prev_snapshot.clone()
267                        } else {
268                            None
269                        };
270
271                    // Fetch and process changes
272                    let Ok(changes) = self.fetch_changes() else {
273                        // Fetch error - continue waiting for next trigger
274                        continue;
275                    };
276
277                    // API events start debounce even without detected changes,
278                    // because Windows may notify before IP is visible
279                    let triggered_by_api = matches!(trigger, PollTrigger::ApiEvent);
280
281                    if let Some(result) =
282                        self.process_with_debounce(changes, pre_fetch_snapshot, triggered_by_api)
283                    {
284                        tracing::debug!(
285                            "Emitting {} change(s) triggered by {}",
286                            result.len(),
287                            trigger.label()
288                        );
289                        return Poll::Ready(Some(result));
290                    }
291                    // No changes to emit - loop back to wait for next trigger
292                }
293            }
294        }
295    }
296}