Skip to main content

datafusion_physical_expr_common/metrics/
elapsed_compute.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21use std::time::Duration;
22
23use datafusion_common::instant::Instant;
24use pin_project::{pin_project, pinned_drop};
25
26use super::Time;
27
28/// Wraps any [`Future`] and accumulates the wall-clock time spent inside
29/// each [`Future::poll`] call into `elapsed_compute`. Everything that
30/// executes synchronously within a `poll()` scope is measured — including
31/// CPU-bound work, memory copies, and any blocking the future performs
32/// before returning. Time between polls (when the runtime has suspended the
33/// future waiting for I/O, a channel, or a waker) is not measured.
34///
35/// For futures that mix synchronous CPU work with async I/O this gives a
36/// good approximation of CPU time: async I/O causes the future to yield
37/// (`Poll::Pending`), so the I/O latency is excluded automatically.
38///
39/// Note: uses `pin-project` rather than `pin-project-lite` in order to
40/// support `PinnedDrop`, which ensures accumulated time is flushed even
41/// if the future is cancelled (dropped before completion).
42#[pin_project(PinnedDrop)]
43pub struct ElapsedComputeFuture<T> {
44    #[pin]
45    inner: T,
46    /// Local accumulator: elapsed time is collected here during each `poll()`
47    /// and only flushed to `elapsed_compute` on completion (`Poll::Ready`) or
48    /// on drop (`PinnedDrop`). Keeping a separate local `Duration` avoids
49    /// performing an atomic operation on every `poll()` call, at the cost of
50    /// the reported metric value being unavailable until the future finishes.
51    curr: Duration,
52    elapsed_compute: Time,
53}
54
55#[pinned_drop]
56impl<T> PinnedDrop for ElapsedComputeFuture<T> {
57    fn drop(self: Pin<&mut Self>) {
58        if self.curr > Duration::default() {
59            let self_projected = self.project();
60            self_projected
61                .elapsed_compute
62                .add_duration(*self_projected.curr);
63        }
64    }
65}
66
67impl<O, F: Future<Output = O>> Future for ElapsedComputeFuture<F> {
68    type Output = O;
69
70    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71        let self_projected = self.project();
72        let start = Instant::now();
73        let result = self_projected.inner.poll(cx);
74        *self_projected.curr += start.elapsed();
75        if result.is_ready() {
76            self_projected
77                .elapsed_compute
78                .add_duration(*self_projected.curr);
79            *self_projected.curr = Duration::default();
80        }
81        result
82    }
83}
84
85/// Extension trait that wraps any [`Future`] with [`ElapsedComputeFuture`].
86pub trait ElapsedComputeFutureExt: Future + Sized {
87    /// Wraps this future so that the time spent inside each [`Future::poll`]
88    /// call is accumulated into `elapsed_compute`. See [`ElapsedComputeFuture`]
89    /// for a full description of what is and is not measured.
90    fn with_elapsed_compute(self, elapsed_compute: Time) -> ElapsedComputeFuture<Self>;
91}
92
93impl<O, F: Future<Output = O>> ElapsedComputeFutureExt for F {
94    fn with_elapsed_compute(self, elapsed_compute: Time) -> ElapsedComputeFuture<Self> {
95        ElapsedComputeFuture {
96            inner: self,
97            curr: Duration::default(),
98            elapsed_compute,
99        }
100    }
101}