use std::cell::Cell;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use pin_project_lite::pin_project;
use web_time::{Duration, Instant};
use super::{HandoffTag, SubgraphTag};
use crate::util::slot_vec::SecondarySlotVec;
#[derive(Default, Clone)]
#[non_exhaustive]
pub struct DfirMetrics {
pub subgraphs: SecondarySlotVec<SubgraphTag, SubgraphMetrics>,
pub handoffs: SecondarySlotVec<HandoffTag, HandoffMetrics>,
}
impl DfirMetrics {
pub(super) fn diff(&mut self, other: &Self) {
for (sg_id, prev_sg_metrics) in other.subgraphs.iter() {
if let Some(curr_sg_metrics) = self.subgraphs.get_mut(sg_id) {
curr_sg_metrics.diff(prev_sg_metrics);
}
}
for (handoff_id, prev_handoff_metrics) in other.handoffs.iter() {
if let Some(curr_handoff_metrics) = self.handoffs.get_mut(handoff_id) {
curr_handoff_metrics.diff(prev_handoff_metrics);
}
}
}
}
#[derive(Clone)]
pub struct DfirMetricsIntervals {
pub(super) curr: Rc<DfirMetrics>,
pub(super) prev: Option<DfirMetrics>,
}
impl DfirMetricsIntervals {
pub fn take_interval(&mut self) -> DfirMetrics {
let mut curr = self.curr.as_ref().clone();
if let Some(prev) = self.prev.replace(curr.clone()) {
curr.diff(&prev);
}
curr
}
pub fn all_metrics(&self) -> Rc<DfirMetrics> {
Rc::clone(&self.curr)
}
}
macro_rules! define_metrics {
(
$(#[$struct_attr:meta])*
pub struct $struct_name:ident {
$(
$( #[doc = $doc:literal] )*
#[diff($diff:ident)]
$( #[$field_attr:meta] )*
$field_name:ident: Cell<$field_type:ty>,
)*
}
) => {
$(#[$struct_attr])*
#[derive(Default, Debug, Clone)]
#[non_exhaustive] pub struct $struct_name {
$(
#[doc(hidden)] $(#[$field_attr])*
pub $field_name: Cell<$field_type>,
)*
}
impl $struct_name {
$(
$( #[doc = $doc] )*
pub fn $field_name(&self) -> $field_type {
self.$field_name.get()
}
)*
fn diff(&mut self, other: &Self) {
$(
define_metrics_diff_field!($diff, $field_name, self, other);
)*
}
}
};
}
macro_rules! define_metrics_diff_field {
(total, $field:ident, $slf:ident, $other:ident) => {
debug_assert!($other.$field.get() <= $slf.$field.get());
$slf.$field.update(|x| x - $other.$field.get());
};
(curr, $field:ident, $slf:ident, $other:ident) => {};
}
define_metrics! {
pub struct HandoffMetrics {
#[diff(curr)]
curr_items_count: Cell<usize>,
#[diff(total)]
total_items_count: Cell<usize>,
}
}
define_metrics! {
pub struct SubgraphMetrics {
#[diff(total)]
total_run_count: Cell<usize>,
#[diff(total)]
total_poll_duration: Cell<Duration>,
#[diff(total)]
total_poll_count: Cell<usize>,
#[diff(total)]
total_idle_duration: Cell<Duration>,
#[diff(total)]
total_idle_count: Cell<usize>,
}
}
pin_project! {
#[doc(hidden)]
pub struct InstrumentSubgraph<'a, Fut> {
#[pin]
future: Fut,
idle_start: Option<Instant>,
metrics: &'a SubgraphMetrics,
}
}
impl<'a, Fut> InstrumentSubgraph<'a, Fut> {
pub fn new(future: Fut, metrics: &'a SubgraphMetrics) -> Self {
Self {
future,
idle_start: None,
metrics,
}
}
}
impl<'a, Fut> Future for InstrumentSubgraph<'a, Fut>
where
Fut: Future,
{
type Output = Fut::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Some(idle_start) = this.idle_start {
this.metrics
.total_idle_duration
.update(|x| x + idle_start.elapsed());
this.metrics.total_idle_count.update(|x| x + 1);
}
let poll_start = Instant::now();
let out = this.future.poll(cx);
this.metrics
.total_poll_duration
.update(|x| x + poll_start.elapsed());
this.metrics.total_poll_count.update(|x| x + 1);
this.idle_start.replace(Instant::now());
out
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::scheduled::{HandoffId, SubgraphId};
#[test]
fn test_dfir_metrics_intervals() {
let sg_id = SubgraphId::from_raw(0);
let handoff_id = HandoffId::from_raw(0);
let mut metrics = DfirMetrics::default();
metrics.subgraphs.insert(
sg_id,
SubgraphMetrics {
total_run_count: Cell::new(5),
total_poll_count: Cell::new(10),
total_idle_count: Cell::new(2),
total_poll_duration: Cell::new(Duration::from_millis(500)),
total_idle_duration: Cell::new(Duration::from_millis(200)),
},
);
metrics.handoffs.insert(
handoff_id,
HandoffMetrics {
curr_items_count: Cell::new(3),
total_items_count: Cell::new(100),
},
);
let metrics = Rc::new(metrics);
let mut intervals = DfirMetricsIntervals {
curr: Rc::clone(&metrics),
prev: None,
};
let first = intervals.take_interval();
let sg_metrics = &first.subgraphs[sg_id];
assert_eq!(sg_metrics.total_run_count(), 5);
let hoff_metrics = &first.handoffs[handoff_id];
assert_eq!(hoff_metrics.total_items_count(), 100);
assert_eq!(hoff_metrics.curr_items_count(), 3);
let sg_metrics = &metrics.subgraphs[sg_id];
sg_metrics.total_run_count.set(12);
sg_metrics.total_poll_count.set(25);
sg_metrics.total_idle_count.set(7);
sg_metrics
.total_poll_duration
.set(Duration::from_millis(1200));
sg_metrics
.total_idle_duration
.set(Duration::from_millis(600));
let hoff_metrics = &metrics.handoffs[handoff_id];
hoff_metrics.total_items_count.set(250);
hoff_metrics.curr_items_count.set(10);
let second = intervals.take_interval();
let sg_metrics = &second.subgraphs[sg_id];
assert_eq!(sg_metrics.total_run_count(), 7); assert_eq!(sg_metrics.total_poll_count(), 15); assert_eq!(sg_metrics.total_idle_count(), 5); let hoff_metrics = &second.handoffs[handoff_id];
assert_eq!(hoff_metrics.total_items_count(), 150); assert_eq!(hoff_metrics.curr_items_count(), 10);
}
}