#![warn(
missing_docs,
rust_2018_idioms,
missing_debug_implementations,
intra_doc_link_resolution_failure,
clippy::all
)]
mod pausability_state;
mod pausable_instant;
mod pause_state;
mod resumability_state;
mod unpausable_task_guard;
mod unresumable_task_guard;
use pausability_state::{
PausabilityState, PausabilityStateTrait, PAUSING_REQUESTED_MASK,
};
pub use pausable_instant::PausableInstant;
use pause_state::{PauseState, PauseStateTrait};
use resumability_state::{
ResumabilityState, ResumabilityStateTrait, RESUMING_REQUESTED_MASK,
};
use std::time::{Duration, Instant};
use unpausable_task_guard::UnpausableTaskGuard;
use unresumable_task_guard::UnresumableTaskGuard;
#[cfg(loom)]
use loom::sync::atomic::{compiler_fence, AtomicU32, AtomicU64, Ordering};
#[cfg(loom)]
use loom::sync::{Condvar, Mutex, MutexGuard};
#[cfg(not(loom))]
use std::sync::atomic::{compiler_fence, AtomicU32, AtomicU64, Ordering};
#[cfg(not(loom))]
use std::sync::{Condvar, Mutex, MutexGuard};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum CoursePauseState {
Paused,
Pausing,
Resumed,
Resuming,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum CoursePausabilityState {
Unused,
Pausing,
Released,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
enum CourseResumabilityState {
Unused,
Resuming,
Released,
}
#[derive(Debug)]
pub struct PausableClock {
zero_instant: Instant,
pause_state: AtomicU64,
pause_state_lock: Mutex<CoursePauseState>,
pause_state_condition: Condvar,
pausability_state: AtomicU32,
pausability_lock: Mutex<CoursePausabilityState>,
pausability_condition: Condvar,
resumability_state: AtomicU32,
resumability_lock: Mutex<CourseResumabilityState>,
resumability_condition: Condvar,
}
impl Default for PausableClock {
fn default() -> Self {
PausableClock::new(Default::default(), false)
}
}
impl PausableClock {
pub fn new(elapsed_time: Duration, paused: bool) -> PausableClock {
let now = Instant::now();
let zero_instant = now - elapsed_time;
let current_state = PauseState::new(
true,
false,
true,
false,
elapsed_time.as_millis() as u64,
);
let result = PausableClock {
zero_instant,
pause_state: AtomicU64::new(current_state),
pause_state_lock: Mutex::new(CoursePauseState::Paused),
pause_state_condition: Condvar::default(),
pausability_state: Default::default(),
pausability_lock: Mutex::new(CoursePausabilityState::Unused),
pausability_condition: Default::default(),
resumability_state: Default::default(),
resumability_lock: Mutex::new(CourseResumabilityState::Unused),
resumability_condition: Default::default(),
};
if !paused {
result.resume();
}
result
}
pub fn now_std(&self) -> Instant {
self.now().into()
}
pub fn now(&self) -> PausableInstant {
self.now_impl().0
}
pub fn pause(&self) -> bool {
let mut paused_guard = self
.pause_state_lock
.lock()
.expect("Failed to get pause lock");
match *paused_guard {
CoursePauseState::Paused => return false,
CoursePauseState::Pausing => {
panic!("Inconsistent pause state");
}
_ => {}
}
*paused_guard = CoursePauseState::Pausing;
{
let mut pausability_guard = self
.pausability_lock
.lock()
.expect("Failed to get pause guard lock");
if *pausability_guard != CoursePausabilityState::Unused {
panic!("Inconsistent pausable state");
}
*pausability_guard = CoursePausabilityState::Pausing;
}
let starting_state = self.current_state(Ordering::SeqCst);
let pausing = starting_state.with_pausing_flag();
self.set_state(pausing);
let pausability_state = self.set_pausing_flag_on_guard_state();
if pausability_state.get_unpausable_task_count() > 0 {
self.wait_for_unpausable_tasks_to_clear();
}
let (freeze_instant, real_time_at_freeze) = self.now_impl();
self.set_state(PauseState::new(
false,
true,
true,
false,
freeze_instant.elapsed_millis,
));
let fake_resume_millis = self.millis_since_zero(real_time_at_freeze)
- freeze_instant.elapsed_millis;
let frozen_millis =
self.zero_instant.elapsed().as_millis() as u64 - fake_resume_millis;
let paused = PauseState::new(true, false, true, false, frozen_millis);
*paused_guard = CoursePauseState::Paused;
self.set_state(paused);
self.unset_pausing_flag_on_guard_state();
{
let mut unpausable_task_guard_lock = self
.pausability_lock
.lock()
.expect("Failed to get pause guard lock");
*unpausable_task_guard_lock = CoursePausabilityState::Unused;
}
self.pause_state_condition.notify_all();
true
}
fn wait_for_unpausable_tasks_to_clear(&self) {
let unpausable_task_guard_lock = self
.pausability_lock
.lock()
.expect("Failed to get pause guard lock");
let _lock = self
.pausability_condition
.wait_while(unpausable_task_guard_lock, |s| {
*s != CoursePausabilityState::Released
})
.expect("Expected valid return from pausability lock");
}
fn wait_for_unresumable_tasks_to_clear(&self) {
let unresumable_task_guard_lock = self
.resumability_lock
.lock()
.expect("Failed to get resume guard lock");
let _lock = self
.resumability_condition
.wait_while(unresumable_task_guard_lock, |s| {
*s != CourseResumabilityState::Released
})
.expect("Expected valid return from resumability lock");
}
pub fn resume(&self) -> bool {
let mut resumed_guard = self
.pause_state_lock
.lock()
.expect("Failed to get pause lock");
match *resumed_guard {
CoursePauseState::Resumed => return false,
CoursePauseState::Resuming => {
panic!("Inconsistent pause state");
}
_ => {}
}
*resumed_guard = CoursePauseState::Resuming;
{
let mut resumability_guard = self
.resumability_lock
.lock()
.expect("Failed to get resume guard lock");
if *resumability_guard != CourseResumabilityState::Unused {
panic!("Inconsistent pausable state");
}
*resumability_guard = CourseResumabilityState::Resuming;
}
let starting_state = self.current_state(Ordering::SeqCst);
let resuming = starting_state.with_pausing_flag();
self.set_state(resuming);
let resumability_state = self.set_resuming_flag_on_guard_state();
if resumability_state.get_unresumable_task_count() > 0 {
self.wait_for_unresumable_tasks_to_clear();
}
let paused_millis = starting_state.get_millis();
let stored_millis =
self.zero_instant.elapsed().as_millis() as u64 - paused_millis;
let resumed_state =
PauseState::new(false, false, false, false, stored_millis);
*resumed_guard = CoursePauseState::Resumed;
self.set_state(resumed_state);
self.unset_resuming_flag_on_guard_state();
{
let mut unresumable_task_guard_lock = self
.resumability_lock
.lock()
.expect("Failed to get resume guard lock");
*unresumable_task_guard_lock = CourseResumabilityState::Unused;
}
self.pause_state_condition.notify_all();
true
}
pub fn is_paused(&self) -> bool {
self.is_paused_ordered(Ordering::Relaxed)
}
pub fn is_pausing(&self) -> bool {
self.is_pausing_ordered(Ordering::Relaxed)
}
pub fn is_paused_or_pausing(&self) -> bool {
self.is_paused_or_pausing_ordered(Ordering::Relaxed)
}
pub fn wait_for_resume(&self) {
let _guard = self.wait_for_resume_impl();
}
fn wait_for_resume_impl(&self) -> Option<MutexGuard<'_, CoursePauseState>> {
if !self.is_paused_or_pausing_ordered(Ordering::Acquire) {
return None;
}
let guard = self
.pause_state_lock
.lock()
.expect("Failed to get pause-state lock");
let guard = self
.pause_state_condition
.wait_while(guard, |p| *p != CoursePauseState::Resumed)
.expect("Failed to reacquire lock on pause state after resume");
Some(guard)
}
fn wait_for_pause_impl(&self) -> Option<MutexGuard<'_, CoursePauseState>> {
if !self.is_resumed_or_resuming_ordered(Ordering::Acquire) {
return None;
}
let guard = self
.pause_state_lock
.lock()
.expect("Failed to get pause-state lock");
let guard = self
.pause_state_condition
.wait_while(guard, |p| *p != CoursePauseState::Paused)
.expect("Failed to reacquire lock on pause state after pause");
Some(guard)
}
pub fn run_unpausable<F, T>(&self, action: F) -> T
where
F: FnOnce() -> T,
{
self.run_paused_blocking_task(true, action).unwrap()
}
pub fn run_if_resumed<F, T>(&self, action: F) -> Option<T>
where
F: FnOnce() -> T,
{
self.run_paused_blocking_task(false, action)
}
fn run_paused_blocking_task<F, T>(
&self,
wait_if_paused: bool,
action: F,
) -> Option<T>
where
F: FnOnce() -> T,
{
let guard_opt = match UnpausableTaskGuard::try_lock(self) {
Ok(guard) => {
let pause_state = self.current_state(Ordering::Acquire);
if pause_state.is_paused() {
None
} else if pause_state.is_pausing() {
self.set_pausing_flag_on_guard_state();
None
} else {
Some(guard)
}
}
_ => None,
};
if let Some(_guard) = guard_opt {
Some(action())
} else if !wait_if_paused {
None
} else {
let mut guard_opt = self.wait_for_resume_impl();
if guard_opt.is_some() {
let _unpausable_task_guard = {
let _active_guard = guard_opt.take();
UnpausableTaskGuard::try_lock(self)
};
Some(action())
} else {
self.run_paused_blocking_task(wait_if_paused, action)
}
}
}
pub fn run_unresumable<F, T>(&self, action: F) -> T
where
F: FnOnce() -> T,
{
self.run_resume_blocking_task(true, action).unwrap()
}
pub fn run_if_paused<F, T>(&self, action: F) -> Option<T>
where
F: FnOnce() -> T,
{
self.run_resume_blocking_task(false, action)
}
fn run_resume_blocking_task<F, T>(
&self,
wait_if_resumed: bool,
action: F,
) -> Option<T>
where
F: FnOnce() -> T,
{
let guard_opt = match UnresumableTaskGuard::try_lock(self) {
Ok(guard) => {
let pause_state = self.current_state(Ordering::Acquire);
if pause_state.is_resumed() {
None
} else if pause_state.is_resuming() {
self.set_resuming_flag_on_guard_state();
None
} else {
Some(guard)
}
}
_ => None,
};
if let Some(_guard) = guard_opt {
Some(action())
} else if !wait_if_resumed {
None
} else {
let mut guard_opt = self.wait_for_pause_impl();
if guard_opt.is_some() {
let _unresumable_task_guard = {
let _active_guard = guard_opt.take();
UnpausableTaskGuard::try_lock(self)
};
Some(action())
} else {
self.run_paused_blocking_task(wait_if_resumed, action)
}
}
}
fn current_state(&self, ordering: Ordering) -> PauseState {
self.pause_state.load(ordering)
}
fn now_impl(&self) -> (PausableInstant, Instant) {
let now = Instant::now();
compiler_fence(Ordering::SeqCst);
let state = self.current_state(Ordering::Relaxed);
if state.is_time_paused() {
(
PausableInstant::new(self.zero_instant, state.get_millis()),
now,
)
} else {
(
PausableInstant::new(
self.zero_instant,
(now - self.zero_instant).as_millis() as u64
- state.get_millis(),
),
now,
)
}
}
fn set_resuming_flag_on_guard_state(&self) -> ResumabilityState {
self.resumability_state
.fetch_or(RESUMING_REQUESTED_MASK, Ordering::AcqRel)
| RESUMING_REQUESTED_MASK
}
fn unset_resuming_flag_on_guard_state(&self) -> ResumabilityState {
self.resumability_state
.fetch_and(!RESUMING_REQUESTED_MASK, Ordering::AcqRel)
& (!RESUMING_REQUESTED_MASK)
}
fn set_pausing_flag_on_guard_state(&self) -> PausabilityState {
self.pausability_state
.fetch_or(PAUSING_REQUESTED_MASK, Ordering::AcqRel)
| PAUSING_REQUESTED_MASK
}
fn unset_pausing_flag_on_guard_state(&self) -> PausabilityState {
self.pausability_state
.fetch_and(!PAUSING_REQUESTED_MASK, Ordering::AcqRel)
& (!PAUSING_REQUESTED_MASK)
}
fn set_state(&self, new_state: PauseState) {
self.pause_state.store(new_state, Ordering::SeqCst)
}
pub fn is_paused_ordered(&self, ordering: Ordering) -> bool {
self.current_state(ordering).is_paused()
}
pub fn is_pausing_ordered(&self, ordering: Ordering) -> bool {
self.current_state(ordering).is_pausing()
}
pub fn is_paused_or_pausing_ordered(&self, ordering: Ordering) -> bool {
self.current_state(ordering).is_paused_or_pausing()
}
pub fn is_resumed_or_resuming_ordered(&self, ordering: Ordering) -> bool {
self.current_state(ordering).is_resumed_or_resuming()
}
fn millis_since_zero(&self, instant: Instant) -> u64 {
(instant - self.zero_instant).as_millis() as u64
}
pub(crate) fn increment_unresumable_task_guards(
&self,
) -> ResumabilityState {
self.resumability_state.fetch_add(1, Ordering::Acquire) + 1
}
pub(crate) fn decrement_unresumable_task_guards(
&self,
) -> ResumabilityState {
let result =
self.resumability_state.fetch_sub(1, Ordering::Acquire) - 1;
if result.get_unresumable_task_count() == 0
&& result.is_resuming_requested()
{
{
let mut resumability_guard = self
.resumability_lock
.lock()
.expect("Failed to get resume guard lock");
if *resumability_guard == CourseResumabilityState::Resuming {
*resumability_guard = CourseResumabilityState::Released;
}
}
self.resumability_condition.notify_all();
}
result
}
pub(crate) fn increment_unpausable_task_guards(&self) -> PausabilityState {
self.pausability_state.fetch_add(1, Ordering::Acquire) + 1
}
pub(crate) fn decrement_unpausable_task_guards(&self) -> PausabilityState {
let result = self.pausability_state.fetch_sub(1, Ordering::Acquire) - 1;
if result.get_unpausable_task_count() == 0
&& result.is_pausing_requested()
{
{
let mut pausability_guard = self
.pausability_lock
.lock()
.expect("Failed to get pause guard lock");
if *pausability_guard == CoursePausabilityState::Pausing {
*pausability_guard = CoursePausabilityState::Released;
}
}
self.pausability_condition.notify_all();
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
#[cfg(not(loom))]
use std::sync::{
atomic::{AtomicBool, AtomicU64},
Arc, Condvar, Mutex,
};
#[cfg(not(loom))]
use std::thread;
#[cfg(loom)]
use loom::sync::{
atomic::{AtomicBool, AtomicU64},
Arc, Condvar, Mutex,
};
#[cfg(loom)]
use loom::thread;
#[test]
fn it_works() {
let clock = Arc::new(PausableClock::default());
assert!(clock.now_std().elapsed().as_millis() == 0);
clock.pause();
assert!(clock.is_paused());
let clock_clone = clock.clone();
let j = thread::spawn(move || {
let bef_real = Instant::now();
let bef = clock_clone.now();
clock_clone.wait_for_resume();
let aft = clock_clone.now();
let aft_real = Instant::now();
let elapsed_clock_millis = aft.elapsed_millis - bef.elapsed_millis;
let elapsed_real_millis = (aft_real - bef_real).as_millis();
assert!(elapsed_real_millis >= 1000);
assert!(elapsed_clock_millis <= 1);
});
let now = Instant::now();
thread::sleep(Duration::from_secs(1));
let slept_millis = now.elapsed().as_secs_f64();
clock.resume();
assert!(!clock.is_paused());
j.join().expect("Must be an assert fail in spawned thread");
let elapsed = clock.now_std().elapsed();
assert!((elapsed.as_secs_f64() - slept_millis).abs() <= 0.001);
}
#[test]
fn test_multiple_pauses() {
let pause_duration = Duration::from_millis(10);
let resume_duration = Duration::from_millis(20);
let pause_count = 100;
let clock = PausableClock::default();
let mut resuming_time = Duration::from_millis(0);
for _ in 0..pause_count {
assert!(clock.pause());
thread::sleep(pause_duration);
assert!(clock.resume());
let now = Instant::now();
thread::sleep(resume_duration);
resuming_time += now.elapsed();
}
let actual_elapsed_millis = clock.now().elapsed_millis as f64;
let expected_elapsed_millis = resuming_time.as_millis() as f64;
assert!(
(actual_elapsed_millis - expected_elapsed_millis).abs()
/ expected_elapsed_millis
< 0.005
);
}
#[test]
fn test_time_max_when_paused() {
let clock = Arc::new(PausableClock::default());
let threads = 1000;
let last_times: Arc<Vec<AtomicU64>> = Arc::new(
std::iter::repeat_with(|| AtomicU64::default())
.take(threads)
.collect(),
);
clock.pause();
let keep_going = Arc::new(AtomicBool::new(true));
let mut join_handles = Vec::with_capacity(threads);
for i in 0..threads {
let clock_clone = clock.clone();
let last_times_clone = last_times.clone();
let keep_going_clone = keep_going.clone();
join_handles.push(thread::spawn(move || {
clock_clone.wait_for_resume();
while keep_going_clone.load(Ordering::Relaxed) {
last_times_clone.get(i).unwrap().store(
clock_clone.now().elapsed_millis,
Ordering::Relaxed,
);
}
}));
}
thread::sleep(Duration::from_millis(10));
clock.resume();
while last_times
.iter()
.filter(|v| v.load(Ordering::Relaxed) == 0)
.next()
.is_some()
{}
clock.pause();
let expected_max_elapsed = clock.now().elapsed_millis;
keep_going.store(false, Ordering::Relaxed);
join_handles.into_iter().for_each(|j| {
let _ = j.join();
});
for reader_latest in last_times.iter() {
let actual = reader_latest.load(Ordering::Relaxed);
println!("Asserting {} > {}", expected_max_elapsed, actual);
assert!(actual > 0);
assert!(actual <= expected_max_elapsed);
}
}
#[test]
fn test_unpausable_wont_run_while_paused() {
let clock = Arc::new(PausableClock::default());
clock.pause();
let clock_clone = clock.clone();
let counter = Arc::new(AtomicU64::default());
let counter_clone = counter.clone();
thread::spawn(move || {
clock_clone.run_unpausable(|| {
counter_clone.store(1, Ordering::SeqCst);
});
});
thread::sleep(Duration::from_millis(50));
assert_eq!(0, counter.load(Ordering::SeqCst));
clock.resume();
thread::sleep(Duration::from_millis(50));
assert_eq!(1, counter.load(Ordering::SeqCst));
}
#[test]
fn test_pause_blocks_until_unpausable_exits() {
let clock = Arc::new(PausableClock::default());
clock.resume();
let lock = Arc::new(Mutex::new(true));
let cond = Arc::new(Condvar::default());
let clock_clone = clock.clone();
let lock_clone = lock.clone();
let cond_clone = cond.clone();
let if_paused_counter = Arc::new(AtomicU32::default());
let counter_clone = if_paused_counter.clone();
thread::spawn(move || {
clock_clone.run_if_resumed(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
});
let clock_clone = clock.clone();
thread::spawn(move || {
clock_clone.run_unpausable(|| {
{
let mut lock = lock_clone.lock().unwrap();
*lock = false;
}
cond_clone.notify_all();
thread::sleep(Duration::from_millis(1000));
});
});
let before = Instant::now();
{
let lock = lock.lock().unwrap();
let _lock = cond.wait_while(lock, |v| *v);
}
assert_eq!(1, if_paused_counter.load(Ordering::SeqCst));
clock.pause();
let time_to_pause = before.elapsed();
println!("{:?}", time_to_pause);
assert!(time_to_pause.as_secs_f64() >= 1.);
clock.run_if_resumed(|| unreachable!());
}
#[test]
fn test_resume_blocks_until_unresumable_exits() {
let clock = Arc::new(PausableClock::default());
let lock = Arc::new(Mutex::new(true));
let cond = Arc::new(Condvar::default());
let clock_clone = clock.clone();
let lock_clone = lock.clone();
let cond_clone = cond.clone();
let if_resumed_counter = Arc::new(AtomicU32::default());
let counter_clone = if_resumed_counter.clone();
clock.pause();
thread::spawn(move || {
clock_clone.run_if_paused(move || {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
});
let clock_clone = clock.clone();
thread::spawn(move || {
clock_clone.run_unresumable(|| {
{
let mut lock = lock_clone.lock().unwrap();
*lock = false;
}
cond_clone.notify_all();
thread::sleep(Duration::from_millis(1000));
});
});
let before = Instant::now();
{
let lock = lock.lock().unwrap();
let _lock = cond.wait_while(lock, |v| *v);
}
assert_eq!(1, if_resumed_counter.load(Ordering::SeqCst));
clock.resume();
let time_to_resume = before.elapsed();
println!("{:?}", time_to_resume);
assert!(time_to_resume.as_secs_f64() >= 1.);
clock.run_if_paused(|| unreachable!());
}
#[test]
#[cfg(loom)]
fn loom_test_pause_and_unpausable_interaction() {
loom::model(|| {
let clock = Arc::new(PausableClock::default());
let clock_clone = clock.clone();
thread::spawn(move || {
clock_clone.pause();
});
let clock_clone = clock.clone();
thread::spawn(move || {
let clock_clone_2 = clock_clone.clone();
clock_clone.run_unpausable(|| {
assert!(!clock_clone_2.is_paused_ordered(Ordering::Relaxed));
});
});
});
}
}