use super::super::DebouncePolicy;
use super::super::change::{IpChange, diff};
use crate::network::{AdapterSnapshot, AddressFetcher, FetchError};
use crate::time::Clock;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{Interval, interval};
use tokio_stream::Stream;
pub struct PollingStream<F, C> {
fetcher: F,
clock: C,
interval: Interval,
debounce: Option<DebouncePolicy>,
prev_snapshot: Option<Vec<AdapterSnapshot>>,
debounce_start: Option<tokio::time::Instant>,
debounce_baseline: Option<Vec<AdapterSnapshot>>,
}
impl<F, C> PollingStream<F, C>
where
F: AddressFetcher,
C: Clock,
{
pub(super) fn new(
fetcher: F,
clock: C,
poll_interval: Duration,
debounce: Option<DebouncePolicy>,
) -> Self {
Self {
fetcher,
clock,
interval: interval(poll_interval),
debounce,
prev_snapshot: None,
debounce_start: None,
debounce_baseline: None,
}
}
#[must_use]
pub fn current_snapshot(&self) -> Option<&[AdapterSnapshot]> {
self.prev_snapshot.as_deref()
}
fn poll_once(&mut self) -> Result<Vec<IpChange>, FetchError> {
let current = self.fetcher.fetch()?;
let timestamp = self.clock.now();
let changes = self
.prev_snapshot
.as_ref()
.map_or_else(Vec::new, |prev| diff(prev, ¤t, timestamp));
self.prev_snapshot = Some(current);
Ok(changes)
}
fn process_with_debounce(
&mut self,
raw_changes: Vec<IpChange>,
pre_poll_snapshot: Option<Vec<AdapterSnapshot>>,
) -> Option<Vec<IpChange>> {
let Some(debounce) = &self.debounce else {
return if raw_changes.is_empty() {
None
} else {
Some(raw_changes)
};
};
if raw_changes.is_empty() && self.debounce_start.is_none() {
return None;
}
let now = tokio::time::Instant::now();
if !raw_changes.is_empty() && self.debounce_start.is_none() {
self.debounce_start = Some(now);
self.debounce_baseline = pre_poll_snapshot;
}
if let Some(start) = self.debounce_start {
if now.duration_since(start) >= debounce.window() {
return self.finalize_debounce();
}
}
None
}
fn finalize_debounce(&mut self) -> Option<Vec<IpChange>> {
let baseline = self.debounce_baseline.take()?;
self.debounce_start = None;
let current = self.prev_snapshot.as_ref()?;
let timestamp = self.clock.now();
let changes = diff(&baseline, current, timestamp);
if changes.is_empty() {
None
} else {
Some(changes)
}
}
}
impl<F, C> Stream for PollingStream<F, C>
where
F: AddressFetcher + Unpin,
C: Clock + Unpin,
{
type Item = Vec<IpChange>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
if Pin::new(&mut self.interval).poll_tick(cx).is_pending() {
return Poll::Pending;
}
let pre_poll_snapshot = if self.debounce.is_some() && self.debounce_start.is_none() {
self.prev_snapshot.clone()
} else {
None
};
let Ok(changes) = self.poll_once() else {
continue;
};
if let Some(result) = self.process_with_debounce(changes, pre_poll_snapshot) {
return Poll::Ready(Some(result));
}
}
}
}