rayon-core 1.9.1

Core APIs for Rayon
Documentation
//! Debug Logging
//!
//! To use in a debug build, set the env var `RAYON_LOG` as
//! described below.  In a release build, logs are compiled out by
//! default unless Rayon is built with `--cfg rayon_rs_log` (try
//! `RUSTFLAGS="--cfg rayon_rs_log"`).
//!
//! Note that logs are an internally debugging tool and their format
//! is considered unstable, as are the details of how to enable them.
//!
//! # Valid values for RAYON_LOG
//!
//! The `RAYON_LOG` variable can take on the following values:
//!
//! * `tail:<file>` -- dumps the last 10,000 events into the given file;
//!   useful for tracking down deadlocks
//! * `profile:<file>` -- dumps only those events needed to reconstruct how
//!   many workers are active at a given time
//! * `all:<file>` -- dumps every event to the file; useful for debugging

use crossbeam_channel::{self, Receiver, Sender};
use std::collections::VecDeque;
use std::env;
use std::fs::File;
use std::io::{self, BufWriter, Write};

/// True if logs are compiled in.
pub(super) const LOG_ENABLED: bool = cfg!(any(rayon_rs_log, debug_assertions));

#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
pub(super) enum Event {
    /// Flushes events to disk, used to terminate benchmarking.
    Flush,

    /// Indicates that a worker thread started execution.
    ThreadStart {
        worker: usize,
        terminate_addr: usize,
    },

    /// Indicates that a worker thread started execution.
    ThreadTerminate { worker: usize },

    /// Indicates that a worker thread became idle, blocked on `latch_addr`.
    ThreadIdle { worker: usize, latch_addr: usize },

    /// Indicates that an idle worker thread found work to do, after
    /// yield rounds. It should no longer be considered idle.
    ThreadFoundWork { worker: usize, yields: u32 },

    /// Indicates that a worker blocked on a latch observed that it was set.
    ///
    /// Internal debugging event that does not affect the state
    /// machine.
    ThreadSawLatchSet { worker: usize, latch_addr: usize },

    /// Indicates that an idle worker is getting sleepy. `sleepy_counter` is the internal
    /// sleep state that we saw at the time.
    ThreadSleepy { worker: usize, jobs_counter: usize },

    /// Indicates that the thread's attempt to fall asleep was
    /// interrupted because the latch was set. (This is not, in and of
    /// itself, a change to the thread state.)
    ThreadSleepInterruptedByLatch { worker: usize, latch_addr: usize },

    /// Indicates that the thread's attempt to fall asleep was
    /// interrupted because a job was posted. (This is not, in and of
    /// itself, a change to the thread state.)
    ThreadSleepInterruptedByJob { worker: usize },

    /// Indicates that an idle worker has gone to sleep.
    ThreadSleeping { worker: usize, latch_addr: usize },

    /// Indicates that a sleeping worker has awoken.
    ThreadAwoken { worker: usize, latch_addr: usize },

    /// Indicates that the given worker thread was notified it should
    /// awaken.
    ThreadNotify { worker: usize },

    /// The given worker has pushed a job to its local deque.
    JobPushed { worker: usize },

    /// The given worker has popped a job from its local deque.
    JobPopped { worker: usize },

    /// The given worker has stolen a job from the deque of another.
    JobStolen { worker: usize, victim: usize },

    /// N jobs were injected into the global queue.
    JobsInjected { count: usize },

    /// A job was removed from the global queue.
    JobUninjected { worker: usize },

    /// When announcing a job, this was the value of the counters we observed.
    ///
    /// No effect on thread state, just a debugging event.
    JobThreadCounts {
        worker: usize,
        num_idle: u16,
        num_sleepers: u16,
    },
}

/// Handle to the logging thread, if any. You can use this to deliver
/// logs. You can also clone it freely.
#[derive(Clone)]
pub(super) struct Logger {
    sender: Option<Sender<Event>>,
}

impl Logger {
    pub(super) fn new(num_workers: usize) -> Logger {
        if !LOG_ENABLED {
            return Self::disabled();
        }

        // see the doc comment for the format
        let env_log = match env::var("RAYON_LOG") {
            Ok(s) => s,
            Err(_) => return Self::disabled(),
        };

        let (sender, receiver) = crossbeam_channel::unbounded();

        if env_log.starts_with("tail:") {
            let filename = env_log["tail:".len()..].to_string();
            ::std::thread::spawn(move || {
                Self::tail_logger_thread(num_workers, filename, 10_000, receiver)
            });
        } else if env_log == "all" {
            ::std::thread::spawn(move || Self::all_logger_thread(num_workers, receiver));
        } else if env_log.starts_with("profile:") {
            let filename = env_log["profile:".len()..].to_string();
            ::std::thread::spawn(move || {
                Self::profile_logger_thread(num_workers, filename, 10_000, receiver)
            });
        } else {
            panic!("RAYON_LOG should be 'tail:<file>' or 'profile:<file>'");
        }

        return Logger {
            sender: Some(sender),
        };
    }

    fn disabled() -> Logger {
        Logger { sender: None }
    }

    #[inline]
    pub(super) fn log(&self, event: impl FnOnce() -> Event) {
        if !LOG_ENABLED {
            return;
        }

        if let Some(sender) = &self.sender {
            sender.send(event()).unwrap();
        }
    }

    fn profile_logger_thread(
        num_workers: usize,
        log_filename: String,
        capacity: usize,
        receiver: Receiver<Event>,
    ) {
        let file = File::create(&log_filename)
            .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));

        let mut writer = BufWriter::new(file);
        let mut events = Vec::with_capacity(capacity);
        let mut state = SimulatorState::new(num_workers);
        let timeout = std::time::Duration::from_secs(30);

        loop {
            loop {
                match receiver.recv_timeout(timeout) {
                    Ok(event) => {
                        if let Event::Flush = event {
                            break;
                        } else {
                            events.push(event);
                        }
                    }

                    Err(_) => break,
                }

                if events.len() == capacity {
                    break;
                }
            }

            for event in events.drain(..) {
                if state.simulate(&event) {
                    state.dump(&mut writer, &event).unwrap();
                }
            }

            writer.flush().unwrap();
        }
    }

    fn tail_logger_thread(
        num_workers: usize,
        log_filename: String,
        capacity: usize,
        receiver: Receiver<Event>,
    ) {
        let file = File::create(&log_filename)
            .unwrap_or_else(|err| panic!("failed to open `{}`: {}", log_filename, err));

        let mut writer = BufWriter::new(file);
        let mut events: VecDeque<Event> = VecDeque::with_capacity(capacity);
        let mut state = SimulatorState::new(num_workers);
        let timeout = std::time::Duration::from_secs(30);
        let mut skipped = false;

        loop {
            loop {
                match receiver.recv_timeout(timeout) {
                    Ok(event) => {
                        if let Event::Flush = event {
                            // We ignore Flush events in tail mode --
                            // we're really just looking for
                            // deadlocks.
                            continue;
                        } else {
                            if events.len() == capacity {
                                let event = events.pop_front().unwrap();
                                state.simulate(&event);
                                skipped = true;
                            }

                            events.push_back(event);
                        }
                    }

                    Err(_) => break,
                }
            }

            if skipped {
                write!(writer, "...\n").unwrap();
                skipped = false;
            }

            for event in events.drain(..) {
                // In tail mode, we dump *all* events out, whether or
                // not they were 'interesting' to the state machine.
                state.simulate(&event);
                state.dump(&mut writer, &event).unwrap();
            }

            writer.flush().unwrap();
        }
    }

    fn all_logger_thread(num_workers: usize, receiver: Receiver<Event>) {
        let stderr = std::io::stderr();
        let mut state = SimulatorState::new(num_workers);

        for event in receiver {
            let mut writer = BufWriter::new(stderr.lock());
            state.simulate(&event);
            state.dump(&mut writer, &event).unwrap();
            writer.flush().unwrap();
        }
    }
}

#[derive(Copy, Clone, PartialOrd, Ord, PartialEq, Eq, Debug)]
enum State {
    Working,
    Idle,
    Notified,
    Sleeping,
    Terminated,
}

impl State {
    fn letter(&self) -> char {
        match self {
            State::Working => 'W',
            State::Idle => 'I',
            State::Notified => 'N',
            State::Sleeping => 'S',
            State::Terminated => 'T',
        }
    }
}

struct SimulatorState {
    local_queue_size: Vec<usize>,
    thread_states: Vec<State>,
    injector_size: usize,
}

impl SimulatorState {
    fn new(num_workers: usize) -> Self {
        Self {
            local_queue_size: (0..num_workers).map(|_| 0).collect(),
            thread_states: (0..num_workers).map(|_| State::Working).collect(),
            injector_size: 0,
        }
    }

    fn simulate(&mut self, event: &Event) -> bool {
        match *event {
            Event::ThreadIdle { worker, .. } => {
                assert_eq!(self.thread_states[worker], State::Working);
                self.thread_states[worker] = State::Idle;
                true
            }

            Event::ThreadStart { worker, .. } | Event::ThreadFoundWork { worker, .. } => {
                self.thread_states[worker] = State::Working;
                true
            }

            Event::ThreadTerminate { worker, .. } => {
                self.thread_states[worker] = State::Terminated;
                true
            }

            Event::ThreadSleeping { worker, .. } => {
                assert_eq!(self.thread_states[worker], State::Idle);
                self.thread_states[worker] = State::Sleeping;
                true
            }

            Event::ThreadAwoken { worker, .. } => {
                assert_eq!(self.thread_states[worker], State::Notified);
                self.thread_states[worker] = State::Idle;
                true
            }

            Event::JobPushed { worker } => {
                self.local_queue_size[worker] += 1;
                true
            }

            Event::JobPopped { worker } => {
                self.local_queue_size[worker] -= 1;
                true
            }

            Event::JobStolen { victim, .. } => {
                self.local_queue_size[victim] -= 1;
                true
            }

            Event::JobsInjected { count } => {
                self.injector_size += count;
                true
            }

            Event::JobUninjected { .. } => {
                self.injector_size -= 1;
                true
            }

            Event::ThreadNotify { worker } => {
                // Currently, this log event occurs while holding the
                // thread lock, so we should *always* see it before
                // the worker awakens.
                assert_eq!(self.thread_states[worker], State::Sleeping);
                self.thread_states[worker] = State::Notified;
                true
            }

            // remaining events are no-ops from pov of simulating the
            // thread state
            _ => false,
        }
    }

    fn dump(&mut self, w: &mut impl Write, event: &Event) -> io::Result<()> {
        let num_idle_threads = self
            .thread_states
            .iter()
            .filter(|s| **s == State::Idle)
            .count();

        let num_sleeping_threads = self
            .thread_states
            .iter()
            .filter(|s| **s == State::Sleeping)
            .count();

        let num_notified_threads = self
            .thread_states
            .iter()
            .filter(|s| **s == State::Notified)
            .count();

        let num_pending_jobs: usize = self.local_queue_size.iter().sum();

        write!(w, "{:2},", num_idle_threads)?;
        write!(w, "{:2},", num_sleeping_threads)?;
        write!(w, "{:2},", num_notified_threads)?;
        write!(w, "{:4},", num_pending_jobs)?;
        write!(w, "{:4},", self.injector_size)?;

        let event_str = format!("{:?}", event);
        write!(w, r#""{:60}","#, event_str)?;

        for ((i, state), queue_size) in (0..).zip(&self.thread_states).zip(&self.local_queue_size) {
            write!(w, " T{:02},{}", i, state.letter(),)?;

            if *queue_size > 0 {
                write!(w, ",{:03},", queue_size)?;
            } else {
                write!(w, ",   ,")?;
            }
        }

        write!(w, "\n")?;
        Ok(())
    }
}