use super::{DebouncePolicy, IpChange, PollingMonitor, merge_changes};
use crate::network::{AdapterKind, AdapterSnapshot, AddressFetcher, FetchError};
use crate::time::Clock;
use std::collections::VecDeque;
use std::net::IpAddr;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, SystemTime};
use tokio_stream::StreamExt;
struct MockClock {
secs: AtomicU64,
}
impl MockClock {
fn new(initial_secs: u64) -> Self {
Self {
secs: AtomicU64::new(initial_secs),
}
}
}
impl Clock for MockClock {
fn now(&self) -> SystemTime {
SystemTime::UNIX_EPOCH + Duration::from_secs(self.secs.load(Ordering::SeqCst))
}
}
struct MockFetcher {
results: Mutex<VecDeque<Result<Vec<AdapterSnapshot>, FetchError>>>,
}
impl MockFetcher {
fn new(results: Vec<Result<Vec<AdapterSnapshot>, FetchError>>) -> Self {
Self {
results: Mutex::new(results.into()),
}
}
fn returning_snapshots(snapshots: Vec<Vec<AdapterSnapshot>>) -> Self {
Self::new(snapshots.into_iter().map(Ok).collect())
}
}
impl AddressFetcher for MockFetcher {
fn fetch(&self) -> Result<Vec<AdapterSnapshot>, FetchError> {
self.results
.lock()
.unwrap()
.pop_front()
.unwrap_or_else(|| Ok(vec![]))
}
}
fn make_snapshot(name: &str, ipv4: Vec<&str>, ipv6: Vec<&str>) -> AdapterSnapshot {
AdapterSnapshot::new(
name,
AdapterKind::Ethernet,
ipv4.into_iter().map(|s| s.parse().unwrap()).collect(),
ipv6.into_iter().map(|s| s.parse().unwrap()).collect(),
)
}
fn timestamp() -> SystemTime {
SystemTime::UNIX_EPOCH
}
mod merge_changes_fn {
use super::*;
#[test]
fn empty_input_returns_empty() {
let result = merge_changes(&[], timestamp());
assert!(result.is_empty());
}
#[test]
fn single_added_preserved() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![IpChange::added("eth0", addr, timestamp())];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 1);
assert!(result[0].is_added());
assert_eq!(result[0].address, addr);
}
#[test]
fn single_removed_preserved() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![IpChange::removed("eth0", addr, timestamp())];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 1);
assert!(result[0].is_removed());
}
#[test]
fn added_then_removed_cancels_out() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![
IpChange::added("eth0", addr, timestamp()),
IpChange::removed("eth0", addr, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert!(result.is_empty());
}
#[test]
fn removed_then_added_cancels_out() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![
IpChange::removed("eth0", addr, timestamp()),
IpChange::added("eth0", addr, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert!(result.is_empty());
}
#[test]
fn multiple_adds_merge_to_single() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![
IpChange::added("eth0", addr, timestamp()),
IpChange::added("eth0", addr, timestamp()),
IpChange::added("eth0", addr, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 1);
assert!(result[0].is_added());
}
#[test]
fn multiple_removes_merge_to_single() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![
IpChange::removed("eth0", addr, timestamp()),
IpChange::removed("eth0", addr, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 1);
assert!(result[0].is_removed());
}
#[test]
fn different_addresses_independent() {
let addr1: IpAddr = "192.168.1.1".parse().unwrap();
let addr2: IpAddr = "192.168.1.2".parse().unwrap();
let changes = vec![
IpChange::added("eth0", addr1, timestamp()),
IpChange::removed("eth0", addr2, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 2);
}
#[test]
fn different_adapters_independent() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let changes = vec![
IpChange::added("eth0", addr, timestamp()),
IpChange::removed("eth1", addr, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 2);
}
#[test]
fn complex_sequence_with_partial_cancellation() {
let addr1: IpAddr = "192.168.1.1".parse().unwrap();
let addr2: IpAddr = "192.168.1.2".parse().unwrap();
let changes = vec![
IpChange::added("eth0", addr1, timestamp()),
IpChange::removed("eth0", addr1, timestamp()),
IpChange::added("eth0", addr1, timestamp()),
IpChange::removed("eth0", addr2, timestamp()),
IpChange::added("eth0", addr2, timestamp()),
];
let result = merge_changes(&changes, timestamp());
assert_eq!(result.len(), 1);
assert!(result[0].is_added());
assert_eq!(result[0].address, addr1);
}
#[test]
fn uses_provided_timestamp() {
let addr: IpAddr = "192.168.1.1".parse().unwrap();
let old_ts = SystemTime::UNIX_EPOCH;
let new_ts = SystemTime::UNIX_EPOCH + Duration::from_secs(1000);
let changes = vec![IpChange::added("eth0", addr, old_ts)];
let result = merge_changes(&changes, new_ts);
assert_eq!(result[0].timestamp, new_ts);
}
}
mod polling_monitor {
use super::*;
#[test]
fn new_creates_with_system_clock() {
let fetcher = MockFetcher::returning_snapshots(vec![]);
let monitor = PollingMonitor::new(fetcher, Duration::from_secs(60));
assert_eq!(monitor.interval(), Duration::from_secs(60));
assert!(monitor.debounce().is_none());
}
#[test]
fn with_clock_allows_custom_clock() {
let fetcher = MockFetcher::returning_snapshots(vec![]);
let clock = MockClock::new(1000);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_secs(30));
assert_eq!(monitor.interval(), Duration::from_secs(30));
}
#[test]
fn with_debounce_sets_policy() {
let fetcher = MockFetcher::returning_snapshots(vec![]);
let policy = DebouncePolicy::new(Duration::from_millis(500));
let monitor =
PollingMonitor::new(fetcher, Duration::from_secs(60)).with_debounce(policy.clone());
assert_eq!(monitor.debounce(), Some(&policy));
}
#[test]
fn interval_accessor() {
let fetcher = MockFetcher::returning_snapshots(vec![]);
let monitor = PollingMonitor::new(fetcher, Duration::from_secs(120));
assert_eq!(monitor.interval(), Duration::from_secs(120));
}
}
mod polling_stream {
use super::*;
#[tokio::test]
async fn emits_changes_when_addresses_change() {
let snapshot1 = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let snapshot2 = make_snapshot("eth0", vec!["192.168.1.2"], vec![]);
let fetcher = MockFetcher::returning_snapshots(vec![
vec![snapshot1], vec![snapshot2], ]);
let clock = MockClock::new(1000);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(10));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
assert_eq!(changes.len(), 1);
let batch = &changes[0];
assert_eq!(batch.len(), 2); }
#[tokio::test]
async fn no_emission_when_unchanged() {
let snapshot = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let fetcher = MockFetcher::returning_snapshots(vec![
vec![snapshot.clone()],
vec![snapshot.clone()],
vec![snapshot.clone()],
vec![make_snapshot("eth0", vec!["192.168.1.2"], vec![])], ]);
let clock = MockClock::new(0);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(5));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
assert_eq!(changes.len(), 1);
}
#[tokio::test]
async fn uses_clock_for_timestamps() {
let snapshot1 = make_snapshot("eth0", vec![], vec![]);
let snapshot2 = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let fetcher = MockFetcher::returning_snapshots(vec![vec![snapshot1], vec![snapshot2]]);
let clock = MockClock::new(12345);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(5));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
let batch = &changes[0];
let expected_time = SystemTime::UNIX_EPOCH + Duration::from_secs(12345);
assert!(batch.iter().all(|c| c.timestamp == expected_time));
}
#[tokio::test]
async fn handles_adapter_appearing() {
let fetcher = MockFetcher::returning_snapshots(vec![
vec![], vec![make_snapshot("eth0", vec!["192.168.1.1"], vec![])],
]);
let clock = MockClock::new(0);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(5));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
let batch = &changes[0];
assert_eq!(batch.len(), 1);
assert!(batch[0].is_added());
assert_eq!(batch[0].adapter, "eth0");
}
#[tokio::test]
async fn handles_adapter_disappearing() {
let fetcher = MockFetcher::returning_snapshots(vec![
vec![make_snapshot("eth0", vec!["192.168.1.1"], vec![])],
vec![], ]);
let clock = MockClock::new(0);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(5));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
let batch = &changes[0];
assert_eq!(batch.len(), 1);
assert!(batch[0].is_removed());
}
#[tokio::test]
async fn handles_multiple_adapters() {
let fetcher = MockFetcher::returning_snapshots(vec![
vec![
make_snapshot("eth0", vec!["192.168.1.1"], vec![]),
make_snapshot("eth1", vec!["10.0.0.1"], vec![]),
],
vec![
make_snapshot("eth0", vec!["192.168.1.2"], vec![]), make_snapshot("eth1", vec!["10.0.0.1"], vec![]), ],
]);
let clock = MockClock::new(0);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(5));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
let batch = &changes[0];
assert_eq!(batch.len(), 2); assert!(batch.iter().all(|c| c.adapter == "eth0"));
}
#[tokio::test(start_paused = true)]
async fn continues_after_fetch_error() {
let fetcher = MockFetcher::new(vec![
Ok(vec![make_snapshot("eth0", vec!["192.168.1.1"], vec![])]),
Err(FetchError::Platform {
message: "transient error".to_string(),
}),
Ok(vec![make_snapshot("eth0", vec!["192.168.1.2"], vec![])]),
]);
let clock = MockClock::new(0);
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(5));
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
assert!(!changes.is_empty());
}
#[tokio::test(start_paused = true)]
async fn debounce_emits_after_window_expires() {
let snapshot1 = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let snapshot2 = make_snapshot("eth0", vec!["192.168.1.2"], vec![]);
let fetcher = MockFetcher::returning_snapshots(vec![
vec![snapshot1], vec![snapshot2.clone()], vec![snapshot2], ]);
let clock = MockClock::new(1000);
let debounce = DebouncePolicy::new(Duration::from_millis(50));
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(100))
.with_debounce(debounce);
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
assert_eq!(changes.len(), 1);
let batch = &changes[0];
assert_eq!(batch.len(), 2);
assert!(batch.iter().any(IpChange::is_removed));
assert!(batch.iter().any(IpChange::is_added));
}
#[tokio::test(start_paused = true)]
async fn debounce_cancels_add_then_remove() {
let snapshot1 = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let snapshot2 = make_snapshot("eth0", vec!["192.168.1.1", "192.168.1.2"], vec![]);
let snapshot3 = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let snapshot4 = make_snapshot("eth0", vec!["10.0.0.1"], vec![]);
let fetcher = MockFetcher::returning_snapshots(vec![
vec![snapshot1], vec![snapshot2], vec![snapshot3.clone()], vec![snapshot3], vec![snapshot4.clone()], vec![snapshot4], ]);
let clock = MockClock::new(0);
let debounce = DebouncePolicy::new(Duration::from_millis(50));
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(100))
.with_debounce(debounce);
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
assert_eq!(changes.len(), 1);
let batch = &changes[0];
assert!(
batch
.iter()
.any(|c| c.is_removed() && c.address.to_string() == "192.168.1.1")
);
assert!(
batch
.iter()
.any(|c| c.is_added() && c.address.to_string() == "10.0.0.1")
);
}
#[tokio::test(start_paused = true)]
async fn debounce_no_changes_no_emit() {
let snapshot = make_snapshot("eth0", vec!["192.168.1.1"], vec![]);
let snapshot_changed = make_snapshot("eth0", vec!["192.168.1.2"], vec![]);
let snapshot_final = make_snapshot("eth0", vec!["10.0.0.1"], vec![]);
let fetcher = MockFetcher::returning_snapshots(vec![
vec![snapshot.clone()], vec![snapshot_changed], vec![snapshot.clone()], vec![snapshot], vec![snapshot_final.clone()], vec![snapshot_final], ]);
let clock = MockClock::new(0);
let debounce = DebouncePolicy::new(Duration::from_millis(50));
let monitor = PollingMonitor::with_clock(fetcher, clock, Duration::from_millis(100))
.with_debounce(debounce);
let stream = monitor.into_stream();
let changes: Vec<_> = stream.take(1).collect().await;
assert_eq!(changes.len(), 1);
assert!(
changes[0]
.iter()
.any(|c| c.address.to_string() == "10.0.0.1")
);
}
}