use std::io;
use std::net::{IpAddr, SocketAddr};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, Instant};
use std::thread;
use dashmap::DashMap;
use rand::Rng;
use tokio::net::{TcpStream, UdpSocket};
use tokio::sync::Semaphore;
use crate::config::{ProxyKind, ProxySpec, ScanPlan};
pub(crate) fn host_over_deadline(
host_start: &DashMap<IpAddr, Instant>,
host: IpAddr,
limit: Duration,
) -> bool {
let now = Instant::now();
let start = *host_start.entry(host).or_insert(now);
now.duration_since(start) > limit
}
pub(crate) fn sample_inter_probe_delay(
scan_delay: Option<Duration>,
max_scan_delay: Option<Duration>,
) -> Option<Duration> {
let mut rng = rand::thread_rng();
match (scan_delay, max_scan_delay) {
(None, None) => None,
(Some(d), None) => Some(d),
(None, Some(max)) => {
let span = max.as_nanos();
if span == 0 {
return Some(Duration::ZERO);
}
let n = rng.gen_range(0u128..=span);
Some(duration_from_nanos_saturating(n))
}
(Some(min), Some(max)) => {
if max <= min {
return Some(min);
}
let span = max.saturating_sub(min).as_nanos();
let n = rng.gen_range(0u128..=span);
Some(min.saturating_add(duration_from_nanos_saturating(n)))
}
}
}
fn duration_from_nanos_saturating(n: u128) -> Duration {
if n <= u64::MAX as u128 {
Duration::from_nanos(n as u64)
} else {
Duration::from_nanos(u64::MAX)
}
}
pub(crate) async fn sleep_inter_probe_delay(
scan_delay: Option<Duration>,
max_scan_delay: Option<Duration>,
) {
if let Some(d) = sample_inter_probe_delay(scan_delay, max_scan_delay) {
if !d.is_zero() {
tokio::time::sleep(d).await;
}
}
}
pub fn sleep_inter_probe_delay_sync(
scan_delay: Option<Duration>,
max_scan_delay: Option<Duration>,
) {
if let Some(d) = sample_inter_probe_delay(scan_delay, max_scan_delay) {
if !d.is_zero() {
thread::sleep(d);
}
}
}
pub struct ProbeRatePacer {
next_slot: Mutex<Instant>,
interval: Duration,
}
impl ProbeRatePacer {
pub fn maybe_new(max_rate: Option<u64>, _min_rate: Option<u64>) -> Option<Arc<Self>> {
max_rate.map(|n| Arc::new(Self::new(n as f64)))
}
pub fn new(probes_per_second: f64) -> Self {
assert!(probes_per_second > 0.0 && probes_per_second.is_finite());
Self {
next_slot: Mutex::new(Instant::now()),
interval: Duration::from_secs_f64(1.0 / probes_per_second),
}
}
pub async fn wait_turn(&self) {
let sleep_for = {
let mut next = self.next_slot.lock().expect("probe rate pacer");
let now = Instant::now();
let start = *next;
if start > now {
let w = start - now;
*next = start + self.interval;
w
} else {
*next = now + self.interval;
Duration::ZERO
}
};
if !sleep_for.is_zero() {
tokio::time::sleep(sleep_for).await;
}
}
pub fn wait_turn_sync(&self) {
let sleep_for = {
let mut next = self.next_slot.lock().expect("probe rate pacer");
let now = Instant::now();
let start = *next;
if start > now {
let w = start - now;
*next = start + self.interval;
w
} else {
*next = now + self.interval;
Duration::ZERO
}
};
if !sleep_for.is_zero() {
thread::sleep(sleep_for);
}
}
}
const UDP_ICMP_DRAIN_MS: u64 = 2;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UdpIcmpOutcome {
Closed,
Filtered,
}
pub type UdpIcmpNotes = Arc<DashMap<(IpAddr, u16), UdpIcmpOutcome>>;
pub(crate) fn merge_udp_icmp_note(notes: &UdpIcmpNotes, k: (IpAddr, u16), new: UdpIcmpOutcome) {
notes
.entry(k)
.and_modify(|cur| {
if new == UdpIcmpOutcome::Closed {
*cur = UdpIcmpOutcome::Closed;
}
})
.or_insert(new);
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PortReason {
SynAck,
ConnRefused,
TcpRst,
TcpWindowRst,
Timeout,
HostTimeout,
Error,
UdpResponse,
IcmpPortUnreachable,
IcmpUnreachableFiltered,
IcmpProtoUnreachable,
FtpBounceOpen,
FtpBounceClosed,
SctpInitAck,
SctpCookieAck,
SctpAbort,
IdleIpIdOpen,
IdleIpIdClosed,
IdleProbeFailed,
}
#[derive(Debug, Clone)]
pub struct PortLine {
pub host: IpAddr,
pub port: u16,
pub proto: &'static str,
pub state: &'static str,
pub reason: PortReason,
pub latency_ms: Option<u128>,
pub version_info: Option<String>,
}
impl PortLine {
pub(crate) fn new(
host: IpAddr,
port: u16,
proto: &'static str,
state: &'static str,
reason: PortReason,
latency_ms: Option<u128>,
) -> Self {
Self {
host,
port,
proto,
state,
reason,
latency_ms,
version_info: None,
}
}
}
async fn connect_via_proxy(proxy: &ProxySpec, target: SocketAddr) -> io::Result<TcpStream> {
let proxy_addr: SocketAddr = SocketAddr::new(
proxy.host.parse().map_err(|e| {
io::Error::new(io::ErrorKind::InvalidInput, format!("bad proxy host: {e}"))
})?,
proxy.port,
);
match proxy.kind {
ProxyKind::Socks4 => tokio_socks::tcp::Socks4Stream::connect(proxy_addr, target)
.await
.map(|s| s.into_inner())
.map_err(io::Error::other),
ProxyKind::Http => {
let mut stream = TcpStream::connect(proxy_addr).await?;
let req = format!(
"CONNECT {}:{} HTTP/1.1\r\nHost: {}:{}\r\n\r\n",
target.ip(),
target.port(),
target.ip(),
target.port()
);
use tokio::io::{AsyncReadExt, AsyncWriteExt};
stream.write_all(req.as_bytes()).await?;
let mut buf = [0u8; 1024];
let n = stream.read(&mut buf).await?;
let resp = String::from_utf8_lossy(&buf[..n]);
if resp.contains("200") {
Ok(stream)
} else {
Err(io::Error::new(
io::ErrorKind::ConnectionRefused,
format!(
"HTTP CONNECT rejected: {}",
resp.lines().next().unwrap_or("")
),
))
}
}
}
}
struct TcpConnectCtx {
work: Vec<(IpAddr, u16)>,
next_idx: AtomicUsize,
timeout: Duration,
no_ping: bool,
max_tries: u32,
pacer: Option<Arc<ProbeRatePacer>>,
host_deadline: Option<Arc<DashMap<IpAddr, Instant>>>,
host_limit: Option<Duration>,
scan_delay: Option<Duration>,
max_scan_delay: Option<Duration>,
proxies: Vec<ProxySpec>,
progress: Option<Arc<AtomicUsize>>,
}
struct ResultSlots {
slots: Vec<std::cell::UnsafeCell<std::mem::MaybeUninit<PortLine>>>,
}
unsafe impl Send for ResultSlots {}
unsafe impl Sync for ResultSlots {}
pub async fn tcp_connect_scan(
work: Vec<(IpAddr, u16)>,
plan: Arc<ScanPlan>,
progress: Option<Arc<AtomicUsize>>,
) -> Vec<PortLine> {
let n = work.len();
if n == 0 {
return Vec::new();
}
let conc = plan.effective_probe_concurrency().min(n);
let max_tries = 1u32.saturating_add(plan.connect_retries);
let use_blocking = plan.proxies.is_empty() && n >= 64;
let ctx = Arc::new(TcpConnectCtx {
work,
next_idx: AtomicUsize::new(0),
timeout: plan.connect_timeout,
no_ping: plan.no_ping,
max_tries,
pacer: ProbeRatePacer::maybe_new(plan.max_probe_rate, plan.min_probe_rate),
host_deadline: plan.host_timeout.map(|_| Arc::new(DashMap::new())),
host_limit: plan.host_timeout,
scan_delay: plan.scan_delay,
max_scan_delay: plan.max_scan_delay,
proxies: plan.proxies.clone(),
progress,
});
let results = Arc::new(ResultSlots {
slots: (0..n)
.map(|_| std::cell::UnsafeCell::new(std::mem::MaybeUninit::uninit()))
.collect(),
});
if use_blocking {
let ctx2 = ctx.clone();
let results2 = results.clone();
tokio::task::spawn_blocking(move || {
thread::scope(|s| {
for _ in 0..conc {
let ctx = &ctx2;
let results = &results2;
s.spawn(move || loop {
let i = ctx.next_idx.fetch_add(1, Ordering::Relaxed);
if i >= ctx.work.len() {
break;
}
let (host, port) = ctx.work[i];
let line = tcp_connect_one_probe_blocking(ctx, host, port);
unsafe { (*results.slots[i].get()).write(line) };
if let Some(ref p) = ctx.progress {
p.fetch_add(1, Ordering::Relaxed);
}
});
}
});
})
.await
.expect("blocking tcp connect pool");
} else {
let mut workers = Vec::with_capacity(conc);
for _ in 0..conc {
let ctx = ctx.clone();
let results = results.clone();
workers.push(tokio::spawn(async move {
loop {
let i = ctx.next_idx.fetch_add(1, Ordering::Relaxed);
if i >= ctx.work.len() {
break;
}
let (host, port) = ctx.work[i];
let line = tcp_connect_one_probe_async(&ctx, host, port).await;
unsafe { (*results.slots[i].get()).write(line) };
if let Some(ref p) = ctx.progress {
p.fetch_add(1, Ordering::Relaxed);
}
}
}));
}
for w in workers {
let _ = w.await;
}
}
let slots = Arc::try_unwrap(results).unwrap_or_else(|_| panic!("workers still hold results"));
slots
.slots
.into_iter()
.map(|cell| unsafe { cell.into_inner().assume_init() })
.collect()
}
fn tcp_connect_one_probe_blocking(ctx: &TcpConnectCtx, host: IpAddr, port: u16) -> PortLine {
let addr = SocketAddr::new(host, port);
let mut timeouts = 0u32;
loop {
if let (Some(limit), Some(ref hs)) = (ctx.host_limit, ctx.host_deadline.as_ref()) {
if host_over_deadline(hs.as_ref(), host, limit) {
return PortLine::new(host, port, "tcp", "filtered", PortReason::HostTimeout, None);
}
}
if timeouts == 0 {
sleep_inter_probe_delay_sync(ctx.scan_delay, ctx.max_scan_delay);
if let Some(p) = ctx.pacer.as_ref() {
p.wait_turn_sync();
}
}
let start = Instant::now();
let res = std::net::TcpStream::connect_timeout(&addr, ctx.timeout);
let elapsed = start.elapsed().as_millis();
match res {
Ok(stream) => {
drop(stream);
return PortLine::new(host, port, "tcp", "open", PortReason::SynAck, Some(elapsed));
}
Err(e) => {
let kind = e.kind();
if kind == io::ErrorKind::ConnectionRefused {
return PortLine::new(
host,
port,
"tcp",
"closed",
PortReason::ConnRefused,
Some(elapsed),
);
} else if kind == io::ErrorKind::TimedOut || kind == io::ErrorKind::WouldBlock {
timeouts += 1;
if timeouts >= ctx.max_tries {
return PortLine::new(
host,
port,
"tcp",
if ctx.no_ping {
"open|filtered"
} else {
"filtered"
},
PortReason::Timeout,
None,
);
}
} else {
return PortLine::new(
host,
port,
"tcp",
"filtered",
PortReason::Error,
Some(elapsed),
);
}
}
}
}
}
async fn tcp_connect_one_probe_async(ctx: &TcpConnectCtx, host: IpAddr, port: u16) -> PortLine {
let addr = SocketAddr::new(host, port);
let mut timeouts = 0u32;
loop {
if let (Some(limit), Some(ref hs)) = (ctx.host_limit, ctx.host_deadline.as_ref()) {
if host_over_deadline(hs.as_ref(), host, limit) {
return PortLine::new(host, port, "tcp", "filtered", PortReason::HostTimeout, None);
}
}
if timeouts == 0 {
sleep_inter_probe_delay(ctx.scan_delay, ctx.max_scan_delay).await;
if let Some(p) = ctx.pacer.as_ref() {
p.wait_turn().await;
}
}
let start = Instant::now();
let res = if let Some(proxy) = ctx.proxies.first() {
tokio::time::timeout(ctx.timeout, connect_via_proxy(proxy, addr)).await
} else {
tokio::time::timeout(ctx.timeout, TcpStream::connect(addr)).await
};
let elapsed = start.elapsed().as_millis();
match res {
Ok(Ok(stream)) => {
drop(stream);
return PortLine::new(host, port, "tcp", "open", PortReason::SynAck, Some(elapsed));
}
Ok(Err(e)) => {
let (state, reason): (&'static str, PortReason) =
if e.kind() == io::ErrorKind::ConnectionRefused {
("closed", PortReason::ConnRefused)
} else {
("filtered", PortReason::Error)
};
return PortLine::new(host, port, "tcp", state, reason, Some(elapsed));
}
Err(_) => {
timeouts += 1;
if timeouts >= ctx.max_tries {
return PortLine::new(
host,
port,
"tcp",
if ctx.no_ping {
"open|filtered"
} else {
"filtered"
},
PortReason::Timeout,
None,
);
}
}
}
}
}
pub async fn udp_scan(
work: Vec<(IpAddr, u16)>,
plan: Arc<ScanPlan>,
icmp_notes: Option<UdpIcmpNotes>,
) -> Vec<PortLine> {
let conc = plan.effective_probe_concurrency();
let timeout = plan.connect_timeout;
let pacer = ProbeRatePacer::maybe_new(plan.max_probe_rate, plan.min_probe_rate);
let host_deadline = plan.host_timeout.map(|_| Arc::new(DashMap::new()));
let host_limit = plan.host_timeout;
let scan_delay = plan.scan_delay;
let max_scan_delay = plan.max_scan_delay;
let connect_retries = plan.connect_retries;
let max_tries = 1u32.saturating_add(connect_retries);
let sem = Arc::new(Semaphore::new(conc));
let n = work.len();
let mut handles = Vec::with_capacity(n);
for (host, port) in work {
let sem = sem.clone();
let icmp_notes = icmp_notes.clone();
let pacer = pacer.clone();
let host_deadline = host_deadline.clone();
handles.push(tokio::spawn(async move {
if let (Some(limit), Some(ref hs)) = (host_limit, host_deadline.as_ref()) {
if host_over_deadline(hs.as_ref(), host, limit) {
return PortLine::new(
host,
port,
"udp",
"filtered",
PortReason::HostTimeout,
None,
);
}
}
let bind_addr: SocketAddr = match host {
IpAddr::V4(_) => "0.0.0.0:0".parse().unwrap(),
IpAddr::V6(_) => "[::]:0".parse().unwrap(),
};
let dst = SocketAddr::new(host, port);
let payload = [0x00u8];
let overall_start = Instant::now();
let mut timeouts = 0u32;
let _permit = sem.acquire().await.expect("semaphore closed");
let Some(socket) = UdpSocket::bind(bind_addr).await.ok() else {
return PortLine::new(
host,
port,
"udp",
"filtered",
PortReason::Error,
Some(overall_start.elapsed().as_millis()),
);
};
loop {
if timeouts == 0 {
sleep_inter_probe_delay(scan_delay, max_scan_delay).await;
if let Some(p) = pacer.as_ref() {
p.wait_turn().await;
}
}
let start = Instant::now();
if socket.send_to(&payload, dst).await.is_err() {
return PortLine::new(
host,
port,
"udp",
"filtered",
PortReason::Error,
Some(overall_start.elapsed().as_millis()),
);
}
let mut buf = [0u8; 512];
let recv = socket.recv_from(&mut buf);
let res = tokio::time::timeout(timeout, recv).await;
let elapsed = start.elapsed().as_millis();
match res {
Ok(Ok((n, _))) if n > 0 => {
return PortLine::new(
host,
port,
"udp",
"open",
PortReason::UdpResponse,
Some(elapsed),
);
}
Ok(_) => {
return PortLine::new(
host,
port,
"udp",
"open|filtered",
PortReason::Error,
Some(elapsed),
);
}
Err(_) => {
timeouts += 1;
if timeouts >= max_tries {
if let Some(ref notes) = icmp_notes {
tokio::time::sleep(Duration::from_millis(UDP_ICMP_DRAIN_MS)).await;
if let Some(out) = notes.get(&(host, port)).as_deref().copied() {
return match out {
UdpIcmpOutcome::Closed => PortLine::new(
host,
port,
"udp",
"closed",
PortReason::IcmpPortUnreachable,
None,
),
UdpIcmpOutcome::Filtered => PortLine::new(
host,
port,
"udp",
"filtered",
PortReason::IcmpUnreachableFiltered,
None,
),
};
}
}
return PortLine::new(
host,
port,
"udp",
"open|filtered",
PortReason::Timeout,
None,
);
}
}
}
}
}));
}
let mut results = Vec::with_capacity(n);
for handle in handles {
results.push(handle.await.expect("udp probe task panicked"));
}
results
}
#[cfg(test)]
mod inter_probe_delay_tests {
use std::time::Duration;
use super::sample_inter_probe_delay;
#[test]
fn sample_fixed_when_only_scan_delay() {
let d = Duration::from_millis(40);
assert_eq!(
sample_inter_probe_delay(Some(d), None),
Some(Duration::from_millis(40))
);
}
#[test]
fn sample_inter_probe_delay_max_only_is_bounded() {
let max = Duration::from_millis(20);
let sampled = sample_inter_probe_delay(None, Some(max)).unwrap();
assert!(sampled <= max);
}
#[test]
fn sample_inter_probe_delay_both_none() {
assert!(sample_inter_probe_delay(None, None).is_none());
}
#[test]
fn sample_inter_probe_delay_min_max_equal_returns_min() {
let d = Duration::from_millis(50);
assert_eq!(sample_inter_probe_delay(Some(d), Some(d)), Some(d));
}
#[test]
fn sample_inter_probe_delay_min_max_range_is_bounded() {
let min = Duration::from_millis(10);
let max = Duration::from_millis(30);
let sampled = sample_inter_probe_delay(Some(min), Some(max)).unwrap();
assert!(sampled >= min);
assert!(sampled <= max);
}
#[test]
fn sample_inter_probe_delay_max_zero_is_zero() {
assert_eq!(
sample_inter_probe_delay(None, Some(Duration::ZERO)),
Some(Duration::ZERO)
);
}
#[test]
fn sample_inter_probe_delay_max_less_than_min_returns_min() {
let min = Duration::from_millis(100);
let max = Duration::from_millis(20);
assert_eq!(sample_inter_probe_delay(Some(min), Some(max)), Some(min));
}
}
#[cfg(test)]
mod host_deadline_tests {
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
use dashmap::DashMap;
use super::host_over_deadline;
#[test]
fn host_deadline_two_probes_within_limit() {
let m = DashMap::new();
let h = IpAddr::V4(Ipv4Addr::new(9, 8, 7, 6));
assert!(!host_over_deadline(&m, h, Duration::from_secs(60)));
assert!(!host_over_deadline(&m, h, Duration::from_secs(60)));
}
#[test]
fn host_deadline_expired_after_limit() {
use std::thread;
let m = DashMap::new();
let h = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
assert!(!host_over_deadline(&m, h, Duration::from_millis(1)));
thread::sleep(Duration::from_millis(5));
assert!(host_over_deadline(&m, h, Duration::from_millis(1)));
}
}
#[cfg(test)]
mod pacer_tests {
use std::time::{Duration, Instant};
use super::ProbeRatePacer;
#[test]
fn probe_rate_pacer_high_rate_allows_back_to_back_sync() {
let p = ProbeRatePacer::new(1_000_000.0);
let t0 = Instant::now();
p.wait_turn_sync();
p.wait_turn_sync();
assert!(t0.elapsed() < Duration::from_millis(5));
}
#[test]
fn maybe_new_none_without_max_rate() {
assert!(ProbeRatePacer::maybe_new(None, Some(100)).is_none());
}
#[test]
fn maybe_new_some_with_max_rate() {
assert!(ProbeRatePacer::maybe_new(Some(500), None).is_some());
}
}
#[cfg(test)]
mod merge_tests {
use std::net::{IpAddr, Ipv4Addr};
use std::sync::Arc;
use dashmap::DashMap;
use super::{merge_udp_icmp_note, UdpIcmpNotes, UdpIcmpOutcome};
#[test]
fn merge_prefers_closed_over_filtered() {
let notes: UdpIcmpNotes = Arc::new(DashMap::new());
let k = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)), 7);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Filtered);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Closed);
assert_eq!(*notes.get(&k).unwrap(), UdpIcmpOutcome::Closed);
}
#[test]
fn merge_keeps_closed_when_later_filtered() {
let notes: UdpIcmpNotes = Arc::new(DashMap::new());
let k = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 2)), 9);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Closed);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Filtered);
assert_eq!(*notes.get(&k).unwrap(), UdpIcmpOutcome::Closed);
}
#[test]
fn merge_filtered_stays_filtered() {
let notes: UdpIcmpNotes = Arc::new(DashMap::new());
let k = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), 11);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Filtered);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Filtered);
assert_eq!(*notes.get(&k).unwrap(), UdpIcmpOutcome::Filtered);
}
#[test]
fn merge_inserts_first_observation_unchanged() {
let notes: UdpIcmpNotes = Arc::new(DashMap::new());
let k = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 3)), 11);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Filtered);
assert_eq!(*notes.get(&k).unwrap(), UdpIcmpOutcome::Filtered);
}
#[test]
fn merge_does_not_overwrite_closed_with_closed() {
let notes: UdpIcmpNotes = Arc::new(DashMap::new());
let k = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 4)), 12);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Closed);
merge_udp_icmp_note(¬es, k, UdpIcmpOutcome::Closed);
assert_eq!(*notes.get(&k).unwrap(), UdpIcmpOutcome::Closed);
}
#[test]
fn merge_independent_keys_isolated() {
let notes: UdpIcmpNotes = Arc::new(DashMap::new());
let k1 = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), 13);
let k2 = (IpAddr::V4(Ipv4Addr::new(10, 0, 0, 5)), 14);
merge_udp_icmp_note(¬es, k1, UdpIcmpOutcome::Closed);
merge_udp_icmp_note(¬es, k2, UdpIcmpOutcome::Filtered);
assert_eq!(*notes.get(&k1).unwrap(), UdpIcmpOutcome::Closed);
assert_eq!(*notes.get(&k2).unwrap(), UdpIcmpOutcome::Filtered);
}
}
#[cfg(test)]
mod nano_duration_tests {
use super::duration_from_nanos_saturating;
use std::time::Duration;
#[test]
fn nanos_within_u64_range_round_trips() {
let d = duration_from_nanos_saturating(1_000_000_000);
assert_eq!(d, Duration::from_secs(1));
}
#[test]
fn nanos_zero_yields_zero() {
assert_eq!(duration_from_nanos_saturating(0), Duration::ZERO);
}
#[test]
fn nanos_at_u64_max_does_not_overflow() {
let n = u64::MAX as u128;
let d = duration_from_nanos_saturating(n);
assert_eq!(d, Duration::from_nanos(u64::MAX));
}
#[test]
fn nanos_above_u64_max_saturates() {
let n = u128::MAX;
let d = duration_from_nanos_saturating(n);
assert_eq!(d, Duration::from_nanos(u64::MAX));
}
}
#[cfg(test)]
mod sample_delay_tests {
use super::sample_inter_probe_delay;
use std::time::Duration;
#[test]
fn neither_set_returns_none() {
assert!(sample_inter_probe_delay(None, None).is_none());
}
#[test]
fn only_max_zero_returns_zero_duration() {
let d = sample_inter_probe_delay(None, Some(Duration::ZERO)).unwrap();
assert_eq!(d, Duration::ZERO);
}
#[test]
fn only_max_nonzero_returns_value_within_bounds() {
let max = Duration::from_millis(10);
for _ in 0..20 {
let d = sample_inter_probe_delay(None, Some(max)).unwrap();
assert!(d <= max, "got {d:?} > max {max:?}");
}
}
#[test]
fn both_min_equals_max_returns_min() {
let d = Duration::from_millis(5);
assert_eq!(sample_inter_probe_delay(Some(d), Some(d)).unwrap(), d);
}
#[test]
fn both_max_less_than_min_clamps_to_min() {
let min = Duration::from_millis(10);
let max = Duration::from_millis(5);
assert_eq!(sample_inter_probe_delay(Some(min), Some(max)).unwrap(), min);
}
#[test]
fn both_min_lt_max_returns_within_inclusive_range() {
let min = Duration::from_millis(2);
let max = Duration::from_millis(8);
for _ in 0..20 {
let d = sample_inter_probe_delay(Some(min), Some(max)).unwrap();
assert!(d >= min && d <= max, "got {d:?} outside [{min:?}, {max:?}]");
}
}
}
#[cfg(test)]
mod port_line_tests {
use super::{PortLine, PortReason};
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn new_sets_all_fields_and_defaults_version_info_to_none() {
let p = PortLine::new(
IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)),
22,
"tcp",
"open",
PortReason::SynAck,
Some(15),
);
assert_eq!(p.host, IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)));
assert_eq!(p.port, 22);
assert_eq!(p.proto, "tcp");
assert_eq!(p.state, "open");
assert_eq!(p.reason, PortReason::SynAck);
assert_eq!(p.latency_ms, Some(15));
assert!(p.version_info.is_none(), "version_info defaults to None");
}
#[test]
fn new_accepts_no_latency_measurement() {
let p = PortLine::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
80,
"tcp",
"filtered",
PortReason::Timeout,
None,
);
assert!(p.latency_ms.is_none());
}
#[test]
fn new_udp_proto_preserved() {
let p = PortLine::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
53,
"udp",
"open",
PortReason::UdpResponse,
Some(2),
);
assert_eq!(p.proto, "udp");
}
#[test]
fn new_sctp_proto_preserved() {
let p = PortLine::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
3868,
"sctp",
"open",
PortReason::SctpInitAck,
None,
);
assert_eq!(p.proto, "sctp");
}
#[test]
fn new_ip_proto_state_preserved() {
let p = PortLine::new(
IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1)),
6,
"ip",
"closed",
PortReason::IcmpProtoUnreachable,
None,
);
assert_eq!(p.proto, "ip");
assert_eq!(p.state, "closed");
}
#[test]
fn new_preserves_host_and_port() {
let host = IpAddr::V4(Ipv4Addr::new(192, 0, 2, 1));
let p = PortLine::new(host, 8080, "tcp", "open", PortReason::SynAck, Some(5));
assert_eq!(p.host, host);
assert_eq!(p.port, 8080);
}
#[test]
fn new_filtered_state_with_timeout_reason() {
let p = PortLine::new(
IpAddr::V4(Ipv4Addr::LOCALHOST),
1,
"tcp",
"filtered",
PortReason::Timeout,
None,
);
assert_eq!(p.reason, PortReason::Timeout);
}
}
#[cfg(test)]
mod pacer_extra_tests {
use super::ProbeRatePacer;
use std::time::Duration;
#[test]
fn maybe_new_some_with_max_rate() {
let p = ProbeRatePacer::maybe_new(Some(100), None);
assert!(p.is_some());
}
#[test]
fn maybe_new_some_with_both_rates() {
let p = ProbeRatePacer::maybe_new(Some(100), Some(10));
assert!(p.is_some());
}
#[test]
fn pacer_low_rate_enforces_spacing_sync() {
let p = ProbeRatePacer::new(100.0);
let t0 = std::time::Instant::now();
p.wait_turn_sync(); p.wait_turn_sync(); let elapsed = t0.elapsed();
assert!(
elapsed >= Duration::from_millis(5),
"second wait should be enforced, got {elapsed:?}"
);
}
#[test]
#[should_panic]
fn pacer_panics_on_zero_rate() {
let _ = ProbeRatePacer::new(0.0);
}
#[test]
#[should_panic]
fn pacer_panics_on_negative_rate() {
let _ = ProbeRatePacer::new(-1.0);
}
#[test]
#[should_panic]
fn pacer_panics_on_infinite_rate() {
let _ = ProbeRatePacer::new(f64::INFINITY);
}
}
#[cfg(test)]
mod host_deadline_extra_tests {
use super::host_over_deadline;
use dashmap::DashMap;
use std::net::{IpAddr, Ipv4Addr};
use std::time::Duration;
#[test]
fn host_deadline_initial_call_records_start_and_is_under_limit() {
let m = DashMap::new();
let h = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
assert!(!host_over_deadline(&m, h, Duration::from_secs(60)));
assert_eq!(m.len(), 1, "first call records start instant for host");
}
#[test]
fn host_deadline_different_hosts_independent_starts() {
let m = DashMap::new();
let h1 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1));
let h2 = IpAddr::V4(Ipv4Addr::new(192, 168, 1, 2));
let _ = host_over_deadline(&m, h1, Duration::from_secs(60));
let _ = host_over_deadline(&m, h2, Duration::from_secs(60));
assert_eq!(m.len(), 2);
}
#[test]
fn host_deadline_zero_limit_quickly_marks_over() {
let m = DashMap::new();
let h = IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4));
let _ = host_over_deadline(&m, h, Duration::ZERO);
std::thread::sleep(Duration::from_millis(2));
assert!(host_over_deadline(&m, h, Duration::ZERO));
}
#[test]
fn host_deadline_ipv6_tracked_separately() {
let m = DashMap::new();
let v4 = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 1));
let v6: IpAddr = "::1".parse().unwrap();
assert!(!host_over_deadline(&m, v4, Duration::from_secs(30)));
assert!(!host_over_deadline(&m, v6, Duration::from_secs(30)));
assert_eq!(m.len(), 2);
}
}