use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use stackpatrol_core::event::{Event, EventEnvelope};
use tokio::signal::unix::{SignalKind, signal};
use crate::buffer::{self, Buffer};
use crate::client::{SendOutcome, build_client, events_url, make_envelope, try_send_envelope};
use crate::config::Config;
use crate::probe_docker::DockerProbe;
use crate::probe_ports::PortProbe;
use crate::probe_resources::ResourceProbe;
use crate::probe_systemd::SystemdProbe;
use crate::style;
const MIN_INTERVAL: Duration = Duration::from_secs(5);
pub async fn run() -> Result<()> {
let cfg = Config::load().context("could not load config — run `stackpatrol init` first")?;
let url = events_url(&cfg);
let client = build_client()?;
let buffer = Buffer::new(buffer::default_path()?);
let mut interval = Duration::from_secs(cfg.daemon.heartbeat_interval_secs);
if interval < MIN_INTERVAL {
interval = MIN_INTERVAL;
}
let mut docker_probe = DockerProbe::new(cfg.docker.projects.clone());
let mut systemd_probe = SystemdProbe::new(cfg.systemd.units.clone());
let mut resource_probe = ResourceProbe::new(&cfg.resources);
let mut port_probe = PortProbe::new(cfg.ports.tcp.clone());
println!(
"{} server={} interval={}s",
style::header("stackpatrol daemon starting"),
style::bold(&cfg.server_name),
interval.as_secs(),
);
println!(" endpoint {}", style::dim(&cfg.api_endpoint));
println!(
" docker {}",
probe_summary(docker_probe.enabled(), cfg.docker.projects.len(), "project")
);
println!(
" systemd {}",
probe_summary(systemd_probe.enabled(), cfg.systemd.units.len(), "unit")
);
println!(
" ports {}",
probe_summary(port_probe.enabled(), cfg.ports.tcp.len(), "target")
);
println!(
" resources disk≥{}% mem≥{}% load1≥{:.2}",
cfg.resources.disk_high_percent,
cfg.resources.memory_high_percent,
resource_probe.load_threshold(),
);
println!(
" buffer {}",
style::dim(&buffer.path().display().to_string())
);
let started = Instant::now();
let mut ticker = tokio::time::interval(interval);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
let mut sigterm = signal(SignalKind::terminate()).context("installing SIGTERM handler")?;
let mut sigint = signal(SignalKind::interrupt()).context("installing SIGINT handler")?;
loop {
tokio::select! {
_ = sigterm.recv() => {
println!("daemon: SIGTERM received — exiting");
break;
}
_ = sigint.recv() => {
println!("daemon: SIGINT received — exiting");
break;
}
_ = ticker.tick() => {
if let Err(err) = drain_buffer(&client, &url, &cfg, &buffer).await {
eprintln!("daemon: buffer drain failed: {err:#}");
}
let uptime_secs = started.elapsed().as_secs();
send_or_queue(
&client, &url, &cfg, &buffer,
make_envelope(&cfg, Event::Heartbeat { uptime_secs }),
).await;
if docker_probe.enabled() {
for event in docker_probe.tick().await {
send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
}
}
if systemd_probe.enabled() {
for event in systemd_probe.tick().await {
send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
}
}
if port_probe.enabled() {
for event in port_probe.tick().await {
send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
}
}
for event in resource_probe.tick() {
send_or_queue(&client, &url, &cfg, &buffer, make_envelope(&cfg, event)).await;
}
}
}
}
Ok(())
}
async fn send_or_queue(
client: &reqwest::Client,
url: &str,
cfg: &Config,
buffer: &Buffer,
envelope: EventEnvelope,
) {
match try_send_envelope(client, url, cfg, &envelope).await {
SendOutcome::Ok => {}
SendOutcome::Transient(msg) => {
eprintln!("daemon: queueing event after transient failure: {msg}");
if let Err(err) = buffer.append(&envelope).await {
eprintln!("daemon: buffer append failed: {err:#}");
}
}
SendOutcome::Permanent(msg) => {
eprintln!("daemon: dropping event after permanent failure: {msg}");
}
}
}
async fn drain_buffer(
client: &reqwest::Client,
url: &str,
cfg: &Config,
buffer: &Buffer,
) -> Result<()> {
let queued = buffer.read_all().await?;
if queued.is_empty() {
return Ok(());
}
eprintln!("daemon: draining {} queued event(s)", queued.len());
let mut remaining: Vec<EventEnvelope> = Vec::new();
let mut iter = queued.into_iter();
while let Some(env) = iter.next() {
match try_send_envelope(client, url, cfg, &env).await {
SendOutcome::Ok => {}
SendOutcome::Permanent(msg) => {
eprintln!("daemon: dropping queued event (permanent): {msg}");
}
SendOutcome::Transient(msg) => {
eprintln!(
"daemon: drain stopped after transient failure ({msg}); {} event(s) still queued",
iter.len() + 1
);
remaining.push(env);
remaining.extend(iter);
break;
}
}
}
buffer.rewrite(&remaining).await
}
fn probe_summary(enabled: bool, count: usize, noun: &str) -> String {
if enabled {
let plural = if count == 1 { "" } else { "s" };
format!("{} {} {}{}", style::ok_mark(), count, noun, plural)
} else {
style::dim("off")
}
}