stackpatrol 0.2.5

Single-binary Rust CLI that monitors a server and reports to the StackPatrol control plane.
use std::collections::HashMap;
use std::time::Duration;

use stackpatrol_core::event::Event;
use tokio::net::TcpStream;
use tokio::task::JoinSet;
use tokio::time;

/// Per-target connect deadline. Long enough for slow links, short enough that the
/// probe finishes well within a 30s heartbeat tick when many targets are checked
/// in parallel.
const CONNECT_TIMEOUT: Duration = Duration::from_secs(3);

pub struct PortProbe {
    tcp_targets: Vec<String>,
    /// target -> was_up_last_tick
    last_state: HashMap<String, bool>,
    initialized: bool,
}

impl PortProbe {
    pub fn new(tcp_targets: Vec<String>) -> Self {
        Self {
            tcp_targets,
            last_state: HashMap::new(),
            initialized: false,
        }
    }

    pub fn enabled(&self) -> bool {
        !self.tcp_targets.is_empty()
    }

    /// Check every configured target in parallel and return events for state
    /// transitions since the previous tick. Same baseline-on-first-tick + diff-only
    /// pattern as the other probes.
    pub async fn tick(&mut self) -> Vec<Event> {
        if !self.enabled() {
            return Vec::new();
        }

        let mut set = JoinSet::new();
        for target in &self.tcp_targets {
            let t = target.clone();
            set.spawn(async move {
                let up = check_tcp(&t).await;
                (t, up)
            });
        }

        let mut current = HashMap::new();
        while let Some(res) = set.join_next().await {
            if let Ok((target, up)) = res {
                current.insert(target, up);
            }
        }

        if !self.initialized {
            self.last_state = current;
            self.initialized = true;
            return Vec::new();
        }

        let mut events = Vec::new();
        for (target, up_now) in &current {
            match self.last_state.get(target) {
                Some(up_before) if up_before == up_now => {}
                Some(_) => {
                    events.push(if *up_now {
                        Event::ServiceUp {
                            name: target.clone(),
                        }
                    } else {
                        Event::ServiceDown {
                            name: target.clone(),
                        }
                    });
                }
                None if !up_now => {
                    events.push(Event::ServiceDown {
                        name: target.clone(),
                    });
                }
                None => {}
            }
        }

        self.last_state = current;
        events
    }
}

async fn check_tcp(target: &str) -> bool {
    matches!(
        time::timeout(CONNECT_TIMEOUT, TcpStream::connect(target)).await,
        Ok(Ok(_))
    )
}

#[cfg(test)]
mod tests {
    use super::*;
    use tokio::net::TcpListener;

    #[tokio::test]
    async fn check_tcp_against_listening_port_succeeds() {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        assert!(check_tcp(&addr.to_string()).await);
    }

    #[tokio::test]
    async fn check_tcp_against_dead_port_fails() {
        // Bind, capture the port, drop the listener — the port is now closed.
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        drop(listener);
        assert!(!check_tcp(&addr.to_string()).await);
    }

    #[tokio::test]
    async fn disabled_when_no_targets() {
        let mut probe = PortProbe::new(vec![]);
        assert!(!probe.enabled());
        assert!(probe.tick().await.is_empty());
    }

    #[tokio::test]
    async fn first_tick_seeds_baseline() {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap().to_string();
        let mut probe = PortProbe::new(vec![addr]);
        let events = probe.tick().await;
        assert!(events.is_empty(), "first tick should emit nothing");
        assert!(probe.initialized);
    }

    #[tokio::test]
    async fn transitions_emit_events() {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap().to_string();
        let mut probe = PortProbe::new(vec![addr.clone()]);

        // First tick: target is up, baseline recorded.
        let events = probe.tick().await;
        assert!(events.is_empty());

        // Drop the listener — port closes.
        drop(listener);
        let events = probe.tick().await;
        assert_eq!(events.len(), 1);
        assert!(matches!(&events[0], Event::ServiceDown { name } if name == &addr));
    }
}