#![allow(dead_code)]
use std::collections::VecDeque;
use std::sync::{Arc, Condvar, Mutex};
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AtomicCounter {
value: u64,
}
impl AtomicCounter {
#[must_use]
pub fn new(v: u64) -> Self {
Self { value: v }
}
pub fn increment(&mut self) -> u64 {
self.value = self.value.saturating_add(1);
self.value
}
pub fn decrement(&mut self) -> u64 {
self.value = self.value.saturating_sub(1);
self.value
}
#[must_use]
pub fn get(&self) -> u64 {
self.value
}
pub fn set(&mut self, v: u64) {
self.value = v;
}
}
impl Default for AtomicCounter {
fn default() -> Self {
Self::new(0)
}
}
#[derive(Debug, Clone)]
pub struct Semaphore {
count: i64,
max: i64,
}
impl Semaphore {
#[must_use]
pub fn new(max: i64) -> Self {
Self { count: max, max }
}
pub fn acquire(&mut self) -> bool {
if self.count > 0 {
self.count -= 1;
true
} else {
false
}
}
pub fn release(&mut self) -> bool {
if self.count < self.max {
self.count += 1;
true
} else {
false
}
}
#[must_use]
pub fn available(&self) -> i64 {
self.count
}
#[must_use]
pub fn max(&self) -> i64 {
self.max
}
}
#[derive(Debug)]
pub struct SimRwLock<T> {
data: T,
readers: u32,
writer: bool,
}
impl<T> SimRwLock<T> {
#[must_use]
pub fn new(data: T) -> Self {
Self {
data,
readers: 0,
writer: false,
}
}
pub fn read(&mut self) -> Option<&T> {
if self.writer {
return None;
}
self.readers += 1;
Some(&self.data)
}
pub fn release_read(&mut self) {
if self.readers > 0 {
self.readers -= 1;
}
}
pub fn write(&mut self) -> Option<&mut T> {
if self.writer || self.readers > 0 {
return None;
}
self.writer = true;
Some(&mut self.data)
}
pub fn try_write(&mut self) -> Option<&mut T> {
self.write()
}
pub fn release_write(&mut self) {
self.writer = false;
}
#[must_use]
pub fn reader_count(&self) -> u32 {
self.readers
}
#[must_use]
pub fn is_writing(&self) -> bool {
self.writer
}
}
#[derive(Debug, Clone)]
pub struct Barrier {
count: usize,
waiting: usize,
}
impl Barrier {
#[must_use]
pub fn new(count: usize) -> Self {
Self { count, waiting: 0 }
}
pub fn wait(&mut self) -> bool {
self.waiting += 1;
if self.waiting >= self.count {
self.reset();
true
} else {
false
}
}
pub fn reset(&mut self) {
self.waiting = 0;
}
#[must_use]
pub fn waiting_count(&self) -> usize {
self.waiting
}
#[must_use]
pub fn target_count(&self) -> usize {
self.count
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ChannelError {
Disconnected,
Full,
Empty,
}
impl std::fmt::Display for ChannelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Disconnected => write!(f, "channel disconnected"),
Self::Full => write!(f, "channel is full"),
Self::Empty => write!(f, "channel is empty"),
}
}
}
impl std::error::Error for ChannelError {}
struct ChannelInner<T> {
queue: Mutex<ChannelState<T>>,
not_full: Condvar,
not_empty: Condvar,
}
struct ChannelState<T> {
buf: VecDeque<T>,
capacity: usize,
sender_count: usize,
receiver_count: usize,
}
impl<T> ChannelState<T> {
fn is_closed_for_send(&self) -> bool {
self.receiver_count == 0
}
fn is_closed_for_recv(&self) -> bool {
self.sender_count == 0 && self.buf.is_empty()
}
}
pub struct BoundedSender<T> {
inner: Arc<ChannelInner<T>>,
}
impl<T> Clone for BoundedSender<T> {
fn clone(&self) -> Self {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
state.sender_count += 1;
drop(state);
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> Drop for BoundedSender<T> {
fn drop(&mut self) {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
state.sender_count -= 1;
let was_last = state.sender_count == 0;
drop(state);
if was_last {
self.inner.not_empty.notify_all();
}
}
}
impl<T: Send> BoundedSender<T> {
pub fn send(&self, item: T) -> Result<(), ChannelError> {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
loop {
if state.is_closed_for_send() {
return Err(ChannelError::Disconnected);
}
if state.buf.len() < state.capacity {
state.buf.push_back(item);
drop(state);
self.inner.not_empty.notify_one();
return Ok(());
}
state = self
.inner
.not_full
.wait(state)
.expect("channel condvar poisoned");
}
}
pub fn try_send(&self, item: T) -> Result<(), ChannelError> {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
if state.is_closed_for_send() {
return Err(ChannelError::Disconnected);
}
if state.buf.len() >= state.capacity {
return Err(ChannelError::Full);
}
state.buf.push_back(item);
drop(state);
self.inner.not_empty.notify_one();
Ok(())
}
pub fn len(&self) -> usize {
self.inner
.queue
.lock()
.expect("channel mutex poisoned")
.buf
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.inner
.queue
.lock()
.expect("channel mutex poisoned")
.capacity
}
}
pub struct BoundedReceiver<T> {
inner: Arc<ChannelInner<T>>,
}
impl<T> Clone for BoundedReceiver<T> {
fn clone(&self) -> Self {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
state.receiver_count += 1;
drop(state);
Self {
inner: Arc::clone(&self.inner),
}
}
}
impl<T> Drop for BoundedReceiver<T> {
fn drop(&mut self) {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
state.receiver_count -= 1;
let was_last = state.receiver_count == 0;
drop(state);
if was_last {
self.inner.not_full.notify_all();
}
}
}
impl<T: Send> BoundedReceiver<T> {
pub fn recv(&self) -> Result<T, ChannelError> {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
loop {
if let Some(item) = state.buf.pop_front() {
drop(state);
self.inner.not_full.notify_one();
return Ok(item);
}
if state.is_closed_for_recv() {
return Err(ChannelError::Disconnected);
}
state = self
.inner
.not_empty
.wait(state)
.expect("channel condvar poisoned");
}
}
pub fn try_recv(&self) -> Result<T, ChannelError> {
let mut state = self.inner.queue.lock().expect("channel mutex poisoned");
if let Some(item) = state.buf.pop_front() {
drop(state);
self.inner.not_full.notify_one();
return Ok(item);
}
if state.is_closed_for_recv() {
Err(ChannelError::Disconnected)
} else {
Err(ChannelError::Empty)
}
}
pub fn len(&self) -> usize {
self.inner
.queue
.lock()
.expect("channel mutex poisoned")
.buf
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
}
pub struct BoundedChannel<T> {
inner: Arc<ChannelInner<T>>,
}
impl<T: Send> BoundedChannel<T> {
#[must_use]
pub fn new(capacity: usize) -> Self {
assert!(capacity > 0, "BoundedChannel capacity must be non-zero");
let state = ChannelState {
buf: VecDeque::with_capacity(capacity),
capacity,
sender_count: 1,
receiver_count: 1,
};
Self {
inner: Arc::new(ChannelInner {
queue: Mutex::new(state),
not_full: Condvar::new(),
not_empty: Condvar::new(),
}),
}
}
pub fn into_split(self) -> (BoundedSender<T>, BoundedReceiver<T>) {
let tx = BoundedSender {
inner: Arc::clone(&self.inner),
};
let rx = BoundedReceiver {
inner: Arc::clone(&self.inner),
};
std::mem::forget(self);
(tx, rx)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_counter_increment() {
let mut c = AtomicCounter::new(0);
assert_eq!(c.increment(), 1);
assert_eq!(c.increment(), 2);
}
#[test]
fn test_counter_decrement() {
let mut c = AtomicCounter::new(5);
assert_eq!(c.decrement(), 4);
assert_eq!(c.decrement(), 3);
}
#[test]
fn test_counter_decrement_saturates() {
let mut c = AtomicCounter::new(0);
assert_eq!(c.decrement(), 0);
}
#[test]
fn test_counter_get_set() {
let mut c = AtomicCounter::new(10);
assert_eq!(c.get(), 10);
c.set(42);
assert_eq!(c.get(), 42);
}
#[test]
fn test_semaphore_acquire_release() {
let mut sem = Semaphore::new(3);
assert_eq!(sem.available(), 3);
assert!(sem.acquire());
assert_eq!(sem.available(), 2);
assert!(sem.release());
assert_eq!(sem.available(), 3);
}
#[test]
fn test_semaphore_exhausted() {
let mut sem = Semaphore::new(1);
assert!(sem.acquire());
assert!(!sem.acquire()); }
#[test]
fn test_semaphore_release_at_max() {
let mut sem = Semaphore::new(2);
assert!(!sem.release()); assert_eq!(sem.available(), 2);
}
#[test]
fn test_semaphore_max() {
let sem = Semaphore::new(5);
assert_eq!(sem.max(), 5);
}
#[test]
fn test_rwlock_read_success() {
let mut lock = SimRwLock::new(42u32);
let val = lock.read().copied();
assert_eq!(val, Some(42));
assert_eq!(lock.reader_count(), 1);
lock.release_read();
assert_eq!(lock.reader_count(), 0);
}
#[test]
fn test_rwlock_write_blocked_by_readers() {
let mut lock = SimRwLock::new(0u32);
let _ = lock.read();
assert!(lock.write().is_none());
lock.release_read();
assert!(lock.write().is_some());
}
#[test]
fn test_rwlock_read_blocked_by_writer() {
let mut lock = SimRwLock::new(0u32);
let _ = lock.write();
assert!(lock.read().is_none());
lock.release_write();
assert!(lock.read().is_some());
}
#[test]
fn test_rwlock_try_write() {
let mut lock = SimRwLock::new(99u32);
{
let w = lock.try_write().expect("try_write should succeed");
*w = 100;
}
lock.release_write();
let v = lock.read().copied();
assert_eq!(v, Some(100));
}
#[test]
fn test_barrier_fires() {
let mut b = Barrier::new(3);
assert!(!b.wait()); assert!(!b.wait()); assert!(b.wait()); assert_eq!(b.waiting_count(), 0);
}
#[test]
fn test_barrier_cyclic() {
let mut b = Barrier::new(2);
assert!(!b.wait());
assert!(b.wait()); assert!(!b.wait());
assert!(b.wait()); }
#[test]
fn test_barrier_target_count() {
let b = Barrier::new(7);
assert_eq!(b.target_count(), 7);
}
#[test]
fn test_bounded_channel_basic() {
let (tx, rx) = BoundedChannel::<i32>::new(4).into_split();
tx.send(10).expect("send");
tx.send(20).expect("send");
assert_eq!(rx.recv().expect("recv"), 10);
assert_eq!(rx.recv().expect("recv"), 20);
}
#[test]
fn test_bounded_channel_try_send_full() {
let (tx, rx) = BoundedChannel::<i32>::new(2).into_split();
assert!(tx.try_send(1).is_ok());
assert!(tx.try_send(2).is_ok());
assert_eq!(tx.try_send(3), Err(ChannelError::Full));
rx.try_recv().expect("recv 1");
rx.try_recv().expect("recv 2");
}
#[test]
fn test_bounded_channel_try_recv_empty() {
let (_tx, rx) = BoundedChannel::<i32>::new(2).into_split();
assert_eq!(rx.try_recv(), Err(ChannelError::Empty));
}
#[test]
fn test_bounded_channel_disconnected_on_recv_drop() {
let (tx, rx) = BoundedChannel::<i32>::new(2).into_split();
drop(rx);
assert_eq!(tx.try_send(1), Err(ChannelError::Disconnected));
}
#[test]
fn test_bounded_channel_disconnected_on_send_drop() {
let (tx, rx) = BoundedChannel::<i32>::new(2).into_split();
drop(tx);
assert_eq!(rx.try_recv(), Err(ChannelError::Disconnected));
}
#[test]
fn test_bounded_channel_drain_after_sender_drop() {
let (tx, rx) = BoundedChannel::<i32>::new(4).into_split();
tx.send(7).expect("send");
tx.send(8).expect("send");
drop(tx);
assert_eq!(rx.recv().expect("recv"), 7);
assert_eq!(rx.recv().expect("recv"), 8);
assert_eq!(rx.recv(), Err(ChannelError::Disconnected));
}
#[test]
fn test_bounded_channel_accessors() {
let (tx, rx) = BoundedChannel::<u8>::new(8).into_split();
assert_eq!(tx.capacity(), 8);
assert!(tx.is_empty());
tx.send(1).expect("send");
tx.send(2).expect("send");
assert_eq!(tx.len(), 2);
assert_eq!(rx.len(), 2);
}
#[test]
fn test_bounded_channel_threaded() {
use std::thread;
let (tx, rx) = BoundedChannel::<u32>::new(4).into_split();
let producer = thread::spawn(move || {
for i in 0..16_u32 {
tx.send(i).expect("send");
}
});
let consumer = thread::spawn(move || {
let mut out = Vec::with_capacity(16);
for _ in 0..16 {
out.push(rx.recv().expect("recv"));
}
out
});
producer.join().expect("producer");
let result = consumer.join().expect("consumer");
assert_eq!(result, (0..16).collect::<Vec<_>>());
}
#[test]
fn test_bounded_channel_clone_sender() {
let (tx, rx) = BoundedChannel::<i32>::new(4).into_split();
let tx2 = tx.clone();
tx.send(1).expect("send");
tx2.send(2).expect("send");
drop(tx);
drop(tx2);
assert_eq!(rx.recv().expect("recv 1"), 1);
assert_eq!(rx.recv().expect("recv 2"), 2);
assert_eq!(rx.recv(), Err(ChannelError::Disconnected));
}
}