use crate::{
dao::{self, pagination::Page},
entities::{iteration, metrics},
};
use itertools::{Itertools, MinMaxResult};
use sea_orm::DatabaseConnection;
use std::collections::{hash_map::Entry, HashMap};
pub enum ScenarioSelection {
All,
InRun(String),
InRange { from: i64, to: i64 },
Search(String),
}
pub enum RunSelection {
All,
InRange { from: i64, to: i64 },
}
pub struct DatasetBuilder<'a> {
db: &'a DatabaseConnection,
}
impl<'a> DatasetBuilder<'a> {
pub fn new(db: &'a DatabaseConnection) -> Self {
DatasetBuilder { db }
}
pub fn scenario(&self, scenario: &str) -> DatasetRow {
DatasetRow {
scenario: scenario.to_string(),
db: self.db,
}
}
pub fn scenarios_all(&self) -> DatasetRowPager {
DatasetRowPager {
scenario_selection: ScenarioSelection::All,
db: self.db,
}
}
pub fn scenarios_in_run(&self, run: i32) -> DatasetRowPager {
DatasetRowPager {
scenario_selection: ScenarioSelection::InRun(run.to_string()),
db: self.db,
}
}
pub fn scenarios_in_range(&self, from: i64, to: i64) -> DatasetRowPager {
DatasetRowPager {
scenario_selection: ScenarioSelection::InRange { from, to },
db: self.db,
}
}
pub fn scenarios_by_name(&self, name: &str) -> DatasetRowPager {
DatasetRowPager {
scenario_selection: ScenarioSelection::Search(name.to_string()),
db: self.db,
}
}
}
pub struct DatasetRowPager<'a> {
scenario_selection: ScenarioSelection,
db: &'a DatabaseConnection,
}
impl<'a> DatasetRowPager<'a> {
pub fn all(self) -> DatasetRows<'a> {
DatasetRows {
scenario_selection: self.scenario_selection,
scenario_page: None,
db: self.db,
}
}
pub fn page(self, page_size: u64, page_num: u64) -> DatasetRows<'a> {
let scenario_page = Page {
size: page_size,
num: page_num,
};
DatasetRows {
scenario_selection: self.scenario_selection,
scenario_page: Some(scenario_page),
db: self.db,
}
}
}
pub struct DatasetRows<'a> {
scenario_selection: ScenarioSelection,
scenario_page: Option<Page>,
db: &'a DatabaseConnection,
}
impl<'a> DatasetRows<'a> {
pub async fn last_n_runs(&self, n: u64) -> anyhow::Result<Dataset> {
let scenarios = match &self.scenario_selection {
ScenarioSelection::All => dao::scenario::fetch_all(&self.scenario_page, self.db).await,
ScenarioSelection::Search(name) => {
dao::scenario::fetch_by_name(name, &self.scenario_page, self.db).await
}
ScenarioSelection::InRun(run) => {
dao::scenario::fetch_in_run(run, &self.scenario_page, self.db).await
}
ScenarioSelection::InRange { from, to } => {
dao::scenario::fetch_in_range(*from, *to, &self.scenario_page, self.db).await
}
}?;
let mut iterations = vec![];
for scenario in scenarios {
let scenario_iterations =
dao::iteration::fetch_runs_last_n(&scenario.scenario_name, n, self.db).await?;
iterations.extend(scenario_iterations);
}
let mut iterations_with_metrics = vec![];
for it in iterations {
let metrics =
dao::metrics::fetch_within(it.run_id, it.start_time, it.stop_time, self.db).await?;
iterations_with_metrics.push(IterationWithMetrics::new(it, metrics));
}
Ok(Dataset::new(iterations_with_metrics))
}
}
pub struct DatasetRow<'a> {
scenario: String,
db: &'a DatabaseConnection,
}
impl<'a> DatasetRow<'a> {
pub fn runs_all(self) -> DatasetColPager<'a> {
DatasetColPager {
scenario: self.scenario,
run_selection: RunSelection::All,
db: self.db,
}
}
pub fn runs_in_range(self, from: i64, to: i64) -> DatasetColPager<'a> {
DatasetColPager {
scenario: self.scenario,
run_selection: RunSelection::InRange { from, to },
db: self.db,
}
}
}
pub struct DatasetColPager<'a> {
scenario: String,
run_selection: RunSelection,
db: &'a DatabaseConnection,
}
impl<'a> DatasetColPager<'a> {
pub async fn page(&self, page_size: u64, page_num: u64) -> anyhow::Result<Dataset> {
let page = Page::new(page_size, page_num);
let iterations = match self.run_selection {
RunSelection::All => {
dao::iteration::fetch_runs_all(&self.scenario, &Some(page), self.db).await
}
RunSelection::InRange { from, to } => {
dao::iteration::fetch_runs_in_range(&self.scenario, from, to, &Some(page), self.db)
.await
}
}?;
let mut iterations_with_metrics = vec![];
for it in iterations {
let metrics =
dao::metrics::fetch_within(it.run_id, it.start_time, it.stop_time, self.db).await?;
iterations_with_metrics.push(IterationWithMetrics::new(it, metrics));
}
Ok(Dataset::new(iterations_with_metrics))
}
}
#[derive(Debug)]
pub struct IterationWithMetrics {
iteration: iteration::Model,
metrics: Vec<metrics::Model>,
}
impl IterationWithMetrics {
pub fn new(iteration: iteration::Model, metrics: Vec<metrics::Model>) -> Self {
Self { iteration, metrics }
}
pub fn iteration(&self) -> &iteration::Model {
&self.iteration
}
pub fn metrics(&self) -> &[metrics::Model] {
&self.metrics
}
pub fn accumulate_by_process(&self) -> Vec<ProcessMetrics> {
let mut metrics_by_process: HashMap<String, Vec<&metrics::Model>> = HashMap::new();
for metric in self.metrics.iter() {
let proc_id = metric.process_id.clone();
metrics_by_process
.entry(proc_id)
.and_modify(|v| v.push(metric))
.or_insert(vec![metric]); }
metrics_by_process
.into_iter()
.map(|(process_id, metrics)| {
let cpu_usage_minmax = metrics.iter().map(|m| m.cpu_usage).minmax();
let cpu_usage_total = metrics.iter().fold(0.0, |acc, m| acc + m.cpu_usage);
let cpu_usage_mean = cpu_usage_total / metrics.len() as f64;
ProcessMetrics {
process_id,
cpu_usage_minmax,
cpu_usage_mean,
cpu_usage_total,
}
})
.collect()
}
}
#[derive(Debug)]
pub struct ProcessMetrics {
process_id: String,
cpu_usage_minmax: MinMaxResult<f64>,
cpu_usage_mean: f64,
cpu_usage_total: f64,
}
impl ProcessMetrics {
pub fn process_id(&self) -> &str {
&self.process_id
}
pub fn cpu_usage_minmax(&self) -> &MinMaxResult<f64> {
&self.cpu_usage_minmax
}
pub fn cpu_usage_mean(&self) -> f64 {
self.cpu_usage_mean
}
pub fn cpu_usage_total(&self) -> f64 {
self.cpu_usage_total
}
}
pub struct Dataset {
data: Vec<IterationWithMetrics>,
}
impl<'a> Dataset {
pub fn new(data: Vec<IterationWithMetrics>) -> Self {
Self { data }
}
pub fn data(&'a self) -> &'a [IterationWithMetrics] {
&self.data
}
pub fn by_scenario(&'a self) -> Vec<ScenarioDataset<'a>> {
let scenario_names = self
.data
.iter()
.map(|x| &x.iteration.scenario_name)
.unique()
.collect::<Vec<_>>();
scenario_names
.into_iter()
.map(|scenario_name| {
let data = self
.data
.iter()
.filter(|x| &x.iteration.scenario_name == scenario_name)
.collect::<Vec<_>>();
ScenarioDataset {
scenario_name,
data,
}
})
.collect::<Vec<_>>()
}
pub fn total_unique_scenarios(&self) -> usize {
self.data
.iter()
.map(|x| &x.iteration.scenario_name)
.collect::<std::collections::HashSet<_>>()
.len()
}
pub fn paginated_unique_scenarios(&self, page: u32, limit: u32) -> Vec<String> {
let unique_scenarios: Vec<String> = self
.data
.iter()
.map(|x| x.iteration.scenario_name.clone())
.collect::<std::collections::HashSet<_>>()
.into_iter()
.collect();
let total_items = unique_scenarios.len();
let start = (page * limit) as usize;
if start >= total_items {
return Vec::new(); }
let end = std::cmp::min(start + limit as usize, total_items);
unique_scenarios[start..end].to_vec()
}
pub fn last_n_runs_for_scenario(
&self,
scenario_name: &str,
n: usize,
) -> Vec<&IterationWithMetrics> {
self.data
.iter()
.filter(|x| x.iteration.scenario_name == scenario_name)
.collect::<Vec<_>>()
.into_iter()
.rev()
.take(n)
.collect()
}
pub fn total_pages(&self, limit: u32) -> u32 {
let total_scenarios = self.total_unique_scenarios() as u32;
(total_scenarios as f64 / limit as f64).ceil() as u32
}
}
#[derive(Debug)]
pub struct ScenarioDataset<'a> {
scenario_name: &'a str,
data: Vec<&'a IterationWithMetrics>,
}
impl<'a> ScenarioDataset<'a> {
pub fn scenario_name(&'a self) -> &'a str {
self.scenario_name
}
pub fn data(&'a self) -> &'a [&'a IterationWithMetrics] {
&self.data
}
pub fn by_run(&'a self) -> Vec<RunDataset<'a>> {
let runs = self
.data
.iter()
.map(|x| &x.iteration.run_id)
.unique()
.collect::<Vec<_>>();
runs.into_iter()
.map(|run_id| {
let data = self
.data
.iter()
.filter(|x| &x.iteration.run_id == run_id)
.cloned()
.collect::<Vec<_>>();
RunDataset {
scenario_name: self.scenario_name,
run_id: *run_id,
data,
}
})
.collect::<Vec<_>>()
}
}
#[derive(Debug)]
pub struct RunDataset<'a> {
scenario_name: &'a str,
run_id: i32,
data: Vec<&'a IterationWithMetrics>,
}
impl<'a> RunDataset<'a> {
pub fn scenario_name(&'a self) -> &'a str {
self.scenario_name
}
pub fn run_id(&'a self) -> i32 {
self.run_id
}
pub fn by_iterations(&'a self) -> &'a [&'a IterationWithMetrics] {
&self.data
}
pub fn averaged(&'a self) -> Vec<ProcessMetrics> {
let all_process_metrics = self
.data
.iter()
.flat_map(|i| i.accumulate_by_process())
.collect::<Vec<_>>();
let mut process_metrics_to_iterations: HashMap<String, Vec<ProcessMetrics>> =
HashMap::new();
for process_metrics in all_process_metrics.into_iter() {
let proc_id = process_metrics.process_id.clone();
let entry = process_metrics_to_iterations.entry(proc_id);
match entry {
Entry::Occupied(_) => {
entry.and_modify(|v| v.push(process_metrics));
}
Entry::Vacant(_) => {
entry.or_insert(vec![process_metrics]);
}
}
}
process_metrics_to_iterations
.into_iter()
.flat_map(|(_, process_metrics)| {
process_metrics.into_iter().reduce(|a, b| {
let a_minmax = match a.cpu_usage_minmax {
MinMaxResult::NoElements => None,
MinMaxResult::OneElement(val) => Some((val, val)),
MinMaxResult::MinMax(min, max) => Some((min, max)),
};
let b_minmax = match b.cpu_usage_minmax {
MinMaxResult::NoElements => None,
MinMaxResult::OneElement(val) => Some((val, val)),
MinMaxResult::MinMax(min, max) => Some((min, max)),
};
let cpu_usage_minmax = if a_minmax.is_some() && b_minmax.is_some() {
let (a_min, a_max) = a_minmax.unwrap();
let (b_min, b_max) = b_minmax.unwrap();
MinMaxResult::MinMax(a_min + b_min / 2.0, a_max + b_max / 2.0)
} else {
MinMaxResult::NoElements
};
ProcessMetrics {
process_id: a.process_id,
cpu_usage_minmax,
cpu_usage_mean: a.cpu_usage_mean + b.cpu_usage_mean / 2.0,
cpu_usage_total: a.cpu_usage_total + b.cpu_usage_total / 2.0,
}
})
})
.collect::<Vec<_>>()
}
}
#[cfg(test)]
mod tests {
}