#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum VcpuOutcome {
Continue,
Canceled,
Stop,
SystemOff,
}
pub struct PauseBarrier<S> {
pause: std::sync::atomic::AtomicBool,
inner: std::sync::Mutex<PauseState<S>>,
cv: std::sync::Condvar,
}
struct PauseState<S> {
parked: usize,
gen: u64,
states: Vec<(usize, S)>,
}
impl<S> Default for PauseBarrier<S> {
fn default() -> Self {
Self::new()
}
}
impl<S> PauseBarrier<S> {
pub fn new() -> Self {
PauseBarrier {
pause: std::sync::atomic::AtomicBool::new(false),
inner: std::sync::Mutex::new(PauseState {
parked: 0,
gen: 0,
states: Vec::new(),
}),
cv: std::sync::Condvar::new(),
}
}
#[inline]
pub fn request_pause(&self) {
self.pause.store(true, std::sync::atomic::Ordering::SeqCst);
}
#[inline]
pub fn is_paused(&self) -> bool {
self.pause.load(std::sync::atomic::Ordering::SeqCst)
}
pub fn park(&self, vcpu_index: usize, state: S) {
let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
g.states.push((vcpu_index, state));
g.parked += 1;
let my_gen = g.gen;
self.cv.notify_all();
while self.pause.load(std::sync::atomic::Ordering::SeqCst) && g.gen == my_gen {
g = self.cv.wait(g).unwrap_or_else(|e| e.into_inner());
}
}
pub fn wait_all_parked(&self, ncpus: usize) -> Vec<S> {
let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
while g.parked < ncpus {
g = self.cv.wait(g).unwrap_or_else(|e| e.into_inner());
}
let mut s = std::mem::take(&mut g.states);
s.sort_by_key(|(i, _)| *i);
s.into_iter().map(|(_, st)| st).collect()
}
pub fn resume(&self) {
let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
self.pause.store(false, std::sync::atomic::Ordering::SeqCst);
g.parked = 0;
g.states.clear();
g.gen = g.gen.wrapping_add(1);
self.cv.notify_all();
}
pub fn park_cancelable(
&self,
vcpu_index: usize,
state: S,
cancel: &std::sync::atomic::AtomicBool,
) {
let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
if !self.pause.load(std::sync::atomic::Ordering::SeqCst) {
return;
}
g.states.push((vcpu_index, state));
g.parked += 1;
let my_gen = g.gen;
self.cv.notify_all();
while self.pause.load(std::sync::atomic::Ordering::SeqCst)
&& g.gen == my_gen
&& !cancel.load(std::sync::atomic::Ordering::SeqCst)
{
let (ng, _to) = self
.cv
.wait_timeout(g, std::time::Duration::from_millis(50))
.unwrap_or_else(|e| e.into_inner());
g = ng;
}
}
pub fn wait_parked_until(
&self,
target: usize,
deadline: std::time::Instant,
cancel: &std::sync::atomic::AtomicBool,
) -> Vec<(usize, S)> {
let mut g = self.inner.lock().unwrap_or_else(|e| e.into_inner());
while g.parked < target && !cancel.load(std::sync::atomic::Ordering::SeqCst) {
let now = std::time::Instant::now();
if now >= deadline {
break;
}
let wait = (deadline - now).min(std::time::Duration::from_millis(50));
let (ng, _to) = self
.cv
.wait_timeout(g, wait)
.unwrap_or_else(|e| e.into_inner());
g = ng;
}
let mut s = std::mem::take(&mut g.states);
s.sort_by_key(|(i, _)| *i);
s
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn outcome_is_copy_and_eq() {
let a = VcpuOutcome::Continue;
let b = a;
assert_eq!(a, b);
assert_ne!(VcpuOutcome::Stop, VcpuOutcome::SystemOff);
assert_ne!(VcpuOutcome::Canceled, VcpuOutcome::Continue);
}
#[test]
fn pause_barrier_round_trips_many_generations() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
const NCPUS: usize = 4;
const GENS: u64 = 50;
let barrier: Arc<PauseBarrier<usize>> = Arc::new(PauseBarrier::new());
let done = Arc::new(AtomicBool::new(false));
let mut handles = Vec::new();
for idx in 0..NCPUS {
let b = Arc::clone(&barrier);
let done = Arc::clone(&done);
handles.push(std::thread::spawn(move || {
loop {
while !b.is_paused() {
if done.load(Ordering::SeqCst) {
return;
}
std::thread::yield_now();
}
b.park(idx, idx);
}
}));
}
for gen in 0..GENS {
barrier.request_pause();
let states = barrier.wait_all_parked(NCPUS);
assert_eq!(
states,
(0..NCPUS).collect::<Vec<_>>(),
"generation {gen}: every vCPU parked exactly once, sorted by index"
);
barrier.resume();
}
done.store(true, Ordering::SeqCst);
barrier.resume();
for h in handles {
h.join().expect("vcpu thread");
}
}
#[test]
fn park_cancelable_timeout_late_park_and_cancel() {
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
let barrier: Arc<PauseBarrier<usize>> = Arc::new(PauseBarrier::new());
let cancel = Arc::new(AtomicBool::new(false));
barrier.request_pause();
let one_parked = {
let b = Arc::clone(&barrier);
let c = Arc::clone(&cancel);
std::thread::spawn(move || b.park_cancelable(0, 0, &c))
};
let states =
barrier.wait_parked_until(2, Instant::now() + Duration::from_millis(300), &cancel);
assert!(
states.len() <= 1,
"partial under timeout, got {}",
states.len()
);
barrier.resume(); one_parked.join().unwrap();
barrier.park_cancelable(1, 999, &cancel); barrier.request_pause();
let s2 = barrier.wait_parked_until(1, Instant::now() + Duration::from_millis(60), &cancel);
assert!(
s2.is_empty(),
"straggler 999 leaked into next cycle: {s2:?}"
);
barrier.resume();
let cancel2 = Arc::new(AtomicBool::new(false));
barrier.request_pause();
let parked = {
let b = Arc::clone(&barrier);
let c = Arc::clone(&cancel2);
std::thread::spawn(move || b.park_cancelable(0, 0, &c))
};
let _ = barrier.wait_parked_until(1, Instant::now() + Duration::from_millis(500), &cancel2);
cancel2.store(true, Ordering::SeqCst);
parked.join().expect("cancel must release the parked vCPU");
barrier.resume(); }
}