#![doc = include_str!("../README.md")]
mod collector;
mod config;
mod metrics;
mod output;
mod sentinel;
extern crate libc;
use collector::{
CpuCollector, DiskCollector, GpuCollector, MemoryCollector, NetworkCollector,
collect_host_info, spawn_cloud_discovery,
};
use config::{Config, OutputFormat};
use metrics::Sample;
use sentinel::{BatchUploader, RunContext, SentinelClient, close_run, samples_to_csv, start_run};
use std::io::Write;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
static SIGTERM_RECEIVED: AtomicBool = AtomicBool::new(false);
extern "C" fn handle_sigterm(_: libc::c_int) {
SIGTERM_RECEIVED.store(true, Ordering::Relaxed);
}
fn shutdown(
exit_code: i32,
sentinel: Option<&SentinelClient>,
run_ctx: Option<Arc<Mutex<RunContext>>>,
shutdown_flag: Option<Arc<AtomicBool>>,
upload_handle: Option<std::thread::JoinHandle<Vec<String>>>,
remaining: Vec<Sample>,
interval_secs: u64,
) -> ! {
if let (Some(client), Some(ctx_arc), Some(flag), Some(handle)) =
(sentinel, run_ctx, shutdown_flag, upload_handle)
{
flag.store(true, Ordering::Relaxed);
let uploaded_uris = handle.join().unwrap_or_default();
let remaining_csv = if uploaded_uris.is_empty() && !remaining.is_empty() {
Some(samples_to_csv(&remaining, interval_secs))
} else {
None
};
let ctx = ctx_arc.lock().unwrap_or_else(|e| e.into_inner());
if let Err(e) = close_run(
&client.agent,
&client.api_base,
&client.token,
&ctx,
Some(exit_code),
remaining_csv,
&uploaded_uris,
) {
eprintln!("warn: sentinel close_run failed: {e}");
}
}
std::process::exit(exit_code);
}
fn main() {
unsafe {
libc::signal(
libc::SIGTERM,
handle_sigterm as *const () as libc::sighandler_t,
);
libc::signal(
libc::SIGINT,
handle_sigterm as *const () as libc::sighandler_t,
);
}
let mut config = Config::load();
let mut out_file: Option<std::io::BufWriter<std::fs::File>> = if config.quiet {
None
} else {
config.output_file.as_deref().map(|path| {
std::io::BufWriter::new(std::fs::File::create(path).unwrap_or_else(|e| {
eprintln!("error: cannot open output file {path}: {e}");
std::process::exit(1);
}))
})
};
macro_rules! emit {
($($arg:tt)*) => {
if !config.quiet {
if let Some(ref mut f) = out_file {
let _ = writeln!(f, $($arg)*);
let _ = f.flush();
} else {
eprintln!($($arg)*);
}
}
}
}
let mut child = if !config.command.is_empty() {
let (program, args) = config.command.split_first().expect("command is non-empty");
match std::process::Command::new(program).args(args).spawn() {
Ok(c) => {
config.pid = Some(i32::try_from(c.id()).unwrap_or(i32::MAX));
Some(c)
}
Err(e) => {
eprintln!("error: failed to spawn {:?}: {e}", program);
std::process::exit(1);
}
}
} else {
None
};
let interval = Duration::from_secs(config.interval_secs);
let mut cpu = CpuCollector::new(config.pid);
let memory = MemoryCollector::new();
let mut network = NetworkCollector::new();
let mut disk = DiskCollector::new(interval);
let mut gpu = GpuCollector::new();
let initial_gpus = gpu.collect().unwrap_or_default();
let host_info = collect_host_info(&initial_gpus);
let cloud_handle = spawn_cloud_discovery();
let _ = cpu.collect();
let _ = network.collect();
let _ = disk.collect();
std::thread::sleep(interval);
let cloud_info = cloud_handle.join().unwrap_or_default();
let sentinel = SentinelClient::from_env();
let (run_ctx_arc, sample_buffer, upload_shutdown_flag, upload_handle) = match &sentinel {
None => (None, None, None, None),
Some(client) => {
match start_run(
&client.agent,
&client.api_base,
&client.token,
&config.metadata,
config.pid,
&host_info,
&cloud_info,
) {
Err(e) => {
eprintln!("warn: sentinel start_run failed: {e}; streaming disabled");
(None, None, None, None)
}
Ok(ctx) => {
let ctx_arc = Arc::new(Mutex::new(ctx));
let upload_interval = std::env::var("TRACKER_UPLOAD_INTERVAL")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(60u64);
let (uploader, buf) = BatchUploader::new(upload_interval, config.interval_secs);
let flag = uploader.shutdown_flag();
let handle = uploader.spawn(
Arc::clone(&ctx_arc),
client.agent.clone(),
client.api_base.clone(),
client.token.clone(),
);
(Some(ctx_arc), Some(buf), Some(flag), Some(handle))
}
}
}
};
if config.format == OutputFormat::Csv {
emit!("{}", output::csv::csv_header());
}
let mut unflushed: Vec<Sample> = Vec::new();
loop {
let timestamp_secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut sample = Sample {
timestamp_secs,
job_name: config.metadata.job_name.clone(),
tracked_pid: config.pid,
cpu: cpu.collect().unwrap_or_default(),
memory: memory.collect().unwrap_or_default(),
network: network.collect().unwrap_or_default(),
disk: disk.collect().unwrap_or_default(),
gpu: gpu.collect().unwrap_or_default(),
};
let (vram_mib, gpu_usage, gpu_utilized) =
if config.pid.is_some() && !sample.cpu.process_tree_pids.is_empty() {
let pids_u32: Vec<u32> = sample
.cpu
.process_tree_pids
.iter()
.filter_map(|&p| u32::try_from(p).ok())
.collect();
gpu.process_gpu_info(&pids_u32, interval)
} else {
gpu.all_gpu_process_info(interval)
};
sample.cpu.process_gpu_vram_mib = vram_mib;
sample.cpu.process_gpu_usage = gpu_usage;
sample.cpu.process_gpu_utilized = gpu_utilized;
match config.format {
OutputFormat::Json => match serde_json::to_value(&sample) {
Ok(mut v) => {
v[format!("{}-version", env!("CARGO_PKG_NAME"))] =
serde_json::Value::String(env!("CARGO_PKG_VERSION").to_string());
emit!("{}", v);
}
Err(e) => eprintln!("warn: json serialize error: {e}"),
},
OutputFormat::Csv => {
emit!(
"{}",
output::csv::sample_to_csv_row(&sample, config.interval_secs)
);
}
}
if let Some(ref buf) = sample_buffer {
buf.lock()
.unwrap_or_else(|e| e.into_inner())
.push(sample.clone());
}
unflushed.push(sample);
if let Some(ref mut c) = child {
match c.try_wait() {
Ok(Some(status)) => {
let code = status.code().unwrap_or(1);
shutdown(
code,
sentinel.as_ref(),
run_ctx_arc,
upload_shutdown_flag,
upload_handle,
unflushed,
config.interval_secs,
);
}
Ok(None) => {}
Err(e) => eprintln!("warn: error checking child status: {e}"),
}
}
if SIGTERM_RECEIVED.load(Ordering::Relaxed) {
shutdown(
0,
sentinel.as_ref(),
run_ctx_arc,
upload_shutdown_flag,
upload_handle,
unflushed,
config.interval_secs,
);
}
std::thread::sleep(interval);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sigint_sets_shutdown_flag() {
SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
unsafe {
libc::signal(
libc::SIGINT,
handle_sigterm as *const () as libc::sighandler_t,
);
}
unsafe {
libc::raise(libc::SIGINT);
}
assert!(
SIGTERM_RECEIVED.load(Ordering::SeqCst),
"SIGTERM_RECEIVED flag must be true after SIGINT"
);
SIGTERM_RECEIVED.store(false, Ordering::SeqCst);
unsafe {
libc::signal(libc::SIGINT, libc::SIG_DFL);
}
}
}