inferno 0.11.14

Rust port of the FlameGraph performance profiling tool suite
Documentation
use std::borrow::Cow;
use std::io;
#[cfg(feature = "multithreaded")]
use std::mem;
#[cfg(feature = "multithreaded")]
use std::sync::Arc;

use ahash::AHashMap;
#[cfg(feature = "multithreaded")]
use dashmap::DashMap;
use once_cell::sync::Lazy;

macro_rules! invalid_data_error {
    ($($arg:tt)*) => {{
        Err(io::Error::new(
            io::ErrorKind::InvalidData,
            format!($($arg)*),
        ))
    }};
}

const CAPACITY_HASHMAP: usize = 512;

pub(crate) const CAPACITY_READER: usize = 128 * 1024;

/// Internal parameter (not exposed to users) that determines how many stacks of
/// input data make up a "chunk" (unit that is sent to the threadpool for
/// processing). Chosen by benchmarking various values using the following tests:
/// * cargo test bench_nstacks_dtrace --release -- --ignored --nocapture
/// * cargo test bench_nstacks_perf --release -- --ignored --nocapture
pub(crate) const DEFAULT_NSTACKS_PER_JOB: usize = 100;

/// A guess at the number of bytes contained in any given stack of any given format.
/// Used to calculate the initial capacity of the vector used for sending input
/// data across threads.
#[cfg(feature = "multithreaded")]
const NBYTES_PER_STACK_GUESS: usize = 1024;

const RUST_HASH_LENGTH: usize = 17;

#[cfg(feature = "multithreaded")]
#[doc(hidden)]
pub static DEFAULT_NTHREADS: Lazy<usize> = Lazy::new(num_cpus::get);
#[cfg(not(feature = "multithreaded"))]
#[doc(hidden)]
pub static DEFAULT_NTHREADS: Lazy<usize> = Lazy::new(|| 1);

/// Sealed trait for internal library authors.
///
/// If you implement this trait, your type will implement the public-facing
/// `Collapse` trait as well. Implementing this trait gives you parallelism
/// for free as long as you adhere to the requirements described in the
/// comments below.
pub trait CollapsePrivate: Send + Sized {
    // *********************************************************** //
    // ********************* REQUIRED METHODS ******************** //
    // *********************************************************** //

    /// Process any header lines that precede the main body of samples.
    ///
    /// Some formats, such as `dtrace`, contain a header or other non-stack
    /// information at the beginning of their input files. If header information
    /// is present, this method **must** consume it (i.e. advance the provided
    /// reader past it).
    ///
    /// This method also provides an opportunity to do processing of actual
    /// stack data on the main thread before worker threads are spun up. For
    /// example, `perf` requires reading the first stack in order to know how to
    /// process the rest; so this method is used for that "upfront" processing.
    ///
    /// If the format you are working with does not contain header information
    /// or does not need any special, up-front processing, just have this method
    /// return `Ok(())` immediately.
    fn pre_process<R>(&mut self, reader: &mut R, occurrences: &mut Occurrences) -> io::Result<()>
    where
        R: io::BufRead;

    /// Process all samples in a chunk of input (the primary method).
    ///
    /// This method receives a reader whose header has already been consumed (see above),
    /// as well as a mutable reference to an `Occurences` instance (just a hashamp that
    /// works across multiple threads). Implementers should parse the stack data
    /// contained in the reader and write output to the provided `Occurrences` map.
    ///
    /// This method may be called multiple times to process batches of incoming samples.
    /// Therefore, make sure that when end-of-file is reached, the collapser now considers
    /// itself back at the top-level context (e.g., not in the middle of a stack). This
    /// means that some internal state, e.g. stack buffers, must be reset by the time this
    /// method returns. Other internal state, e.g. caches, however, may be kept.
    fn collapse_single_threaded<R>(
        &mut self,
        reader: R,
        occurrences: &mut Occurrences,
    ) -> io::Result<()>
    where
        R: io::BufRead;

    /// Determine the end of a stack.
    ///
    /// Worker threads **must** receive full stacks (as opposed to partial stacks); so this method
    /// determines, for your specific format, when the end of a stack has been reached.
    ///
    /// This method should return `true` if the provided line represents the end of a stack;
    /// `false` otherwise.
    ///
    /// If your format requires more information than merely a line of the input data in order
    /// to determine whether or not you are at the end of a stack, you can retrieve/store
    /// information on the `self` instance, which is also available to you in this method. This
    /// method will be called for every line of input data (excluding those consumed by the
    /// `pre_process` method).
    fn would_end_stack(&mut self, line: &[u8]) -> bool;

    /// Creates a copy and prepares it to be sent to a different thread.
    ///
    /// This method creates a copy of `self` in order to send it to a different thread.
    /// As such, it should clone all the internal fields of `self` **except** those that
    /// should be reset because the collapser will now operate in a different stack context.
    /// For example, any options should be cloned, but any stack buffers or similar "stack state"
    /// should be reset to, for example, an empty vector before this method returns.
    fn clone_and_reset_stack_context(&self) -> Self;

    /// Determine if this format corresponds to the input data.
    ///
    /// This method, used by the `guess` collapser, should return whether or not the
    /// implementation corresponds with the given input string, i.e. if the input data
    /// matches the collapser.
    ///
    /// - `None` means "not sure -- need more input"
    /// - `Some(true)` means "yes, this implementation should work with this string"
    /// - `Some(false)` means "no, this implementation definitely won't work"
    #[allow(clippy::wrong_self_convention)]
    fn is_applicable(&mut self, input: &str) -> Option<bool>;

    /// Returns the number of stacks per job to send to the threadpool.
    fn nstacks_per_job(&self) -> usize;

    /// Sets the number of stacks per job to send to the threadpool.
    fn set_nstacks_per_job(&mut self, n: usize);

    /// Returns the number of threads to use.
    fn nthreads(&self) -> usize;

    /// Sets the number of threads to use.
    fn set_nthreads(&mut self, n: usize);

    // *********************************************************** //
    // ******************** PROVIDED METHODS ********************* //
    // *********************************************************** //

    fn collapse<R, W>(&mut self, mut reader: R, writer: W) -> io::Result<()>
    where
        R: io::BufRead,
        W: io::Write,
    {
        let mut occurrences = Occurrences::new(self.nthreads());

        // Consume the header, if any, and do any other pre-processing
        // that needs to occur.
        self.pre_process(&mut reader, &mut occurrences)?;

        // Do collapsing.
        if occurrences.is_concurrent() {
            self.collapse_multi_threaded(reader, &mut occurrences)?;
        } else {
            self.collapse_single_threaded(reader, &mut occurrences)?;
        }

        // Write results.
        occurrences.write_and_clear(writer)
    }

    #[cfg(not(feature = "multithreaded"))]
    fn collapse_multi_threaded<R>(&mut self, _: R, _: &mut Occurrences) -> io::Result<()>
    where
        R: io::BufRead,
    {
        unimplemented!();
    }

    #[cfg(feature = "multithreaded")]
    fn collapse_multi_threaded<R>(
        &mut self,
        mut reader: R,
        occurrences: &mut Occurrences,
    ) -> io::Result<()>
    where
        R: io::BufRead,
    {
        let nstacks_per_job = self.nstacks_per_job();
        let nthreads = self.nthreads();

        assert_ne!(nstacks_per_job, 0);
        assert!(nthreads > 1);
        assert!(occurrences.is_concurrent());

        crossbeam_utils::thread::scope(|scope| {
            // Channel for sending an error from the worker threads to the main thread
            // in the event a worker has failed.
            let (tx_error, rx_error) = crossbeam_channel::bounded::<io::Error>(1);

            // Channel for sending input data from the main thread to the worker threads.
            // We choose `2 * nthreads` as the channel size here in order to limit memory
            // usage in the case of particularly large input files.
            let (tx_input, rx_input) = crossbeam_channel::bounded::<Vec<u8>>(2 * nthreads);

            // Channel for worker threads that have errored to signal to all the other
            // worker threads that they should stop work immediately and return.
            let (tx_stop, rx_stop) = crossbeam_channel::bounded::<()>(nthreads - 1);

            let mut handles = Vec::with_capacity(nthreads);
            for _ in 0..nthreads {
                let tx_error = tx_error.clone();
                let rx_input = rx_input.clone();
                let (tx_stop, rx_stop) = (tx_stop.clone(), rx_stop.clone());

                let mut folder = self.clone_and_reset_stack_context();
                let mut occurrences = occurrences.clone();

                // Launch the worker thread...
                let handle = scope.spawn(move |_| loop {
                    crossbeam_channel::select! {
                        recv(rx_input) -> input => {
                            // Receive input from the main thread.
                            let data = match input {
                                Ok(data) => data,
                                // The main threads drops it's handle to the input sender once it's
                                // finished sending data; so if we get an error here, it means
                                // there is no more data to be sent and we should exit.
                                Err(_) => return,
                            };
                            // If there is input data, process it.
                            if let Err(e) = folder.collapse_single_threaded(&data[..], &mut occurrences) {
                                // In the event of an error...
                                //
                                // We notify all the threads about it here, rather than wait for the main input
                                // loop to see the error, so that we can also stop the input loop from iterating
                                // through the rest of the file.
                                //
                                // If the channel is full, it means another thread has also errored
                                // and already sent a stop signal to the other threads; so there is
                                // no need to wait or to check for a `SendError` here.
                                for _ in 0..(nthreads - 1) {
                                    let _ = tx_stop.try_send(());
                                }

                                // Then, send the error produced to the main thread for
                                // propagation. If the channel is full, it means another thread
                                // has also errored and already sent its error back to the
                                // main thread; so there is no need to wait or to check for a
                                // `SendError` here.
                                let _ = tx_error.try_send(e);

                                // Finally, return.
                                return;
                            }
                            // If successful, return to the top of the loop and continue to poll
                            // the input and stop channels.
                        },
                        recv(rx_stop) -> _ => {
                            // Received a signal from another worker thread that it has errored;
                            // so should cease work immediately and return.
                            return;
                        },
                    }
                });
                handles.push(handle);
            }

            // On the main thread, we're about to start sending data to the worker threads,
            // but we only want to send data to the worker threads **if** they're still alive!
            // (if one of them produces an error, all of them will exit early). To ensure we don't try
            // to send data to dead worker threads, drop the main thread's handle to the input receiver
            // here. This way, if all the workers die, every handle to the input receiver will have
            // been dropped and we'll get an error when trying to send data on the input sender,
            // which will tell us (the main thread) to stop trying to send data and, instead,
            // skip to trying to pull an error off the error channel.
            drop(rx_input);

            // Now that we've dropped the main thread's handle to the input sender, start
            // trying to send data to the worker threads...

            let buf_capacity = usize::next_power_of_two(NBYTES_PER_STACK_GUESS * nstacks_per_job);
            let mut buf = Vec::with_capacity(buf_capacity);
            let (mut index, mut nstacks) = (0, 0);

            loop {
                let n = reader.read_until(b'\n', &mut buf)?;
                if n == 0 {
                    // If we've reached the end of the data, send the final chunk to the worker
                    // threads and break from the loop, The worker threads may or may not still
                    // be alive (depending on if one errored in between the sending of the last
                    // chunk and the sending of this one), but either way we should break the loop;
                    // so there's no need to check for a `SendError` here.
                    let _ = tx_input.send(buf);
                    break;
                }
                let line = &buf[index..index + n];
                index += n;
                if self.would_end_stack(line) {
                    // If we've reached the end of a stack, count it.
                    nstacks += 1;
                    if nstacks == nstacks_per_job {
                        // If we've accumulated enough stacks to make up a chunk to send to the
                        // worker threads, try to send it.
                        let buf_capacity = usize::next_power_of_two(buf.capacity());
                        let chunk = mem::replace(&mut buf, Vec::with_capacity(buf_capacity));
                        if tx_input.send(chunk).is_err() {
                            // If sending the chunk produces a `SendError`, this means that one
                            // of the worker threads has errored, sent a signal to all the other
                            // worker threads to shut down, and they have all shutdown, in which
                            // case we know there will be an error waiting for us on the error
                            // channel; so we should stop parsing input data (i.e. break).
                            break;
                        }
                        index = 0;
                        nstacks = 0;
                    }
                    continue
                }
            }

            // The main thread needs to drop its handle to the input sender here because
            // that's how we signal to the worker threads that there is no more data coming
            // on the input channel, in which case they should exit.
            drop(tx_input);

            // The main thread needs to drop its handle to the error sender here because we
            // are about to poll the error receiver for errors, which will block until all
            // the error senders have been dropped (including ours).
            drop(tx_error);

            // Now we poll the error channel, which will block until either:
            // * all work has been completely successfully,
            //   in which case the expression below will evaluate to `None`, or
            // * an error has occurred on one of the worker theads,
            //   in which case the expression below will evaluate to `Some(<io::Error>)`.
            if let Some(e) = rx_error.iter().next() {
                return Err(e);
            }

            for handle in handles {
                handle.join().unwrap();
            }

            Ok(())
        })
        .unwrap()
    }
}

/// Occurrences is a HashMap, which uses:
/// * AHashMap if single-threaded
/// * DashMap if multi-threaded
///
/// This is public because it is part of the sealed `CollapsePrivate` trait's API, but it
/// is in a crate-private module so is not nameable by downstream library users.
#[derive(Clone, Debug)]
pub enum Occurrences {
    SingleThreaded(AHashMap<String, usize>),
    #[cfg(feature = "multithreaded")]
    MultiThreaded(Arc<DashMap<String, usize, ahash::RandomState>>),
}

impl Occurrences {
    #[cfg(feature = "multithreaded")]
    pub(crate) fn new(nthreads: usize) -> Self {
        assert_ne!(nthreads, 0);
        if nthreads == 1 {
            Self::new_single_threaded()
        } else {
            Self::new_multi_threaded()
        }
    }

    #[cfg(not(feature = "multithreaded"))]
    pub(crate) fn new(nthreads: usize) -> Self {
        assert_ne!(nthreads, 0);
        Self::new_single_threaded()
    }

    fn new_single_threaded() -> Self {
        let map =
            AHashMap::with_capacity_and_hasher(CAPACITY_HASHMAP, ahash::RandomState::default());
        Occurrences::SingleThreaded(map)
    }

    #[cfg(feature = "multithreaded")]
    fn new_multi_threaded() -> Self {
        let map =
            DashMap::with_capacity_and_hasher(CAPACITY_HASHMAP, ahash::RandomState::default());
        Occurrences::MultiThreaded(Arc::new(map))
    }

    /// Inserts a key-count pair into the map. If the map did not have this key
    /// present, `None` is returned. If the map did have this key present, the
    /// value is updated, and the old value is returned.
    pub(crate) fn insert(&mut self, key: String, count: usize) -> Option<usize> {
        use self::Occurrences::*;
        match self {
            SingleThreaded(map) => map.insert(key, count),
            #[cfg(feature = "multithreaded")]
            MultiThreaded(arc) => arc.insert(key, count),
        }
    }

    /// Inserts a key-count pair into the map if the key does not already exist.
    /// If the key does already exist, adds count to the current value of the
    /// existing key.
    pub(crate) fn insert_or_add(&mut self, key: String, count: usize) {
        use self::Occurrences::*;
        match self {
            SingleThreaded(map) => *map.entry(key).or_insert(0) += count,
            #[cfg(feature = "multithreaded")]
            MultiThreaded(arc) => *arc.entry(key).or_insert(0) += count,
        }
    }

    pub(crate) fn is_concurrent(&self) -> bool {
        use self::Occurrences::*;
        match self {
            SingleThreaded(_) => false,
            #[cfg(feature = "multithreaded")]
            MultiThreaded(_) => true,
        }
    }

    pub(crate) fn write_and_clear<W>(&mut self, mut writer: W) -> io::Result<()>
    where
        W: io::Write,
    {
        use self::Occurrences::*;
        match self {
            SingleThreaded(ref mut map) => {
                let mut contents: Vec<_> = map.drain().collect();
                contents.sort();
                for (key, value) in contents {
                    writeln!(writer, "{} {}", key, value)?;
                }
            }
            #[cfg(feature = "multithreaded")]
            MultiThreaded(ref mut arc) => {
                let map = match Arc::get_mut(arc) {
                    Some(map) => map,
                    None => panic!(
                        "Attempting to drain the contents of a concurrent HashMap \
                         when more than one thread has access to it, which is \
                         not allowed."
                    ),
                };
                let map = mem::replace(
                    map,
                    DashMap::with_capacity_and_hasher(
                        CAPACITY_HASHMAP,
                        ahash::RandomState::default(),
                    ),
                );
                let contents = map.iter().collect::<Vec<_>>();
                let mut pairs = contents.iter().map(|pair| pair.pair()).collect::<Vec<_>>();
                pairs.sort();
                for (key, value) in pairs {
                    writeln!(writer, "{} {}", key, value)?;
                }
            }
        }
        writer.flush()?;
        Ok(())
    }
}

/// Demangles partially demangled Rust symbols that were demangled incorrectly by profilers like
/// `sample` and `DTrace`.
///
/// For example:
///     `_$LT$grep_searcher..searcher..glue..ReadByLine$LT$$u27$s$C$$u20$M$C$$u20$R$C$$u20$S$GT$$GT$::run::h30ecedc997ad7e32`
/// becomes
///     `<grep_searcher::searcher::glue::ReadByLine<'s, M, R, S>>::run`
///
/// Non-Rust symobols, or Rust symbols that are already demangled, will be returned unchanged.
///
/// Based on code in https://github.com/alexcrichton/rustc-demangle/blob/master/src/legacy.rs
#[allow(clippy::cognitive_complexity)]
pub(crate) fn fix_partially_demangled_rust_symbol(symbol: &str) -> Cow<str> {
    // Rust hashes are hex digits with an `h` prepended.
    let is_rust_hash =
        |s: &str| s.starts_with('h') && s[1..].chars().all(|c| c.is_ascii_hexdigit());

    // If there's no trailing Rust hash just return the symbol as is.
    if symbol.len() < RUST_HASH_LENGTH || !is_rust_hash(&symbol[symbol.len() - RUST_HASH_LENGTH..])
    {
        return Cow::Borrowed(symbol);
    }

    // Strip off trailing hash.
    let mut rest = &symbol[..symbol.len() - RUST_HASH_LENGTH];

    if rest.ends_with("::") {
        rest = &rest[..rest.len() - 2];
    }

    if rest.starts_with("_$") {
        rest = &rest[1..];
    }

    let mut demangled = String::new();

    while !rest.is_empty() {
        if rest.starts_with('.') {
            if let Some('.') = rest[1..].chars().next() {
                demangled.push_str("::");
                rest = &rest[2..];
            } else {
                demangled.push('.');
                rest = &rest[1..];
            }
        } else if rest.starts_with('$') {
            macro_rules! demangle {
                ($($pat:expr => $demangled:expr,)*) => ({
                    $(if rest.starts_with($pat) {
                        demangled.push_str($demangled);
                        rest = &rest[$pat.len()..];
                        } else)*
                    {
                        demangled.push_str(rest);
                        break;
                    }

                })
            }

            demangle! {
                "$SP$" => "@",
                "$BP$" => "*",
                "$RF$" => "&",
                "$LT$" => "<",
                "$GT$" => ">",
                "$LP$" => "(",
                "$RP$" => ")",
                "$C$" => ",",
                "$u7e$" => "~",
                "$u20$" => " ",
                "$u27$" => "'",
                "$u3d$" => "=",
                "$u5b$" => "[",
                "$u5d$" => "]",
                "$u7b$" => "{",
                "$u7d$" => "}",
                "$u3b$" => ";",
                "$u2b$" => "+",
                "$u21$" => "!",
                "$u22$" => "\"",
            }
        } else {
            let idx = match rest.char_indices().find(|&(_, c)| c == '$' || c == '.') {
                None => rest.len(),
                Some((i, _)) => i,
            };
            demangled.push_str(&rest[..idx]);
            rest = &rest[idx..];
        }
    }

    Cow::Owned(demangled)
}

#[cfg(test)]
pub(crate) mod testing {
    use std::collections::HashMap;
    use std::fmt;
    use std::fs::File;
    use std::io::Write;
    use std::io::{self, BufRead, Read};
    use std::path::{Path, PathBuf};
    use std::time::Instant;

    use libflate::gzip::Decoder;

    use super::*;
    use crate::collapse::Collapse;

    pub(crate) fn read_inputs<P>(inputs: &[P]) -> io::Result<HashMap<PathBuf, Vec<u8>>>
    where
        P: AsRef<Path>,
    {
        let mut map = HashMap::default();
        for path in inputs.iter() {
            let path = path.as_ref();
            let bytes = {
                let mut buf = Vec::new();
                let mut file = File::open(path)?;
                if path.to_str().unwrap().ends_with(".gz") {
                    let mut reader = Decoder::new(file)?;
                    reader.read_to_end(&mut buf)?;
                } else {
                    file.read_to_end(&mut buf)?;
                }
                buf
            };
            map.insert(path.to_path_buf(), bytes);
        }
        Ok(map)
    }

    pub(crate) fn test_collapse_multi<C, P>(folder: &mut C, inputs: &[P]) -> io::Result<()>
    where
        C: Collapse + CollapsePrivate,
        P: AsRef<Path>,
    {
        const MAX_THREADS: usize = 16;
        for (path, bytes) in read_inputs(inputs)? {
            folder.set_nthreads(1);
            let mut writer = Vec::new();
            <C as Collapse>::collapse(folder, &bytes[..], &mut writer)?;
            let expected = std::str::from_utf8(&writer[..]).unwrap();

            for n in 2..=MAX_THREADS {
                folder.set_nthreads(n);
                let mut writer = Vec::new();
                <C as Collapse>::collapse(folder, &bytes[..], &mut writer)?;
                let actual = std::str::from_utf8(&writer[..]).unwrap();

                assert_eq!(
                    actual,
                    expected,
                    "Collapsing with {} threads does not produce the same output as collapsing with 1 thread for {}",
                    n,
                    path.display()
                );
            }
        }

        Ok(())
    }

    pub(crate) fn bench_nstacks<C, P>(folder: &mut C, inputs: &[P]) -> io::Result<()>
    where
        C: CollapsePrivate,
        P: AsRef<Path>,
    {
        const MIN_LINES: usize = 2000;
        const NSAMPLES: usize = 100;
        const WARMUP_SECS: usize = 3;

        let _stdout = io::stdout();
        let _stderr = io::stdout();

        let mut stdout = _stdout.lock();
        let _stderr = _stderr.lock();

        struct Foo<'a> {
            default: usize,
            nlines: usize,
            nstacks: usize,
            path: &'a Path,
            results: HashMap<usize, u64>,
        }

        impl<'a> Foo<'a> {
            fn new<C>(
                folder: &mut C,
                path: &'a Path,
                bytes: &[u8],
                stdout: &mut io::StdoutLock,
            ) -> io::Result<Option<Self>>
            where
                C: CollapsePrivate,
            {
                let default = folder.nstacks_per_job();

                let (nlines, nstacks) = count_lines_and_stacks(bytes);
                if nlines < MIN_LINES {
                    return Ok(None);
                }

                let mut results = HashMap::default();
                let iter = vec![default]
                    .into_iter()
                    .chain(1..=10)
                    .chain((20..=nstacks).step_by(10));
                for nstacks_per_job in iter {
                    folder.set_nstacks_per_job(nstacks_per_job);
                    let mut durations = Vec::new();
                    for _ in 0..NSAMPLES {
                        let now = Instant::now();
                        folder.collapse(bytes, io::sink())?;
                        durations.push(now.elapsed().as_nanos());
                    }
                    let avg_duration =
                        (durations.iter().sum::<u128>() as f64 / durations.len() as f64) as u64;
                    results.insert(nstacks_per_job, avg_duration);
                    stdout.write_all(&[b'.'])?;
                    stdout.flush()?;
                }
                Ok(Some(Self {
                    default,
                    nlines,
                    nstacks,
                    path,
                    results,
                }))
            }
        }

        impl<'a> fmt::Display for Foo<'a> {
            fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
                writeln!(
                    f,
                    "{} (nstacks: {}, lines: {})",
                    self.path.display(),
                    self.nstacks,
                    self.nlines
                )?;
                let default_duration = self.results[&self.default];
                let mut results = self.results.iter().collect::<Vec<_>>();
                results.sort_by(|(_, d1), (_, d2)| (**d1).cmp(*d2));
                for (nstacks_per_job, duration) in results.iter().take(10) {
                    writeln!(
                        f,
                        "    nstacks_per_job: {:>4} (% of total: {:>3.0}%) | time: {:.0}% of default",
                        nstacks_per_job,
                        (**nstacks_per_job as f32 / self.nstacks as f32) * 100.0,
                        **duration as f64 / default_duration as f64 * 100.0,
                    )?;
                }
                writeln!(f)?;
                Ok(())
            }
        }

        fn count_lines_and_stacks(bytes: &[u8]) -> (usize, usize) {
            let mut reader = io::BufReader::new(bytes);
            let mut line = Vec::new();

            let (mut nlines, mut nstacks) = (0, 0);
            loop {
                line.clear();
                let n = reader.read_until(0x0A, &mut line).unwrap();
                if n == 0 {
                    nstacks += 1;
                    break;
                }
                let l = String::from_utf8_lossy(&line);
                nlines += 1;
                if l.trim().is_empty() {
                    nstacks += 1;
                }
            }
            (nlines, nstacks)
        }

        let inputs = read_inputs(inputs)?;

        // Warmup
        let now = Instant::now();
        stdout.write_fmt(format_args!(
            "# Warming up for approximately {} seconds.\n",
            WARMUP_SECS
        ))?;
        stdout.flush()?;
        while now.elapsed() < std::time::Duration::from_secs(WARMUP_SECS as u64) {
            for (_, bytes) in inputs.iter() {
                folder.collapse(&bytes[..], io::sink())?;
            }
        }

        // Time
        let mut foos = Vec::new();
        for (path, bytes) in &inputs {
            stdout.write_fmt(format_args!("# {} ", path.display()))?;
            stdout.flush()?;
            if let Some(foo) = Foo::new(folder, path, bytes, &mut stdout)? {
                foos.push(foo);
            }
            stdout.write_all(&[b'\n'])?;
            stdout.flush()?;
        }
        stdout.write_all(&[b'\n'])?;
        stdout.flush()?;
        foos.sort_by(|a, b| b.nstacks.cmp(&a.nstacks));
        for foo in foos {
            stdout.write_fmt(format_args!("{}", foo))?;
            stdout.flush()?;
        }

        Ok(())
    }
}

#[cfg(test)]
mod tests {
    macro_rules! t {
        ($a:expr, $b:expr) => {
            assert!(ok($a, $b))
        };
    }

    macro_rules! t_unchanged {
        ($a:expr) => {
            assert!(ok_unchanged($a))
        };
    }

    fn ok(sym: &str, expected: &str) -> bool {
        let result = super::fix_partially_demangled_rust_symbol(sym);
        if result == expected {
            true
        } else {
            println!("\n{}\n!=\n{}\n", result, expected);
            false
        }
    }

    fn ok_unchanged(sym: &str) -> bool {
        let result = super::fix_partially_demangled_rust_symbol(sym);
        if result == sym {
            true
        } else {
            println!("{} should have been unchanged, but got {}", sym, result);
            false
        }
    }

    #[test]
    fn fix_partially_demangled_rust_symbols() {
        t!(
            "std::sys::unix::fs::File::open::hb90e1c1c787080f0",
            "std::sys::unix::fs::File::open"
        );
        t!("_$LT$std..fs..ReadDir$u20$as$u20$core..iter..traits..iterator..Iterator$GT$::next::hc14f1750ca79129b", "<std::fs::ReadDir as core::iter::traits::iterator::Iterator>::next");
        t!("rg::search_parallel::_$u7b$$u7b$closure$u7d$$u7d$::_$u7b$$u7b$closure$u7d$$u7d$::h6e849b55a66fcd85", "rg::search_parallel::_{{closure}}::_{{closure}}");
        t!(
            "_$LT$F$u20$as$u20$alloc..boxed..FnBox$LT$A$GT$$GT$::call_box::h8612a2a83552fc2d",
            "<F as alloc::boxed::FnBox<A>>::call_box"
        );
        t!(
            "_$LT$$RF$std..fs..File$u20$as$u20$std..io..Read$GT$::read::h5d84059cf335c8e6",
            "<&std::fs::File as std::io::Read>::read"
        );
        t!(
            "_$LT$std..thread..JoinHandle$LT$T$GT$$GT$::join::hca6aa63e512626da",
            "<std::thread::JoinHandle<T>>::join"
        );
        t!(
            "std::sync::mpsc::shared::Packet$LT$T$GT$::recv::hfde2d9e28d13fd56",
            "std::sync::mpsc::shared::Packet<T>::recv"
        );
        t!("crossbeam_utils::thread::ScopedThreadBuilder::spawn::_$u7b$$u7b$closure$u7d$$u7d$::h8fdc7d4f74c0da05", "crossbeam_utils::thread::ScopedThreadBuilder::spawn::_{{closure}}");
    }

    #[test]
    fn fix_partially_demangled_rust_symbol_on_fully_mangled_symbols() {
        t_unchanged!("_ZN4testE");
        t_unchanged!("_ZN4test1a2bcE");
        t_unchanged!("_ZN7inferno10flamegraph5merge6frames17hacfe2d67301633c2E");
        t_unchanged!("_ZN3std2rt19lang_start_internal17h540c897fe52ba9c5E");
        t_unchanged!("_ZN116_$LT$core..str..pattern..CharSearcher$LT$$u27$a$GT$$u20$as$u20$core..str..pattern..ReverseSearcher$LT$$u27$a$GT$$GT$15next_match_back17h09d544049dd719bbE");
        t_unchanged!("_ZN3std5panic12catch_unwind17h0562757d03ff60b3E");
        t_unchanged!("_ZN3std9panicking3try17h9c1cbc5599e1efbfE");
    }

    #[test]
    fn fix_partially_demangled_rust_symbol_on_fully_demangled_symbols() {
        t_unchanged!("std::sys::unix::fs::File::open");
        t_unchanged!("<F as alloc::boxed::FnBox<A>>::call_box");
        t_unchanged!("<std::fs::ReadDir as core::iter::traits::iterator::Iterator>::next");
        t_unchanged!("<rg::search::SearchWorker<W>>::search_impl");
        t_unchanged!("<grep_searcher::searcher::glue::ReadByLine<'s, M, R, S>>::run");
        t_unchanged!("<alloc::raw_vec::RawVec<T, A>>::reserve_internal");
    }
}