ddns_a/monitor/poller/
stream.rs1use super::super::DebouncePolicy;
7use super::super::change::{IpChange, diff};
8use crate::network::{AdapterSnapshot, AddressFetcher, FetchError};
9use crate::time::Clock;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::Duration;
13use tokio::time::{Interval, interval};
14use tokio_stream::Stream;
15
16pub struct PollingStream<F, C> {
21 fetcher: F,
22 clock: C,
23 interval: Interval,
24 debounce: Option<DebouncePolicy>,
25 prev_snapshot: Option<Vec<AdapterSnapshot>>,
27 debounce_start: Option<tokio::time::Instant>,
29 debounce_baseline: Option<Vec<AdapterSnapshot>>,
31}
32
33impl<F, C> PollingStream<F, C>
34where
35 F: AddressFetcher,
36 C: Clock,
37{
38 pub(super) fn new(
39 fetcher: F,
40 clock: C,
41 poll_interval: Duration,
42 debounce: Option<DebouncePolicy>,
43 ) -> Self {
44 Self {
45 fetcher,
46 clock,
47 interval: interval(poll_interval),
48 debounce,
49 prev_snapshot: None,
50 debounce_start: None,
51 debounce_baseline: None,
52 }
53 }
54
55 #[must_use]
59 pub fn current_snapshot(&self) -> Option<&[AdapterSnapshot]> {
60 self.prev_snapshot.as_deref()
61 }
62
63 fn poll_once(&mut self) -> Result<Vec<IpChange>, FetchError> {
65 let current = self.fetcher.fetch()?;
66 let timestamp = self.clock.now();
67
68 let changes = self
69 .prev_snapshot
70 .as_ref()
71 .map_or_else(Vec::new, |prev| diff(prev, ¤t, timestamp));
72
73 self.prev_snapshot = Some(current);
74 Ok(changes)
75 }
76
77 fn process_with_debounce(
82 &mut self,
83 raw_changes: Vec<IpChange>,
84 pre_poll_snapshot: Option<Vec<AdapterSnapshot>>,
85 ) -> Option<Vec<IpChange>> {
86 let Some(debounce) = &self.debounce else {
87 return if raw_changes.is_empty() {
89 None
90 } else {
91 Some(raw_changes)
92 };
93 };
94
95 if raw_changes.is_empty() && self.debounce_start.is_none() {
96 return None;
98 }
99
100 let now = tokio::time::Instant::now();
101
102 if !raw_changes.is_empty() && self.debounce_start.is_none() {
103 self.debounce_start = Some(now);
105 self.debounce_baseline = pre_poll_snapshot;
106 }
107
108 if let Some(start) = self.debounce_start {
110 if now.duration_since(start) >= debounce.window() {
111 return self.finalize_debounce();
113 }
114 }
115
116 None
117 }
118
119 fn finalize_debounce(&mut self) -> Option<Vec<IpChange>> {
121 let baseline = self.debounce_baseline.take()?;
122 self.debounce_start = None;
123
124 let current = self.prev_snapshot.as_ref()?;
125 let timestamp = self.clock.now();
126
127 let changes = diff(&baseline, current, timestamp);
128 if changes.is_empty() {
129 None
130 } else {
131 Some(changes)
132 }
133 }
134}
135
136impl<F, C> Stream for PollingStream<F, C>
137where
138 F: AddressFetcher + Unpin,
139 C: Clock + Unpin,
140{
141 type Item = Vec<IpChange>;
142
143 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
144 loop {
145 if Pin::new(&mut self.interval).poll_tick(cx).is_pending() {
147 return Poll::Pending;
148 }
149
150 let pre_poll_snapshot = if self.debounce.is_some() && self.debounce_start.is_none() {
153 self.prev_snapshot.clone()
154 } else {
155 None
156 };
157
158 let Ok(changes) = self.poll_once() else {
162 continue;
164 };
165
166 if let Some(result) = self.process_with_debounce(changes, pre_poll_snapshot) {
167 return Poll::Ready(Some(result));
168 }
169 }
171 }
172}