housekeeping 0.0.3

A concurrent memory reclaimer for periodic cleanups.
Documentation
//! Testing QSBR by counting phases.
//!
//! This module implements tests for [`housekeeping::qsbr`]. In its tests, QSBR
//! schedules are created and registered against; QSBR phases are assigned
//! monotonically increasing "positions", and threads verify each others'
//! positions periodically. This is a good way of testing that all threads make
//! the appropriate amount of progress.
//!
//! Tests are provided for Loom, MIRI, and hardware.

use std::{
    fmt, mem,
    ops::RangeBounds,
    sync::{Arc, atomic::Ordering::Relaxed},
};

#[cfg(feature = "loom")]
use loom::sync::atomic::AtomicUsize;

#[cfg(not(feature = "loom"))]
use std::sync::atomic::AtomicUsize;

use housekeeping::qsbr;

//----------- Top-level Tests --------------------------------------------------

/// Test a small portion of the search space exhaustively under Loom.
#[cfg(feature = "loom")]
#[test]
fn small_exhaustive() {
    let runs = RunCounter::default();
    loom::model(move || {
        runs.start();

        let global = Arc::new(Global::new(Config {
            num_threads: 3,
            num_refreshes: 4,
            num_reregistrations: 1,
        }));

        for index in 0..global.config.num_threads {
            let global = global.clone();
            loom::thread::Builder::new()
                .name(format!("t{index}"))
                .spawn(move || Thread::new(index, global).run())
                .unwrap();
        }
    });
}

/// Test the search space randomly under MIRI.
#[cfg(all(miri, not(feature = "loom")))]
#[test]
fn random() {
    let global = Arc::new(Global::new(Config {
        num_threads: 3,
        num_refreshes: 64,
        num_reregistrations: 6,
    }));

    std::thread::scope(|s| {
        for index in 0..global.config.num_threads {
            let global = global.clone();
            std::thread::Builder::new()
                .name(format!("t{index}"))
                .spawn_scoped(s, move || Thread::new(index, global).run())
                .unwrap();
        }
    });
}

/// Stress-test hardware.
#[cfg(all(not(miri), not(feature = "loom")))]
#[test]
fn stress() {
    let global = Arc::new(Global::new(Config {
        num_threads: std::thread::available_parallelism().unwrap().get(),
        num_refreshes: 1048576,
        num_reregistrations: 16384,
    }));

    std::thread::scope(|s| {
        for index in 0..global.config.num_threads {
            let global = global.clone();
            std::thread::Builder::new()
                .name(format!("t{index}"))
                .spawn_scoped(s, move || Thread::new(index, global).run())
                .unwrap();
        }
    });
}

//----------- Infrastructure ---------------------------------------------------

/// Test configuration.
struct Config {
    /// The number of threads.
    num_threads: usize,

    /// The number of refreshes to perform on each thread.
    num_refreshes: usize,

    /// The number of re-registrations to perform on each thread.
    num_reregistrations: usize,
}

/// Global state for testing.
struct Global {
    /// Configuration.
    config: Config,

    /// The QSBR schedule.
    schedule: qsbr::Schedule,

    /// Phase position variables.
    ///
    /// Every thread has a corresponding phase position variable, which it
    /// updates when it changes phase. These writes are stand-ins for the
    /// arbitrary writes that happen during phases. Other threads will verify
    /// that they can observe the right writes, and thus that the QSBR schedule
    /// is maintaining its progress invariant.
    positions: Vec<AtomicUsize>,

    /// The base phase position.
    ///
    /// At any time, all threads use the same phase position or two adjacent
    /// phase positions. The QSBR schedule reports the phase position modulo 3,
    /// but some way is needed to recover the full (non-modulo) position. This
    /// variable tracks this "base" across all threads.
    ///
    /// Specifically, this variable holds the position of the last closed phase.
    /// Every registered thread is one or two phases ahead of this; thus, given
    /// this base and the position modulo 3, its full position can be recovered.
    base: AtomicUsize,
}

impl Global {
    /// Construct a new [`Global`].
    fn new(config: Config) -> Self {
        Self {
            schedule: qsbr::Schedule::new(),
            positions: (0..config.num_threads)
                .map(|_| AtomicUsize::new(usize::MAX))
                .collect(),
            base: AtomicUsize::new(2),
            config,
        }
    }
}

/// A thread's testing state.
struct Thread {
    /// The index of the thread.
    index: usize,

    /// The global state.
    global: Arc<Global>,

    /// The QSBR registration.
    user: Option<qsbr::User>,

    /// The phase position.
    pos: usize,
}

impl Thread {
    /// Construct a new [`Thread`].
    fn new(index: usize, global: Arc<Global>) -> Self {
        Self {
            index,
            global,
            user: None,
            pos: usize::MAX,
        }
    }

    /// Run this thread.
    fn run(mut self) {
        self.join();

        let config = &self.global.config;
        let num_iters = config.num_refreshes + config.num_reregistrations;
        let rereg_freq = config.num_reregistrations.div_ceil(num_iters);

        for iter in 0..num_iters {
            if iter.is_multiple_of(rereg_freq) {
                // Re-register the thread against the QSBR schedule.
                self.leave();
                self.join();
                continue;
            }

            // Refresh and progress the QSBR schedule.
            let mut user = self.user.take().unwrap();
            match user.progress(&self.global.schedule) {
                Some(leaving) => {
                    self.pos += 1;

                    // We are moving from 'pos - 1' to 'pos', and are currently
                    // registered against both; all threads must fall in the
                    // same range.
                    self.check_rest_positions(self.pos - 1..=self.pos);

                    if let Some(_last_leaving) = leaving.leave_last() {
                        // We are the last thread leaving 'pos - 1'. All other
                        // threads must be on 'pos' specifically.
                        self.check_rest_positions(self.pos..=self.pos);

                        self.global.base.store(self.pos - 1, Relaxed);
                    }
                }
                None => {
                    // We cannot place a strong bound on the phase positions of
                    // othre threads. We are at 'pos', so all threads must be on
                    // a consecutive phase.
                    self.check_rest_positions(self.pos.saturating_sub(1)..=self.pos + 1);
                }
            }
            assert_eq!(user.index(), self.pos % 3);
            self.user = Some(user);
        }

        self.leave();
    }

    /// Join the QSBR schedule.
    fn join(&mut self) {
        assert!(self.user.is_none() && self.pos == usize::MAX);
        let user = self.global.schedule.register();
        let index = user.index();
        let base = self.global.base.load(Relaxed);
        let pos = base + 3 - (base + 3 - index) % 3;
        self.global.positions[self.index].store(pos, Relaxed);
        self.user = Some(user);
        self.pos = pos;
    }

    /// Leave the QSBR schedule.
    fn leave(&mut self) {
        let user = self.user.take().unwrap();
        let pos = mem::replace(&mut self.pos, usize::MAX);
        self.global.positions[self.index].store(usize::MAX, Relaxed);
        let leaving = user.deregister(&self.global.schedule);
        self.check_rest_positions(pos.saturating_sub(1)..=pos + 1);
        if let Some(_last_leaving) = leaving.leave_last() {
            self.check_rest_positions(pos + 1..=pos + 1);
            self.global.base.store(pos, Relaxed);
        }
    }

    /// Verify that all *other* phase positions match a certain range.
    fn check_rest_positions<R>(&self, range: R)
    where
        R: RangeBounds<usize> + fmt::Debug,
    {
        for (index, position) in self.global.positions.iter().enumerate() {
            if index == self.index {
                // Ignore the current thread.
                continue;
            }

            // Load the position.
            let position = position.load(Relaxed);

            if position == usize::MAX {
                // The position is unset, ignore.
                continue;
            }

            assert!(
                range.contains(&position),
                "{position:?} (of [t{index}]) did not fit {range:?}"
            );
        }
    }
}

/// A run counter.
#[cfg(feature = "loom")]
#[derive(Default)]
struct RunCounter {
    /// The number of started runs.
    started: std::sync::atomic::AtomicUsize,
}

#[cfg(feature = "loom")]
impl RunCounter {
    /// Count a newly started run.
    fn start(&self) {
        use std::io::Write;

        let run = self.started.fetch_add(1, Relaxed);
        if run.is_multiple_of(4096) {
            print!("\rRun {run}");
            let _ = std::io::stdout().flush();
        }
    }
}

#[cfg(feature = "loom")]
impl Drop for RunCounter {
    fn drop(&mut self) {
        println!("\rFinished after {} runs", *self.started.get_mut());
    }
}