use std::collections::VecDeque;
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
use std::sync::{Arc, Condvar, Mutex};
use std::time::{Duration, Instant};
use crate::error::{SynthError, SynthResult};
use crate::traits::{BackpressureStrategy, StreamEvent};
#[derive(Debug, Clone, Default)]
pub struct ChannelStats {
pub items_sent: u64,
pub items_received: u64,
pub items_dropped: u64,
pub buffer_size: usize,
pub max_buffer_size: usize,
pub send_blocks: u64,
pub receive_blocks: u64,
}
pub struct BoundedChannel<T> {
inner: Arc<ChannelInner<T>>,
capacity: usize,
strategy: BackpressureStrategy,
}
struct ChannelInner<T> {
buffer: Mutex<VecDeque<T>>,
not_full: Condvar,
not_empty: Condvar,
closed: AtomicBool,
items_sent: AtomicU64,
items_received: AtomicU64,
items_dropped: AtomicU64,
send_blocks: AtomicU64,
receive_blocks: AtomicU64,
max_buffer_size: AtomicU64,
}
impl<T> BoundedChannel<T> {
pub fn new(capacity: usize, strategy: BackpressureStrategy) -> Self {
Self {
inner: Arc::new(ChannelInner {
buffer: Mutex::new(VecDeque::with_capacity(capacity)),
not_full: Condvar::new(),
not_empty: Condvar::new(),
closed: AtomicBool::new(false),
items_sent: AtomicU64::new(0),
items_received: AtomicU64::new(0),
items_dropped: AtomicU64::new(0),
send_blocks: AtomicU64::new(0),
receive_blocks: AtomicU64::new(0),
max_buffer_size: AtomicU64::new(0),
}),
capacity,
strategy,
}
}
pub fn send(&self, item: T) -> SynthResult<bool> {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SynthError::ChannelClosed);
}
let mut buffer = self
.inner
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
if buffer.len() >= self.capacity {
match self.strategy {
BackpressureStrategy::Block => {
self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
buffer = self
.inner
.not_full
.wait_while(buffer, |b| {
b.len() >= self.capacity && !self.inner.closed.load(Ordering::SeqCst)
})
.unwrap_or_else(std::sync::PoisonError::into_inner);
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SynthError::ChannelClosed);
}
}
BackpressureStrategy::DropOldest => {
buffer.pop_front();
self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
}
BackpressureStrategy::DropNewest => {
self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
return Ok(false);
}
BackpressureStrategy::Buffer { max_overflow } => {
if buffer.len() >= self.capacity + max_overflow {
self.inner.send_blocks.fetch_add(1, Ordering::Relaxed);
buffer = self
.inner
.not_full
.wait_while(buffer, |b| {
b.len() >= self.capacity + max_overflow
&& !self.inner.closed.load(Ordering::SeqCst)
})
.unwrap_or_else(std::sync::PoisonError::into_inner);
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SynthError::ChannelClosed);
}
}
}
}
}
buffer.push_back(item);
let current_size = buffer.len() as u64;
self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
let mut max_size = self.inner.max_buffer_size.load(Ordering::Relaxed);
while current_size > max_size {
match self.inner.max_buffer_size.compare_exchange_weak(
max_size,
current_size,
Ordering::SeqCst,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(x) => max_size = x,
}
}
drop(buffer);
self.inner.not_empty.notify_one();
Ok(true)
}
pub fn send_timeout(&self, item: T, timeout: Duration) -> SynthResult<bool> {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SynthError::ChannelClosed);
}
let deadline = Instant::now() + timeout;
let mut buffer = self
.inner
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
while buffer.len() >= self.capacity {
if self.inner.closed.load(Ordering::SeqCst) {
return Err(SynthError::ChannelClosed);
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
match self.strategy {
BackpressureStrategy::DropNewest => {
self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
return Ok(false);
}
BackpressureStrategy::DropOldest => {
buffer.pop_front();
self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
break;
}
_ => {
return Err(SynthError::GenerationError("send timeout".to_string()));
}
}
}
let (new_buffer, wait_result) = self
.inner
.not_full
.wait_timeout(buffer, remaining)
.unwrap_or_else(std::sync::PoisonError::into_inner);
buffer = new_buffer;
if wait_result.timed_out() && buffer.len() >= self.capacity {
match self.strategy {
BackpressureStrategy::DropNewest => {
self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
return Ok(false);
}
BackpressureStrategy::DropOldest => {
buffer.pop_front();
self.inner.items_dropped.fetch_add(1, Ordering::Relaxed);
break;
}
_ => {
return Err(SynthError::GenerationError("send timeout".to_string()));
}
}
}
}
buffer.push_back(item);
self.inner.items_sent.fetch_add(1, Ordering::Relaxed);
drop(buffer);
self.inner.not_empty.notify_one();
Ok(true)
}
pub fn recv(&self) -> Option<T> {
let mut buffer = self
.inner
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
while buffer.is_empty() {
if self.inner.closed.load(Ordering::SeqCst) {
return None;
}
self.inner.receive_blocks.fetch_add(1, Ordering::Relaxed);
buffer = self
.inner
.not_empty
.wait(buffer)
.unwrap_or_else(std::sync::PoisonError::into_inner);
}
let item = buffer.pop_front();
if item.is_some() {
self.inner.items_received.fetch_add(1, Ordering::Relaxed);
}
drop(buffer);
self.inner.not_full.notify_one();
item
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<T> {
let deadline = Instant::now() + timeout;
let mut buffer = self
.inner
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
while buffer.is_empty() {
if self.inner.closed.load(Ordering::SeqCst) {
return None;
}
let remaining = deadline.saturating_duration_since(Instant::now());
if remaining.is_zero() {
return None;
}
let (new_buffer, wait_result) = self
.inner
.not_empty
.wait_timeout(buffer, remaining)
.unwrap_or_else(std::sync::PoisonError::into_inner);
buffer = new_buffer;
if wait_result.timed_out() && buffer.is_empty() {
return None;
}
}
let item = buffer.pop_front();
if item.is_some() {
self.inner.items_received.fetch_add(1, Ordering::Relaxed);
}
drop(buffer);
self.inner.not_full.notify_one();
item
}
pub fn try_recv(&self) -> Option<T> {
let mut buffer = self
.inner
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
let item = buffer.pop_front();
if item.is_some() {
self.inner.items_received.fetch_add(1, Ordering::Relaxed);
drop(buffer);
self.inner.not_full.notify_one();
}
item
}
pub fn close(&self) {
self.inner.closed.store(true, Ordering::SeqCst);
self.inner.not_full.notify_all();
self.inner.not_empty.notify_all();
}
pub fn is_closed(&self) -> bool {
self.inner.closed.load(Ordering::SeqCst)
}
pub fn len(&self) -> usize {
self.inner
.buffer
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner)
.len()
}
pub fn is_empty(&self) -> bool {
self.len() == 0
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn stats(&self) -> ChannelStats {
ChannelStats {
items_sent: self.inner.items_sent.load(Ordering::Relaxed),
items_received: self.inner.items_received.load(Ordering::Relaxed),
items_dropped: self.inner.items_dropped.load(Ordering::Relaxed),
buffer_size: self.len(),
max_buffer_size: self.inner.max_buffer_size.load(Ordering::Relaxed) as usize,
send_blocks: self.inner.send_blocks.load(Ordering::Relaxed),
receive_blocks: self.inner.receive_blocks.load(Ordering::Relaxed),
}
}
}
impl<T> Clone for BoundedChannel<T> {
fn clone(&self) -> Self {
Self {
inner: Arc::clone(&self.inner),
capacity: self.capacity,
strategy: self.strategy,
}
}
}
pub fn stream_channel<T>(
capacity: usize,
strategy: BackpressureStrategy,
) -> (StreamSender<T>, StreamReceiver<T>) {
let channel = BoundedChannel::new(capacity, strategy);
(
StreamSender {
channel: channel.clone(),
},
StreamReceiver { channel },
)
}
pub struct StreamSender<T> {
channel: BoundedChannel<StreamEvent<T>>,
}
impl<T> StreamSender<T> {
pub fn send(&self, event: StreamEvent<T>) -> SynthResult<bool> {
self.channel.send(event)
}
pub fn send_data(&self, item: T) -> SynthResult<bool> {
self.channel.send(StreamEvent::Data(item))
}
pub fn close(&self) {
self.channel.close();
}
pub fn stats(&self) -> ChannelStats {
self.channel.stats()
}
}
impl<T> Clone for StreamSender<T> {
fn clone(&self) -> Self {
Self {
channel: self.channel.clone(),
}
}
}
pub struct StreamReceiver<T> {
channel: BoundedChannel<StreamEvent<T>>,
}
impl<T> StreamReceiver<T> {
pub fn recv(&self) -> Option<StreamEvent<T>> {
self.channel.recv()
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<StreamEvent<T>> {
self.channel.recv_timeout(timeout)
}
pub fn try_recv(&self) -> Option<StreamEvent<T>> {
self.channel.try_recv()
}
pub fn is_closed(&self) -> bool {
self.channel.is_closed()
}
pub fn stats(&self) -> ChannelStats {
self.channel.stats()
}
}
impl<T> Iterator for StreamReceiver<T> {
type Item = StreamEvent<T>;
fn next(&mut self) -> Option<Self::Item> {
self.recv()
}
}
#[cfg(test)]
#[allow(clippy::unwrap_used)]
mod tests {
use super::*;
use std::thread;
#[test]
fn test_bounded_channel_basic() {
let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
channel.send(1).unwrap();
channel.send(2).unwrap();
channel.send(3).unwrap();
assert_eq!(channel.recv(), Some(1));
assert_eq!(channel.recv(), Some(2));
assert_eq!(channel.recv(), Some(3));
}
#[test]
fn test_bounded_channel_drop_oldest() {
let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropOldest);
channel.send(1).unwrap();
channel.send(2).unwrap();
channel.send(3).unwrap();
let stats = channel.stats();
assert_eq!(stats.items_dropped, 1);
assert_eq!(channel.recv(), Some(2));
assert_eq!(channel.recv(), Some(3));
}
#[test]
fn test_bounded_channel_drop_newest() {
let channel: BoundedChannel<i32> = BoundedChannel::new(2, BackpressureStrategy::DropNewest);
channel.send(1).unwrap();
channel.send(2).unwrap();
let sent = channel.send(3).unwrap();
assert!(!sent);
let stats = channel.stats();
assert_eq!(stats.items_dropped, 1);
}
#[test]
fn test_bounded_channel_close() {
let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
channel.send(1).unwrap();
channel.close();
assert_eq!(channel.recv(), Some(1));
assert_eq!(channel.recv(), None);
assert!(channel.send(2).is_err());
}
#[test]
fn test_bounded_channel_threaded() {
let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
let sender = channel.clone();
let handle = thread::spawn(move || {
for i in 0..100 {
sender.send(i).unwrap();
}
sender.close();
});
let mut received = Vec::new();
while let Some(item) = channel.recv() {
received.push(item);
}
handle.join().unwrap();
assert_eq!(received, (0..100).collect::<Vec<_>>());
}
#[test]
fn test_stream_channel() {
let (sender, receiver) = stream_channel::<i32>(10, BackpressureStrategy::Block);
sender.send_data(1).unwrap();
sender.send_data(2).unwrap();
sender.close();
let events: Vec<_> = receiver.collect();
assert_eq!(events.len(), 2);
assert!(matches!(events[0], StreamEvent::Data(1)));
assert!(matches!(events[1], StreamEvent::Data(2)));
}
#[test]
fn test_channel_stats() {
let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
channel.send(1).unwrap();
channel.send(2).unwrap();
channel.recv();
let stats = channel.stats();
assert_eq!(stats.items_sent, 2);
assert_eq!(stats.items_received, 1);
assert_eq!(stats.buffer_size, 1);
}
#[test]
fn test_channel_recovers_from_poisoned_mutex() {
let channel: BoundedChannel<i32> = BoundedChannel::new(10, BackpressureStrategy::Block);
let poisoner = channel.clone();
let handle = thread::spawn(move || {
let _guard = poisoner
.inner
.buffer
.lock()
.unwrap_or_else(|p| p.into_inner());
panic!("intentional panic to poison mutex");
});
let _ = handle.join();
assert!(channel.send(42).is_ok());
assert_eq!(channel.recv(), Some(42));
assert_eq!(channel.try_recv(), None);
let stats = channel.stats();
assert_eq!(stats.items_sent, 1);
assert_eq!(stats.items_received, 1);
}
}