Skip to main content

piano_runtime/
collector.rs

1//! Thread-local timing collector with RAII guards.
2//!
3//! Each instrumented function calls `enter(name)` which pushes a `StackEntry`
4//! onto a thread-local call stack and returns an RAII `Guard`. When the guard
5//! drops (on any exit path), it pops the stack entry, computes elapsed time,
6//! propagates children time to the parent, and records a `RawRecord`.
7//!
8//! `collect()` aggregates raw records into per-function summaries sorted by
9//! self-time descending. `reset()` clears all state for the current thread.
10//!
11//! Flush strategy: each thread's records live in an `Arc<Mutex<Vec<RawRecord>>>`
12//! registered in a global `THREAD_RECORDS` Vec. `shutdown()` (injected at the
13//! end of main by the AST rewriter) iterates all Arcs to collect data from every
14//! thread, including thread-pool workers whose TLS destructors may never fire.
15//!
16//! Thread-locality: stack and records are thread-local. Each thread produces an
17//! independent call tree by default. For cross-thread attribution (e.g. rayon
18//! scopes, spawned threads), use `fork()` / `adopt()` to propagate timing context
19//! so that child thread elapsed time is correctly subtracted from the parent's
20//! self-time. `SpanContext` auto-finalizes on Drop.
21
22use std::cell::{RefCell, UnsafeCell};
23use std::collections::{HashMap, HashSet};
24use std::io::Write;
25use std::path::PathBuf;
26use std::sync::atomic::{compiler_fence, Ordering};
27use std::sync::{Arc, Mutex, Once};
28use std::time::{Instant, SystemTime, UNIX_EPOCH};
29
30/// Thread-safe, initialize-once cell using `Once` + `UnsafeCell`.
31///
32/// Equivalent to `OnceLock` (stabilized in 1.70) but works on Rust 1.59+.
33/// The `Once` primitive guarantees single-writer semantics; after initialization
34/// the value is read-only, so there are no data races.
35struct SyncOnceCell<T> {
36    once: Once,
37    value: UnsafeCell<Option<T>>,
38}
39
40// SAFETY: `Once` synchronizes initialization. After `call_once` completes,
41// the inner value is only read, never mutated, so `Sync` is sound.
42unsafe impl<T: Send + Sync> Sync for SyncOnceCell<T> {}
43
44impl<T> SyncOnceCell<T> {
45    const fn new() -> Self {
46        Self {
47            once: Once::new(),
48            value: UnsafeCell::new(None),
49        }
50    }
51
52    fn get_or_init(&self, f: impl FnOnce() -> T) -> &T {
53        self.once.call_once(|| {
54            // SAFETY: `call_once` guarantees this runs exactly once, with
55            // all subsequent callers blocking until it completes.
56            unsafe { *self.value.get() = Some(f()) };
57        });
58        // SAFETY: After `call_once` returns (on any thread), `value` is
59        // `Some` and never mutated again.
60        unsafe { (*self.value.get()).as_ref().unwrap() }
61    }
62}
63
64/// Process-wide run identifier.
65///
66/// All threads within a single process share this ID, so that JSON files
67/// written by different threads can be correlated during report consolidation.
68static RUN_ID: SyncOnceCell<String> = SyncOnceCell::new();
69
70fn run_id() -> &'static str {
71    RUN_ID.get_or_init(|| format!("{}_{}", std::process::id(), timestamp_ms()))
72}
73
74/// Process-start epoch for relative timestamps.
75static EPOCH: SyncOnceCell<Instant> = SyncOnceCell::new();
76
77/// Explicit runs directory set by the CLI via `set_runs_dir()`.
78///
79/// When set, `runs_dir()` returns this path instead of the env-var /
80/// home-directory fallback. This ensures `flush()` and `shutdown()` write
81/// to the same project-local directory that `shutdown_to()` uses.
82///
83/// Uses `SyncOnceCell<Mutex<_>>` instead of `static Mutex` because
84/// `Mutex::new` is not `const fn` on Rust < 1.63 (runtime MSRV is 1.59).
85static RUNS_DIR: SyncOnceCell<Mutex<Option<PathBuf>>> = SyncOnceCell::new();
86
87fn runs_dir_lock() -> &'static Mutex<Option<PathBuf>> {
88    RUNS_DIR.get_or_init(|| Mutex::new(None))
89}
90
91fn epoch() -> Instant {
92    *EPOCH.get_or_init(|| {
93        crate::tsc::calibrate();
94        let tsc_val = crate::tsc::read();
95        let now = Instant::now();
96        crate::tsc::set_epoch_tsc(tsc_val);
97        now
98    })
99}
100
101/// Aggregated timing data for a single function.
102#[derive(Debug, Clone)]
103pub struct FunctionRecord {
104    pub name: String,
105    pub calls: u64,
106    pub total_ms: f64,
107    pub self_ms: f64,
108    #[cfg(feature = "cpu-time")]
109    pub cpu_self_ms: f64,
110}
111
112/// Per-function summary within a single frame.
113#[derive(Debug, Clone)]
114pub struct FrameFnSummary {
115    pub name: &'static str,
116    pub calls: u32,
117    pub self_ns: u64,
118    #[cfg(feature = "cpu-time")]
119    pub cpu_self_ns: u64,
120    pub alloc_count: u64,
121    pub alloc_bytes: u64,
122    pub free_count: u64,
123    pub free_bytes: u64,
124}
125
126/// Per-invocation measurement record with nanosecond precision.
127#[derive(Debug, Clone)]
128pub struct InvocationRecord {
129    pub name: &'static str,
130    pub start_ns: u64,
131    pub elapsed_ns: u64,
132    pub self_ns: u64,
133    #[cfg(feature = "cpu-time")]
134    pub cpu_self_ns: u64,
135    pub alloc_count: u64,
136    pub alloc_bytes: u64,
137    pub free_count: u64,
138    pub free_bytes: u64,
139    pub depth: u16,
140}
141
142/// Entry on the thread-local timing stack.
143pub(crate) struct StackEntry {
144    pub(crate) name: &'static str,
145    pub(crate) children_ms: f64,
146    #[cfg(feature = "cpu-time")]
147    pub(crate) cpu_children_ns: u64,
148    #[cfg(feature = "cpu-time")]
149    pub(crate) cpu_start_ns: u64,
150    /// Saved ALLOC_COUNTERS from before this scope, restored on Guard::drop.
151    pub(crate) saved_alloc: crate::alloc::AllocSnapshot,
152    /// Packed identity: `[cookie:32][name_id:16][depth:16]`.
153    ///
154    /// Regular entries: matches the Guard's packed value (same thread pushed it).
155    /// Phantom entries: the migrated Guard's packed identity (different thread
156    /// cookie), used as a unique key for PHANTOM_REGISTRY / PHANTOM_ARCS lookup.
157    pub(crate) packed: u64,
158}
159
160/// Raw measurement produced when a Guard drops.
161#[derive(Clone)]
162struct RawRecord {
163    name: &'static str,
164    elapsed_ms: f64,
165    children_ms: f64,
166    #[cfg(feature = "cpu-time")]
167    cpu_self_ns: u64,
168}
169
170type ThreadRecordArc = Arc<Mutex<Vec<RawRecord>>>;
171
172/// Per-phantom tracking for cross-thread forwarding and cleanup.
173///
174/// When `check()` pushes a phantom on thread B, it registers a `PhantomInfo`
175/// in the global `PHANTOM_REGISTRY`. This enables:
176/// - Forwarding: children_ms accumulated on B is readable by C via the Arc.
177/// - Cleanup: `host_cookie` identifies B so the cleanup queue can target it.
178struct PhantomInfo {
179    /// Thread cookie of the thread hosting this phantom StackEntry.
180    host_cookie: u64,
181    /// Shared children_ms accumulator. The host thread writes to this Arc
182    /// (via PHANTOM_ARCS) when children update the phantom; the next thread
183    /// reads from it to seed its own phantom.
184    children_arc: Arc<Mutex<f64>>,
185}
186
187/// Global registry mapping guard packed identity -> phantom info.
188///
189/// `check()` inserts when pushing a phantom. On re-migration, `check()` reads
190/// the old entry (forwarding + cleanup scheduling) and inserts a new one.
191/// `drop_cold` removes the final entry.
192static PHANTOM_REGISTRY: SyncOnceCell<Mutex<HashMap<u64, PhantomInfo>>> = SyncOnceCell::new();
193
194fn phantom_registry() -> &'static Mutex<HashMap<u64, PhantomInfo>> {
195    PHANTOM_REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
196}
197
198/// Cleanup queue: `(host_cookie, guard_packed)` pairs. Thread B drains entries
199/// where `host_cookie == B` on its next `enter_cold`, removing the stale phantom
200/// from its stack.
201static PHANTOM_CLEANUP: SyncOnceCell<Mutex<Vec<(u64, u64)>>> = SyncOnceCell::new();
202
203fn phantom_cleanup() -> &'static Mutex<Vec<(u64, u64)>> {
204    PHANTOM_CLEANUP.get_or_init(|| Mutex::new(Vec::new()))
205}
206
207/// Fast-path guard: skip the cleanup lock when the queue is empty.
208static HAS_PHANTOM_CLEANUP: std::sync::atomic::AtomicBool =
209    std::sync::atomic::AtomicBool::new(false);
210
211/// Global registry of per-thread record storage.
212/// Each thread registers its Arc on first access. collect_all() iterates all Arcs.
213static THREAD_RECORDS: SyncOnceCell<Mutex<Vec<ThreadRecordArc>>> = SyncOnceCell::new();
214
215fn thread_records() -> &'static Mutex<Vec<ThreadRecordArc>> {
216    THREAD_RECORDS.get_or_init(|| Mutex::new(Vec::new()))
217}
218
219thread_local! {
220    pub(crate) static STACK: RefCell<Vec<StackEntry>> = RefCell::new(Vec::new());
221    static RECORDS: Arc<Mutex<Vec<RawRecord>>> = {
222        let arc = Arc::new(Mutex::new(Vec::new()));
223        thread_records().lock().unwrap_or_else(|e| e.into_inner()).push(Arc::clone(&arc));
224        arc
225    };
226    static REGISTERED: RefCell<Vec<&'static str>> = RefCell::new(Vec::new());
227    #[cfg(any(test, feature = "_test_internals"))]
228    static INVOCATIONS: RefCell<Vec<InvocationRecord>> = RefCell::new(Vec::new());
229    /// Invocations accumulated within the current frame (cleared on frame boundary).
230    static FRAME_BUFFER: RefCell<Vec<InvocationRecord>> = RefCell::new(Vec::new());
231    /// Completed per-frame summaries.
232    static FRAMES: RefCell<Vec<Vec<FrameFnSummary>>> = RefCell::new(Vec::new());
233    /// Per-thread phantom forwarding Arcs. When a phantom StackEntry exists on
234    /// this thread (from Guard::check()), the corresponding Arc is stored here
235    /// so child drops can write through to it.
236    static PHANTOM_ARCS: RefCell<Vec<(u64, Arc<Mutex<f64>>)>> = RefCell::new(Vec::new());
237    /// Reusable Vec for frame aggregation. Cleared and reused on each
238    /// depth-0 boundary to avoid per-frame heap allocation.
239    /// Linear scan is faster than HashMap for the typical 1-5 unique functions
240    /// per frame (no hashing, no bucket probing, no memset for clear).
241    static FRAME_AGG_VEC: RefCell<Vec<FrameFnSummary>> = RefCell::new(Vec::new());
242    /// Fast local buffer for RawRecords. Avoids the Mutex lock on RECORDS
243    /// for every drop_cold call. Flushed to RECORDS at depth-0 boundaries.
244    static RECORDS_BUF: RefCell<Vec<RawRecord>> = RefCell::new(Vec::new());
245}
246
247/// Drain RECORDS_BUF into the Mutex-guarded RECORDS.
248/// Called at depth-0 boundaries and before reading RECORDS.
249fn flush_records_buf() {
250    RECORDS_BUF.with(|buf| {
251        let mut buf = buf.borrow_mut();
252        if buf.is_empty() {
253            return;
254        }
255        RECORDS.with(|records| {
256            records
257                .lock()
258                .unwrap_or_else(|e| e.into_inner())
259                .extend(buf.drain(..));
260        });
261    });
262}
263
264/// Sequential per-thread identifier. Cheaper than `std::thread::current().id()`
265/// and packable into a u64 alongside the stack depth.
266static NEXT_THREAD_COOKIE: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(1);
267
268thread_local! {
269    static THREAD_COOKIE: u64 = NEXT_THREAD_COOKIE.fetch_add(1, Ordering::Relaxed);
270}
271
272// -- Interned function name table ------------------------------------------
273//
274// Maps u16 IDs to `&'static str` function names so the Guard can carry a
275// compact name reference without growing beyond 16 bytes.
276//
277// Layout: append-only Vec behind a Mutex. Reads during drop_cold take the
278// lock briefly; writes happen once per unique name in enter_cold.
279// A thread-local cache avoids the global lock on the hot path.
280
281/// Global interned name table: index -> &'static str.
282static NAME_TABLE: SyncOnceCell<Mutex<Vec<&'static str>>> = SyncOnceCell::new();
283
284fn name_table() -> &'static Mutex<Vec<&'static str>> {
285    NAME_TABLE.get_or_init(|| Mutex::new(Vec::new()))
286}
287
288// Thread-local cache mapping name pointer -> interned ID.
289// Uses pointer identity (`&'static str` addresses are stable).
290thread_local! {
291    static NAME_CACHE: RefCell<HashMap<usize, u16>> = RefCell::new(HashMap::new());
292}
293
294/// Intern a function name, returning its u16 ID.
295/// Fast path: thread-local cache hit (no global lock).
296/// Slow path: global table lookup/insert under lock, then cache.
297#[inline(always)]
298fn intern_name(name: &'static str) -> u16 {
299    let ptr = name.as_ptr() as usize;
300    let cached = NAME_CACHE.with(|cache| cache.borrow().get(&ptr).copied());
301    if let Some(id) = cached {
302        return id;
303    }
304    intern_name_slow(name, ptr)
305}
306
307#[inline(never)]
308fn intern_name_slow(name: &'static str, ptr: usize) -> u16 {
309    let mut table = name_table().lock().unwrap_or_else(|e| e.into_inner());
310    // Check if already in global table (another thread may have added it).
311    let id = if let Some(pos) = table.iter().position(|&n| n.as_ptr() as usize == ptr) {
312        pos as u16
313    } else {
314        let len = table.len();
315        debug_assert!(
316            len <= u16::MAX as usize,
317            "interned name table overflow: more than 65535 unique function names"
318        );
319        if len > u16::MAX as usize {
320            // Table full (65536 entries, indices 0..=u16::MAX). Saturate
321            // instead of wrapping. lookup_name handles out-of-bounds by
322            // returning "<unknown>", so this degrades gracefully.
323            return u16::MAX;
324        }
325        let id = len as u16;
326        table.push(name);
327        id
328    };
329    drop(table);
330    NAME_CACHE.with(|cache| {
331        cache.borrow_mut().insert(ptr, id);
332    });
333    id
334}
335
336/// Look up a function name by its interned ID.
337/// Returns `"<unknown>"` if the ID is out of bounds (should never happen).
338fn lookup_name(id: u16) -> &'static str {
339    let table = name_table().lock().unwrap_or_else(|e| e.into_inner());
340    table.get(id as usize).copied().unwrap_or("<unknown>")
341}
342
343/// Pack a thread cookie (32 bits), name ID (16 bits), and stack depth (16 bits)
344/// into a single u64. Guard stays 16 bytes.
345///
346/// Layout: `[cookie:32][name_id:16][depth:16]`
347///
348/// Note: only the low 32 bits of the cookie are stored, limiting unique thread
349/// identification to ~4 billion threads per process. This is a deliberate
350/// tradeoff (from a previous 48-bit cookie) to make room for the 16-bit
351/// name_id. If THREAD_COOKIE exceeds 2^32, `unpack_cookie` returns only the
352/// low 32 bits, which could cause false migration detection in `drop_cold`.
353#[inline(always)]
354fn pack_cookie_name_depth(cookie: u64, name_id: u16, depth: u16) -> u64 {
355    (cookie << 32) | ((name_id as u64) << 16) | (depth as u64)
356}
357
358/// Unpack the thread cookie (high 32 bits) from a packed u64.
359#[inline(always)]
360fn unpack_cookie(packed: u64) -> u64 {
361    packed >> 32
362}
363
364/// Unpack the name ID (bits 16..31) from a packed u64.
365#[inline(always)]
366fn unpack_name_id(packed: u64) -> u16 {
367    (packed >> 16) as u16
368}
369
370/// Unpack the stack depth (low 16 bits) from a packed u64.
371#[inline(always)]
372fn unpack_depth(packed: u64) -> u16 {
373    packed as u16
374}
375
376/// RAII timing guard. Records elapsed time on drop.
377///
378/// 16 bytes: fits in two registers on both x86_64 (rax+rdx) and
379/// aarch64 (x0+x1), eliminating all memory stores from the
380/// measurement window.
381///
382/// Uses a raw hardware counter (`rdtsc` / `cntvct_el0`) instead of
383/// `Instant::now()` to minimize clock-read cost. The tsc-to-nanosecond
384/// conversion happens in `drop_cold`, outside the measurement window.
385#[must_use = "dropping the guard immediately records ~0ms; bind it with `let _guard = ...`"]
386pub struct Guard {
387    start_tsc: u64,
388    /// Bit layout: `[cookie:32][name_id:16][depth:16]`
389    /// - cookie: identifies the thread that called enter()
390    /// - name_id: index into the global interned name table
391    /// - depth: stack depth at the time of enter()
392    packed: u64,
393}
394
395// Guard must be Send so async runtimes can move futures containing guards
396// across worker threads.
397const _: () = {
398    fn _assert_send<T: Send>() {}
399    fn _check() {
400        _assert_send::<Guard>();
401    }
402    // Verify Guard is exactly 16 bytes (fits in 2 registers).
403    fn _assert_size() {
404        let _ = core::mem::transmute::<Guard, [u8; 16]>;
405    }
406};
407
408impl Guard {
409    /// Check for thread migration after an .await point.
410    ///
411    /// If the Guard has migrated to a different thread, pushes a phantom
412    /// StackEntry onto the new thread's stack so that children can update
413    /// it via the normal parent fast path. Also registers the Guard as
414    /// migrated so the original thread can clean up the orphaned entry.
415    ///
416    /// No-op if still on the same thread. Idempotent: safe to call
417    /// multiple times (skips if phantom already exists on current stack).
418    pub fn check(&self) {
419        let current_cookie = THREAD_COOKIE.with(|c| *c);
420        let enter_cookie = unpack_cookie(self.packed);
421        if current_cookie == enter_cookie {
422            return; // Same thread, no migration
423        }
424
425        // Push phantom on current thread's stack (idempotent).
426        // Children will update it via the normal parent fast path.
427        STACK.with(|stack| {
428            let mut s = stack.borrow_mut();
429            // Check if phantom already exists for this guard (exact packed match).
430            let already_has = s
431                .iter()
432                .any(|e| e.packed == self.packed && e.name == "<phantom>");
433            if already_has {
434                return;
435            }
436
437            // Read forwarded children_ms from a previous thread's phantom and
438            // schedule cleanup of the old host thread's phantom (multi-hop
439            // migration: A -> B -> C).
440            let (forwarded_children_ms, fwd_arc) = {
441                // Lock registry: remove old entry (if any) and insert new one.
442                let arc = Arc::new(Mutex::new(0.0));
443                let old_info = {
444                    let mut reg = phantom_registry().lock().unwrap_or_else(|e| e.into_inner());
445                    let old = reg.remove(&self.packed);
446                    reg.insert(
447                        self.packed,
448                        PhantomInfo {
449                            host_cookie: current_cookie,
450                            children_arc: Arc::clone(&arc),
451                        },
452                    );
453                    old
454                }; // registry lock dropped here
455
456                // Process old entry (if any) without holding registry lock.
457                let forwarded = if let Some(old) = old_info {
458                    // Schedule cleanup of old host's phantom.
459                    let mut cleanup = phantom_cleanup().lock().unwrap_or_else(|e| e.into_inner());
460                    cleanup.push((old.host_cookie, self.packed));
461                    HAS_PHANTOM_CLEANUP.store(true, Ordering::Relaxed);
462                    drop(cleanup); // cleanup lock dropped
463
464                    let val = *old.children_arc.lock().unwrap_or_else(|e| e.into_inner());
465                    *arc.lock().unwrap_or_else(|e| e.into_inner()) = val;
466                    val
467                } else {
468                    0.0
469                };
470
471                (forwarded, arc)
472            };
473
474            // Store the Arc for write-through from child drops.
475            PHANTOM_ARCS.with(|arcs| {
476                arcs.borrow_mut().push((self.packed, fwd_arc));
477            });
478
479            s.push(StackEntry {
480                name: "<phantom>",
481                children_ms: forwarded_children_ms,
482                #[cfg(feature = "cpu-time")]
483                cpu_children_ns: 0,
484                #[cfg(feature = "cpu-time")]
485                cpu_start_ns: 0,
486                saved_alloc: crate::alloc::AllocSnapshot::new(),
487                packed: self.packed,
488            });
489        });
490    }
491}
492
493/// Bookkeeping half of Guard::drop(): thread check, alloc restore, stack pop,
494/// recording. Kept out-of-line so the inlined drop is just a counter read + call.
495#[inline(never)]
496fn drop_cold(guard: &Guard, end_tsc: u64, #[cfg(feature = "cpu-time")] cpu_end_ns: u64) {
497    let drop_cookie = THREAD_COOKIE.with(|c| *c);
498    let enter_cookie = unpack_cookie(guard.packed);
499    let migrated = drop_cookie != enter_cookie;
500
501    if migrated {
502        let name = lookup_name(unpack_name_id(guard.packed));
503        let elapsed_ns = crate::tsc::elapsed_ns(guard.start_tsc, end_tsc);
504        let elapsed_ms = elapsed_ns as f64 / 1_000_000.0;
505        let start_ns = crate::tsc::ticks_to_epoch_ns(guard.start_tsc, crate::tsc::epoch_tsc());
506
507        // Post-migration children (Case 2): find and pop our phantom
508        // StackEntry. Its children_ms was updated by children via the
509        // normal parent fast path, and seeded with forwarded children
510        // from previous thread hops (multi-hop migration).
511        let phantom_children_ms = STACK.with(|stack| {
512            let mut s = stack.borrow_mut();
513            if let Some(pos) = s
514                .iter()
515                .rposition(|e| e.packed == guard.packed && e.name == "<phantom>")
516            {
517                let phantom = s.remove(pos);
518                phantom.children_ms
519            } else {
520                0.0
521            }
522        });
523
524        // Clean up the phantom registry and PHANTOM_ARCS for this guard.
525        {
526            let mut reg = phantom_registry().lock().unwrap_or_else(|e| e.into_inner());
527            reg.remove(&guard.packed);
528        }
529        PHANTOM_ARCS.with(|arcs| {
530            arcs.borrow_mut().retain(|(pk, _)| *pk != guard.packed);
531        });
532
533        let children_ns = (phantom_children_ms * 1_000_000.0) as u64;
534        let self_ns = elapsed_ns.saturating_sub(children_ns);
535
536        // Migrated path: push directly to Mutex-guarded RECORDS (not the
537        // fast buffer) because this thread has no depth-0 boundary to flush.
538        RECORDS.with(|records| {
539            records
540                .lock()
541                .unwrap_or_else(|e| e.into_inner())
542                .push(RawRecord {
543                    name,
544                    elapsed_ms,
545                    children_ms: phantom_children_ms,
546                    #[cfg(feature = "cpu-time")]
547                    cpu_self_ns: 0,
548                });
549        });
550
551        let invocation = InvocationRecord {
552            name,
553            start_ns,
554            elapsed_ns,
555            self_ns,
556            #[cfg(feature = "cpu-time")]
557            cpu_self_ns: 0,
558            alloc_count: 0,
559            alloc_bytes: 0,
560            free_count: 0,
561            free_bytes: 0,
562            depth: 0,
563        };
564
565        #[cfg(any(test, feature = "_test_internals"))]
566        INVOCATIONS.with(|inv| {
567            inv.borrow_mut().push(invocation.clone());
568        });
569
570        FRAME_BUFFER.with(|buf| {
571            buf.borrow_mut().push(invocation);
572        });
573        return;
574    }
575
576    // Same thread -- existing logic with orphan drain prefix.
577    let scope_alloc = crate::alloc::ALLOC_COUNTERS
578        .try_with(|cell| cell.get())
579        .unwrap_or_default();
580
581    STACK.with(|stack| {
582        // Drain orphaned entries left by migrated child guards.
583        {
584            let mut s = stack.borrow_mut();
585            let guard_depth = unpack_depth(guard.packed);
586            while s
587                .last()
588                .map_or(false, |e| unpack_depth(e.packed) > guard_depth)
589            {
590                let orphan = s.pop().unwrap();
591                let _ = crate::alloc::ALLOC_COUNTERS.try_with(|cell| {
592                    cell.set(orphan.saved_alloc);
593                });
594            }
595        }
596
597        let entry = match stack.borrow_mut().pop() {
598            Some(e) => e,
599            None => {
600                eprintln!("piano-runtime: guard dropped without matching stack entry (bug)");
601                return;
602            }
603        };
604
605        // Restore parent's saved alloc counters.
606        let _ = crate::alloc::ALLOC_COUNTERS.try_with(|cell| {
607            cell.set(entry.saved_alloc);
608        });
609
610        let elapsed_ns = crate::tsc::elapsed_ns(guard.start_tsc, end_tsc);
611        let elapsed_ms = elapsed_ns as f64 / 1_000_000.0;
612        let children_ns = (entry.children_ms * 1_000_000.0) as u64;
613        let self_ns = elapsed_ns.saturating_sub(children_ns);
614        let start_ns = crate::tsc::ticks_to_epoch_ns(guard.start_tsc, crate::tsc::epoch_tsc());
615        let children_ms = entry.children_ms;
616
617        #[cfg(feature = "cpu-time")]
618        let cpu_elapsed_ns = cpu_end_ns.saturating_sub(entry.cpu_start_ns);
619        #[cfg(feature = "cpu-time")]
620        let cpu_self_ns = cpu_elapsed_ns.saturating_sub(entry.cpu_children_ns);
621
622        if let Some(parent) = stack.borrow_mut().last_mut() {
623            parent.children_ms += elapsed_ms;
624            // If parent is a phantom (cookie differs from this thread),
625            // write through to its forwarding Arc so subsequent thread
626            // hops can read the accumulated children_ms.
627            if unpack_cookie(parent.packed) != drop_cookie {
628                let children = parent.children_ms;
629                let pk = parent.packed;
630                PHANTOM_ARCS.with(|arcs| {
631                    if let Some((_, arc)) = arcs.borrow().iter().find(|(k, _)| *k == pk) {
632                        *arc.lock().unwrap_or_else(|e| e.into_inner()) = children;
633                    }
634                });
635            }
636            #[cfg(feature = "cpu-time")]
637            {
638                parent.cpu_children_ns += cpu_elapsed_ns;
639            }
640        }
641
642        RECORDS_BUF.with(|buf| {
643            buf.borrow_mut().push(RawRecord {
644                name: entry.name,
645                elapsed_ms,
646                children_ms,
647                #[cfg(feature = "cpu-time")]
648                cpu_self_ns,
649            });
650        });
651
652        let invocation = InvocationRecord {
653            name: entry.name,
654            start_ns,
655            elapsed_ns,
656            self_ns,
657            #[cfg(feature = "cpu-time")]
658            cpu_self_ns,
659            alloc_count: scope_alloc.alloc_count,
660            alloc_bytes: scope_alloc.alloc_bytes,
661            free_count: scope_alloc.free_count,
662            free_bytes: scope_alloc.free_bytes,
663            depth: unpack_depth(entry.packed),
664        };
665
666        #[cfg(any(test, feature = "_test_internals"))]
667        INVOCATIONS.with(|inv| {
668            inv.borrow_mut().push(invocation.clone());
669        });
670
671        FRAME_BUFFER.with(|buf| {
672            buf.borrow_mut().push(invocation);
673        });
674
675        // Flush RECORDS_BUF at frame boundaries. Normally this is depth 0,
676        // but fork/adopt places an entry at depth 0 on worker threads,
677        // pushing real functions to depth 1+. Flush when all remaining
678        // stack entries are at depth 0 (all real work for this frame done).
679        let remaining_all_base = stack.borrow().iter().all(|e| unpack_depth(e.packed) == 0);
680        let is_frame_boundary = unpack_depth(entry.packed) == 0 || remaining_all_base;
681
682        if is_frame_boundary {
683            flush_records_buf();
684        }
685        if unpack_depth(entry.packed) == 0 {
686            FRAME_BUFFER.with(|buf| {
687                {
688                    let borrowed = buf.borrow();
689                    aggregate_frame_into_frames(&borrowed);
690                }
691                buf.borrow_mut().clear();
692            });
693        }
694    });
695}
696
697impl Drop for Guard {
698    /// Inlined so the counter read (`rdtsc`/`cntvct_el0`) happens at the drop
699    /// site as a single inline instruction.
700    /// `compiler_fence` prevents the compiler from hoisting Guard field loads
701    /// before the counter read.
702    #[inline(always)]
703    fn drop(&mut self) {
704        let end_tsc = crate::tsc::read();
705        #[cfg(feature = "cpu-time")]
706        let cpu_end_ns = crate::cpu_clock::cpu_now_ns();
707
708        // Prevent the compiler from moving Guard field reads (needed by
709        // drop_cold) before the counter read above.
710        compiler_fence(Ordering::SeqCst);
711
712        drop_cold(
713            self,
714            end_tsc,
715            #[cfg(feature = "cpu-time")]
716            cpu_end_ns,
717        );
718    }
719}
720
721/// Drain stale phantom StackEntries scheduled for cleanup on this thread.
722///
723/// Called from `enter_cold` before pushing the new entry. The cleanup queue
724/// is populated by `check()` on other threads when a guard re-migrates
725/// (e.g., A -> B -> C: C schedules B's phantom for cleanup).
726fn drain_phantom_cleanup(my_cookie: u64) {
727    if !HAS_PHANTOM_CLEANUP.load(Ordering::Relaxed) {
728        return;
729    }
730    let mut queue = phantom_cleanup().lock().unwrap_or_else(|e| e.into_inner());
731    let mine: Vec<u64> = queue
732        .iter()
733        .filter(|(cookie, _)| *cookie == my_cookie)
734        .map(|(_, packed)| *packed)
735        .collect();
736    queue.retain(|(cookie, _)| *cookie != my_cookie);
737    if queue.is_empty() {
738        HAS_PHANTOM_CLEANUP.store(false, Ordering::Relaxed);
739    }
740    drop(queue);
741
742    if mine.is_empty() {
743        return;
744    }
745
746    // Remove matching phantom StackEntries from this thread's stack.
747    STACK.with(|stack| {
748        stack
749            .borrow_mut()
750            .retain(|e| !(e.name == "<phantom>" && mine.contains(&e.packed)));
751    });
752    // Remove matching PHANTOM_ARCS entries.
753    PHANTOM_ARCS.with(|arcs| {
754        arcs.borrow_mut()
755            .retain(|(packed, _)| !mine.contains(packed));
756    });
757}
758
759/// Bookkeeping half of enter(): epoch, alloc save, stack push, name interning.
760/// Returns a packed u64: `[cookie:32][name_id:16][depth:16]`.
761#[inline(never)]
762fn enter_cold(name: &'static str) -> u64 {
763    let _ = epoch();
764
765    let cookie = THREAD_COOKIE.with(|c| *c);
766
767    // Drain stale phantoms before computing depth, so the new entry
768    // gets the correct stack position.
769    drain_phantom_cleanup(cookie);
770
771    let name_id = intern_name(name);
772
773    let saved_alloc = crate::alloc::ALLOC_COUNTERS
774        .try_with(|cell| {
775            let snap = cell.get();
776            cell.set(crate::alloc::AllocSnapshot::new());
777            snap
778        })
779        .unwrap_or_default();
780
781    #[cfg(feature = "cpu-time")]
782    let cpu_start_ns = crate::cpu_clock::cpu_now_ns();
783
784    STACK.with(|stack| {
785        let depth = stack.borrow().len() as u16;
786        let packed = pack_cookie_name_depth(cookie, name_id, depth);
787        stack.borrow_mut().push(StackEntry {
788            name,
789            children_ms: 0.0,
790            #[cfg(feature = "cpu-time")]
791            cpu_children_ns: 0,
792            #[cfg(feature = "cpu-time")]
793            cpu_start_ns,
794            saved_alloc,
795            packed,
796        });
797        packed
798    })
799}
800
801/// Start timing a function. Returns a Guard that records the measurement on drop.
802///
803/// Inlined so the counter read (`rdtsc`/`cntvct_el0`) happens at the call
804/// site as a single inline instruction — no function call, no vDSO overhead.
805///
806/// Guard is 16 bytes: fits in two registers on both x86_64 and aarch64,
807/// so the compiler keeps it in registers across the call — zero memory
808/// stores inside the measurement window.
809#[inline(always)]
810pub fn enter(name: &'static str) -> Guard {
811    let packed = enter_cold(name);
812    let start_tsc = crate::tsc::read();
813    Guard { start_tsc, packed }
814}
815
816/// Register a function name so it appears in output even if never called.
817///
818/// Must be called from the same thread that will later call `collect_all()`
819/// or `shutdown()`. In practice this means `main()` -- the AST rewriter
820/// injects `register()` calls at the top of `main()` and `shutdown()` at
821/// the end. Calling `register()` from worker threads will cause those
822/// function names to be missing from aggregated output because `REGISTERED`
823/// is thread-local.
824pub fn register(name: &'static str) {
825    REGISTERED.with(|reg| {
826        let mut reg = reg.borrow_mut();
827        if !reg.contains(&name) {
828            reg.push(name);
829        }
830    });
831}
832
833/// Aggregate raw records into per-function summaries, sorted by self_ms descending.
834struct AggEntry {
835    calls: u64,
836    total_ms: f64,
837    self_ms: f64,
838    #[cfg(feature = "cpu-time")]
839    cpu_self_ns: u64,
840}
841
842impl AggEntry {
843    fn new() -> Self {
844        Self {
845            calls: 0,
846            total_ms: 0.0,
847            self_ms: 0.0,
848            #[cfg(feature = "cpu-time")]
849            cpu_self_ns: 0,
850        }
851    }
852}
853
854fn aggregate(raw: &[RawRecord], registered: &[&str]) -> Vec<FunctionRecord> {
855    let mut map: HashMap<&str, AggEntry> = HashMap::new();
856
857    for name in registered {
858        map.entry(name).or_insert_with(AggEntry::new);
859    }
860
861    for rec in raw {
862        let entry = map.entry(rec.name).or_insert_with(AggEntry::new);
863        entry.calls += 1;
864        entry.total_ms += rec.elapsed_ms;
865        entry.self_ms += (rec.elapsed_ms - rec.children_ms).max(0.0);
866        #[cfg(feature = "cpu-time")]
867        {
868            entry.cpu_self_ns += rec.cpu_self_ns;
869        }
870    }
871
872    let mut result: Vec<FunctionRecord> = map
873        .into_iter()
874        .map(|(name, e)| FunctionRecord {
875            name: name.to_owned(),
876            calls: e.calls,
877            total_ms: e.total_ms,
878            self_ms: e.self_ms,
879            #[cfg(feature = "cpu-time")]
880            cpu_self_ms: e.cpu_self_ns as f64 / 1_000_000.0,
881        })
882        .collect();
883
884    result.sort_by(|a, b| {
885        b.self_ms
886            .partial_cmp(&a.self_ms)
887            .unwrap_or(std::cmp::Ordering::Equal)
888    });
889    result
890}
891
892/// Aggregate raw records into per-function summaries, sorted by self_ms descending.
893/// Reads only from the current thread's record storage.
894pub fn collect() -> Vec<FunctionRecord> {
895    flush_records_buf();
896    RECORDS.with(|records| {
897        let recs = records.lock().unwrap_or_else(|e| e.into_inner());
898        REGISTERED.with(|reg| aggregate(&recs, &reg.borrow()))
899    })
900}
901
902/// Return all raw per-invocation records (not aggregated).
903#[cfg(any(test, feature = "_test_internals"))]
904pub fn collect_invocations() -> Vec<InvocationRecord> {
905    INVOCATIONS.with(|inv| inv.borrow().clone())
906}
907
908/// Return completed per-frame summaries.
909///
910/// No `flush_records_buf()` call is needed here: FRAMES is populated at
911/// depth-0 boundaries -- the same point where RECORDS_BUF is flushed into
912/// RECORDS. By the time a frame appears in FRAMES, its records have already
913/// been flushed.
914pub fn collect_frames() -> Vec<Vec<FrameFnSummary>> {
915    FRAMES.with(|frames| frames.borrow().clone())
916}
917
918/// Aggregate invocation records and push directly into FRAMES.
919///
920/// Uses a thread-local reusable Vec with linear scan instead of HashMap.
921/// For the typical 1-5 unique functions per frame, linear scan avoids
922/// hashing, bucket probing, and memset overhead.
923fn aggregate_frame_into_frames(records: &[InvocationRecord]) {
924    FRAME_AGG_VEC.with(|vec_cell| {
925        let mut agg = vec_cell.borrow_mut();
926        agg.clear();
927        for rec in records {
928            // Linear scan: faster than HashMap for typical 1-5 unique functions.
929            // Function names are interned &'static str, so pointer comparison works.
930            if let Some(entry) = agg.iter_mut().find(|e| std::ptr::eq(e.name, rec.name)) {
931                entry.calls += 1;
932                entry.self_ns += rec.self_ns;
933                #[cfg(feature = "cpu-time")]
934                {
935                    entry.cpu_self_ns += rec.cpu_self_ns;
936                }
937                entry.alloc_count += rec.alloc_count;
938                entry.alloc_bytes += rec.alloc_bytes;
939                entry.free_count += rec.free_count;
940                entry.free_bytes += rec.free_bytes;
941            } else {
942                agg.push(FrameFnSummary {
943                    name: rec.name,
944                    calls: 1,
945                    self_ns: rec.self_ns,
946                    #[cfg(feature = "cpu-time")]
947                    cpu_self_ns: rec.cpu_self_ns,
948                    alloc_count: rec.alloc_count,
949                    alloc_bytes: rec.alloc_bytes,
950                    free_count: rec.free_count,
951                    free_bytes: rec.free_bytes,
952                });
953            }
954        }
955        FRAMES.with(|frames| {
956            frames.borrow_mut().push(agg.clone());
957        });
958    });
959}
960
961/// Collect records from ALL threads via the global registry.
962/// This is the primary collection method for cross-thread profiling — it
963/// captures data from thread-pool workers whose TLS destructors may never fire.
964///
965/// Clones the Arc handles under the global lock, then drops the lock before
966/// iterating per-thread records. This avoids blocking new thread registrations
967/// while aggregation is in progress.
968///
969/// Note: `REGISTERED` (the set of known function names) is read from the
970/// calling thread's TLS only. Function names registered on other threads
971/// will not appear in the output unless they were also recorded via `enter()`.
972/// In the normal flow the AST rewriter injects all `register()` calls into
973/// `main()`, so calling `collect_all()` from `main()` (via `shutdown()`)
974/// sees every registered name.
975pub fn collect_all() -> Vec<FunctionRecord> {
976    // Flush the calling thread's local buffer so its records are visible.
977    // Other threads' buffers are flushed at their own depth-0 boundaries.
978    // Records still buffered on other threads (not yet at a depth-0 boundary)
979    // will be missed. In practice this only affects detached spawns
980    // (std::thread::spawn, rayon::spawn) which have no completion guarantee;
981    // scoped concurrency always completes before the caller returns.
982    flush_records_buf();
983    let arcs: Vec<ThreadRecordArc> = {
984        let registry = thread_records().lock().unwrap_or_else(|e| e.into_inner());
985        registry.clone()
986    };
987    let mut all_raw: Vec<RawRecord> = Vec::new();
988    for arc in &arcs {
989        let records = arc.lock().unwrap_or_else(|e| e.into_inner());
990        all_raw.extend(records.iter().cloned());
991    }
992    let registered: Vec<&str> = REGISTERED
993        .try_with(|reg| reg.borrow().clone())
994        .unwrap_or_default();
995    aggregate(&all_raw, &registered)
996}
997
998/// Clear all collected timing data for the current thread.
999///
1000/// Also drains any pending phantom cleanup entries targeting this thread
1001/// from the global queue, so stale phantoms don't leak across test runs.
1002pub fn reset() {
1003    STACK.with(|stack| stack.borrow_mut().clear());
1004    RECORDS_BUF.with(|buf| buf.borrow_mut().clear());
1005    RECORDS.with(|records| {
1006        records.lock().unwrap_or_else(|e| e.into_inner()).clear();
1007    });
1008    REGISTERED.with(|reg| reg.borrow_mut().clear());
1009    #[cfg(any(test, feature = "_test_internals"))]
1010    INVOCATIONS.with(|inv| inv.borrow_mut().clear());
1011    FRAME_BUFFER.with(|buf| buf.borrow_mut().clear());
1012    FRAMES.with(|frames| frames.borrow_mut().clear());
1013    FRAME_AGG_VEC.with(|v| v.borrow_mut().clear());
1014    PHANTOM_ARCS.with(|arcs| arcs.borrow_mut().clear());
1015    // Drain any pending cleanup entries for this thread from the global queue.
1016    let cookie = THREAD_COOKIE.with(|c| *c);
1017    drain_phantom_cleanup(cookie);
1018}
1019
1020/// Clear collected timing data across ALL threads, plus the calling thread's
1021/// local state (stack, registrations, frame buffers).
1022///
1023/// Unlike `reset()` which only clears the calling thread's records,
1024/// `reset_all()` iterates every Arc in the global `THREAD_RECORDS` registry
1025/// so that a subsequent `collect_all()` sees no stale data from other threads.
1026#[cfg(test)]
1027pub fn reset_all() {
1028    // Clear every thread's record Arc.
1029    let arcs: Vec<ThreadRecordArc> = {
1030        let registry = thread_records().lock().unwrap_or_else(|e| e.into_inner());
1031        registry.clone()
1032    };
1033    for arc in &arcs {
1034        arc.lock().unwrap_or_else(|e| e.into_inner()).clear();
1035    }
1036    // Clear global phantom state.
1037    phantom_registry()
1038        .lock()
1039        .unwrap_or_else(|e| e.into_inner())
1040        .clear();
1041    {
1042        let mut cleanup = phantom_cleanup().lock().unwrap_or_else(|e| e.into_inner());
1043        cleanup.clear();
1044        HAS_PHANTOM_CLEANUP.store(false, Ordering::Relaxed);
1045    }
1046    // Clear the calling thread's local state.
1047    reset();
1048}
1049
1050/// Return the current time as milliseconds since the Unix epoch.
1051fn timestamp_ms() -> u128 {
1052    SystemTime::now()
1053        .duration_since(UNIX_EPOCH)
1054        .unwrap_or_default()
1055        .as_millis()
1056}
1057
1058/// Configure the directory where run files should be written.
1059///
1060/// Called by CLI-injected code at the start of `main()` so that both
1061/// `flush()` and `shutdown()` write to the project-local directory.
1062/// `PIANO_RUNS_DIR` env var still takes priority (for testing and user
1063/// overrides).
1064pub fn set_runs_dir(dir: &str) {
1065    *runs_dir_lock().lock().unwrap_or_else(|e| e.into_inner()) = Some(PathBuf::from(dir));
1066}
1067
1068/// Clear the configured runs directory.
1069///
1070/// Exposed for testing. Not part of the public API.
1071#[cfg(test)]
1072pub fn clear_runs_dir() {
1073    *runs_dir_lock().lock().unwrap_or_else(|e| e.into_inner()) = None;
1074}
1075
1076/// Return the directory where run files should be written.
1077///
1078/// Priority: `PIANO_RUNS_DIR` env var > `set_runs_dir()` > `~/.piano/runs/`.
1079fn runs_dir() -> Option<PathBuf> {
1080    if let Ok(dir) = std::env::var("PIANO_RUNS_DIR") {
1081        return Some(PathBuf::from(dir));
1082    }
1083    if let Some(dir) = runs_dir_lock()
1084        .lock()
1085        .unwrap_or_else(|e| e.into_inner())
1086        .clone()
1087    {
1088        return Some(dir);
1089    }
1090    dirs_fallback().map(|home| home.join(".piano").join("runs"))
1091}
1092
1093/// Best-effort home directory detection (no deps).
1094fn dirs_fallback() -> Option<PathBuf> {
1095    std::env::var_os("HOME").map(PathBuf::from)
1096}
1097
1098/// Write a JSON run file from the given function records.
1099///
1100/// Hand-written JSON via `write!()` — zero serde dependency.
1101fn write_json(records: &[FunctionRecord], path: &std::path::Path) -> std::io::Result<()> {
1102    if let Some(parent) = path.parent() {
1103        std::fs::create_dir_all(parent)?;
1104    }
1105    let mut f = std::fs::File::create(path)?;
1106    let ts = timestamp_ms();
1107    let run_id = run_id();
1108    write!(
1109        f,
1110        "{{\"run_id\":\"{run_id}\",\"timestamp_ms\":{ts},\"functions\":["
1111    )?;
1112    for (i, rec) in records.iter().enumerate() {
1113        if i > 0 {
1114            write!(f, ",")?;
1115        }
1116        // Escape the function name (in practice only ASCII identifiers, but be safe).
1117        let name = rec.name.replace('\\', "\\\\").replace('"', "\\\"");
1118        write!(
1119            f,
1120            "{{\"name\":\"{}\",\"calls\":{},\"total_ms\":{:.3},\"self_ms\":{:.3}",
1121            name, rec.calls, rec.total_ms, rec.self_ms
1122        )?;
1123        #[cfg(feature = "cpu-time")]
1124        write!(f, ",\"cpu_self_ms\":{:.3}", rec.cpu_self_ms)?;
1125        write!(f, "}}")?;
1126    }
1127    writeln!(f, "]}}")?;
1128    Ok(())
1129}
1130
1131/// Write an NDJSON file with frame-level data.
1132///
1133/// Line 1: header with metadata and function name table.
1134/// Lines 2+: one line per frame with per-function summaries.
1135fn write_ndjson(
1136    frames: &[Vec<FrameFnSummary>],
1137    fn_names: &[&str],
1138    path: &std::path::Path,
1139) -> std::io::Result<()> {
1140    if let Some(parent) = path.parent() {
1141        std::fs::create_dir_all(parent)?;
1142    }
1143    let mut f = std::fs::File::create(path)?;
1144    let ts = timestamp_ms();
1145    let run_id = run_id();
1146
1147    // Header line: metadata + function name table
1148    write!(
1149        f,
1150        "{{\"format_version\":3,\"run_id\":\"{run_id}\",\"timestamp_ms\":{ts}"
1151    )?;
1152    #[cfg(feature = "cpu-time")]
1153    write!(f, ",\"has_cpu_time\":true")?;
1154    write!(f, ",\"functions\":[")?;
1155    for (i, name) in fn_names.iter().enumerate() {
1156        if i > 0 {
1157            write!(f, ",")?;
1158        }
1159        let name = name.replace('\\', "\\\\").replace('"', "\\\"");
1160        write!(f, "\"{name}\"")?;
1161    }
1162    writeln!(f, "]}}")?;
1163
1164    // Build index for O(1) fn_id lookup
1165    let fn_id_map: HashMap<&str, usize> =
1166        fn_names.iter().enumerate().map(|(i, &n)| (n, i)).collect();
1167
1168    // One line per frame
1169    for (frame_idx, frame) in frames.iter().enumerate() {
1170        write!(f, "{{\"frame\":{frame_idx},\"fns\":[")?;
1171        for (i, s) in frame.iter().enumerate() {
1172            if i > 0 {
1173                write!(f, ",")?;
1174            }
1175            let fn_id = fn_id_map.get(s.name).copied().unwrap_or(0);
1176            write!(
1177                f,
1178                "{{\"id\":{},\"calls\":{},\"self_ns\":{},\"ac\":{},\"ab\":{},\"fc\":{},\"fb\":{}",
1179                fn_id, s.calls, s.self_ns, s.alloc_count, s.alloc_bytes, s.free_count, s.free_bytes
1180            )?;
1181            #[cfg(feature = "cpu-time")]
1182            write!(f, ",\"csn\":{}", s.cpu_self_ns)?;
1183            write!(f, "}}")?;
1184        }
1185        writeln!(f, "]}}")?;
1186    }
1187    Ok(())
1188}
1189
1190/// Flush collected timing data to disk.
1191///
1192/// If frame data is present, writes NDJSON format. Otherwise falls back
1193/// to JSON for non-frame workloads.
1194///
1195/// Normally you don't need to call this — `shutdown()` flushes all threads
1196/// at the end of main. This function exists for explicit mid-program flushes
1197/// of the current thread's data.
1198pub fn flush() {
1199    let dir = match runs_dir() {
1200        Some(d) => d,
1201        None => return,
1202    };
1203
1204    let frames = collect_frames();
1205    if !frames.is_empty() {
1206        let mut seen = HashSet::new();
1207        let mut fn_names: Vec<&str> = Vec::new();
1208        for frame in &frames {
1209            for s in frame {
1210                if seen.insert(s.name) {
1211                    fn_names.push(s.name);
1212                }
1213            }
1214        }
1215        let path = dir.join(format!("{}.ndjson", timestamp_ms()));
1216        let _ = write_ndjson(&frames, &fn_names, &path);
1217    } else {
1218        let records = collect();
1219        if records.is_empty() {
1220            return;
1221        }
1222        let path = dir.join(format!("{}.json", timestamp_ms()));
1223        let _ = write_json(&records, &path);
1224    }
1225    reset();
1226}
1227
1228/// No-op retained for API compatibility.
1229///
1230/// Flushing now happens via `shutdown()` at the end of main.
1231/// Instrumented code may still call `init()` — it's harmless.
1232pub fn init() {}
1233
1234/// Flush all collected timing data from ALL threads and write to disk.
1235///
1236/// Collects from the global per-thread registry, so data from thread-pool
1237/// workers is included. Injected at the end of main() by the AST rewriter.
1238///
1239/// Writes NDJSON if frame data is present (from the calling thread), and
1240/// always writes JSON with cross-thread aggregation from all registered Arcs.
1241pub fn shutdown() {
1242    let dir = match runs_dir() {
1243        Some(d) => d,
1244        None => return,
1245    };
1246    if shutdown_impl(&dir) {
1247        std::process::exit(70);
1248    }
1249}
1250
1251/// Like `shutdown`, but writes run files to the specified directory.
1252///
1253/// Used by the CLI to write to project-local `target/piano/runs/` instead
1254/// of the global `~/.piano/runs/`. `PIANO_RUNS_DIR` env var takes priority
1255/// if set (for testing and user overrides).
1256pub fn shutdown_to(dir: &str) {
1257    let failed = if let Ok(override_dir) = std::env::var("PIANO_RUNS_DIR") {
1258        shutdown_impl(std::path::Path::new(&override_dir))
1259    } else {
1260        shutdown_impl(std::path::Path::new(dir))
1261    };
1262    if failed {
1263        std::process::exit(70);
1264    }
1265}
1266
1267/// Returns `true` if any write failed.
1268fn shutdown_impl(dir: &std::path::Path) -> bool {
1269    let mut write_failed = false;
1270    let ts = timestamp_ms();
1271
1272    // Write frame-level data if present (NDJSON format).
1273    let frames = collect_frames();
1274    if !frames.is_empty() {
1275        let mut seen = HashSet::new();
1276        let mut fn_names: Vec<&str> = Vec::new();
1277        for frame in &frames {
1278            for s in frame {
1279                if seen.insert(s.name) {
1280                    fn_names.push(s.name);
1281                }
1282            }
1283        }
1284        let path = dir.join(format!("{ts}.ndjson"));
1285        if let Err(e) = write_ndjson(&frames, &fn_names, &path) {
1286            eprintln!(
1287                "piano: failed to write profiling data to {}: {e}",
1288                path.display()
1289            );
1290            write_failed = true;
1291        }
1292    }
1293
1294    // Always write aggregated cross-thread data (JSON format).
1295    let records = collect_all();
1296    if !records.is_empty() {
1297        let path = dir.join(format!("{ts}.json"));
1298        if let Err(e) = write_json(&records, &path) {
1299            eprintln!(
1300                "piano: failed to write profiling data to {}: {e}",
1301                path.display()
1302            );
1303            write_failed = true;
1304        }
1305    }
1306    write_failed
1307}
1308
1309/// Context for propagating parent-child CPU timing across thread boundaries.
1310///
1311/// Created by `fork()` on the parent thread, passed to child threads via
1312/// `adopt()`. When the child completes, its CPU time is accumulated
1313/// in `children_cpu_ns` which the parent reads back via Drop (or explicit `finalize()`).
1314/// Wall time is NOT propagated cross-thread (it's not additive for parallel work).
1315pub struct SpanContext {
1316    parent_name: &'static str,
1317    #[cfg(feature = "cpu-time")]
1318    children_cpu_ns: Arc<Mutex<u64>>,
1319    finalized: bool,
1320}
1321
1322impl SpanContext {
1323    /// Explicitly finalize cross-thread attribution.
1324    /// Equivalent to dropping the SpanContext, but makes intent clear.
1325    pub fn finalize(mut self) {
1326        self.apply_children();
1327        self.finalized = true;
1328    }
1329
1330    fn apply_children(&self) {
1331        #[cfg(feature = "cpu-time")]
1332        {
1333            let children_cpu = *self
1334                .children_cpu_ns
1335                .lock()
1336                .unwrap_or_else(|e| e.into_inner());
1337            STACK.with(|stack| {
1338                if let Some(top) = stack.borrow_mut().last_mut() {
1339                    top.cpu_children_ns += children_cpu;
1340                }
1341            });
1342        }
1343    }
1344}
1345
1346impl Drop for SpanContext {
1347    fn drop(&mut self) {
1348        if !self.finalized {
1349            self.apply_children();
1350        }
1351    }
1352}
1353
1354/// RAII guard for cross-thread adoption. Pops the synthetic parent on drop
1355/// and propagates CPU time back to the parent's `SpanContext`.
1356#[must_use = "dropping AdoptGuard immediately records ~0ms; bind it with `let _guard = ...`"]
1357pub struct AdoptGuard {
1358    #[cfg(feature = "cpu-time")]
1359    cpu_start_ns: u64,
1360    #[cfg(feature = "cpu-time")]
1361    ctx_children_cpu_ns: Arc<Mutex<u64>>,
1362}
1363
1364impl Drop for AdoptGuard {
1365    fn drop(&mut self) {
1366        // Restore the parent's saved alloc counters (same pattern as Guard::drop).
1367        // The adopted scope's alloc data isn't recorded into an InvocationRecord,
1368        // but the restore is necessary for correct nesting.
1369        STACK.with(|stack| {
1370            let entry = match stack.borrow_mut().pop() {
1371                Some(e) => e,
1372                None => return,
1373            };
1374
1375            let _ = crate::alloc::ALLOC_COUNTERS.try_with(|cell| {
1376                cell.set(entry.saved_alloc);
1377            });
1378
1379            // Propagate this thread's CPU time back to the parent context.
1380            #[cfg(feature = "cpu-time")]
1381            {
1382                let cpu_elapsed_ns =
1383                    crate::cpu_clock::cpu_now_ns().saturating_sub(self.cpu_start_ns);
1384                let mut cpu_children = self
1385                    .ctx_children_cpu_ns
1386                    .lock()
1387                    .unwrap_or_else(|e| e.into_inner());
1388                *cpu_children += cpu_elapsed_ns;
1389            }
1390        });
1391    }
1392}
1393
1394/// Capture the current stack top as a cross-thread span context.
1395///
1396/// Returns `None` if the call stack is empty (no active span to fork from).
1397/// Pass the returned context to child threads via `adopt()`.
1398pub fn fork() -> Option<SpanContext> {
1399    STACK.with(|stack| {
1400        let stack = stack.borrow();
1401        let top = stack.last()?;
1402        Some(SpanContext {
1403            parent_name: top.name,
1404            #[cfg(feature = "cpu-time")]
1405            children_cpu_ns: Arc::new(Mutex::new(0)),
1406            finalized: false,
1407        })
1408    })
1409}
1410
1411/// Adopt a parent span context on a child thread.
1412///
1413/// Pushes a synthetic parent entry so that `enter()`/`Guard::drop()` on this
1414/// thread correctly attributes children time. Returns an `AdoptGuard` that
1415/// propagates CPU time back to the parent on drop.
1416pub fn adopt(ctx: &SpanContext) -> AdoptGuard {
1417    // Save current alloc counters and zero them, same as enter().
1418    let saved_alloc = crate::alloc::ALLOC_COUNTERS
1419        .try_with(|cell| {
1420            let snap = cell.get();
1421            cell.set(crate::alloc::AllocSnapshot::new());
1422            snap
1423        })
1424        .unwrap_or_default();
1425
1426    #[cfg(feature = "cpu-time")]
1427    let cpu_start_ns = crate::cpu_clock::cpu_now_ns();
1428
1429    let cookie = THREAD_COOKIE.with(|c| *c);
1430    STACK.with(|stack| {
1431        let depth = stack.borrow().len() as u16;
1432        stack.borrow_mut().push(StackEntry {
1433            name: ctx.parent_name,
1434            children_ms: 0.0,
1435            #[cfg(feature = "cpu-time")]
1436            cpu_children_ns: 0,
1437            #[cfg(feature = "cpu-time")]
1438            cpu_start_ns,
1439            saved_alloc,
1440            packed: pack_cookie_name_depth(cookie, intern_name(ctx.parent_name), depth),
1441        });
1442    });
1443
1444    AdoptGuard {
1445        #[cfg(feature = "cpu-time")]
1446        cpu_start_ns,
1447        #[cfg(feature = "cpu-time")]
1448        ctx_children_cpu_ns: Arc::clone(&ctx.children_cpu_ns),
1449    }
1450}
1451
1452/// CPU-bound workload for testing: hash a buffer `iterations` times.
1453/// Uses wrapping arithmetic to prevent optimization while staying deterministic.
1454#[cfg(test)]
1455pub(crate) fn burn_cpu(iterations: u64) {
1456    let mut buf = [0x42u8; 4096];
1457    for i in 0..iterations {
1458        for b in &mut buf {
1459            *b = b.wrapping_add(i as u8).wrapping_mul(31);
1460        }
1461    }
1462    std::hint::black_box(&buf);
1463}
1464
1465#[cfg(test)]
1466mod tests {
1467    use super::*;
1468    use std::thread;
1469
1470    #[test]
1471    fn flush_writes_valid_output_to_env_dir() {
1472        reset();
1473        {
1474            let _g = enter("flush_test");
1475            burn_cpu(5_000);
1476        }
1477
1478        let tmp = std::env::temp_dir().join(format!("piano_test_{}", std::process::id()));
1479        std::fs::create_dir_all(&tmp).unwrap();
1480
1481        // Point flush at our temp dir.
1482        // SAFETY: Test runs serially (no concurrent env access).
1483        unsafe { std::env::set_var("PIANO_RUNS_DIR", &tmp) };
1484        flush();
1485        unsafe { std::env::remove_var("PIANO_RUNS_DIR") };
1486
1487        // Find written file (NDJSON for frame workloads).
1488        let files: Vec<_> = std::fs::read_dir(&tmp)
1489            .unwrap()
1490            .filter_map(|e| e.ok())
1491            .filter(|e| {
1492                let ext = e.path().extension().map(|e| e.to_owned());
1493                ext.as_deref() == Some(std::ffi::OsStr::new("ndjson"))
1494                    || ext.as_deref() == Some(std::ffi::OsStr::new("json"))
1495            })
1496            .collect();
1497        assert!(!files.is_empty(), "expected at least one output file");
1498
1499        let content = std::fs::read_to_string(files[0].path()).unwrap();
1500        assert!(
1501            content.contains("flush_test"),
1502            "should contain function name"
1503        );
1504        assert!(
1505            content.contains("timestamp_ms"),
1506            "should contain timestamp_ms"
1507        );
1508
1509        // Cleanup.
1510        let _ = std::fs::remove_dir_all(&tmp);
1511    }
1512
1513    #[test]
1514    fn write_json_produces_valid_format() {
1515        let records = vec![
1516            FunctionRecord {
1517                name: "walk".into(),
1518                calls: 3,
1519                total_ms: 12.5,
1520                self_ms: 8.3,
1521                #[cfg(feature = "cpu-time")]
1522                cpu_self_ms: 7.0,
1523            },
1524            FunctionRecord {
1525                name: "resolve".into(),
1526                calls: 1,
1527                total_ms: 4.2,
1528                self_ms: 4.2,
1529                #[cfg(feature = "cpu-time")]
1530                cpu_self_ms: 4.1,
1531            },
1532        ];
1533        let tmp = std::env::temp_dir().join(format!("piano_json_{}.json", std::process::id()));
1534        write_json(&records, &tmp).unwrap();
1535
1536        let content = std::fs::read_to_string(&tmp).unwrap();
1537
1538        // Verify structure.
1539        assert!(
1540            content.starts_with("{\"run_id\":\""),
1541            "should start with run_id"
1542        );
1543        assert!(
1544            content.contains("\"timestamp_ms\":"),
1545            "should contain timestamp_ms"
1546        );
1547        assert!(
1548            content.contains("\"functions\":["),
1549            "should have functions array"
1550        );
1551        assert!(content.contains("\"walk\""), "should contain walk");
1552        assert!(content.contains("\"resolve\""), "should contain resolve");
1553        assert!(content.contains("\"calls\":3"), "should have calls count");
1554
1555        let _ = std::fs::remove_file(&tmp);
1556    }
1557
1558    #[cfg(feature = "cpu-time")]
1559    #[test]
1560    fn write_json_includes_cpu_self_ms() {
1561        let records = vec![FunctionRecord {
1562            name: "compute".into(),
1563            calls: 5,
1564            total_ms: 10.0,
1565            self_ms: 8.0,
1566            cpu_self_ms: 7.5,
1567        }];
1568        let tmp = std::env::temp_dir().join(format!("piano_cpu_json_{}.json", std::process::id()));
1569        write_json(&records, &tmp).unwrap();
1570        let content = std::fs::read_to_string(&tmp).unwrap();
1571        assert!(
1572            content.contains("\"cpu_self_ms\":7.500"),
1573            "JSON should contain cpu_self_ms, got: {content}"
1574        );
1575        let _ = std::fs::remove_file(&tmp);
1576    }
1577
1578    #[test]
1579    fn init_can_be_called_multiple_times() {
1580        // init() is a no-op retained for API compatibility.
1581        init();
1582        init();
1583        init();
1584    }
1585
1586    #[test]
1587    fn single_function_timing() {
1588        reset();
1589        {
1590            let _g = enter("work");
1591            burn_cpu(5_000);
1592        }
1593        let records = collect();
1594        assert_eq!(records.len(), 1);
1595        assert_eq!(records[0].name, "work");
1596        assert_eq!(records[0].calls, 1);
1597    }
1598
1599    #[test]
1600    fn nested_function_self_time() {
1601        reset();
1602        {
1603            let _outer = enter("outer");
1604            burn_cpu(5_000);
1605            {
1606                let _inner = enter("inner");
1607                burn_cpu(10_000);
1608            }
1609        }
1610        let records = collect();
1611        let outer = records
1612            .iter()
1613            .find(|r| r.name == "outer")
1614            .expect("outer not found");
1615        let inner = records
1616            .iter()
1617            .find(|r| r.name == "inner")
1618            .expect("inner not found");
1619
1620        // Structural: outer self < total because inner subtracts.
1621        assert!(
1622            outer.self_ms < outer.total_ms,
1623            "self ({:.3}) should be less than total ({:.3})",
1624            outer.self_ms,
1625            outer.total_ms
1626        );
1627        // Inner is a leaf -- self ~ total within 10%.
1628        let diff = (inner.self_ms - inner.total_ms).abs();
1629        assert!(
1630            diff < inner.total_ms * 0.1,
1631            "inner self_ms={:.3} total_ms={:.3}",
1632            inner.self_ms,
1633            inner.total_ms
1634        );
1635    }
1636
1637    #[test]
1638    fn call_count_tracking() {
1639        reset();
1640        for _ in 0..5 {
1641            let _g = enter("repeated");
1642        }
1643        let records = collect();
1644        assert_eq!(records.len(), 1);
1645        assert_eq!(records[0].name, "repeated");
1646        assert_eq!(records[0].calls, 5);
1647    }
1648
1649    #[test]
1650    fn reset_clears_state() {
1651        reset();
1652        {
1653            let _g = enter("something");
1654            burn_cpu(1_000);
1655        }
1656        reset();
1657        let records = collect();
1658        assert!(
1659            records.is_empty(),
1660            "expected empty after reset, got {} records",
1661            records.len()
1662        );
1663    }
1664
1665    #[test]
1666    fn collect_sorts_by_self_time_descending() {
1667        reset();
1668        {
1669            let _g = enter("fast");
1670            burn_cpu(1_000);
1671        }
1672        {
1673            let _g = enter("slow");
1674            burn_cpu(50_000);
1675        }
1676        let records = collect();
1677        assert_eq!(
1678            records.len(),
1679            2,
1680            "expected 2 records, got {}: {:?}",
1681            records.len(),
1682            records.iter().map(|r| &r.name).collect::<Vec<_>>()
1683        );
1684        assert_eq!(
1685            records[0].name, "slow",
1686            "expected slow first, got {:?}",
1687            records[0].name
1688        );
1689        assert_eq!(
1690            records[1].name, "fast",
1691            "expected fast second, got {:?}",
1692            records[1].name
1693        );
1694    }
1695
1696    #[test]
1697    fn registered_but_uncalled_functions_appear_with_zero_calls() {
1698        reset();
1699        register("never_called");
1700        {
1701            let _g = enter("called_once");
1702            burn_cpu(1_000);
1703        }
1704        let records = collect();
1705        assert_eq!(records.len(), 2, "should have both functions");
1706        let never = records
1707            .iter()
1708            .find(|r| r.name == "never_called")
1709            .expect("never_called");
1710        assert_eq!(never.calls, 0);
1711        assert!((never.total_ms).abs() < f64::EPSILON);
1712        assert!((never.self_ms).abs() < f64::EPSILON);
1713        let called = records
1714            .iter()
1715            .find(|r| r.name == "called_once")
1716            .expect("called_once");
1717        assert_eq!(called.calls, 1);
1718    }
1719
1720    #[test]
1721    fn output_contains_run_id() {
1722        reset();
1723        {
1724            let _g = enter("rid_test");
1725            burn_cpu(1_000);
1726        }
1727        let tmp = std::env::temp_dir().join(format!("piano_rid_{}", std::process::id()));
1728        std::fs::create_dir_all(&tmp).unwrap();
1729        unsafe { std::env::set_var("PIANO_RUNS_DIR", &tmp) };
1730        flush();
1731        unsafe { std::env::remove_var("PIANO_RUNS_DIR") };
1732        let files: Vec<_> = std::fs::read_dir(&tmp)
1733            .unwrap()
1734            .filter_map(|e| e.ok())
1735            .filter(|e| {
1736                let ext = e.path().extension().map(|e| e.to_owned());
1737                ext.as_deref() == Some(std::ffi::OsStr::new("ndjson"))
1738                    || ext.as_deref() == Some(std::ffi::OsStr::new("json"))
1739            })
1740            .collect();
1741        assert!(!files.is_empty());
1742        let content = std::fs::read_to_string(files[0].path()).unwrap();
1743        assert!(
1744            content.contains("\"run_id\":\""),
1745            "should contain run_id field: {content}"
1746        );
1747        let _ = std::fs::remove_dir_all(&tmp);
1748    }
1749
1750    #[test]
1751    fn negative_self_time_clamped_to_zero() {
1752        // Regression test for the f64 drift clamp in aggregate().
1753        // Construct a synthetic RawRecord where children_ms slightly exceeds elapsed_ms
1754        // (simulating floating-point accumulation drift).
1755        let raw = vec![RawRecord {
1756            name: "drifted",
1757            elapsed_ms: 10.0,
1758            children_ms: 10.001,
1759            #[cfg(feature = "cpu-time")]
1760            cpu_self_ns: 0,
1761        }];
1762        let result = aggregate(&raw, &[]);
1763        assert_eq!(result.len(), 1);
1764        assert_eq!(
1765            result[0].self_ms, 0.0,
1766            "negative self-time should be clamped to zero"
1767        );
1768    }
1769
1770    #[test]
1771    fn deep_nesting_100_levels() {
1772        reset();
1773
1774        // Pre-generate static names for each level.
1775        let names: Vec<&'static str> = (0..100)
1776            .map(|i| -> &'static str { Box::leak(format!("level_{i}").into_boxed_str()) })
1777            .collect();
1778
1779        // Build nested call tree iteratively using a vec of guards.
1780        let mut guards = Vec::with_capacity(100);
1781        for name in &names {
1782            guards.push(enter(name));
1783            burn_cpu(1_000);
1784        }
1785        // Drop guards in reverse order (innermost first).
1786        while let Some(g) = guards.pop() {
1787            drop(g);
1788        }
1789
1790        let records = collect();
1791        assert_eq!(records.len(), 100, "expected 100 functions");
1792
1793        // No negative self-times.
1794        for rec in &records {
1795            assert!(
1796                rec.self_ms >= 0.0,
1797                "{} has negative self_ms: {}",
1798                rec.name,
1799                rec.self_ms
1800            );
1801        }
1802
1803        reset();
1804    }
1805
1806    #[test]
1807    fn fork_returns_none_with_empty_stack() {
1808        reset();
1809        assert!(fork().is_none(), "fork should return None with empty stack");
1810    }
1811
1812    #[test]
1813    fn fork_adopt_propagates_child_time_to_parent() {
1814        reset();
1815        {
1816            let _parent = enter("parent_fn");
1817            burn_cpu(5_000);
1818
1819            let ctx = fork().expect("should have parent on stack");
1820
1821            // Simulate a child thread (same thread for test simplicity).
1822            {
1823                let _adopt = adopt(&ctx);
1824                {
1825                    let _child = enter("child_fn");
1826                    burn_cpu(20_000);
1827                }
1828            }
1829
1830            ctx.finalize();
1831        }
1832
1833        let records = collect();
1834        let parent = records.iter().find(|r| r.name == "parent_fn").unwrap();
1835        let child = records.iter().find(|r| r.name == "child_fn").unwrap();
1836
1837        // Both recorded with correct call counts.
1838        assert_eq!(parent.calls, 1);
1839        assert_eq!(child.calls, 1);
1840        // Parent total exceeds child total.
1841        assert!(
1842            parent.total_ms > child.total_ms,
1843            "parent total ({:.1}ms) should exceed child total ({:.1}ms)",
1844            parent.total_ms,
1845            child.total_ms
1846        );
1847
1848        // Wall self no longer reduced by cross-thread children.
1849        assert!(
1850            parent.self_ms > parent.total_ms * 0.5,
1851            "parent self ({:.1}ms) should not be reduced by cross-thread child wall. total={:.1}ms",
1852            parent.self_ms,
1853            parent.total_ms
1854        );
1855    }
1856
1857    #[test]
1858    fn adopt_without_child_work_adds_minimal_overhead() {
1859        reset();
1860        {
1861            let _parent = enter("overhead_parent");
1862            let ctx = fork().unwrap();
1863            {
1864                let _adopt = adopt(&ctx);
1865                // No work on child thread.
1866            }
1867            ctx.finalize();
1868        }
1869
1870        let records = collect();
1871        let parent = records
1872            .iter()
1873            .find(|r| r.name == "overhead_parent")
1874            .unwrap();
1875        // Parent should still have valid timing.
1876        assert!(parent.calls == 1);
1877        assert!(parent.total_ms >= 0.0);
1878    }
1879
1880    #[test]
1881    fn multiple_children_accumulate_in_parent() {
1882        reset();
1883        {
1884            let _parent = enter("multi_parent");
1885            burn_cpu(5_000);
1886
1887            let ctx = fork().unwrap();
1888
1889            // Simulate 3 child threads.
1890            for _ in 0..3 {
1891                let _adopt = adopt(&ctx);
1892                {
1893                    let _child = enter("worker");
1894                    burn_cpu(10_000);
1895                }
1896            }
1897
1898            ctx.finalize();
1899        }
1900
1901        let records = collect();
1902        let parent = records.iter().find(|r| r.name == "multi_parent").unwrap();
1903        let worker = records.iter().find(|r| r.name == "worker").unwrap();
1904
1905        assert_eq!(parent.calls, 1, "parent should have 1 call");
1906        assert_eq!(worker.calls, 3, "should have 3 worker calls");
1907    }
1908
1909    #[test]
1910    fn invocation_records_capture_depth() {
1911        reset();
1912        {
1913            let _outer = enter("outer");
1914            burn_cpu(5_000);
1915            {
1916                let _inner = enter("inner");
1917                burn_cpu(5_000);
1918            }
1919        }
1920        let invocations = collect_invocations();
1921        let outer_inv = invocations.iter().find(|r| r.name == "outer").unwrap();
1922        let inner_inv = invocations.iter().find(|r| r.name == "inner").unwrap();
1923        assert_eq!(outer_inv.depth, 0);
1924        assert_eq!(inner_inv.depth, 1);
1925    }
1926
1927    #[test]
1928    fn cross_thread_fork_adopt_propagates() {
1929        reset();
1930        {
1931            let _parent = enter("parent_fn");
1932            burn_cpu(5_000);
1933
1934            let ctx = fork().expect("should have parent on stack");
1935
1936            thread::scope(|s| {
1937                s.spawn(|| {
1938                    let _adopt = adopt(&ctx);
1939                    {
1940                        let _child = enter("thread_child");
1941                        burn_cpu(10_000);
1942                    }
1943                });
1944            });
1945
1946            ctx.finalize();
1947        }
1948
1949        let records = collect();
1950        let parent = records.iter().find(|r| r.name == "parent_fn").unwrap();
1951
1952        // collect() is thread-local so we can only see the parent.
1953        // Wall self no longer reduced by cross-thread children.
1954        assert_eq!(parent.calls, 1);
1955        assert!(
1956            parent.self_ms > parent.total_ms * 0.5,
1957            "parent self ({:.1}ms) should not be reduced by cross-thread child wall. total={:.1}ms",
1958            parent.self_ms,
1959            parent.total_ms
1960        );
1961    }
1962
1963    #[test]
1964    fn write_ndjson_format() {
1965        reset();
1966        for _ in 0..2 {
1967            let _outer = enter("update");
1968            burn_cpu(5_000);
1969            {
1970                let _inner = enter("physics");
1971                burn_cpu(5_000);
1972            }
1973        }
1974
1975        let tmp = std::env::temp_dir().join(format!("piano_ndjson_{}", std::process::id()));
1976        std::fs::create_dir_all(&tmp).unwrap();
1977
1978        unsafe { std::env::set_var("PIANO_RUNS_DIR", &tmp) };
1979        flush();
1980        unsafe { std::env::remove_var("PIANO_RUNS_DIR") };
1981
1982        let files: Vec<_> = std::fs::read_dir(&tmp)
1983            .unwrap()
1984            .filter_map(|e| e.ok())
1985            .filter(|e| e.path().extension().is_some_and(|ext| ext == "ndjson"))
1986            .collect();
1987        assert!(!files.is_empty(), "should write .ndjson file");
1988
1989        let content = std::fs::read_to_string(files[0].path()).unwrap();
1990        let lines: Vec<&str> = content.lines().collect();
1991
1992        // First line is header
1993        assert!(lines[0].contains("\"format_version\":3"));
1994        assert!(lines[0].contains("\"functions\""));
1995
1996        // Remaining lines are frames
1997        assert!(lines.len() >= 3, "header + 2 frames, got {}", lines.len());
1998        assert!(lines[1].contains("\"frame\":0"));
1999        assert!(lines[2].contains("\"frame\":1"));
2000
2001        let _ = std::fs::remove_dir_all(&tmp);
2002    }
2003
2004    #[test]
2005    fn frame_boundary_aggregation() {
2006        reset();
2007        // Simulate 3 frames: depth-0 function called 3 times
2008        for _frame in 0..3u32 {
2009            let _outer = enter("update");
2010            burn_cpu(5_000);
2011            {
2012                let _inner = enter("physics");
2013                burn_cpu(5_000);
2014            }
2015        }
2016        let frames = collect_frames();
2017        assert_eq!(frames.len(), 3, "should have 3 frames");
2018        for frame in &frames {
2019            let update = frame.iter().find(|s| s.name == "update").unwrap();
2020            assert_eq!(update.calls, 1);
2021            let physics = frame.iter().find(|s| s.name == "physics").unwrap();
2022            assert_eq!(physics.calls, 1);
2023        }
2024    }
2025
2026    #[test]
2027    fn non_frame_workload_still_collects() {
2028        reset();
2029        // All calls at depth 0 but no "frame" structure
2030        {
2031            let _a = enter("parse");
2032            burn_cpu(5_000);
2033        }
2034        {
2035            let _b = enter("resolve");
2036            burn_cpu(5_000);
2037        }
2038        // Each depth-0 return is a frame boundary, so we get 2 single-function frames
2039        let frames = collect_frames();
2040        assert_eq!(frames.len(), 2, "each depth-0 return creates a frame");
2041
2042        // Aggregate collect() should still work
2043        let records = collect();
2044        assert_eq!(records.len(), 2);
2045    }
2046
2047    #[test]
2048    fn records_from_other_threads_are_captured_via_shutdown() {
2049        reset();
2050        // Spawn a thread that does work, then joins.
2051        // With TLS-only storage, the thread's records would be lost
2052        // if TLS destructors don't fire (as with rayon workers).
2053        // With per-thread Arc storage, collect_all() can collect them.
2054        std::thread::scope(|s| {
2055            s.spawn(|| {
2056                let _g = enter("thread_work");
2057                burn_cpu(10_000);
2058            });
2059        });
2060
2061        let records = collect_all();
2062        let thread_work = records.iter().find(|r| r.name == "thread_work");
2063        assert!(
2064            thread_work.is_some(),
2065            "thread_work should be captured via global registry. Got: {:?}",
2066            records.iter().map(|r| &r.name).collect::<Vec<_>>()
2067        );
2068        // Use >= instead of == because collect_all() reads all threads and
2069        // may include stale records from concurrent tests.
2070        assert!(thread_work.unwrap().calls >= 1);
2071    }
2072
2073    #[test]
2074    fn span_context_auto_finalizes_on_drop() {
2075        reset();
2076        {
2077            let _parent = enter("auto_parent");
2078            burn_cpu(5_000);
2079
2080            // fork + adopt, but do NOT call finalize() — rely on Drop.
2081            {
2082                let ctx = fork().expect("should have parent on stack");
2083                {
2084                    let _adopt = adopt(&ctx);
2085                    {
2086                        let _child = enter("auto_child");
2087                        burn_cpu(20_000);
2088                    }
2089                }
2090                // ctx drops here — should auto-finalize
2091            }
2092        }
2093
2094        let records = collect();
2095        let parent = records.iter().find(|r| r.name == "auto_parent").unwrap();
2096
2097        // Wall self no longer reduced by cross-thread children.
2098        assert!(
2099            parent.self_ms > parent.total_ms * 0.5,
2100            "parent self ({:.1}ms) should not be reduced by cross-thread child wall. total={:.1}ms",
2101            parent.self_ms,
2102            parent.total_ms
2103        );
2104    }
2105
2106    #[test]
2107    fn shutdown_writes_json_with_all_thread_data() {
2108        reset();
2109        std::thread::scope(|s| {
2110            s.spawn(|| {
2111                let _g = enter("shutdown_thread_work");
2112                burn_cpu(10_000);
2113            });
2114        });
2115        {
2116            let _g = enter("shutdown_main_work");
2117            burn_cpu(5_000);
2118        }
2119
2120        let tmp = std::env::temp_dir().join(format!("piano_shutdown_{}", timestamp_ms()));
2121        std::fs::create_dir_all(&tmp).unwrap();
2122        unsafe { std::env::set_var("PIANO_RUNS_DIR", &tmp) };
2123        shutdown();
2124        unsafe { std::env::remove_var("PIANO_RUNS_DIR") };
2125
2126        let files: Vec<_> = std::fs::read_dir(&tmp)
2127            .unwrap()
2128            .filter_map(|e| e.ok())
2129            .filter(|e| e.path().extension().is_some_and(|ext| ext == "json"))
2130            .collect();
2131        assert!(!files.is_empty(), "shutdown should write JSON");
2132
2133        let content = std::fs::read_to_string(files[0].path()).unwrap();
2134        assert!(
2135            content.contains("\"shutdown_thread_work\""),
2136            "should contain thread work: {content}"
2137        );
2138        assert!(
2139            content.contains("\"shutdown_main_work\""),
2140            "should contain main work: {content}"
2141        );
2142
2143        let _ = std::fs::remove_dir_all(&tmp);
2144    }
2145
2146    #[test]
2147    fn fork_adopt_does_not_inflate_reported_times() {
2148        // Verify that fork/adopt overhead is NOT attributed to any function.
2149        // Only instrumented functions (via enter()) should appear in output.
2150        reset();
2151        {
2152            let _parent = enter("timed_parent");
2153            burn_cpu(5_000);
2154
2155            let ctx = fork().unwrap();
2156
2157            // Simulate rayon: 4 children each doing work
2158            for _ in 0..4 {
2159                let _adopt = adopt(&ctx);
2160                {
2161                    let _child = enter("timed_child");
2162                    burn_cpu(10_000);
2163                }
2164            }
2165            // ctx auto-finalizes on drop
2166        }
2167
2168        // No cross-thread spawning here, so thread-local collect() is sufficient
2169        // and avoids picking up stale records from other threads in parallel tests.
2170        let records = collect();
2171
2172        // Only "timed_parent" and "timed_child" should appear. No adopt/fork entries.
2173        let names: Vec<&str> = records.iter().map(|r| r.name.as_str()).collect();
2174        assert!(
2175            !names
2176                .iter()
2177                .any(|n| n.contains("adopt") || n.contains("fork") || n.contains("piano")),
2178            "fork/adopt should not appear in output. Got: {names:?}",
2179        );
2180
2181        let parent = records.iter().find(|r| r.name == "timed_parent").unwrap();
2182        let child = records.iter().find(|r| r.name == "timed_child").unwrap();
2183
2184        // Parent should appear once, child 4 times.
2185        assert_eq!(parent.calls, 1);
2186        assert_eq!(child.calls, 4);
2187    }
2188
2189    #[test]
2190    #[ignore] // reset_all() clears ALL threads' records; must run in isolation
2191    fn reset_all_clears_cross_thread_records() {
2192        reset();
2193        // Produce records on a spawned thread.
2194        std::thread::scope(|s| {
2195            s.spawn(|| {
2196                let _g = enter("reset_all_thread");
2197                burn_cpu(5_000);
2198            });
2199        });
2200        // Verify the spawned-thread record is visible via collect_all().
2201        let before = collect_all();
2202        assert!(
2203            before.iter().any(|r| r.name == "reset_all_thread"),
2204            "should see cross-thread record before reset_all"
2205        );
2206
2207        // reset_all() should clear all threads' records.
2208        reset_all();
2209
2210        let after = collect_all();
2211        assert!(
2212            !after.iter().any(|r| r.name == "reset_all_thread"),
2213            "reset_all should have cleared cross-thread records. Got: {:?}",
2214            after.iter().map(|r| &r.name).collect::<Vec<_>>()
2215        );
2216    }
2217
2218    #[cfg(feature = "cpu-time")]
2219    #[test]
2220    fn cpu_time_propagated_across_threads_via_adopt() {
2221        reset();
2222        {
2223            let _parent = enter("cpu_parent");
2224            burn_cpu(5_000); // parent's own work
2225
2226            let ctx = fork().expect("should have parent on stack");
2227
2228            thread::scope(|s| {
2229                s.spawn(|| {
2230                    let _adopt = adopt(&ctx);
2231                    {
2232                        let _child = enter("cpu_child");
2233                        burn_cpu(50_000); // much more child CPU
2234                    }
2235                });
2236            });
2237
2238            ctx.finalize();
2239        }
2240
2241        let records = collect();
2242        let parent = records
2243            .iter()
2244            .find(|r| r.name == "cpu_parent")
2245            .expect("cpu_parent not found");
2246
2247        // Key insight: after the wall-time fix, parent.self_ms is large because
2248        // wall time is NOT subtracted cross-thread. But parent.cpu_self_ms should
2249        // be small because CPU time IS propagated across thread boundaries via
2250        // fork/adopt, so the child's CPU time was subtracted from the parent's
2251        // CPU budget.
2252        eprintln!(
2253            "cpu_parent: self_ms={:.3}, cpu_self_ms={:.3}, total_ms={:.3}",
2254            parent.self_ms, parent.cpu_self_ms, parent.total_ms
2255        );
2256        assert!(
2257            parent.cpu_self_ms < parent.self_ms * 0.8,
2258            "cpu_self_ms ({:.3}) should be significantly less than self_ms ({:.3}) \
2259             because child CPU time is propagated cross-thread but wall time is not",
2260            parent.cpu_self_ms,
2261            parent.self_ms,
2262        );
2263    }
2264
2265    #[test]
2266    fn fork_adopt_does_not_subtract_wall_time_from_parent() {
2267        // Wall time should NOT be subtracted cross-thread.
2268        // Parent wall self = elapsed - same-thread children only.
2269        reset();
2270        {
2271            let _parent = enter("wall_parent");
2272            burn_cpu(5_000);
2273
2274            let ctx = fork().unwrap();
2275
2276            {
2277                let _adopt = adopt(&ctx);
2278                {
2279                    let _child = enter("wall_child");
2280                    burn_cpu(50_000);
2281                }
2282            }
2283
2284            ctx.finalize();
2285        }
2286
2287        let records = collect();
2288        let parent = records.iter().find(|r| r.name == "wall_parent").unwrap();
2289        let child = records.iter().find(|r| r.name == "wall_child").unwrap();
2290
2291        // After fix: parent.self_ms ~ parent.total_ms (no cross-thread wall subtraction).
2292        assert!(
2293            parent.self_ms > child.self_ms * 0.5,
2294            "parent wall self ({:.3}ms) should NOT be reduced by cross-thread child wall ({:.3}ms). \
2295             parent.total={:.3}ms",
2296            parent.self_ms,
2297            child.self_ms,
2298            parent.total_ms,
2299        );
2300    }
2301
2302    // ---------------------------------------------------------------
2303    // Async / migration tests
2304    // ---------------------------------------------------------------
2305
2306    #[test]
2307    fn async_guard_same_thread() {
2308        reset();
2309        {
2310            let _outer = enter("outer");
2311            burn_cpu(5_000);
2312            {
2313                let _inner = enter("inner");
2314                burn_cpu(10_000);
2315            }
2316            burn_cpu(5_000);
2317        }
2318        let records = collect();
2319        let outer = records.iter().find(|r| r.name == "outer").unwrap();
2320        let inner = records.iter().find(|r| r.name == "inner").unwrap();
2321        assert!(
2322            outer.self_ms < outer.total_ms,
2323            "self should be less than total"
2324        );
2325        let diff = (inner.self_ms - inner.total_ms).abs();
2326        assert!(diff < inner.total_ms * 0.1, "inner is leaf: self ~ total");
2327    }
2328
2329    #[test]
2330    fn async_guard_migrated_wall_time() {
2331        reset();
2332        let guard = enter("migrating_fn");
2333        burn_cpu(10_000);
2334
2335        std::thread::scope(|s| {
2336            s.spawn(move || {
2337                burn_cpu(10_000);
2338                drop(guard);
2339            });
2340        });
2341
2342        let records = collect_all();
2343        let rec = records.iter().find(|r| r.name == "migrating_fn");
2344        assert!(
2345            rec.is_some(),
2346            "migrated guard should preserve function name 'migrating_fn'. Got: {:?}",
2347            records.iter().map(|r| &r.name).collect::<Vec<_>>()
2348        );
2349        assert!(
2350            rec.unwrap().total_ms > 0.5,
2351            "wall time should reflect work on both threads"
2352        );
2353    }
2354
2355    #[test]
2356    fn async_guard_orphan_cleanup() {
2357        reset();
2358        {
2359            let _parent = enter("parent");
2360            burn_cpu(5_000);
2361
2362            let child = enter("child");
2363            burn_cpu(5_000);
2364
2365            std::thread::scope(|s| {
2366                s.spawn(move || {
2367                    burn_cpu(5_000);
2368                    drop(child);
2369                });
2370            });
2371
2372            burn_cpu(5_000);
2373        }
2374
2375        let records = collect();
2376        let parent = records.iter().find(|r| r.name == "parent").unwrap();
2377        assert_eq!(parent.calls, 1, "parent should have exactly 1 call");
2378        assert!(parent.total_ms > 0.0, "parent wall time should be positive");
2379        assert!(parent.self_ms > 0.0, "parent self time should be positive");
2380    }
2381
2382    #[test]
2383    fn async_guard_nested_migration() {
2384        reset();
2385        {
2386            let _parent = enter("gp_parent");
2387            burn_cpu(5_000);
2388            {
2389                let _child = enter("gp_child");
2390                burn_cpu(5_000);
2391
2392                let grandchild = enter("gp_grandchild");
2393                burn_cpu(5_000);
2394
2395                std::thread::scope(|s| {
2396                    s.spawn(move || {
2397                        drop(grandchild);
2398                    });
2399                });
2400
2401                burn_cpu(5_000);
2402            }
2403            burn_cpu(5_000);
2404        }
2405
2406        let records = collect();
2407        let parent = records.iter().find(|r| r.name == "gp_parent").unwrap();
2408        let child = records.iter().find(|r| r.name == "gp_child").unwrap();
2409        assert_eq!(parent.calls, 1);
2410        assert_eq!(child.calls, 1);
2411        assert!(parent.self_ms > 0.0, "parent not corrupted");
2412        assert!(child.self_ms > 0.0, "child not corrupted");
2413        assert!(
2414            parent.self_ms < parent.total_ms,
2415            "parent has child time subtracted"
2416        );
2417    }
2418
2419    #[test]
2420    fn async_guard_alloc_restore_on_orphan() {
2421        // When a child guard migrates, its stack entry's saved_alloc is
2422        // orphaned. During the parent's drop, the orphan drain restores
2423        // those saved counters to ALLOC_COUNTERS before the parent's own
2424        // saved_alloc is restored. This ensures the grandparent scope
2425        // sees consistent alloc state after the parent completes.
2426        reset();
2427
2428        // Set a known alloc baseline before any guards.
2429        crate::alloc::ALLOC_COUNTERS.with(|cell| {
2430            cell.set(crate::alloc::AllocSnapshot {
2431                alloc_count: 42,
2432                alloc_bytes: 4200,
2433                free_count: 0,
2434                free_bytes: 0,
2435            });
2436        });
2437
2438        {
2439            let _parent = enter("alloc_parent");
2440            // enter() saved {42, 4200} and zeroed counters.
2441            // Simulate allocations in parent scope.
2442            crate::alloc::ALLOC_COUNTERS.with(|cell| {
2443                cell.set(crate::alloc::AllocSnapshot {
2444                    alloc_count: 10,
2445                    alloc_bytes: 1000,
2446                    free_count: 0,
2447                    free_bytes: 0,
2448                });
2449            });
2450
2451            let child = enter("alloc_child");
2452            // enter() saved {10, 1000} and zeroed counters.
2453
2454            std::thread::scope(|s| {
2455                s.spawn(move || {
2456                    drop(child);
2457                });
2458            });
2459            // child's stack entry is now orphaned with saved_alloc = {10, 1000}.
2460        }
2461        // After parent drops: orphan drain restores {10, 1000}, then parent
2462        // restores its own saved {42, 4200}. ALLOC_COUNTERS should be {42, 4200}.
2463        let restored = crate::alloc::ALLOC_COUNTERS.with(|cell| cell.get());
2464        assert_eq!(
2465            restored.alloc_count, 42,
2466            "grandparent alloc_count should be restored after orphan drain"
2467        );
2468        assert_eq!(
2469            restored.alloc_bytes, 4200,
2470            "grandparent alloc_bytes should be restored after orphan drain"
2471        );
2472    }
2473
2474    #[test]
2475    fn set_runs_dir_used_by_flush() {
2476        // set_runs_dir() should configure where flush() writes data,
2477        // without requiring PIANO_RUNS_DIR env var or ~/.piano/ fallback.
2478        reset();
2479        {
2480            let _g = enter("set_dir_fn");
2481            burn_cpu(5_000);
2482        }
2483
2484        let tmp = std::env::temp_dir().join(format!("piano_setdir_{}", std::process::id()));
2485        std::fs::create_dir_all(&tmp).unwrap();
2486
2487        // Configure runs dir via set_runs_dir, NOT env var.
2488        set_runs_dir(tmp.to_str().unwrap());
2489        flush();
2490
2491        // Clear the global so other tests aren't affected.
2492        clear_runs_dir();
2493
2494        let files: Vec<_> = std::fs::read_dir(&tmp)
2495            .unwrap()
2496            .filter_map(|e| e.ok())
2497            .collect();
2498        assert!(
2499            !files.is_empty(),
2500            "flush() should write to set_runs_dir() path, got no files in {tmp:?}"
2501        );
2502
2503        let _ = std::fs::remove_dir_all(&tmp);
2504    }
2505
2506    #[test]
2507    fn shutdown_to_sets_runs_dir_for_flush() {
2508        // shutdown_to() should set the global runs dir so that any
2509        // earlier flush() calls (e.g. from panic hooks) would have
2510        // used the same directory. After shutdown_to(), the dir should
2511        // be set so future flushes also use it.
2512        reset();
2513
2514        // Produce data, then call set_runs_dir + flush to simulate
2515        // the scenario where the CLI injects set_runs_dir at start
2516        // of main, and flush() is called mid-program.
2517        let tmp = std::env::temp_dir().join(format!("piano_shutdown_to_{}", std::process::id()));
2518        std::fs::create_dir_all(&tmp).unwrap();
2519
2520        // First: set_runs_dir early (like CLI injection at start of main)
2521        set_runs_dir(tmp.to_str().unwrap());
2522
2523        // Generate some data and flush mid-program.
2524        {
2525            let _g = enter("mid_flush_fn");
2526            burn_cpu(5_000);
2527        }
2528        flush();
2529
2530        // Generate more data.
2531        {
2532            let _g = enter("shutdown_fn");
2533            burn_cpu(5_000);
2534        }
2535
2536        // shutdown_to uses the same dir.
2537        shutdown_to(tmp.to_str().unwrap());
2538        clear_runs_dir();
2539
2540        let files: Vec<_> = std::fs::read_dir(&tmp)
2541            .unwrap()
2542            .filter_map(|e| e.ok())
2543            .collect();
2544        // Should have files from both flush and shutdown_to.
2545        assert!(
2546            files.len() >= 2,
2547            "expected files from both flush() and shutdown_to(), got {} in {tmp:?}",
2548            files.len()
2549        );
2550
2551        let _ = std::fs::remove_dir_all(&tmp);
2552    }
2553
2554    #[cfg(feature = "cpu-time")]
2555    #[test]
2556    fn async_guard_cpu_time_skipped_on_migration() {
2557        reset();
2558        let guard = enter("cpu_migrated");
2559        burn_cpu(20_000);
2560
2561        std::thread::scope(|s| {
2562            s.spawn(move || {
2563                burn_cpu(20_000);
2564                drop(guard);
2565            });
2566        });
2567
2568        let records = collect_all();
2569        let rec = records
2570            .iter()
2571            .find(|r| r.name == "cpu_migrated")
2572            .expect("migrated guard should preserve name 'cpu_migrated'");
2573        assert!(rec.total_ms > 0.0, "wall time captured");
2574        assert!(
2575            rec.cpu_self_ms == 0.0,
2576            "cpu_self_ms should be exactly 0 for migrated guard, got {:.3}",
2577            rec.cpu_self_ms
2578        );
2579    }
2580
2581    #[test]
2582    fn stack_entry_is_64_bytes() {
2583        assert_eq!(
2584            core::mem::size_of::<StackEntry>(),
2585            64,
2586            "StackEntry must be exactly 64 bytes to preserve lsl #6 indexing"
2587        );
2588    }
2589
2590    #[test]
2591    fn guard_check_pushes_phantom_on_migration() {
2592        reset();
2593        let guard = enter("check_parent");
2594        std::thread::scope(|s| {
2595            s.spawn(move || {
2596                guard.check();
2597                STACK.with(|stack| {
2598                    let s = stack.borrow();
2599                    assert_eq!(s.len(), 1, "phantom should be pushed");
2600                    assert_eq!(s[0].name, "<phantom>");
2601                });
2602                drop(guard);
2603            });
2604        });
2605    }
2606
2607    #[test]
2608    fn guard_check_is_noop_on_same_thread() {
2609        reset();
2610        let guard = enter("same_thread");
2611        guard.check();
2612        STACK.with(|stack| {
2613            let s = stack.borrow();
2614            assert_eq!(s.len(), 1, "no phantom on same thread");
2615            assert_eq!(s[0].name, "same_thread");
2616        });
2617        drop(guard);
2618    }
2619
2620    #[test]
2621    fn guard_check_is_idempotent() {
2622        reset();
2623        let guard = enter("idempotent");
2624        std::thread::scope(|s| {
2625            s.spawn(move || {
2626                guard.check();
2627                guard.check();
2628                STACK.with(|stack| {
2629                    let s = stack.borrow();
2630                    assert_eq!(s.len(), 1, "only one phantom after two checks");
2631                });
2632                drop(guard);
2633            });
2634        });
2635    }
2636
2637    #[test]
2638    fn migrated_parent_subtracts_post_migration_children() {
2639        reset();
2640        let parent_guard = enter("mig_parent");
2641        let invocations = std::thread::scope(|s| {
2642            s.spawn(move || {
2643                parent_guard.check();
2644                {
2645                    let _child = enter("mig_child");
2646                    burn_cpu(20_000);
2647                }
2648                drop(parent_guard);
2649                collect_invocations()
2650            })
2651            .join()
2652            .unwrap()
2653        });
2654
2655        let parent_inv = invocations
2656            .iter()
2657            .find(|r| r.name == "mig_parent")
2658            .expect("migrated parent should preserve name 'mig_parent'");
2659        let child_inv = invocations
2660            .iter()
2661            .find(|r| r.name == "mig_child")
2662            .expect("child should produce an invocation");
2663
2664        assert!(
2665            parent_inv.self_ns < parent_inv.elapsed_ns,
2666            "self_ns ({}) should be < elapsed_ns ({}) after subtracting child",
2667            parent_inv.self_ns,
2668            parent_inv.elapsed_ns,
2669        );
2670        assert!(
2671            child_inv.elapsed_ns > 500_000,
2672            "child should have substantial elapsed time, got {}",
2673            child_inv.elapsed_ns,
2674        );
2675    }
2676
2677    #[test]
2678    fn migrated_record_has_children_subtracted_in_collect() {
2679        reset();
2680        let parent_guard = enter("rec_parent");
2681        std::thread::scope(|s| {
2682            s.spawn(move || {
2683                parent_guard.check();
2684                {
2685                    let _child = enter("rec_child");
2686                    burn_cpu(20_000);
2687                }
2688                drop(parent_guard);
2689            });
2690        });
2691
2692        let records = collect_all();
2693        let parent_rec = records
2694            .iter()
2695            .find(|r| r.name == "rec_parent")
2696            .expect("migrated parent should preserve name 'rec_parent'");
2697
2698        assert!(
2699            parent_rec.self_ms < parent_rec.total_ms,
2700            "self_ms ({:.3}) should be < total_ms ({:.3})",
2701            parent_rec.self_ms,
2702            parent_rec.total_ms,
2703        );
2704    }
2705
2706    #[test]
2707    fn root_function_does_not_affect_migrated_guard() {
2708        reset();
2709        {
2710            let _root = enter("root_fn");
2711            burn_cpu(20_000);
2712        }
2713
2714        let guard = std::thread::scope(|s| s.spawn(|| enter("other_thread")).join().unwrap());
2715        guard.check();
2716        drop(guard);
2717
2718        let invocations = collect_invocations();
2719        let migrated = invocations
2720            .iter()
2721            .find(|r| r.name == "other_thread")
2722            .expect("migrated guard should preserve name 'other_thread'");
2723
2724        assert_eq!(
2725            migrated.self_ns, migrated.elapsed_ns,
2726            "migrated guard with no children: self_ns ({}) should equal elapsed_ns ({})",
2727            migrated.self_ns, migrated.elapsed_ns,
2728        );
2729    }
2730
2731    #[test]
2732    fn phantom_on_second_migration_captures_children() {
2733        // A->B->C migration: guard enters on A, migrates to B (phantom on B,
2734        // child on B updates it), migrates to C (phantom on C, child on C
2735        // updates it), drops on C. Both B's and C's phantom children should
2736        // be subtracted from the migrated record's self_ns via forwarding.
2737        reset();
2738        let guard = enter("bc_parent");
2739        let (guard, _b_invocations) = std::thread::scope(|s| {
2740            s.spawn(move || {
2741                guard.check();
2742                {
2743                    let _child = enter("b_child");
2744                    burn_cpu(10_000);
2745                }
2746                let inv = collect_invocations();
2747                (guard, inv)
2748            })
2749            .join()
2750            .unwrap()
2751        });
2752
2753        let c_invocations = std::thread::scope(|s| {
2754            s.spawn(move || {
2755                guard.check();
2756                {
2757                    let _child = enter("c_child");
2758                    burn_cpu(10_000);
2759                }
2760                drop(guard);
2761                collect_invocations()
2762            })
2763            .join()
2764            .unwrap()
2765        });
2766
2767        let b_child_ns = _b_invocations
2768            .iter()
2769            .find(|r| r.name == "b_child")
2770            .expect("b_child invocation")
2771            .elapsed_ns;
2772
2773        let c_child_ns = c_invocations
2774            .iter()
2775            .find(|r| r.name == "c_child")
2776            .expect("c_child invocation")
2777            .elapsed_ns;
2778
2779        let migrated = c_invocations
2780            .iter()
2781            .find(|r| r.name == "bc_parent")
2782            .expect("migrated guard should preserve name 'bc_parent'");
2783
2784        // Both B's and C's children should be subtracted from self_ns.
2785        let children_ns = migrated.elapsed_ns - migrated.self_ns;
2786        assert!(
2787            migrated.self_ns < migrated.elapsed_ns,
2788            "self_ns ({}) should be < elapsed_ns ({}) with children on B and C",
2789            migrated.self_ns,
2790            migrated.elapsed_ns,
2791        );
2792        // Verify B's children were forwarded: children_ns should account
2793        // for both b_child and c_child (with tolerance for timing noise).
2794        let expected_children_min = (b_child_ns + c_child_ns) / 2;
2795        assert!(
2796            children_ns >= expected_children_min,
2797            "children_ns ({children_ns}) should include both b_child ({b_child_ns}) \
2798             and c_child ({c_child_ns}) (min threshold: {expected_children_min})",
2799        );
2800    }
2801
2802    #[test]
2803    fn multiple_checks_on_same_thread_are_idempotent() {
2804        reset();
2805        let guard = enter("multi_check");
2806        std::thread::scope(|s| {
2807            s.spawn(move || {
2808                guard.check();
2809                {
2810                    let _child1 = enter("child1");
2811                    burn_cpu(10_000);
2812                }
2813                guard.check();
2814                {
2815                    let _child2 = enter("child2");
2816                    burn_cpu(10_000);
2817                }
2818                STACK.with(|stack| {
2819                    let s = stack.borrow();
2820                    assert_eq!(s.len(), 1, "only one phantom on stack");
2821                    assert!(
2822                        s[0].children_ms > 0.0,
2823                        "phantom should have accumulated children_ms"
2824                    );
2825                });
2826                drop(guard);
2827            });
2828        });
2829    }
2830
2831    #[test]
2832    fn migrated_guard_preserves_function_name() {
2833        // Migrated guards should report the actual function name,
2834        // not a generic "<migrated>" placeholder.
2835        reset();
2836        let guard = enter("real_fn_name");
2837        burn_cpu(10_000);
2838
2839        std::thread::scope(|s| {
2840            s.spawn(move || {
2841                burn_cpu(10_000);
2842                drop(guard);
2843            });
2844        });
2845
2846        let records = collect_all();
2847        let rec = records.iter().find(|r| r.name == "real_fn_name");
2848        assert!(
2849            rec.is_some(),
2850            "migrated guard should preserve function name 'real_fn_name'. Got: {:?}",
2851            records.iter().map(|r| &r.name).collect::<Vec<_>>()
2852        );
2853        assert!(
2854            rec.unwrap().total_ms > 0.0,
2855            "should have recorded wall time"
2856        );
2857    }
2858
2859    #[test]
2860    fn migrated_guards_distinguish_multiple_functions() {
2861        // When multiple functions migrate, each should retain its own name
2862        // instead of collapsing into a single "<migrated>" bucket.
2863        reset();
2864        let guard_a = enter("fn_alpha");
2865        burn_cpu(5_000);
2866
2867        let guard_b = std::thread::scope(|s| s.spawn(|| enter("fn_beta")).join().unwrap());
2868        burn_cpu(5_000);
2869
2870        // Drop both guards on different threads than where they were created.
2871        std::thread::scope(|s| {
2872            s.spawn(move || {
2873                drop(guard_a);
2874            });
2875        });
2876        std::thread::scope(|s| {
2877            s.spawn(move || {
2878                drop(guard_b);
2879            });
2880        });
2881
2882        let records = collect_all();
2883        let names: Vec<&str> = records.iter().map(|r| r.name.as_str()).collect();
2884        assert!(
2885            names.contains(&"fn_alpha"),
2886            "should have fn_alpha in records. Got: {names:?}"
2887        );
2888        assert!(
2889            names.contains(&"fn_beta"),
2890            "should have fn_beta in records. Got: {names:?}"
2891        );
2892        assert!(
2893            !names.contains(&"<migrated>"),
2894            "should NOT have <migrated> placeholder. Got: {names:?}"
2895        );
2896    }
2897
2898    #[test]
2899    fn migrated_invocation_has_real_name() {
2900        // Verify InvocationRecord also carries the real function name.
2901        reset();
2902        let guard = enter("inv_migrated_fn");
2903        burn_cpu(10_000);
2904
2905        let invocations = std::thread::scope(|s| {
2906            s.spawn(move || {
2907                burn_cpu(10_000);
2908                drop(guard);
2909                collect_invocations()
2910            })
2911            .join()
2912            .unwrap()
2913        });
2914
2915        let inv = invocations.iter().find(|r| r.name == "inv_migrated_fn");
2916        assert!(
2917            inv.is_some(),
2918            "migrated invocation should have name 'inv_migrated_fn'. Got: {:?}",
2919            invocations.iter().map(|r| r.name).collect::<Vec<_>>()
2920        );
2921        assert!(
2922            inv.unwrap().elapsed_ns > 0,
2923            "should have recorded elapsed time"
2924        );
2925    }
2926
2927    #[test]
2928    fn pack_unpack_round_trip() {
2929        let cookie = 42u64;
2930        let name_id = 1234u16;
2931        let depth = 567u16;
2932        let packed = pack_cookie_name_depth(cookie, name_id, depth);
2933        assert_eq!(unpack_cookie(packed), cookie);
2934        assert_eq!(unpack_name_id(packed), name_id);
2935        assert_eq!(unpack_depth(packed), depth);
2936
2937        // Max values: verifies the full bit range.
2938        let packed_max = pack_cookie_name_depth(u32::MAX as u64, u16::MAX, u16::MAX);
2939        assert_eq!(unpack_cookie(packed_max), u32::MAX as u64);
2940        assert_eq!(unpack_name_id(packed_max), u16::MAX);
2941        assert_eq!(unpack_depth(packed_max), u16::MAX);
2942
2943        // Zero values: verifies zero-packing.
2944        let packed_zero = pack_cookie_name_depth(0, 0, 0);
2945        assert_eq!(unpack_cookie(packed_zero), 0);
2946        assert_eq!(unpack_name_id(packed_zero), 0);
2947        assert_eq!(unpack_depth(packed_zero), 0);
2948    }
2949
2950    #[test]
2951    fn phantom_cleaned_up_on_intermediate_thread() {
2952        // A -> B -> C migration: after the guard migrates from B to C, B's
2953        // phantom StackEntry should be cleaned up so that subsequent functions
2954        // on B get correct depth and frame boundaries.
2955        //
2956        // This test uses a long-lived thread B (via channels) so we can:
2957        // 1. Send the guard to B (check() pushes phantom)
2958        // 2. Send the guard to C (check() on C detects re-migration, drops on C)
2959        // 3. Run a new function on B and verify correct behavior
2960        use std::sync::mpsc;
2961
2962        reset();
2963
2964        let guard = enter("async_fn");
2965
2966        // Thread B: stays alive across the full test.
2967        let (tx_guard_to_b, rx_guard_on_b) = mpsc::channel::<Guard>();
2968        let (tx_guard_from_b, rx_guard_from_b) = mpsc::channel::<Guard>();
2969        let (tx_verify, rx_verify) = mpsc::channel::<()>();
2970        let (tx_results, rx_results) = mpsc::channel::<(u16, usize)>(); // (depth, frame_count)
2971
2972        let b_handle = std::thread::spawn(move || {
2973            // Phase 1: receive guard, push phantom.
2974            let guard = rx_guard_on_b.recv().unwrap();
2975            guard.check();
2976            // Send guard to main so it can go to C.
2977            tx_guard_from_b.send(guard).unwrap();
2978
2979            // Phase 2: wait for signal that guard has been dropped on C.
2980            rx_verify.recv().unwrap();
2981
2982            // Phase 3: run a new top-level function on B.
2983            // Do NOT call reset() here -- that would clear the stack
2984            // and mask the bug. Only clear invocations and frames so we
2985            // can observe fresh results.
2986            INVOCATIONS.with(|inv| inv.borrow_mut().clear());
2987            FRAMES.with(|frames| frames.borrow_mut().clear());
2988            FRAME_BUFFER.with(|buf| buf.borrow_mut().clear());
2989
2990            {
2991                let _work = enter("b_later_work");
2992                burn_cpu(1_000);
2993            }
2994
2995            // Collect results: depth and frame count.
2996            let invocations = collect_invocations();
2997            let frames = collect_frames();
2998
2999            let work_rec = invocations
3000                .iter()
3001                .find(|r| r.name == "b_later_work")
3002                .expect("should have b_later_work record");
3003
3004            tx_results.send((work_rec.depth, frames.len())).unwrap();
3005        });
3006
3007        // Send guard to B.
3008        tx_guard_to_b.send(guard).unwrap();
3009        // Get guard back from B.
3010        let guard = rx_guard_from_b.recv().unwrap();
3011
3012        // Send guard to C, where it drops.
3013        std::thread::scope(|s| {
3014            s.spawn(move || {
3015                guard.check();
3016                drop(guard);
3017            });
3018        });
3019
3020        // Signal B that the guard has dropped.
3021        tx_verify.send(()).unwrap();
3022
3023        // Get results from B.
3024        let (depth, frame_count) = rx_results.recv().unwrap();
3025        b_handle.join().unwrap();
3026
3027        // After phantom cleanup, b_later_work should be depth 0 (top-level on B).
3028        assert_eq!(
3029            depth, 0,
3030            "b_later_work depth should be 0 after phantom cleanup (got {depth})"
3031        );
3032        // Frame boundary should have triggered (depth == 0).
3033        assert_eq!(
3034            frame_count, 1,
3035            "should have 1 frame after b_later_work completes (got {frame_count})"
3036        );
3037    }
3038
3039    #[test]
3040    fn shutdown_impl_reports_write_errors_to_stderr() {
3041        reset();
3042        // Produce some data so shutdown_impl has something to write.
3043        {
3044            let _g = enter("write_err_test");
3045        }
3046
3047        // Point at a path that cannot be a directory (a file, not a dir).
3048        let tmp = std::env::temp_dir().join(format!("piano_write_err_{}", std::process::id()));
3049        // Create a file where shutdown_impl expects a directory.
3050        std::fs::write(&tmp, b"not a directory").unwrap();
3051
3052        // shutdown_impl should try to write and fail, printing to stderr.
3053        // We can't easily capture stderr in a unit test, so instead verify
3054        // that the function does not panic and returns normally.
3055        shutdown_impl(&tmp);
3056
3057        // Clean up.
3058        let _ = std::fs::remove_file(&tmp);
3059    }
3060}