use crate::monitor::DebouncePolicy;
use crate::monitor::change::{IpChange, diff};
use crate::monitor::error::ApiError;
use crate::network::{AdapterSnapshot, AddressFetcher, FetchError};
use crate::time::Clock;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::time::{Interval, interval};
use tokio_stream::Stream;
#[derive(Debug)]
enum StreamState<S> {
Hybrid {
api_stream: S,
},
PollingOnly,
}
#[derive(Debug)]
enum PollTrigger {
ApiEvent,
ApiDegraded,
Interval,
Pending,
}
impl PollTrigger {
const fn label(&self) -> &'static str {
match self {
Self::ApiEvent => "API event",
Self::ApiDegraded => "API degradation",
Self::Interval => "polling interval",
Self::Pending => "pending",
}
}
}
#[derive(Debug)]
pub struct HybridStream<F, S, C> {
fetcher: F,
clock: C,
interval: Interval,
debounce: Option<DebouncePolicy>,
state: StreamState<S>,
prev_snapshot: Option<Vec<AdapterSnapshot>>,
debounce_start: Option<tokio::time::Instant>,
debounce_baseline: Option<Vec<AdapterSnapshot>>,
}
impl<F, S, C> HybridStream<F, S, C>
where
F: AddressFetcher,
S: Stream<Item = Result<(), ApiError>> + Unpin,
C: Clock,
{
pub(super) fn new(
fetcher: F,
api_stream: S,
clock: C,
poll_interval: Duration,
debounce: Option<DebouncePolicy>,
) -> Self {
Self {
fetcher,
clock,
interval: interval(poll_interval),
debounce,
state: StreamState::Hybrid { api_stream },
prev_snapshot: None,
debounce_start: None,
debounce_baseline: None,
}
}
#[must_use]
pub const fn is_polling_only(&self) -> bool {
matches!(self.state, StreamState::PollingOnly)
}
#[must_use]
pub fn current_snapshot(&self) -> Option<&[AdapterSnapshot]> {
self.prev_snapshot.as_deref()
}
fn fetch_changes(&mut self) -> Result<Vec<IpChange>, FetchError> {
let current = self.fetcher.fetch()?;
let timestamp = self.clock.now();
let changes = self
.prev_snapshot
.as_ref()
.map_or_else(Vec::new, |prev| diff(prev, ¤t, timestamp));
self.prev_snapshot = Some(current);
Ok(changes)
}
fn process_with_debounce(
&mut self,
raw_changes: Vec<IpChange>,
pre_fetch_snapshot: Option<Vec<AdapterSnapshot>>,
triggered_by_api: bool,
) -> Option<Vec<IpChange>> {
let Some(debounce) = &self.debounce else {
return if raw_changes.is_empty() {
None
} else {
Some(raw_changes)
};
};
let now = tokio::time::Instant::now();
let has_valid_baseline = pre_fetch_snapshot.is_some();
let should_start_window = self.debounce_start.is_none()
&& has_valid_baseline
&& (!raw_changes.is_empty() || triggered_by_api);
if should_start_window {
if triggered_by_api && raw_changes.is_empty() {
tracing::trace!(
"API event triggered but no changes detected yet, starting observation window"
);
}
self.debounce_start = Some(now);
self.debounce_baseline = pre_fetch_snapshot;
}
if let Some(start) = self.debounce_start {
if now.duration_since(start) >= debounce.window() {
return self.finalize_debounce();
}
}
None
}
fn finalize_debounce(&mut self) -> Option<Vec<IpChange>> {
let baseline = self.debounce_baseline.take();
self.debounce_start = None;
let baseline = baseline?;
let current = self.prev_snapshot.as_ref()?;
let timestamp = self.clock.now();
let changes = diff(&baseline, current, timestamp);
if changes.is_empty() {
None
} else {
Some(changes)
}
}
fn degrade_to_polling(&mut self) {
self.state = StreamState::PollingOnly;
}
}
impl<F, S, C> Stream for HybridStream<F, S, C>
where
F: AddressFetcher + Unpin,
S: Stream<Item = Result<(), ApiError>> + Unpin,
C: Clock + Unpin,
{
type Item = Vec<IpChange>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
let trigger = match &mut self.state {
StreamState::Hybrid { api_stream } => {
match Pin::new(api_stream).poll_next(cx) {
Poll::Ready(Some(Ok(()))) => PollTrigger::ApiEvent,
Poll::Ready(Some(Err(_)) | None) => {
PollTrigger::ApiDegraded
}
Poll::Pending => {
if Pin::new(&mut self.interval).poll_tick(cx).is_ready() {
PollTrigger::Interval
} else {
PollTrigger::Pending
}
}
}
}
StreamState::PollingOnly => {
if Pin::new(&mut self.interval).poll_tick(cx).is_ready() {
PollTrigger::Interval
} else {
PollTrigger::Pending
}
}
};
match trigger {
PollTrigger::Pending => return Poll::Pending,
PollTrigger::ApiDegraded => {
self.degrade_to_polling();
}
PollTrigger::ApiEvent | PollTrigger::Interval => {
tracing::debug!("Check triggered by {}", trigger.label());
let pre_fetch_snapshot =
if self.debounce.is_some() && self.debounce_start.is_none() {
self.prev_snapshot.clone()
} else {
None
};
let Ok(changes) = self.fetch_changes() else {
continue;
};
let triggered_by_api = matches!(trigger, PollTrigger::ApiEvent);
if let Some(result) =
self.process_with_debounce(changes, pre_fetch_snapshot, triggered_by_api)
{
tracing::debug!(
"Emitting {} change(s) triggered by {}",
result.len(),
trigger.label()
);
return Poll::Ready(Some(result));
}
}
}
}
}
}