use std::process::Stdio;
use std::time::Instant;
use cstats_core::{api::MetricValue, config::Config, stats::StatsCollector, Result};
use tokio::process::Command;
use tracing::{debug, info};
use crate::CollectArgs;
use super::parse_metadata;
pub async fn collect_command(args: CollectArgs, config: &Config) -> Result<()> {
let collector = StatsCollector::from_config(config)?;
if let Some(cache) = collector.cache.as_ref() {
cache.init().await?;
}
if let Some(command) = &args.command {
collect_command_stats(&collector, &args.source, command, &args).await
} else {
collect_system_stats(&collector, &args.source, &args).await
}
}
async fn collect_command_stats(
collector: &StatsCollector,
source: &str,
command: &str,
args: &CollectArgs,
) -> Result<()> {
info!("Executing command: {}", command);
let start = Instant::now();
let timer_id = collector.start_timer("command_execution").await?;
let mut cmd_parts = command.split_whitespace();
let program = cmd_parts.next().unwrap_or(command);
let cmd_args: Vec<&str> = cmd_parts.collect();
let mut cmd = Command::new(program);
cmd.args(&cmd_args)
.stdout(Stdio::piped())
.stderr(Stdio::piped());
let output = cmd.output().await?;
let duration = start.elapsed();
collector.stop_timer(&timer_id).await?;
let exit_code = output.status.code().unwrap_or(-1);
let stdout_size = output.stdout.len() as i64;
let stderr_size = output.stderr.len() as i64;
let mut metrics = std::collections::HashMap::new();
metrics.insert(
"execution_time_ms".to_string(),
MetricValue::Integer(duration.as_millis() as i64),
);
metrics.insert(
"exit_code".to_string(),
MetricValue::Integer(exit_code as i64),
);
metrics.insert(
"stdout_size_bytes".to_string(),
MetricValue::Integer(stdout_size),
);
metrics.insert(
"stderr_size_bytes".to_string(),
MetricValue::Integer(stderr_size),
);
metrics.insert(
"success".to_string(),
MetricValue::Boolean(output.status.success()),
);
for metric_name in &args.metrics {
if !metrics.contains_key(metric_name) {
metrics.insert(metric_name.clone(), MetricValue::Integer(0));
}
}
if args.metrics.contains(&"system".to_string()) || args.metrics.is_empty() {
let system_record = collector.collect_system_stats(source).await?;
for (name, value) in system_record.metrics {
metrics.insert(name, value);
}
}
let metadata = parse_metadata(args.metadata.clone());
let mut record = cstats_core::stats::StatsRecord::new(source);
record.set_duration(duration);
for (name, value) in metrics {
record.add_metric(name, value);
}
for (key, value) in metadata {
record.add_metadata(key, value);
}
record.add_metadata("command", command);
collector.store_record(record).await?;
info!("Command completed in {:?}", duration);
debug!("Exit code: {}", exit_code);
if output.status.success() {
if !output.stdout.is_empty() {
print!("{}", String::from_utf8_lossy(&output.stdout));
}
} else if !output.stderr.is_empty() {
eprint!("{}", String::from_utf8_lossy(&output.stderr));
}
Ok(())
}
async fn collect_system_stats(
collector: &StatsCollector,
source: &str,
args: &CollectArgs,
) -> Result<()> {
info!("Collecting system statistics");
let mut record = collector.collect_system_stats(source).await?;
for metric_name in &args.metrics {
if !record.metrics.contains_key(metric_name) {
record.add_metric(metric_name, MetricValue::Integer(0));
}
}
let metadata = parse_metadata(args.metadata.clone());
for (key, value) in metadata {
record.add_metadata(key, value);
}
collector.store_record(record).await?;
info!("System statistics collected");
Ok(())
}