datafusion_physical_expr_common/metrics/elapsed_compute.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use std::future::Future;
19use std::pin::Pin;
20use std::task::{Context, Poll};
21use std::time::Duration;
22
23use datafusion_common::instant::Instant;
24use pin_project::{pin_project, pinned_drop};
25
26use super::Time;
27
28/// Wraps any [`Future`] and accumulates the wall-clock time spent inside
29/// each [`Future::poll`] call into `elapsed_compute`. Everything that
30/// executes synchronously within a `poll()` scope is measured — including
31/// CPU-bound work, memory copies, and any blocking the future performs
32/// before returning. Time between polls (when the runtime has suspended the
33/// future waiting for I/O, a channel, or a waker) is not measured.
34///
35/// For futures that mix synchronous CPU work with async I/O this gives a
36/// good approximation of CPU time: async I/O causes the future to yield
37/// (`Poll::Pending`), so the I/O latency is excluded automatically.
38///
39/// Note: uses `pin-project` rather than `pin-project-lite` in order to
40/// support `PinnedDrop`, which ensures accumulated time is flushed even
41/// if the future is cancelled (dropped before completion).
42#[pin_project(PinnedDrop)]
43pub struct ElapsedComputeFuture<T> {
44 #[pin]
45 inner: T,
46 /// Local accumulator: elapsed time is collected here during each `poll()`
47 /// and only flushed to `elapsed_compute` on completion (`Poll::Ready`) or
48 /// on drop (`PinnedDrop`). Keeping a separate local `Duration` avoids
49 /// performing an atomic operation on every `poll()` call, at the cost of
50 /// the reported metric value being unavailable until the future finishes.
51 curr: Duration,
52 elapsed_compute: Time,
53}
54
55#[pinned_drop]
56impl<T> PinnedDrop for ElapsedComputeFuture<T> {
57 fn drop(self: Pin<&mut Self>) {
58 if self.curr > Duration::default() {
59 let self_projected = self.project();
60 self_projected
61 .elapsed_compute
62 .add_duration(*self_projected.curr);
63 }
64 }
65}
66
67impl<O, F: Future<Output = O>> Future for ElapsedComputeFuture<F> {
68 type Output = O;
69
70 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
71 let self_projected = self.project();
72 let start = Instant::now();
73 let result = self_projected.inner.poll(cx);
74 *self_projected.curr += start.elapsed();
75 if result.is_ready() {
76 self_projected
77 .elapsed_compute
78 .add_duration(*self_projected.curr);
79 *self_projected.curr = Duration::default();
80 }
81 result
82 }
83}
84
85/// Extension trait that wraps any [`Future`] with [`ElapsedComputeFuture`].
86pub trait ElapsedComputeFutureExt: Future + Sized {
87 /// Wraps this future so that the time spent inside each [`Future::poll`]
88 /// call is accumulated into `elapsed_compute`. See [`ElapsedComputeFuture`]
89 /// for a full description of what is and is not measured.
90 fn with_elapsed_compute(self, elapsed_compute: Time) -> ElapsedComputeFuture<Self>;
91}
92
93impl<O, F: Future<Output = O>> ElapsedComputeFutureExt for F {
94 fn with_elapsed_compute(self, elapsed_compute: Time) -> ElapsedComputeFuture<Self> {
95 ElapsedComputeFuture {
96 inner: self,
97 curr: Duration::default(),
98 elapsed_compute,
99 }
100 }
101}