Skip to main content

piano_runtime/
piano_future.rs

1//! Async function instrumentation -- PianoFuture wrapper.
2//!
3//! PianoFuture wraps an inner Future and accumulates per-poll alloc
4//! and CPU deltas. On completion or cancellation (drop), it computes
5//! self-time via the TLS children-time accumulator and aggregates
6//! into the per-thread FnAgg vec.
7//!
8//! Same exit path as Guard: children TLS save/restore + aggregate.
9//! No Measurement struct. No push_measurement. No span tree.
10//!
11//! Invariants:
12//! - Wall time starts on first poll, not construction.
13//! - Each poll saves/restores TLS children_ns (nested guards contribute).
14//! - Alloc deltas accumulated per-poll via snapshot_alloc_counters.
15//! - Bookkeeping allocs excluded via ReentrancyGuard.
16//! - Cancelled/panicking futures emit best-effort aggregate via Drop.
17//! - Never double-emits (emitted flag).
18
19use core::future::Future;
20use core::pin::Pin;
21use core::sync::atomic::{compiler_fence, Ordering};
22use core::task::{Context, Poll};
23
24use crate::aggregator;
25use crate::alloc::{snapshot_alloc_counters, ReentrancyGuard};
26use crate::children;
27use crate::session::ProfileSession;
28use crate::time::read;
29
30/// Future wrapper for async function instrumentation.
31pub struct PianoFuture<F> {
32    inner: F,
33    session: Option<&'static ProfileSession>,
34    name_id: u32,
35    start_ticks: u64,
36    saved_children_ns: u64,
37    alloc_count_acc: u64,
38    alloc_bytes_acc: u64,
39    free_count_acc: u64,
40    free_bytes_acc: u64,
41    cpu_acc_ns: u64,
42    children_ns_acc: u64,
43    emitted: bool,
44}
45
46/// Wrap an async function body for profiling.
47///
48/// If profiling is not active, returns a transparent wrapper whose
49/// poll delegates directly to the inner future with no overhead.
50pub fn enter_async<F: Future>(name_id: u32, body: F) -> PianoFuture<F> {
51    let session = match ProfileSession::get() {
52        Some(s) => s,
53        None => {
54            return PianoFuture {
55                inner: body,
56                session: None,
57                name_id: 0,
58                start_ticks: 0,
59                saved_children_ns: 0,
60                alloc_count_acc: 0,
61                alloc_bytes_acc: 0,
62                free_count_acc: 0,
63                free_bytes_acc: 0,
64                cpu_acc_ns: 0,
65                children_ns_acc: 0,
66                emitted: true, // prevent emit on drop
67            };
68        }
69    };
70
71    // Save parent's children_ns accumulator at construction time.
72    // This future's inclusive time will be reported to the parent on completion.
73    let saved_children_ns = children::save_and_zero();
74
75    PianoFuture {
76        inner: body,
77        session: Some(session),
78        name_id,
79        start_ticks: 0,
80        saved_children_ns,
81        alloc_count_acc: 0,
82        alloc_bytes_acc: 0,
83        free_count_acc: 0,
84        free_bytes_acc: 0,
85        cpu_acc_ns: 0,
86        children_ns_acc: 0,
87        emitted: false,
88    }
89}
90
91impl<F: Future> Future for PianoFuture<F> {
92    type Output = F::Output;
93
94    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
95        // SAFETY: We project Pin to `inner` only. All other fields are
96        // Unpin primitives. We never move `inner` out of self.
97        let this = unsafe { self.get_unchecked_mut() };
98
99        let session = match this.session {
100            Some(s) => s,
101            None => {
102                // Inactive: transparent passthrough.
103                // SAFETY: inner is pinned through self. We never move it.
104                let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
105                return inner.poll(cx);
106            }
107        };
108
109        // Save TLS children_ns for this poll. Nested sync Guards and
110        // PianoFutures will add their inclusive times to TLS during this poll.
111        let poll_children_saved = children::save_and_zero();
112
113        // Pre-poll bookkeeping
114        let snap_start;
115        {
116            let _reentrancy = ReentrancyGuard::enter();
117            snap_start = snapshot_alloc_counters();
118
119            if this.start_ticks == 0 {
120                this.start_ticks = read();
121            }
122        }
123
124        compiler_fence(Ordering::SeqCst);
125
126        let cpu_poll_start = if session.cpu_time_enabled {
127            crate::cpu_clock::cpu_now_ns()
128        } else {
129            0
130        };
131
132        // Poll inner future
133        // SAFETY: inner is pinned through self. We never move it.
134        let inner = unsafe { Pin::new_unchecked(&mut this.inner) };
135        let result = inner.poll(cx);
136
137        // Post-poll bookkeeping
138        let cpu_poll_end = if session.cpu_time_enabled {
139            crate::cpu_clock::cpu_now_ns()
140        } else {
141            0
142        };
143
144        compiler_fence(Ordering::SeqCst);
145
146        {
147            let _reentrancy = ReentrancyGuard::enter();
148            let snap_end = snapshot_alloc_counters();
149
150            this.alloc_count_acc += snap_end.alloc_count.saturating_sub(snap_start.alloc_count);
151            this.alloc_bytes_acc += snap_end.alloc_bytes.saturating_sub(snap_start.alloc_bytes);
152            this.free_count_acc += snap_end.free_count.saturating_sub(snap_start.free_count);
153            this.free_bytes_acc += snap_end.free_bytes.saturating_sub(snap_start.free_bytes);
154            this.cpu_acc_ns += cpu_poll_end.saturating_sub(cpu_poll_start);
155        }
156
157        // Accumulate children's inclusive time from this poll.
158        this.children_ns_acc += children::current_children_ns();
159
160        // Restore TLS children_ns for the outer scope.
161        // Don't report our time yet (we might have more polls).
162        children::restore_and_report(poll_children_saved, 0);
163
164        if result.is_ready() {
165            this.emit(session);
166        }
167
168        result
169    }
170}
171
172impl<F> PianoFuture<F> {
173    fn emit(&mut self, session: &'static ProfileSession) {
174        if self.emitted || self.start_ticks == 0 {
175            return;
176        }
177        self.emitted = true;
178
179        let end_ticks = read();
180        let start_ns = session.calibration.now_ns(self.start_ticks);
181        let end_ns = session.calibration.now_ns(end_ticks);
182        let inclusive_ns = end_ns.saturating_sub(start_ns);
183        let self_ns = inclusive_ns.saturating_sub(self.children_ns_acc);
184
185        aggregator::aggregate(
186            self.name_id,
187            self_ns,
188            inclusive_ns,
189            self.cpu_acc_ns,
190            self.alloc_count_acc,
191            self.alloc_bytes_acc,
192            self.free_count_acc,
193            self.free_bytes_acc,
194            &session.agg_registry,
195        );
196
197        // Report our inclusive time to the parent scope.
198        children::restore_and_report(self.saved_children_ns, inclusive_ns);
199    }
200}
201
202impl<F> Drop for PianoFuture<F> {
203    fn drop(&mut self) {
204        if let Some(session) = self.session {
205            self.emit(session);
206        }
207    }
208}
209
210// PianoFuture is Send if the inner future is Send.
211const _: () = {
212    fn _assert_send<T: Send>() {}
213    fn _check() {
214        _assert_send::<PianoFuture<core::future::Ready<()>>>();
215    }
216};