use std::convert::identity;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;
use crossbeam_queue::SegQueue;
pub struct CountdownEvent {
initial: usize,
counter: AtomicUsize,
waiting: SegQueue<thread::Thread>,
}
#[derive(Debug, Copy, Clone, PartialEq, Eq)]
pub enum CountdownError {
SaturatedCounter,
TooManySignals,
AlreadySet,
}
impl CountdownEvent {
pub fn new(count: usize) -> CountdownEvent {
CountdownEvent {
initial: count,
counter: AtomicUsize::new(count),
waiting: SegQueue::new(),
}
}
pub fn reset(&mut self) {
self.counter = AtomicUsize::new(self.initial);
while let Some(thread) = self.waiting.pop() {
thread.unpark();
}
}
pub fn reset_to_count(&mut self, count: usize) {
self.initial = count;
self.reset();
}
pub fn count(&self) -> usize {
self.counter.load(Ordering::SeqCst)
}
pub fn add(&self, count: usize) -> Result<(), CountdownError> {
let mut current = self.count();
loop {
if current == 0 {
return Err(CountdownError::AlreadySet);
}
if let Some(new_count) = current.checked_add(count) {
let exchange_result = self.counter.compare_exchange_weak(
current,
new_count,
Ordering::SeqCst,
Ordering::SeqCst,
);
match exchange_result {
Ok(_) => return Ok(()),
Err(last_count) => current = last_count,
}
} else {
return Err(CountdownError::SaturatedCounter);
}
}
}
pub fn signal(&self, count: usize) -> Result<bool, CountdownError> {
let mut current = self.count();
loop {
if current == 0 {
return Err(CountdownError::AlreadySet);
}
if let Some(new_count) = current.checked_sub(count) {
let exchange_result = self.counter.compare_exchange_weak(
current,
new_count,
Ordering::SeqCst,
Ordering::SeqCst,
);
match exchange_result {
Ok(_) => {
current = new_count;
break;
}
Err(last_count) => current = last_count,
}
} else {
return Err(CountdownError::TooManySignals);
}
}
if current == 0 {
while let Some(thread) = self.waiting.pop() {
thread.unpark();
}
Ok(true)
} else {
Ok(false)
}
}
pub fn increment(&self) -> Result<(), CountdownError> {
self.add(1)
}
pub fn decrement(&self) -> Result<bool, CountdownError> {
self.signal(1)
}
pub fn guard(&self) -> Result<CountdownGuard, CountdownError> {
CountdownGuard::new(self)
}
pub fn wait(&self) {
self.waiting.push(thread::current());
let mut first = true;
while self.count() > 0 {
if first {
first = false;
} else {
self.waiting.push(thread::current());
}
thread::park();
}
}
pub fn wait_timeout(&self, timeout: Duration) -> usize {
use std::time::Instant;
self.waiting.push(thread::current());
let begin = Instant::now();
let mut first = true;
let mut remaining = timeout;
loop {
let current = self.count();
if current == 0 {
return 0;
}
if first {
first = false;
} else {
let elapsed = begin.elapsed();
if elapsed >= timeout {
return current;
} else {
remaining = timeout - elapsed;
}
self.waiting.push(thread::current());
}
thread::park_timeout(remaining);
}
}
}
pub struct CountdownGuard<'a> {
event: &'a CountdownEvent,
}
impl<'a> CountdownGuard<'a> {
fn new(event: &'a CountdownEvent) -> Result<CountdownGuard<'a>, CountdownError> {
event.increment()?;
Ok(CountdownGuard { event })
}
}
impl<'a> Drop for CountdownGuard<'a> {
fn drop(&mut self) {
self.event.decrement().ok();
}
}
#[derive(Debug, PartialEq, Copy, Clone)]
pub enum SignalKind {
Auto,
Manual,
}
pub struct SignalEvent {
reset: SignalKind,
signal: AtomicBool,
waiting: SegQueue<thread::Thread>,
}
impl SignalEvent {
pub fn new(init_state: bool, signal_kind: SignalKind) -> SignalEvent {
SignalEvent {
reset: signal_kind,
signal: AtomicBool::new(init_state),
waiting: SegQueue::new(),
}
}
pub fn auto(init_state: bool) -> SignalEvent {
SignalEvent::new(init_state, SignalKind::Auto)
}
pub fn manual(init_state: bool) -> SignalEvent {
SignalEvent::new(init_state, SignalKind::Manual)
}
pub fn status(&self) -> bool {
self.signal.load(Ordering::SeqCst)
}
pub fn signal(&self) {
self.signal.store(true, Ordering::SeqCst);
match self.reset {
SignalKind::Auto => {
while self.signal.load(Ordering::SeqCst) {
if let Some(thread) = self.waiting.pop() {
thread.unpark();
} else {
break;
}
}
}
SignalKind::Manual => {
while let Some(thread) = self.waiting.pop() {
thread.unpark();
}
}
}
}
pub fn reset(&self) {
self.signal.store(false, Ordering::SeqCst);
}
pub fn wait(&self) {
self.waiting.push(thread::current());
let mut first = true;
while !self.check_signal() {
if first {
first = false;
} else {
self.waiting.push(thread::current());
}
thread::park();
}
}
pub fn wait_timeout(&self, timeout: Duration) -> bool {
use std::time::Instant;
self.waiting.push(thread::current());
let begin = Instant::now();
let mut first = true;
let mut remaining = timeout;
loop {
if self.check_signal() {
return true;
}
if first {
first = false;
} else {
let elapsed = begin.elapsed();
if elapsed >= timeout {
return self.status();
} else {
remaining = timeout - elapsed;
}
self.waiting.push(thread::current());
}
thread::park_timeout(remaining);
}
}
fn check_signal(&self) -> bool {
self.signal
.compare_exchange_weak(
true,
self.reset == SignalKind::Manual,
Ordering::SeqCst,
Ordering::SeqCst,
)
.unwrap_or_else(identity)
}
}