mm1-common 0.7.23

An Erlang-style actor runtime for Rust.
Documentation
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::{Duration, Instant};

use metrics::{Counter, Histogram};

#[macro_export]
macro_rules! make_metrics {
    ($name:literal $(, $k:literal => $v:expr )* $(,)?) => {{
        const _: &str = $name;
        const POLL_COUNT: &str = concat!($name, "_poll_count");
        const DONE_COUNT: &str = concat!($name, "_done_count");
        const BUSY_TIME: &str = concat!($name, "_busy_sec");
        const WAIT_TIME: &str = concat!($name, "_wait_sec");
        const WALL_TIME: &str = concat!($name, "_wall_sec");

        let poll_count = ::metrics::counter!(POLL_COUNT $(, $k => $v )*);
        let done_count = ::metrics::counter!(DONE_COUNT $(, $k => $v )*);
        let busy_time = ::metrics::histogram!(BUSY_TIME $(, $k => $v )*);
        let wait_time = ::metrics::histogram!(WAIT_TIME $(, $k => $v )*);
        let wall_time = ::metrics::histogram!(WALL_TIME $(, $k => $v )*);

        $crate::metrics::Metrics {
            poll_count,
            done_count,
            busy_time,
            wait_time,
            wall_time,
        }
    }};
}

pub trait MeasuredFutureExt: Future + Sized {
    fn measured(self, metrics: Metrics) -> MeasuredFuture<Self> {
        MeasuredFuture::new(self, metrics)
    }
}
impl<F> MeasuredFutureExt for F where F: Future + Sized {}

#[pin_project::pin_project]
#[derive(derive_more::Debug)]
pub struct MeasuredFuture<F> {
    #[debug(skip)]
    #[pin]
    inner: F,

    metrics:      Metrics,
    measurements: Measurements,
}

#[derive(Debug)]
pub struct Metrics {
    pub poll_count: Counter,
    pub done_count: Counter,
    pub busy_time:  Histogram,
    pub wait_time:  Histogram,
    pub wall_time:  Histogram,
}

#[derive(Default, Debug)]
struct Measurements {
    first_poll: Option<Instant>,
    last_poll:  Option<Instant>,
    busy_time:  Duration,
    wait_time:  Duration,
}

impl<F> MeasuredFuture<F> {
    pub fn new(inner: F, metrics: Metrics) -> Self {
        Self {
            inner,
            metrics,
            measurements: Default::default(),
        }
    }
}

#[cfg(test)]
impl<F> MeasuredFuture<F> {
    fn recorded_wait_time(&self) -> Duration {
        self.measurements.wait_time
    }
}

impl<F> Future for MeasuredFuture<F>
where
    F: Future,
{
    type Output = F::Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let this = self.project();

        this.metrics.poll_count.increment(1);

        let t_start = Instant::now();
        if let Some(last_poll) = this.measurements.last_poll {
            this.measurements.wait_time += t_start.duration_since(last_poll);
        }
        let first_poll = this.measurements.first_poll.get_or_insert(t_start);

        let poll = this.inner.poll(cx);

        let t_end = Instant::now();
        this.measurements.busy_time += t_end.duration_since(t_start);
        // Record when this poll ended so the next poll can measure the idle gap.
        this.measurements.last_poll = Some(t_end);

        if poll.is_ready() {
            this.metrics
                .wall_time
                .record(t_end.duration_since(*first_poll));
            this.metrics.busy_time.record(this.measurements.busy_time);
            this.metrics.wait_time.record(this.measurements.wait_time);

            this.metrics.done_count.increment(1);
        }

        poll
    }
}

#[cfg(test)]
mod tests {
    use std::future::Future;
    use std::pin::Pin;
    use std::task::{Context, Poll, Waker};
    use std::time::Duration;

    use super::MeasuredFuture;

    // Pends once, then completes, so `poll` runs twice with a gap in between.
    struct PendOnce {
        polled: bool,
    }

    impl Future for PendOnce {
        type Output = ();

        fn poll(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<()> {
            if self.polled {
                Poll::Ready(())
            } else {
                self.polled = true;
                Poll::Pending
            }
        }
    }

    // Regression test for #146: the idle time between polls must accumulate.
    #[test]
    fn wait_time_accumulates_between_polls() {
        let mut fut = MeasuredFuture::new(
            PendOnce { polled: false },
            crate::make_metrics!("test_measured"),
        );
        let waker = Waker::noop();
        let mut cx = Context::from_waker(waker);

        assert!(Pin::new(&mut fut).poll(&mut cx).is_pending());
        std::thread::sleep(Duration::from_millis(5));
        assert!(Pin::new(&mut fut).poll(&mut cx).is_ready());

        assert!(
            fut.recorded_wait_time() > Duration::ZERO,
            "wait_time did not accumulate: {:?}",
            fut.recorded_wait_time()
        );
    }
}