use datafusion::{
common::instant::Instant,
physical_plan::metrics::{Count, ExecutionPlanMetricsSet, MetricBuilder, Time},
};
pub struct StartableTime {
pub(crate) metrics: Time,
pub(crate) start: Option<Instant>,
}
impl StartableTime {
pub(crate) fn start(&mut self) {
assert!(self.start.is_none());
self.start = Some(Instant::now());
}
pub(crate) fn stop(&mut self) {
if let Some(start) = self.start.take() {
self.metrics.add_elapsed(start);
}
}
}
pub(crate) struct FlightStreamMetrics {
pub time_processing: StartableTime,
pub time_reading_total: StartableTime,
pub poll_count: Count,
pub output_rows: Count,
pub bytes_decoded: Count,
}
impl FlightStreamMetrics {
pub(crate) fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
Self {
time_processing: StartableTime {
metrics: MetricBuilder::new(metrics).subset_time("time_processing", partition),
start: None,
},
time_reading_total: StartableTime {
metrics: MetricBuilder::new(metrics).subset_time("time_reading_total", partition),
start: None,
},
output_rows: MetricBuilder::new(metrics).output_rows(partition),
poll_count: MetricBuilder::new(metrics).counter("poll_count", partition),
bytes_decoded: MetricBuilder::new(metrics).counter("bytes_decoded", partition),
}
}
}