pub mod config;
pub mod dao;
pub mod data;
pub mod entities;
pub mod metrics;
pub mod metrics_logger;
pub mod migrations;
pub mod models;
pub mod server;
use crate::{
config::{Config, ExecutionMode},
data::dataset_builder::DatasetBuilder,
migrations::{Migrator, MigratorTrait},
};
use anyhow::{anyhow, Context};
use chrono::Utc;
use colored::Colorize;
use config::{ExecutionPlan, Process, ProcessToObserve, ProcessType, Redirect, Scenario};
use data::dataset_builder::DatasetRows;
use entities::{iteration, run};
use sea_orm::*;
use serde_json::Value;
use std::{
collections::HashMap,
fs::{self, OpenOptions},
io::Write,
path::Path,
process::exit,
time::Duration,
};
use subprocess::{Exec, NullFile, Redirection};
use sysinfo::{CpuRefreshKind, RefreshKind, System};
use tracing::{debug, info};
fn ask_for_cpu() -> String {
loop {
print!("Please enter a CPU name: ");
let _ = std::io::stdout().flush();
let mut input = String::new();
let res = std::io::stdin().read_line(&mut input);
match res {
Ok(_) => return input,
Err(_) => continue,
}
}
}
fn ask_for_tdp() -> f64 {
loop {
print!("Please enter the TDP of your CPU in watts: ");
let _ = std::io::stdout().flush();
let mut input = String::new();
let res = std::io::stdin().read_line(&mut input);
match res {
Ok(_) => match input.trim().parse::<f64>() {
Ok(parsed_input) => {
return parsed_input;
}
Err(_) => {
println!("{}", "Please enter a valid number.".yellow());
continue;
}
},
Err(_) => continue,
}
}
}
fn find_cpu() -> Option<String> {
let sys = System::new_with_specifics(RefreshKind::new().with_cpu(CpuRefreshKind::everything()));
sys.cpus().first().map(|cpu| cpu.brand().to_string())
}
fn parse_json(json_obj: &Value) -> Option<f64> {
json_obj
.get("verbose")?
.get("avg_power")?
.get("value")?
.as_f64()
}
async fn fetch_tdp(cpu_name: &str) -> anyhow::Result<f64> {
let client = reqwest::Client::new();
let mut json = HashMap::new();
json.insert("name", cpu_name);
let resp = client
.post("https://api.boavizta.org/v1/component/cpu")
.header("Content-Type", "application/json")
.json(&json)
.send()
.await?;
let json_obj = resp.json().await?;
parse_json(&json_obj).context("Error finding CPU stats from Boavizta")
}
pub async fn init_config() {
let cpu_name: String;
println!("\n{}", " Setting up Cardamon ".reversed().green());
loop {
print!("Would you like to create a config for this computer [1] or another computer [2]? ");
let _ = std::io::stdout().flush();
let mut ans = String::new();
let res = std::io::stdin().read_line(&mut ans);
match res {
Ok(_) => {
let opt = ans.trim().parse::<u32>();
match opt {
Ok(1) => {
cpu_name = match find_cpu() {
Some(name) => {
println!("{} {}", "It looks like you have a".yellow(), name);
name
}
None => {
println!("{}", "Unable to find CPU!".red());
ask_for_cpu()
}
};
break;
}
Ok(2) => {
cpu_name = ask_for_cpu();
break;
}
_ => {
println!("{}", "Please enter 1 or 2.\n".yellow());
continue;
}
}
}
Err(_) => {
println!("{}", "Please enter 1 or 2.\n".yellow());
continue;
}
}
}
let tdp = match fetch_tdp(&cpu_name).await {
Ok(tdp) => {
println!("{} {}", "Boavista reports an avg power of".yellow(), tdp);
tdp
}
Err(_) => {
println!("{}", "Cannot get avg power from Boavizta!".red());
ask_for_tdp()
}
};
match Config::write_example_to_file(&cpu_name, tdp, Path::new("./cardamon.toml")) {
Ok(_) => {
println!("{}", "cardamon.toml created!".green());
println!("\n🤩\n");
}
Err(err) => {
println!("{}\n{}", "Error creating config.".red(), err);
println!("\nðŸ˜\n");
}
}
}
pub async fn db_connect(
database_url: &str,
database_name: Option<&str>,
) -> anyhow::Result<DatabaseConnection> {
let db = Database::connect(database_url).await?;
match db.get_database_backend() {
DbBackend::Sqlite => Ok(db),
DbBackend::Postgres => {
let database_name =
database_name.context("Database name is required for postgres connections")?;
db.execute(Statement::from_string(
db.get_database_backend(),
format!("CREATE DATABASE \"{}\";", database_name),
))
.await
.ok();
let url = format!("{}/{}", database_url, database_name);
Database::connect(&url)
.await
.context("Error creating postgresql database.")
}
DbBackend::MySql => {
let database_name =
database_name.context("Database name is required for mysql connections")?;
db.execute(Statement::from_string(
db.get_database_backend(),
format!("CREATE DATABASE IF NOT EXISTS `{}`;", database_name),
))
.await?;
let url = format!("{}/{}", database_url, database_name);
Database::connect(&url)
.await
.context("Error creating mysql database.")
}
}
}
pub async fn db_migrate(db_conn: &DatabaseConnection) -> anyhow::Result<()> {
Migrator::up(db_conn, None)
.await
.context("Error migrating database.")
}
fn shutdown_application(running_processes: &Vec<ProcessToObserve>) -> anyhow::Result<()> {
for proc in running_processes {
match proc {
ProcessToObserve::ManagedPid {
pid: _,
process_name,
down: Some(down),
} => {
print!("> stopping process {}", process_name.green());
let res = run_command_detached(&down, None);
if res.is_err() {
let err = res.unwrap_err();
tracing::warn!(
"Failed to shutdown process with name {}\n{}",
process_name,
err
);
println!();
} else {
println!("\t{}", "✓".green());
println!("\t{}", format!("- {}", down).bright_black());
}
}
ProcessToObserve::ManagedContainers {
process_name,
container_names: _,
down: Some(down),
} => {
print!("> stopping process {}", process_name.green());
let res = run_command_detached(&down, None);
if res.is_err() {
let err = res.unwrap_err();
tracing::warn!(
"Failed to shutdown process with name {}\n{}",
process_name,
err
);
println!();
} else {
println!("\t{}", "✓".green());
println!("\t{}", format!("- {}", down).bright_black());
}
}
_ => {} }
}
Ok(())
}
pub fn cleanup_stdout_stderr() -> anyhow::Result<()> {
debug!("Cleaning up stdout and stderr");
let stdout = Path::new("./.stdout");
let stderr = Path::new("./.stderr");
if stdout.exists() {
fs::remove_file(stdout)?;
}
if stderr.exists() {
fs::remove_file(stderr)?;
}
Ok(())
}
fn run_command_detached(command: &str, redirect: Option<Redirect>) -> anyhow::Result<u32> {
let redirect = redirect.unwrap_or(Redirect::File);
let words = shlex::split(command).expect("Command string is not POSIX compliant.");
match &words[..] {
[command, args @ ..] => {
let exec = Exec::cmd(command).args(args);
let exec = match redirect {
Redirect::Null => exec.stdout(NullFile).stderr(NullFile),
Redirect::Parent => exec,
Redirect::File => {
let out_file = OpenOptions::new()
.append(true)
.create(true)
.open("./.stdout")?;
let err_file = OpenOptions::new()
.append(true)
.create(true)
.open("./.stderr")?;
exec.stdout(Redirection::File(out_file))
.stderr(Redirection::File(err_file))
}
};
exec.detached()
.popen()
.context(format!(
"Failed to spawn detached process, command: {}",
command
))?
.pid()
.context("Process should have a PID")
}
_ => Err(anyhow!("")),
}
}
fn run_process(proc_to_exec: &Process) -> anyhow::Result<ProcessToObserve> {
match &proc_to_exec.process_type {
ProcessType::Docker { containers } => {
debug!(
"Running command {} in detached mode ( Docker ) ",
proc_to_exec.up
);
run_command_detached(&proc_to_exec.up, proc_to_exec.redirect)?;
Ok(ProcessToObserve::ManagedContainers {
process_name: proc_to_exec.name.clone(),
container_names: containers.clone(),
down: proc_to_exec.down.clone(),
})
}
ProcessType::BareMetal => {
debug!(
"Running command {} in detached mode ( Bare metal ) ",
proc_to_exec.up
);
let pid = run_command_detached(&proc_to_exec.up, proc_to_exec.redirect)?;
Ok(ProcessToObserve::ManagedPid {
process_name: proc_to_exec.name.clone(),
pid,
down: proc_to_exec
.down
.clone()
.map(|down| down.replace("{pid}", &pid.to_string())),
})
}
}
}
async fn run_scenario<'a>(
run_id: i32,
scenario: &Scenario,
iteration: i32,
) -> anyhow::Result<iteration::ActiveModel> {
let start = Utc::now().timestamp_millis();
let command_parts = match shlex::split(&scenario.command) {
Some(command) => command,
None => vec!["error".to_string()],
};
let command = command_parts
.first()
.ok_or_else(|| anyhow::anyhow!("Empty command"))?;
let args = &command_parts[1..];
let output = tokio::process::Command::new(command)
.args(args)
.kill_on_drop(true)
.output()
.await
.context(format!("Tokio command failed to run {command}"))?;
info!("Ran command {}", scenario.command);
if output.status.success() {
let stop = Utc::now().timestamp_millis();
let scenario_iteration = iteration::ActiveModel {
id: ActiveValue::NotSet,
run_id: ActiveValue::Set(run_id),
scenario_name: ActiveValue::Set(scenario.name.clone()),
count: ActiveValue::Set(iteration),
start_time: ActiveValue::Set(start),
stop_time: ActiveValue::Set(stop),
};
Ok(scenario_iteration)
} else {
let error_message = String::from_utf8_lossy(&output.stderr).to_string();
Err(anyhow::anyhow!(
"Scenario execution failed: {}. Command: {}",
error_message,
scenario.command
))
}
}
async fn run_scenarios<'a>(
run_id: i32,
scenarios: Vec<&'a Scenario>,
processes_to_observe: Vec<ProcessToObserve>,
db: &DatabaseConnection,
) -> anyhow::Result<()> {
for scenario in scenarios {
for iteration in 1..scenario.iterations + 1 {
println!(
"> running scenario {} - iteration {}/{}",
scenario.name.green(),
iteration,
scenario.iterations
);
let stop_handle = metrics_logger::start_logging(processes_to_observe.clone())?;
let scenario_iteration = run_scenario(run_id, &scenario, iteration).await?;
scenario_iteration.save(db).await?;
let metrics_log = stop_handle.stop().await?;
metrics_log.save(run_id, db).await?;
}
}
Ok(())
}
pub async fn run_live<'a>(
run_id: i32,
processes_to_observe: Vec<ProcessToObserve>,
db: &DatabaseConnection,
) -> anyhow::Result<()> {
let start = Utc::now().timestamp_millis();
let iteration = iteration::ActiveModel {
id: ActiveValue::NotSet,
run_id: ActiveValue::Set(run_id),
scenario_name: ActiveValue::Set("live".to_string()),
count: ActiveValue::Set(1),
start_time: ActiveValue::Set(start),
stop_time: ActiveValue::Set(start), };
iteration.save(db).await?;
let stop_handle = metrics_logger::start_logging(processes_to_observe.clone())?;
let shared_metrics_log = stop_handle.shared_metrics_log.clone();
loop {
tokio::time::sleep(Duration::from_secs(1)).await;
let shared_metrics_log = shared_metrics_log.clone();
let mut metrics_log = shared_metrics_log.lock().unwrap();
metrics_log.save(run_id, &db).await?;
metrics_log.clear();
let now = Utc::now().timestamp_millis();
let mut active_iteration = dao::iteration::fetch_live(run_id, &db)
.await?
.into_active_model();
active_iteration.stop_time = ActiveValue::Set(now);
active_iteration.update(db).await?;
let now = Utc::now().timestamp_millis();
let mut active_run = dao::run::fetch(run_id, &db).await?.into_active_model();
active_run.stop_time = ActiveValue::Set(now);
active_run.update(db).await?;
}
}
pub async fn run<'a>(
exec_plan: ExecutionPlan<'a>,
cpu_avg_power: f32,
db: &DatabaseConnection,
) -> anyhow::Result<DatasetRows> {
let mut processes_to_observe = exec_plan.external_processes_to_observe.unwrap_or(vec![]);
if !exec_plan.processes_to_execute.is_empty() {
for proc in exec_plan.processes_to_execute {
print!("> starting process {}", proc.name.green());
let process_to_observe = run_process(proc)?;
processes_to_observe.push(process_to_observe);
println!("{}", "\t✓".green());
println!("\t{}", format!("- {}", proc.up).bright_black());
}
}
let start_time = Utc::now().timestamp_millis();
let is_live = match exec_plan.execution_mode {
ExecutionMode::Live => true,
_ => false,
};
let mut active_run = run::ActiveModel {
id: sea_orm::ActiveValue::NotSet,
is_live: sea_orm::ActiveValue::Set(is_live),
cpu_avg_power: sea_orm::ActiveValue::Set(cpu_avg_power),
start_time: sea_orm::ActiveValue::Set(start_time),
stop_time: sea_orm::ActiveValue::set(start_time), }
.save(db)
.await?;
let run_id = active_run.clone().try_into_model()?.id;
let processes_to_shutdown = processes_to_observe.clone();
ctrlc::set_handler(move || {
println!();
shutdown_application(&processes_to_shutdown)
.expect("Error shutting down managed processes");
exit(0)
})?;
match exec_plan.execution_mode {
ExecutionMode::Observation(scenarios) => {
run_scenarios(run_id, scenarios, processes_to_observe.clone(), db).await?;
}
config::ExecutionMode::Live => {
run_live(run_id, processes_to_observe.clone(), db).await?;
}
};
let stop_time = Utc::now().timestamp_millis();
active_run.stop_time = ActiveValue::Set(stop_time);
active_run.save(db).await?;
shutdown_application(&processes_to_observe)?;
Ok(DatasetBuilder::new().scenarios_in_run(run_id).all())
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::{
config::{Process, ProcessType},
fetch_tdp, metrics_logger, run_process, ProcessToObserve,
};
use std::time::Duration;
use sysinfo::{Pid, System};
pub async fn setup_fixtures(fixtures: &[&str], db: &DatabaseConnection) -> anyhow::Result<()> {
for path in fixtures {
let path = Path::new(path);
let stmt = std::fs::read_to_string(path)?;
db.query_one(Statement::from_string(DatabaseBackend::Sqlite, stmt))
.await
.context(format!("Error applying fixture {:?}", path))?;
}
Ok(())
}
#[test]
fn should_find_cpu() {
let cpu_name = find_cpu();
assert!(cpu_name.is_some())
}
#[tokio::test]
async fn fetch_tdp_should_work() -> anyhow::Result<()> {
let cpu_name = find_cpu();
if let Some(cpu_name) = cpu_name {
let tdp = fetch_tdp(&cpu_name).await?;
assert!(tdp > 0f64);
return Ok(());
}
panic!()
}
#[cfg(target_family = "windows")]
mod windows {
use super::*;
#[test]
fn can_run_a_bare_metal_process() -> anyhow::Result<()> {
let proc = Process {
name: "sleep".to_string(),
up: "powershell sleep 15".to_string(),
down: None,
redirect: None,
process_type: ProcessType::BareMetal,
};
let proc_to_observe = run_process(&proc)?;
match proc_to_observe {
ProcessToObserve::ManagedPid {
process_name: _,
pid,
down: _,
} => {
let mut system = System::new();
system.refresh_all();
let proc = system.process(Pid::from_u32(pid));
assert!(proc.is_some());
}
_ => panic!("expected to find a process id"),
}
Ok(())
}
#[tokio::test]
async fn log_scenario_should_return_metrics_log_without_errors() -> anyhow::Result<()> {
let proc = Process {
name: "sleep".to_string(),
up: "powershell sleep 20".to_string(),
down: None,
redirect: None,
process_type: ProcessType::BareMetal,
};
let proc_to_observe = run_process(&proc)?;
let stop_handle = metrics_logger::start_logging(&[&proc_to_observe])?;
tokio::time::sleep(Duration::from_secs(10)).await;
let metrics_log = stop_handle.stop().await?;
assert!(!metrics_log.has_errors());
assert!(!metrics_log.get_metrics().is_empty());
Ok(())
}
}
#[cfg(target_family = "unix")]
mod unix {
use std::ops::Deref;
use super::*;
use crate::config::Redirect;
#[test]
fn can_run_a_bare_metal_process() -> anyhow::Result<()> {
let proc = Process {
name: "sleep".to_string(),
up: "sleep 15".to_string(),
down: None,
redirect: Some(Redirect::Null),
process_type: ProcessType::BareMetal,
};
let proc_to_observe = run_process(&proc)?;
match proc_to_observe {
ProcessToObserve::ManagedPid {
process_name,
pid,
down: _,
} => {
let mut system = System::new();
system.refresh_all();
let proc = system.process(Pid::from_u32(pid));
let proc_name = proc.unwrap().name().to_os_string();
let proc_name = proc_name.to_string_lossy();
let proc_name = proc_name.deref().to_string();
assert!(proc.is_some());
assert!(proc_name == process_name);
}
e => panic!("expected to find a process id {:?}", e),
}
Ok(())
}
#[tokio::test]
async fn log_scenario_should_return_metrics_log_without_errors() -> anyhow::Result<()> {
let proc = Process {
name: "sleep".to_string(),
up: "sleep 20".to_string(),
down: None,
redirect: Some(Redirect::Null),
process_type: ProcessType::BareMetal,
};
let procs_to_observe = run_process(&proc)?;
let stop_handle = metrics_logger::start_logging(vec![procs_to_observe])?;
tokio::time::sleep(Duration::from_secs(10)).await;
let metrics_log = stop_handle.stop().await?;
assert!(!metrics_log.has_errors());
assert!(!metrics_log.get_metrics().is_empty());
Ok(())
}
}
}