use notify::NotifySender;
use std::fmt::{self, Display, Formatter};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
mod notify;
pub use notify::{NotifyError, NotifyHandle, NotifyTimeoutError};
#[derive(Debug)]
pub struct Counter {
counter: Arc<AtomicUsize>,
notify: Vec<NotifySender>,
size: usize,
}
#[derive(Clone, Debug)]
pub struct WeakCounter {
counter: Arc<AtomicUsize>,
notify: Vec<NotifySender>,
}
pub struct CounterBuilder {
counter: Arc<AtomicUsize>,
size: usize,
notify: Vec<NotifySender>,
}
impl CounterBuilder {
pub fn size(mut self, v: usize) -> Self {
self.size = v;
self
}
pub fn create_notify(&mut self) -> NotifyHandle {
let (handle, sender) = NotifyHandle::new(Arc::clone(&self.counter));
self.notify.push(sender);
handle
}
pub fn build(self) -> Counter {
self.counter.fetch_add(self.size, Ordering::SeqCst);
Counter {
counter: self.counter,
notify: self.notify,
size: self.size,
}
}
}
impl Default for CounterBuilder {
fn default() -> Self {
Self {
counter: Arc::new(AtomicUsize::new(0)),
size: 1,
notify: vec![],
}
}
}
impl Counter {
pub fn builder() -> CounterBuilder {
CounterBuilder::default()
}
pub fn downgrade(self) -> WeakCounter {
self.spawn_downgrade()
}
pub fn spawn_downgrade(&self) -> WeakCounter {
WeakCounter {
notify: self.notify.clone(),
counter: Arc::clone(&self.counter),
}
}
#[inline]
pub fn count(&self) -> usize {
self.counter.load(Ordering::Acquire)
}
}
impl Clone for Counter {
fn clone(&self) -> Self {
self.counter.fetch_add(self.size, Ordering::SeqCst);
for sender in &self.notify {
sender.notify();
}
Counter {
notify: self.notify.clone(),
counter: Arc::clone(&self.counter),
size: self.size,
}
}
}
impl Display for Counter {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "Counter(count={})", self.count())
}
}
impl Drop for Counter {
fn drop(&mut self) {
self.counter.fetch_sub(self.size, Ordering::SeqCst);
for sender in &self.notify {
sender.notify();
}
}
}
pub struct WeakCounterBuilder {
counter: Arc<AtomicUsize>,
notify: Vec<NotifySender>,
}
impl WeakCounterBuilder {
pub fn create_notify(&mut self) -> NotifyHandle {
let (handle, sender) = NotifyHandle::new(Arc::clone(&self.counter));
self.notify.push(sender);
handle
}
pub fn build(self) -> WeakCounter {
WeakCounter {
notify: self.notify,
counter: self.counter,
}
}
}
impl Default for WeakCounterBuilder {
fn default() -> Self {
Self {
counter: Arc::new(AtomicUsize::new(0)),
notify: vec![],
}
}
}
impl WeakCounter {
pub fn builder() -> WeakCounterBuilder {
WeakCounterBuilder::default()
}
#[inline]
pub fn count(&self) -> usize {
self.counter.load(Ordering::Acquire)
}
pub fn upgrade(self) -> Counter {
self.spawn_upgrade()
}
pub fn spawn_upgrade(&self) -> Counter {
self.spawn_upgrade_with_size(1)
}
pub fn spawn_upgrade_with_size(&self, size: usize) -> Counter {
self.counter.fetch_add(size, Ordering::SeqCst);
for sender in &self.notify {
sender.notify();
}
Counter {
notify: self.notify.clone(),
counter: Arc::clone(&self.counter),
size,
}
}
}
impl Display for WeakCounter {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
write!(f, "WeakCounter(count={})", self.count())
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::thread;
use std::time::Duration;
#[test]
fn it_works() {
let counter = Counter::builder().build();
assert_eq!(counter.count(), 1);
let weak = counter.downgrade();
assert_eq!(weak.count(), 0);
{
let _counter1 = weak.spawn_upgrade();
assert_eq!(weak.count(), 1);
let _counter2 = weak.spawn_upgrade();
assert_eq!(weak.count(), 2);
}
assert_eq!(weak.count(), 0);
}
#[test]
fn different_sizes_work() {
let weak = WeakCounter::builder().build();
assert_eq!(weak.count(), 0);
let counter5 = weak.spawn_upgrade_with_size(5);
assert_eq!(weak.count(), 5);
{
let _moved_counter5 = counter5;
assert_eq!(weak.count(), 5);
let _counter1 = weak.spawn_upgrade();
assert_eq!(weak.count(), 6);
}
assert_eq!(weak.count(), 0);
}
#[test]
fn counter_with_size_works() {
let counter = Counter::builder().size(4).build();
assert_eq!(counter.count(), 4);
let weak = counter.spawn_downgrade();
assert_eq!(weak.count(), 4);
drop(counter);
assert_eq!(weak.count(), 0);
}
#[test]
fn wait_until_condition_works() {
run_wait_until_condition_test(|notify| notify.wait_until_condition(|v| v == 10).unwrap());
}
#[test]
fn wait_until_condition_with_timeout_works() {
run_wait_until_condition_test(|notify| {
notify
.wait_until_condition_timeout(|v| v == 10, Duration::from_secs(2))
.unwrap()
});
}
fn run_wait_until_condition_test(notify_fn: impl Fn(NotifyHandle)) {
let (weak, notify) = {
let mut builder = WeakCounter::builder();
let notify = builder.create_notify();
(builder.build(), notify)
};
let join_handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let mut counters = vec![];
for _ in 0..10 {
counters.push(weak.spawn_upgrade());
}
counters
});
notify_fn(notify);
join_handle.join().unwrap();
}
#[test]
#[ignore]
fn test_wait_until_condition_always_occurs() {
let mut i = 0;
loop {
wait_until_condition_works();
println!("[{}] Completed.", i);
i += 1;
}
}
#[test]
fn notify_errors_when_all_references_are_dropped() {
let (weak, notify) = {
let mut builder = WeakCounter::builder();
let notify = builder.create_notify();
(builder.build(), notify)
};
thread::spawn(move || {
thread::sleep(Duration::from_millis(100));
let mut counters = vec![];
for _ in 0..5 {
counters.push(weak.spawn_upgrade());
}
});
assert_eq!(
notify.wait_until_condition(|v| v == 10),
Err(NotifyError::Disconnected),
);
}
#[test]
fn notify_checks_condition_before_erroring() {
let (weak, notify) = {
let mut builder = WeakCounter::builder();
let notify = builder.create_notify();
(builder.build(), notify)
};
drop(weak);
assert!(notify.wait_until_condition(|v| v == 0).is_ok());
}
#[test]
fn notify_with_timeout_can_timeout() {
let (weak, notify) = {
let mut builder = WeakCounter::builder();
let notify = builder.create_notify();
(builder.build(), notify)
};
assert_eq!(
notify.wait_until_condition_timeout(|v| v == 10, Duration::from_millis(100)),
Err(NotifyTimeoutError::Timeout)
);
drop(weak);
}
}