use std::collections::HashMap;
use std::time::Duration;
use stackpatrol_core::event::Event;
use tokio::net::TcpStream;
use tokio::task::JoinSet;
use tokio::time;
const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);
pub struct PortProbe {
tcp_targets: Vec<String>,
last_state: HashMap<String, bool>,
initialized: bool,
}
impl PortProbe {
pub fn new(tcp_targets: Vec<String>) -> Self {
Self {
tcp_targets,
last_state: HashMap::new(),
initialized: false,
}
}
pub fn enabled(&self) -> bool {
!self.tcp_targets.is_empty()
}
pub async fn tick(&mut self) -> Vec<Event> {
if !self.enabled() {
return Vec::new();
}
let mut set = JoinSet::new();
for target in &self.tcp_targets {
let t = target.clone();
set.spawn(async move {
let up = check_tcp(&t).await;
(t, up)
});
}
let mut current = HashMap::new();
while let Some(res) = set.join_next().await {
if let Ok((target, up)) = res {
current.insert(target, up);
}
}
if !self.initialized {
self.last_state = current;
self.initialized = true;
return Vec::new();
}
let mut events = Vec::new();
for (target, up_now) in ¤t {
match self.last_state.get(target) {
Some(up_before) if up_before == up_now => {}
Some(_) => {
events.push(if *up_now {
Event::ServiceUp { name: target.clone() }
} else {
Event::ServiceDown { name: target.clone() }
});
}
None if !up_now => {
events.push(Event::ServiceDown { name: target.clone() });
}
None => {}
}
}
self.last_state = current;
events
}
}
async fn check_tcp(target: &str) -> bool {
matches!(
time::timeout(CONNECT_TIMEOUT, TcpStream::connect(target)).await,
Ok(Ok(_))
)
}
#[cfg(test)]
mod tests {
use super::*;
use tokio::net::TcpListener;
#[tokio::test]
async fn check_tcp_against_listening_port_succeeds() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
assert!(check_tcp(&addr.to_string()).await);
}
#[tokio::test]
async fn check_tcp_against_dead_port_fails() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
drop(listener);
assert!(!check_tcp(&addr.to_string()).await);
}
#[tokio::test]
async fn disabled_when_no_targets() {
let mut probe = PortProbe::new(vec![]);
assert!(!probe.enabled());
assert!(probe.tick().await.is_empty());
}
#[tokio::test]
async fn first_tick_seeds_baseline() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap().to_string();
let mut probe = PortProbe::new(vec![addr]);
let events = probe.tick().await;
assert!(events.is_empty(), "first tick should emit nothing");
assert!(probe.initialized);
}
#[tokio::test]
async fn transitions_emit_events() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap().to_string();
let mut probe = PortProbe::new(vec![addr.clone()]);
let events = probe.tick().await;
assert!(events.is_empty());
drop(listener);
let events = probe.tick().await;
assert_eq!(events.len(), 1);
assert!(matches!(&events[0], Event::ServiceDown { name } if name == &addr));
}
}