use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use send::TrySender;
pub fn split_sender<S, T>(send: S, cap_one: usize, cap_two: usize) -> (SplitSender<S>, SplitSender<S>)
where S: TrySender<T> + Clone,
T: Send
{
(SplitSender::new(send.clone(), cap_one), SplitSender::new(send, cap_two))
}
pub struct SplitSender<S> {
send: S,
count: Arc<AtomicUsize>,
capacity: usize,
}
impl<S> Clone for SplitSender<S>
where S: Clone
{
fn clone(&self) -> SplitSender<S> {
SplitSender {
send: self.send.clone(),
count: self.count.clone(),
capacity: self.capacity,
}
}
}
unsafe impl<S> Sync for SplitSender<S> where S: Sync {}
impl<S> SplitSender<S> {
pub fn new(send: S, capacity: usize) -> SplitSender<S> {
SplitSender {
send: send,
count: Arc::new(AtomicUsize::new(0)),
capacity: capacity,
}
}
pub fn sender_ack(&self) -> SplitSenderAck {
SplitSenderAck { count: self.count.clone() }
}
fn try_count_increment(&self) -> bool {
let our_count = self.count.fetch_add(1, Ordering::SeqCst);
if our_count < self.capacity {
true
} else {
self.count.fetch_sub(1, Ordering::SeqCst);
false
}
}
}
impl<S, T> TrySender<T> for SplitSender<S>
where S: TrySender<T>,
T: Send
{
fn try_send(&self, data: T) -> Option<T> {
let should_send = self.try_count_increment();
if should_send {
self.send.try_send(data)
} else {
Some(data)
}
}
}
pub struct SplitSenderAck {
count: Arc<AtomicUsize>,
}
impl SplitSenderAck {
pub fn ack(&self) {
self.count.fetch_sub(1, Ordering::SeqCst);
}
}
#[cfg(test)]
mod tests {
use std::sync::mpsc;
use send::TrySender;
use super::SplitSender;
#[test]
fn positive_send_zero_capacity() {
let (send, recv) = mpsc::channel();
let split_sender = SplitSender::new(send, 0);
assert!(split_sender.try_send(()).is_some());
assert!(recv.try_recv().is_err());
}
#[test]
fn positive_send_one_capacity() {
let (send, recv) = mpsc::channel();
let split_sender = SplitSender::new(send, 1);
assert!(split_sender.try_send(()).is_none());
assert!(recv.try_recv().is_ok());
}
}