ddns_a/monitor/hybrid/stream.rs
1//! Hybrid stream implementation.
2//!
3//! This module provides [`HybridStream`], a stream that combines API event
4//! notifications with periodic polling for IP address change detection.
5
6use crate::monitor::DebouncePolicy;
7use crate::monitor::change::{IpChange, diff};
8use crate::monitor::error::ApiError;
9use crate::network::{AdapterSnapshot, AddressFetcher, FetchError};
10use crate::time::Clock;
11use std::pin::Pin;
12use std::task::{Context, Poll};
13use std::time::Duration;
14use tokio::time::{Interval, interval};
15use tokio_stream::Stream;
16
17/// Internal state of the hybrid stream.
18#[derive(Debug)]
19enum StreamState<S> {
20 /// Hybrid mode: API events + polling.
21 Hybrid {
22 /// The API notification stream.
23 api_stream: S,
24 },
25 /// Polling-only mode: API has failed, using polling as sole source.
26 PollingOnly,
27}
28
29/// What triggered the current poll iteration.
30#[derive(Debug)]
31enum PollTrigger {
32 /// API notification received
33 ApiEvent,
34 /// API stream ended or errored - degrade
35 ApiDegraded,
36 /// Polling interval elapsed
37 Interval,
38 /// Nothing ready yet
39 Pending,
40}
41
42impl PollTrigger {
43 /// Returns a human-readable label for logging.
44 const fn label(&self) -> &'static str {
45 match self {
46 Self::ApiEvent => "API event",
47 Self::ApiDegraded => "API degradation",
48 Self::Interval => "polling interval",
49 Self::Pending => "pending",
50 }
51 }
52}
53
54/// A stream of IP address changes produced by hybrid monitoring.
55///
56/// This type is returned by [`super::HybridMonitor::into_stream`] and yields
57/// batches of [`IpChange`] events whenever changes are detected.
58///
59/// The stream operates in two modes:
60/// - **Hybrid**: Reacts to both API notifications and polling interval
61/// - **Polling-only**: Falls back to polling if the API fails
62///
63/// Degradation from hybrid to polling-only is automatic and permanent
64/// for the lifetime of this stream.
65#[derive(Debug)]
66pub struct HybridStream<F, S, C> {
67 fetcher: F,
68 clock: C,
69 interval: Interval,
70 debounce: Option<DebouncePolicy>,
71 state: StreamState<S>,
72 /// Previous snapshot for comparison.
73 prev_snapshot: Option<Vec<AdapterSnapshot>>,
74 /// Debounce state: `Some(start_time)` if currently debouncing.
75 debounce_start: Option<tokio::time::Instant>,
76 /// Snapshot taken at debounce start for final comparison.
77 debounce_baseline: Option<Vec<AdapterSnapshot>>,
78}
79
80impl<F, S, C> HybridStream<F, S, C>
81where
82 F: AddressFetcher,
83 S: Stream<Item = Result<(), ApiError>> + Unpin,
84 C: Clock,
85{
86 pub(super) fn new(
87 fetcher: F,
88 api_stream: S,
89 clock: C,
90 poll_interval: Duration,
91 debounce: Option<DebouncePolicy>,
92 ) -> Self {
93 Self {
94 fetcher,
95 clock,
96 interval: interval(poll_interval),
97 debounce,
98 state: StreamState::Hybrid { api_stream },
99 prev_snapshot: None,
100 debounce_start: None,
101 debounce_baseline: None,
102 }
103 }
104
105 /// Returns true if currently in polling-only mode.
106 #[must_use]
107 pub const fn is_polling_only(&self) -> bool {
108 matches!(self.state, StreamState::PollingOnly)
109 }
110
111 /// Returns the current (most recent) snapshot of network adapters.
112 ///
113 /// Returns `None` if no snapshot has been taken yet (before the first fetch).
114 #[must_use]
115 pub fn current_snapshot(&self) -> Option<&[AdapterSnapshot]> {
116 self.prev_snapshot.as_deref()
117 }
118
119 /// Performs a single fetch and returns changes if any.
120 fn fetch_changes(&mut self) -> Result<Vec<IpChange>, FetchError> {
121 let current = self.fetcher.fetch()?;
122 let timestamp = self.clock.now();
123
124 let changes = self
125 .prev_snapshot
126 .as_ref()
127 .map_or_else(Vec::new, |prev| diff(prev, ¤t, timestamp));
128
129 self.prev_snapshot = Some(current);
130 Ok(changes)
131 }
132
133 /// Handles debounce logic, returning changes to emit (if any).
134 ///
135 /// `pre_fetch_snapshot` is the snapshot state BEFORE this fetch cycle,
136 /// used as baseline when starting a new debounce window.
137 ///
138 /// `triggered_by_api`: When true, starts debounce window even if no changes
139 /// detected yet. This handles Windows API timing where `NotifyIpInterfaceChange`
140 /// fires before the new IP is visible in `GetAdaptersAddresses`.
141 fn process_with_debounce(
142 &mut self,
143 raw_changes: Vec<IpChange>,
144 pre_fetch_snapshot: Option<Vec<AdapterSnapshot>>,
145 triggered_by_api: bool,
146 ) -> Option<Vec<IpChange>> {
147 let Some(debounce) = &self.debounce else {
148 // No debounce configured - emit immediately if non-empty
149 return if raw_changes.is_empty() {
150 None
151 } else {
152 Some(raw_changes)
153 };
154 };
155
156 let now = tokio::time::Instant::now();
157
158 // Decide whether to start a new debounce window:
159 // - Either we detected changes (normal case)
160 // - Or API event fired (signal that changes are coming, even if not visible yet)
161 // - But only if we have a valid baseline to compare against
162 let has_valid_baseline = pre_fetch_snapshot.is_some();
163 let should_start_window = self.debounce_start.is_none()
164 && has_valid_baseline
165 && (!raw_changes.is_empty() || triggered_by_api);
166
167 if should_start_window {
168 if triggered_by_api && raw_changes.is_empty() {
169 tracing::trace!(
170 "API event triggered but no changes detected yet, starting observation window"
171 );
172 }
173 // Start new debounce window, save baseline (state BEFORE changes)
174 self.debounce_start = Some(now);
175 self.debounce_baseline = pre_fetch_snapshot;
176 }
177
178 // Check if debounce window has elapsed
179 if let Some(start) = self.debounce_start {
180 if now.duration_since(start) >= debounce.window() {
181 // Window expired - compute final changes from baseline
182 return self.finalize_debounce();
183 }
184 }
185
186 None
187 }
188
189 /// Finalizes debounce by computing net changes from baseline to current state.
190 fn finalize_debounce(&mut self) -> Option<Vec<IpChange>> {
191 // Always reset debounce_start to avoid stuck state if baseline is None
192 let baseline = self.debounce_baseline.take();
193 self.debounce_start = None;
194
195 let baseline = baseline?;
196 let current = self.prev_snapshot.as_ref()?;
197 let timestamp = self.clock.now();
198
199 let changes = diff(&baseline, current, timestamp);
200 if changes.is_empty() {
201 None
202 } else {
203 Some(changes)
204 }
205 }
206
207 /// Transitions to polling-only mode.
208 fn degrade_to_polling(&mut self) {
209 self.state = StreamState::PollingOnly;
210 }
211}
212
213impl<F, S, C> Stream for HybridStream<F, S, C>
214where
215 F: AddressFetcher + Unpin,
216 S: Stream<Item = Result<(), ApiError>> + Unpin,
217 C: Clock + Unpin,
218{
219 type Item = Vec<IpChange>;
220
221 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
222 loop {
223 let trigger = match &mut self.state {
224 StreamState::Hybrid { api_stream } => {
225 // Check API stream first (higher priority for responsiveness)
226 match Pin::new(api_stream).poll_next(cx) {
227 Poll::Ready(Some(Ok(()))) => PollTrigger::ApiEvent,
228 Poll::Ready(Some(Err(_)) | None) => {
229 // API failed or ended - will degrade
230 PollTrigger::ApiDegraded
231 }
232 Poll::Pending => {
233 // API not ready - check interval
234 if Pin::new(&mut self.interval).poll_tick(cx).is_ready() {
235 PollTrigger::Interval
236 } else {
237 PollTrigger::Pending
238 }
239 }
240 }
241 }
242 StreamState::PollingOnly => {
243 // Only check interval in polling-only mode
244 if Pin::new(&mut self.interval).poll_tick(cx).is_ready() {
245 PollTrigger::Interval
246 } else {
247 PollTrigger::Pending
248 }
249 }
250 };
251
252 match trigger {
253 PollTrigger::Pending => return Poll::Pending,
254 PollTrigger::ApiDegraded => {
255 // Degrade to polling-only mode
256 self.degrade_to_polling();
257 // Continue loop to check interval
258 }
259 PollTrigger::ApiEvent | PollTrigger::Interval => {
260 tracing::debug!("Check triggered by {}", trigger.label());
261
262 // Capture snapshot BEFORE fetch (needed for debounce baseline)
263 // Only clone when we might start debouncing
264 let pre_fetch_snapshot =
265 if self.debounce.is_some() && self.debounce_start.is_none() {
266 self.prev_snapshot.clone()
267 } else {
268 None
269 };
270
271 // Fetch and process changes
272 let Ok(changes) = self.fetch_changes() else {
273 // Fetch error - continue waiting for next trigger
274 continue;
275 };
276
277 // API events start debounce even without detected changes,
278 // because Windows may notify before IP is visible
279 let triggered_by_api = matches!(trigger, PollTrigger::ApiEvent);
280
281 if let Some(result) =
282 self.process_with_debounce(changes, pre_fetch_snapshot, triggered_by_api)
283 {
284 tracing::debug!(
285 "Emitting {} change(s) triggered by {}",
286 result.len(),
287 trigger.label()
288 );
289 return Poll::Ready(Some(result));
290 }
291 // No changes to emit - loop back to wait for next trigger
292 }
293 }
294 }
295 }
296}