use crate::execution::memory_pool::MemoryPool;
use crate::physical_plan::metrics::tracker::MemTrackingMetrics;
use crate::physical_plan::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricValue, MetricsSet, Time,
Timestamp,
};
use crate::physical_plan::Metric;
use chrono::{TimeZone, Utc};
use std::sync::Arc;
use std::time::Duration;
#[derive(Debug, Clone)]
pub struct CompositeMetricsSet {
mid: ExecutionPlanMetricsSet,
final_: ExecutionPlanMetricsSet,
}
impl Default for CompositeMetricsSet {
fn default() -> Self {
Self::new()
}
}
impl CompositeMetricsSet {
pub fn new() -> Self {
Self {
mid: ExecutionPlanMetricsSet::new(),
final_: ExecutionPlanMetricsSet::new(),
}
}
pub fn new_intermediate_baseline(&self, partition: usize) -> BaselineMetrics {
BaselineMetrics::new(&self.mid, partition)
}
pub fn new_final_baseline(&self, partition: usize) -> BaselineMetrics {
BaselineMetrics::new(&self.final_, partition)
}
pub fn new_intermediate_tracking(
&self,
partition: usize,
pool: &Arc<dyn MemoryPool>,
) -> MemTrackingMetrics {
MemTrackingMetrics::new(&self.mid, pool, partition)
}
pub fn new_final_tracking(
&self,
partition: usize,
pool: &Arc<dyn MemoryPool>,
) -> MemTrackingMetrics {
MemTrackingMetrics::new(&self.final_, pool, partition)
}
fn merge_compute_time(&self, dest: &Time) {
let time1 = self
.mid
.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64);
let time2 = self
.final_
.clone_inner()
.elapsed_compute()
.map_or(0u64, |v| v as u64);
dest.add_duration(Duration::from_nanos(time1));
dest.add_duration(Duration::from_nanos(time2));
}
fn merge_spill_count(&self, dest: &Count) {
let count1 = self.mid.clone_inner().spill_count().map_or(0, |v| v);
let count2 = self.final_.clone_inner().spill_count().map_or(0, |v| v);
dest.add(count1);
dest.add(count2);
}
fn merge_spilled_bytes(&self, dest: &Count) {
let count1 = self.mid.clone_inner().spilled_bytes().map_or(0, |v| v);
let count2 = self.final_.clone_inner().spill_count().map_or(0, |v| v);
dest.add(count1);
dest.add(count2);
}
fn merge_output_count(&self, dest: &Count) {
let count = self.final_.clone_inner().output_rows().map_or(0, |v| v);
dest.add(count);
}
fn merge_start_time(&self, dest: &Timestamp) {
let start1 = self
.mid
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::StartTimestamp(_)))
.map(|v| v.as_usize());
let start2 = self
.final_
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::StartTimestamp(_)))
.map(|v| v.as_usize());
match (start1, start2) {
(Some(start1), Some(start2)) => {
dest.set(Utc.timestamp_nanos(start1.min(start2) as i64))
}
(Some(start1), None) => dest.set(Utc.timestamp_nanos(start1 as i64)),
(None, Some(start2)) => dest.set(Utc.timestamp_nanos(start2 as i64)),
(None, None) => {}
}
}
fn merge_end_time(&self, dest: &Timestamp) {
let start1 = self
.mid
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::EndTimestamp(_)))
.map(|v| v.as_usize());
let start2 = self
.final_
.clone_inner()
.sum(|metric| matches!(metric.value(), MetricValue::EndTimestamp(_)))
.map(|v| v.as_usize());
match (start1, start2) {
(Some(start1), Some(start2)) => {
dest.set(Utc.timestamp_nanos(start1.max(start2) as i64))
}
(Some(start1), None) => dest.set(Utc.timestamp_nanos(start1 as i64)),
(None, Some(start2)) => dest.set(Utc.timestamp_nanos(start2 as i64)),
(None, None) => {}
}
}
pub fn aggregate_all(&self) -> MetricsSet {
let mut metrics = MetricsSet::new();
let elapsed_time = Time::new();
let spill_count = Count::new();
let spilled_bytes = Count::new();
let output_count = Count::new();
let start_time = Timestamp::new();
let end_time = Timestamp::new();
metrics.push(Arc::new(Metric::new(
MetricValue::ElapsedCompute(elapsed_time.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::SpillCount(spill_count.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::SpilledBytes(spilled_bytes.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::OutputRows(output_count.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::StartTimestamp(start_time.clone()),
None,
)));
metrics.push(Arc::new(Metric::new(
MetricValue::EndTimestamp(end_time.clone()),
None,
)));
self.merge_compute_time(&elapsed_time);
self.merge_spill_count(&spill_count);
self.merge_spilled_bytes(&spilled_bytes);
self.merge_output_count(&output_count);
self.merge_start_time(&start_time);
self.merge_end_time(&end_time);
metrics
}
}