pub mod config;
pub mod dao;
pub mod dataset;
pub mod entities;
pub mod metrics;
pub mod metrics_logger;
pub mod migrations;
pub mod server;
use crate::config::Config;
use anyhow::{anyhow, Context};
use chrono::Utc;
use colored::Colorize;
use config::{ExecutionPlan, ProcessToObserve, ProcessType, Redirect, ScenarioToExecute};
use dataset::{Dataset, DatasetBuilder};
use entities::{iteration, run};
use sea_orm::*;
use serde_json::Value;
use std::{
collections::HashMap,
fs::{self, OpenOptions},
io::Write,
path::Path,
};
use subprocess::{Exec, NullFile, Redirection};
use sysinfo::{CpuRefreshKind, RefreshKind, System};
use tracing::{debug, info};
fn ask_for_cpu() -> String {
loop {
print!("Please enter a CPU name: ");
let _ = std::io::stdout().flush();
let mut input = String::new();
let res = std::io::stdin().read_line(&mut input);
match res {
Ok(_) => return input,
Err(_) => continue,
}
}
}
fn ask_for_tdp() -> f64 {
loop {
print!("Please enter the TDP of your CPU in watts: ");
let _ = std::io::stdout().flush();
let mut input = String::new();
let res = std::io::stdin().read_line(&mut input);
match res {
Ok(_) => match input.trim().parse::<f64>() {
Ok(parsed_input) => {
return parsed_input;
}
Err(_) => {
println!("{}", "Please enter a valid number.".yellow());
continue;
}
},
Err(_) => continue,
}
}
}
fn find_cpu() -> Option<String> {
let sys = System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::everything()));
sys.cpus().first().map(|cpu| cpu.brand().to_string())
}
fn parse_json(json_obj: &Value) -> Option<f64> {
json_obj
.get("verbose")?
.get("avg_power")?
.get("value")?
.as_f64()
}
async fn fetch_tdp(cpu_name: &str) -> anyhow::Result<f64> {
let client = reqwest::Client::new();
let mut json = HashMap::new();
json.insert("name", cpu_name);
let resp = client
.post("https://api.boavizta.org/v1/component/cpu")
.header("Content-Type", "application/json")
.json(&json)
.send()
.await?;
let json_obj = resp.json().await?;
parse_json(&json_obj).context("Error finding CPU stats from Boavizta")
}
pub async fn init_config() {
let cpu_name: String;
println!();
println!("🌱🌿🌳🌸🌸🌸🌸🌱🌿🌳");
println!("Welcome to Cardamon");
println!("🌱🌿🌳🌸🌸🌸🌸🌱🌿🌳");
println!();
loop {
print!("Would you like to create a config for this computer [1] or another computer [2]? ");
let _ = std::io::stdout().flush();
let mut ans = String::new();
let res = std::io::stdin().read_line(&mut ans);
match res {
Ok(_) => {
let opt = ans.trim().parse::<u32>();
match opt {
Ok(1) => {
cpu_name = match find_cpu() {
Some(name) => {
println!("{} {}", "It looks like you have a".yellow(), name);
name
}
None => {
println!("{}", "Unable to find CPU!".red());
ask_for_cpu()
}
};
break;
}
Ok(2) => {
cpu_name = ask_for_cpu();
break;
}
_ => {
println!("{}", "Please enter 1 or 2.\n".yellow());
continue;
}
}
}
Err(_) => {
println!("{}", "Please enter 1 or 2.\n".yellow());
continue;
}
}
}
let tdp = match fetch_tdp(&cpu_name).await {
Ok(tdp) => {
println!("{} {}", "Boavista reports an avg power of".yellow(), tdp);
tdp
}
Err(_) => {
println!("{}", "Cannot get avg power from Boavizta!".red());
ask_for_tdp()
}
};
match Config::write_example_to_file(&cpu_name, tdp, Path::new("./cardamon.toml")) {
Ok(_) => {
println!("{}", "cardamon.toml created!".green());
println!("\n🤩\n");
}
Err(err) => {
println!("{}\n{}", "Error creating config.".red(), err);
println!("\nðŸ˜\n");
}
}
}
pub fn cleanup_stdout_stderr() -> anyhow::Result<()> {
debug!("Cleaning up stdout and stderr");
let stdout = Path::new("./.stdout");
let stderr = Path::new("./.stderr");
if stdout.exists() {
fs::remove_file(stdout)?;
}
if stderr.exists() {
fs::remove_file(stderr)?;
}
Ok(())
}
fn run_command_detached(command: &str, redirect: &Option<Redirect>) -> anyhow::Result<u32> {
let redirect = redirect.unwrap_or(Redirect::File);
let words = shlex::split(command).expect("Command string is not POSIX compliant.");
match &words[..] {
[command, args @ ..] => {
let exec = Exec::cmd(command).args(args);
let exec = match redirect {
Redirect::Null => exec.stdout(NullFile).stderr(NullFile),
Redirect::Parent => exec,
Redirect::File => {
let out_file = OpenOptions::new()
.append(true)
.create(true)
.open("./.stdout")?;
let err_file = OpenOptions::new()
.append(true)
.create(true)
.open("./.stderr")?;
exec.stdout(Redirection::File(out_file))
.stderr(Redirection::File(err_file))
}
};
exec.detached()
.popen()
.context(format!(
"Failed to spawn detached process, command: {}",
command
))?
.pid()
.context("Process should have a PID")
}
_ => Err(anyhow!("")),
}
}
fn run_process(proc: &config::ProcessToExecute) -> anyhow::Result<Vec<ProcessToObserve>> {
match &proc.process {
config::ProcessType::Docker { containers } => {
debug!("Running command {} in detached mode ( Docker ) ", proc.up);
run_command_detached(&proc.up, &proc.redirect)?;
Ok(containers
.iter()
.map(|name| ProcessToObserve::ContainerName(name.clone()))
.collect())
}
config::ProcessType::BareMetal => {
debug!(
"Running command {} in detached mode ( Bare metal ) ",
proc.up
);
let pid = run_command_detached(&proc.up, &proc.redirect)?;
Ok(vec![ProcessToObserve::Pid(Some(proc.name.clone()), pid)])
}
}
}
async fn run_scenario<'a>(
run_id: i32,
scenario_to_execute: &ScenarioToExecute<'a>,
) -> anyhow::Result<iteration::ActiveModel> {
let start = Utc::now().timestamp_millis();
let command_parts = match shlex::split(&scenario_to_execute.scenario.command) {
Some(command) => command,
None => vec!["error".to_string()],
};
let command = command_parts
.first()
.ok_or_else(|| anyhow::anyhow!("Empty command"))?;
let args = &command_parts[1..];
println!(
"Running scenario {} iteration {}",
scenario_to_execute.scenario.name,
scenario_to_execute.iteration + 1
);
let output = tokio::process::Command::new(command)
.args(args)
.kill_on_drop(true)
.output()
.await
.context(format!("Tokio command failed to run {command}"))?;
info!("Ran command {}", scenario_to_execute.scenario.command);
if output.status.success() {
let stop = Utc::now().timestamp_millis();
let scenario_iteration = iteration::ActiveModel {
id: ActiveValue::NotSet,
run_id: ActiveValue::Set(run_id),
scenario_name: ActiveValue::Set(scenario_to_execute.scenario.name.clone()),
count: ActiveValue::Set(scenario_to_execute.iteration),
start_time: ActiveValue::Set(start),
stop_time: ActiveValue::Set(stop),
};
Ok(scenario_iteration)
} else {
let error_message = String::from_utf8_lossy(&output.stderr).to_string();
Err(anyhow::anyhow!(
"Scenario execution failed: {}. Command: {}",
error_message,
scenario_to_execute.scenario.command
))
}
}
fn shutdown_application(
exec_plan: &ExecutionPlan,
running_processes: &[ProcessToObserve],
) -> anyhow::Result<()> {
for proc in exec_plan.processes_to_execute.iter() {
if let Some(down_command) = &proc.down {
match proc.process {
ProcessType::BareMetal => {
let pid = running_processes.iter().find_map(|p| match p {
ProcessToObserve::Pid(Some(name), pid) if name == &proc.name => Some(*pid),
_ => None,
});
if let Some(pid) = pid {
let down_command = down_command.replace("{pid}", &pid.to_string());
let res = run_command_detached(&down_command, &proc.redirect);
if res.is_err() {
let err = res.unwrap_err();
tracing::warn!(
"Failed to shutdown process with name {}\n{}",
proc.name,
err
);
}
} else {
tracing::warn!(
"Unable to find PID for bare-metal process with name: {}",
proc.name
);
}
}
ProcessType::Docker { containers: _ } => {
let res = run_command_detached(down_command, &proc.redirect);
if res.is_err() {
let err = res.unwrap_err();
tracing::warn!(
"Failed to shutdown process with name {}\n{}",
proc.name,
err
);
}
}
}
}
}
Ok(())
}
pub async fn run<'a>(
exec_plan: ExecutionPlan<'a>,
db: &DatabaseConnection,
) -> anyhow::Result<Dataset> {
let mut run_id: i32 = 0;
let mut processes_to_observe = exec_plan.external_processes_to_observe.to_vec();
if !exec_plan.processes_to_execute.is_empty() {
for proc in exec_plan.processes_to_execute.iter() {
let process_to_observe = run_process(proc)?;
processes_to_observe.extend(process_to_observe);
}
}
for scenario_to_execute in exec_plan.scenarios_to_execute.iter() {
let stop_handle = metrics_logger::start_logging(&processes_to_observe)?;
let start_time = Utc::now().timestamp_millis();
let mut active_run = run::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
start_time: sea_orm::ActiveValue::Set(start_time),
stop_time: sea_orm::ActiveValue::set(None),
}
.save(db)
.await?;
run_id = active_run.clone().try_into_model()?.id;
let scenario_iteration = run_scenario(run_id, scenario_to_execute).await?;
let metrics_log = stop_handle.stop().await?;
if metrics_log.has_errors() {
for err in metrics_log.get_errors() {
tracing::error!("{}", err);
}
return Err(anyhow!("Metric log contained errors, please see logs."));
}
let stop_time = Utc::now().timestamp_millis();
active_run.stop_time = ActiveValue::Set(Some(stop_time));
active_run.save(db).await?;
scenario_iteration.save(db).await?;
for metrics in metrics_log.get_metrics() {
metrics.into_active_model(run_id).save(db).await?;
}
}
shutdown_application(&exec_plan, &processes_to_observe)?;
DatasetBuilder::new(db)
.scenarios_in_run(run_id)
.all()
.last_n_runs(3)
.await
}
#[cfg(test)]
mod tests {
use super::*;
use crate::{
config::{ProcessToExecute, ProcessType},
fetch_tdp, metrics_logger, run_process, ProcessToObserve,
};
use std::time::Duration;
use sysinfo::{Pid, System};
#[test]
fn should_find_cpu() {
let cpu_name = find_cpu();
assert!(cpu_name.is_some())
}
#[tokio::test]
async fn fetch_tdp_should_work() -> anyhow::Result<()> {
let cpu_name = find_cpu();
if let Some(cpu_name) = cpu_name {
let tdp = fetch_tdp(&cpu_name).await?;
assert!(tdp > 0f64);
return Ok(());
}
panic!()
}
#[cfg(target_family = "windows")]
mod windows {
use super::*;
#[test]
fn can_run_a_bare_metal_process() -> anyhow::Result<()> {
let process = ProcessToExecute {
name: "sleep".to_string(),
up: "powershell sleep 15".to_string(),
down: None,
redirect: None,
process: ProcessType::BareMetal,
};
let processes_to_observe = run_process(&process)?;
assert_eq!(processes_to_observe.len(), 1);
match processes_to_observe.first().expect("process should exist") {
ProcessToObserve::Pid(_, pid) => {
let mut system = System::new();
system.refresh_all();
let proc = system.process(Pid::from_u32(*pid));
assert!(proc.is_some());
}
_ => panic!("expected to find a process id"),
}
Ok(())
}
#[tokio::test]
async fn log_scenario_should_return_metrics_log_without_errors() -> anyhow::Result<()> {
let process = ProcessToExecute {
name: "sleep".to_string(),
up: "powershell sleep 20".to_string(),
down: None,
redirect: None,
process: ProcessType::BareMetal,
};
let processes_to_observe = run_process(&process)?;
let stop_handle = metrics_logger::start_logging(&processes_to_observe)?;
tokio::time::sleep(Duration::from_secs(10)).await;
let metrics_log = stop_handle.stop().await?;
assert!(!metrics_log.has_errors());
assert!(!metrics_log.get_metrics().is_empty());
Ok(())
}
}
#[cfg(target_family = "unix")]
mod unix {
use std::ops::Deref;
use super::*;
use crate::config::Redirect;
#[test]
fn can_run_a_bare_metal_process() -> anyhow::Result<()> {
let process = ProcessToExecute {
name: "sleep".to_string(),
up: "sleep 15".to_string(),
down: None,
redirect: Some(Redirect::Null),
process: ProcessType::BareMetal,
};
let processes_to_observe = run_process(&process)?;
assert_eq!(processes_to_observe.len(), 1);
match processes_to_observe.first().expect("process should exist") {
ProcessToObserve::Pid(name, pid) => {
let mut system = System::new();
system.refresh_all();
let proc = system.process(Pid::from_u32(*pid));
let proc_name = proc.unwrap().name().to_os_string();
let proc_name = proc_name.to_string_lossy();
let proc_name = proc_name.deref().to_string();
assert!(proc.is_some());
assert!(proc_name == name.clone().unwrap());
}
e => panic!("expected to find a process id {:?}", e),
}
Ok(())
}
#[tokio::test]
async fn log_scenario_should_return_metrics_log_without_errors() -> anyhow::Result<()> {
let process = ProcessToExecute {
name: "sleep".to_string(),
up: "sleep 20".to_string(),
down: None,
redirect: Some(Redirect::Null),
process: ProcessType::BareMetal,
};
let processes_to_observe = run_process(&process)?;
let stop_handle = metrics_logger::start_logging(&processes_to_observe)?;
tokio::time::sleep(Duration::from_secs(10)).await;
let metrics_log = stop_handle.stop().await?;
assert!(!metrics_log.has_errors());
assert!(!metrics_log.get_metrics().is_empty());
Ok(())
}
}
}