stackpatrol 0.2.5

Single-binary Rust CLI that monitors a server and reports to the StackPatrol control plane.
use std::time::{Duration, Instant};

use anyhow::{Context, Result};
use stackpatrol_core::event::{Event, EventEnvelope};
use tokio::signal::unix::{SignalKind, signal};

use crate::buffer::{self, Buffer};
use crate::client::{SendOutcome, build_client, events_url, make_envelope, try_send_envelope};
use crate::config::Config;
use crate::probe_docker::DockerProbe;
use crate::probe_ports::PortProbe;
use crate::probe_resources::ResourceProbe;
use crate::probe_systemd::SystemdProbe;
use crate::style;

/// Minimum interval between heartbeats. Anything lower is silly given Telegram + Postgres
/// round-trip costs and risks rate-limiting on the server side.
const MIN_INTERVAL: Duration = Duration::from_secs(5);

pub async fn run() -> Result<()> {
    let cfg = Config::load().context("could not load config — run `stackpatrol init` first")?;
    let url = events_url(&cfg);
    let client = build_client()?;
    let buffer = Buffer::new(buffer::default_path()?);

    let mut interval = Duration::from_secs(cfg.daemon.heartbeat_interval_secs);
    if interval < MIN_INTERVAL {
        interval = MIN_INTERVAL;
    }

    let mut docker_probe = DockerProbe::new(cfg.docker.projects.clone());
    let mut systemd_probe = SystemdProbe::new(cfg.systemd.units.clone());
    let mut resource_probe = ResourceProbe::new(&cfg.resources);
    let mut port_probe = PortProbe::new(cfg.ports.tcp.clone());

    println!(
        "{} server={} interval={}s",
        style::header("stackpatrol daemon starting"),
        style::bold(&cfg.server_name),
        interval.as_secs(),
    );
    println!("  endpoint   {}", style::dim(&cfg.api_endpoint));
    println!(
        "  docker     {}",
        probe_summary(docker_probe.enabled(), cfg.docker.projects.len(), "project")
    );
    println!(
        "  systemd    {}",
        probe_summary(systemd_probe.enabled(), cfg.systemd.units.len(), "unit")
    );
    println!(
        "  ports      {}",
        probe_summary(port_probe.enabled(), cfg.ports.tcp.len(), "target")
    );
    println!(
        "  resources  disk≥{}% mem≥{}% load1≥{:.2}",
        cfg.resources.disk_high_percent,
        cfg.resources.memory_high_percent,
        resource_probe.load_threshold(),
    );
    println!(
        "  buffer     {}",
        style::dim(&buffer.path().display().to_string())
    );

    let started = Instant::now();
    let mut ticker = tokio::time::interval(interval);
    // First tick fires immediately; that's exactly what we want — confirm liveness on boot.
    ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);

    let mut sigterm = signal(SignalKind::terminate()).context("installing SIGTERM handler")?;
    let mut sigint = signal(SignalKind::interrupt()).context("installing SIGINT handler")?;

    loop {
        tokio::select! {
            _ = sigterm.recv() => {
                println!("daemon: SIGTERM received — exiting");
                break;
            }
            _ = sigint.recv() => {
                println!("daemon: SIGINT received — exiting");
                break;
            }
            _ = ticker.tick() => {
                // Drain queued events first. If the API is still down we'll re-queue
                // remaining items; if it's up, the backlog clears before we fire fresh
                // events so timestamps stay roughly in order.
                if let Err(err) = drain_buffer(&client, &url, &cfg, &buffer).await {
                    eprintln!("daemon: buffer drain failed: {err:#}");
                }

                let uptime_secs = started.elapsed().as_secs();
                send_or_queue(
                    &client, &url, &cfg, &buffer,
                    make_envelope(&cfg, Event::Heartbeat { uptime_secs }),
                ).await;

                if docker_probe.enabled() {
                    for event in docker_probe.tick().await {
                        send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
                    }
                }

                if systemd_probe.enabled() {
                    for event in systemd_probe.tick().await {
                        send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
                    }
                }

                if port_probe.enabled() {
                    for event in port_probe.tick().await {
                        send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
                    }
                }

                for event in resource_probe.tick() {
                    send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
                }
            }
        }
    }

    Ok(())
}

/// Send one envelope. On transient failure, append to the buffer for the next tick.
/// On permanent failure (bad token, validation), log and drop — retrying won't help
/// and we don't want to fill the buffer with poison events.
async fn send_or_queue(
    client: &reqwest::Client,
    url: &str,
    cfg: &Config,
    buffer: &Buffer,
    envelope: EventEnvelope,
) {
    match try_send_envelope(client, url, cfg, &envelope).await {
        SendOutcome::Ok => {}
        SendOutcome::Transient(msg) => {
            eprintln!("daemon: queueing event after transient failure: {msg}");
            if let Err(err) = buffer.append(&envelope).await {
                eprintln!("daemon: buffer append failed: {err:#}");
            }
        }
        SendOutcome::Permanent(msg) => {
            eprintln!("daemon: dropping event after permanent failure: {msg}");
        }
    }
}

/// Read all queued envelopes and try to send each. Stop on the first transient
/// failure (the API is presumably still down — no point flooding it). On permanent
/// failure, drop the offending event and continue with the rest. Rewrite the buffer
/// with whatever wasn't successfully sent.
async fn drain_buffer(
    client: &reqwest::Client,
    url: &str,
    cfg: &Config,
    buffer: &Buffer,
) -> Result<()> {
    let queued = buffer.read_all().await?;
    if queued.is_empty() {
        return Ok(());
    }

    eprintln!("daemon: draining {} queued event(s)", queued.len());
    let mut remaining: Vec<EventEnvelope> = Vec::new();
    let mut iter = queued.into_iter();
    while let Some(env) = iter.next() {
        match try_send_envelope(client, url, cfg, &env).await {
            SendOutcome::Ok => {}
            SendOutcome::Permanent(msg) => {
                eprintln!("daemon: dropping queued event (permanent): {msg}");
            }
            SendOutcome::Transient(msg) => {
                eprintln!(
                    "daemon: drain stopped after transient failure ({msg}); {} event(s) still queued",
                    iter.len() + 1
                );
                remaining.push(env);
                remaining.extend(iter);
                break;
            }
        }
    }

    buffer.rewrite(&remaining).await
}

fn probe_summary(enabled: bool, count: usize, noun: &str) -> String {
    if enabled {
        let plural = if count == 1 { "" } else { "s" };
        format!("{} {} {}{}", style::ok_mark(), count, noun, plural)
    } else {
        style::dim("off")
    }
}