use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use datafusion_common::instant::Instant;
use pin_project::{pin_project, pinned_drop};
use super::Time;
#[pin_project(PinnedDrop)]
pub struct ElapsedComputeFuture<T> {
#[pin]
inner: T,
curr: Duration,
elapsed_compute: Time,
}
#[pinned_drop]
impl<T> PinnedDrop for ElapsedComputeFuture<T> {
fn drop(self: Pin<&mut Self>) {
if self.curr > Duration::default() {
let self_projected = self.project();
self_projected
.elapsed_compute
.add_duration(*self_projected.curr);
}
}
}
impl<O, F: Future<Output = O>> Future for ElapsedComputeFuture<F> {
type Output = O;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let self_projected = self.project();
let start = Instant::now();
let result = self_projected.inner.poll(cx);
*self_projected.curr += start.elapsed();
if result.is_ready() {
self_projected
.elapsed_compute
.add_duration(*self_projected.curr);
*self_projected.curr = Duration::default();
}
result
}
}
pub trait ElapsedComputeFutureExt: Future + Sized {
fn with_elapsed_compute(self, elapsed_compute: Time) -> ElapsedComputeFuture<Self>;
}
impl<O, F: Future<Output = O>> ElapsedComputeFutureExt for F {
fn with_elapsed_compute(self, elapsed_compute: Time) -> ElapsedComputeFuture<Self> {
ElapsedComputeFuture {
inner: self,
curr: Duration::default(),
elapsed_compute,
}
}
}