use std::cell::RefCell;
use std::collections::VecDeque;
use std::mem;
use std::ptr;
use std::sync::mpsc;
use std::sync::{Arc, Condvar, Mutex};
use Continue;
use MainContext;
use Priority;
use Source;
use SourceId;
use get_thread_id;
use ffi as glib_ffi;
use translate::{mut_override, FromGlibPtrFull, FromGlibPtrNone, ToGlib, ToGlibPtr};
#[derive(Debug)]
enum ChannelSourceState {
NotAttached,
Attached(*mut glib_ffi::GSource),
Destroyed,
}
unsafe impl Send for ChannelSourceState {}
unsafe impl Sync for ChannelSourceState {}
#[derive(Debug)]
struct ChannelInner<T> {
queue: VecDeque<T>,
source: ChannelSourceState,
}
impl<T> ChannelInner<T> {
fn receiver_disconnected(&self) -> bool {
match self.source {
ChannelSourceState::Destroyed => true,
ChannelSourceState::Attached(source)
if unsafe { glib_ffi::g_source_is_destroyed(source) } != glib_ffi::GFALSE =>
{
true
}
ChannelSourceState::NotAttached => false,
ChannelSourceState::Attached(_) => false,
}
}
fn set_ready_time(&mut self, ready_time: i64) {
if let ChannelSourceState::Attached(source) = self.source {
unsafe {
glib_ffi::g_source_set_ready_time(source, ready_time);
}
}
}
fn source(&self) -> Option<Source> {
match self.source {
ChannelSourceState::Attached(source)
if unsafe { glib_ffi::g_source_is_destroyed(source) == glib_ffi::GFALSE } =>
{
Some(unsafe { Source::from_glib_none(source) })
}
_ => None,
}
}
}
#[derive(Debug)]
struct ChannelBound {
bound: usize,
cond: Condvar,
}
#[derive(Debug)]
struct Channel<T>(Arc<(Mutex<ChannelInner<T>>, Option<ChannelBound>)>);
impl<T> Clone for Channel<T> {
fn clone(&self) -> Channel<T> {
Channel(self.0.clone())
}
}
impl<T> Channel<T> {
fn new(bound: Option<usize>) -> Channel<T> {
Channel(Arc::new((
Mutex::new(ChannelInner {
queue: VecDeque::new(),
source: ChannelSourceState::NotAttached,
}),
bound.map(|bound| ChannelBound {
bound,
cond: Condvar::new(),
}),
)))
}
fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
let mut inner = (self.0).0.lock().unwrap();
if let Some(ChannelBound { bound, ref cond }) = (self.0).1 {
while inner.queue.len() >= bound
&& !inner.queue.is_empty()
&& !inner.receiver_disconnected()
{
inner = cond.wait(inner).unwrap();
}
}
if inner.receiver_disconnected() {
return Err(mpsc::SendError(t));
}
inner.queue.push_back(t);
inner.set_ready_time(0);
if let Some(ChannelBound { bound: 0, ref cond }) = (self.0).1 {
while !inner.queue.is_empty() && !inner.receiver_disconnected() {
inner = cond.wait(inner).unwrap();
}
if inner.receiver_disconnected() {
if let Some(t) = inner.queue.pop_front() {
return Err(mpsc::SendError(t));
}
}
}
Ok(())
}
fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
let mut inner = (self.0).0.lock().unwrap();
let ChannelBound { bound, ref cond } = (self.0)
.1
.as_ref()
.expect("called try_send() on an unbounded channel");
if inner.queue.len() >= *bound && !inner.queue.is_empty() {
return Err(mpsc::TrySendError::Full(t));
}
if inner.receiver_disconnected() {
return Err(mpsc::TrySendError::Disconnected(t));
}
inner.queue.push_back(t);
inner.set_ready_time(0);
if *bound == 0 {
while !inner.queue.is_empty() && !inner.receiver_disconnected() {
inner = cond.wait(inner).unwrap();
}
if inner.receiver_disconnected() {
if let Some(t) = inner.queue.pop_front() {
return Err(mpsc::TrySendError::Disconnected(t));
}
}
}
Ok(())
}
fn try_recv(&self) -> Result<T, mpsc::TryRecvError> {
let mut inner = (self.0).0.lock().unwrap();
if let Some(item) = inner.queue.pop_front() {
if let Some(ChannelBound { ref cond, .. }) = (self.0).1 {
cond.notify_one();
}
return Ok(item);
}
if Arc::strong_count(&self.0) == 1 {
Err(mpsc::TryRecvError::Disconnected)
} else {
Err(mpsc::TryRecvError::Empty)
}
}
}
#[repr(C)]
struct ChannelSource<T, F: FnMut(T) -> Continue + 'static> {
source: glib_ffi::GSource,
thread_id: usize,
source_funcs: Option<Box<glib_ffi::GSourceFuncs>>,
channel: Option<Channel<T>>,
callback: Option<RefCell<F>>,
}
unsafe extern "C" fn prepare<T>(
source: *mut glib_ffi::GSource,
timeout: *mut i32,
) -> glib_ffi::gboolean {
*timeout = -1;
if glib_ffi::g_source_get_ready_time(source) == 0 {
glib_ffi::GTRUE
} else {
glib_ffi::GFALSE
}
}
unsafe extern "C" fn check<T>(source: *mut glib_ffi::GSource) -> glib_ffi::gboolean {
if glib_ffi::g_source_get_ready_time(source) == 0 {
glib_ffi::GTRUE
} else {
glib_ffi::GFALSE
}
}
unsafe extern "C" fn dispatch<T, F: FnMut(T) -> Continue + 'static>(
source: *mut glib_ffi::GSource,
callback: glib_ffi::GSourceFunc,
_user_data: glib_ffi::gpointer,
) -> glib_ffi::gboolean {
let source = &mut *(source as *mut ChannelSource<T, F>);
assert!(callback.is_none());
glib_ffi::g_source_set_ready_time(&mut source.source, -1);
assert_eq!(
get_thread_id(),
source.thread_id,
"Source dispatched on a different thread than before"
);
let channel = source
.channel
.as_ref()
.expect("ChannelSource without Channel");
loop {
match channel.try_recv() {
Err(mpsc::TryRecvError::Empty) => break,
Err(mpsc::TryRecvError::Disconnected) => return glib_ffi::G_SOURCE_REMOVE,
Ok(item) => {
let callback = source
.callback
.as_mut()
.expect("ChannelSource called before Receiver was attached");
if (&mut *callback.borrow_mut())(item) == Continue(false) {
return glib_ffi::G_SOURCE_REMOVE;
}
}
}
}
glib_ffi::G_SOURCE_CONTINUE
}
unsafe extern "C" fn finalize<T, F: FnMut(T) -> Continue + 'static>(
source: *mut glib_ffi::GSource,
) {
let source = &mut *(source as *mut ChannelSource<T, F>);
let channel = source.channel.take().expect("Receiver without channel");
{
let mut inner = (channel.0).0.lock().unwrap();
inner.source = ChannelSourceState::Destroyed;
if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
cond.notify_all();
}
}
let _ = source.callback.take();
let _ = source.source_funcs.take();
}
#[derive(Clone, Debug)]
pub struct Sender<T>(Option<Channel<T>>);
impl<T> Sender<T> {
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
self.0.as_ref().expect("Sender with no channel").send(t)
}
}
impl<T> Drop for Sender<T> {
fn drop(&mut self) {
unsafe {
let channel = self.0.take().expect("Sender with no channel");
let source = {
let inner = (channel.0).0.lock().unwrap();
match inner.source() {
None => return,
Some(source) => source,
}
};
drop(channel);
glib_ffi::g_source_set_ready_time(source.to_glib_none().0, 0);
}
}
}
#[derive(Clone, Debug)]
pub struct SyncSender<T>(Option<Channel<T>>);
impl<T> SyncSender<T> {
pub fn send(&self, t: T) -> Result<(), mpsc::SendError<T>> {
self.0.as_ref().expect("Sender with no channel").send(t)
}
pub fn try_send(&self, t: T) -> Result<(), mpsc::TrySendError<T>> {
self.0.as_ref().expect("Sender with no channel").try_send(t)
}
}
impl<T> Drop for SyncSender<T> {
fn drop(&mut self) {
unsafe {
let channel = self.0.take().expect("Sender with no channel");
let source = {
let inner = (channel.0).0.lock().unwrap();
match inner.source() {
None => return,
Some(source) => source,
}
};
drop(channel);
glib_ffi::g_source_set_ready_time(source.to_glib_none().0, 0);
}
}
}
#[derive(Debug)]
pub struct Receiver<T>(Option<Channel<T>>, Priority);
unsafe impl<T: Send> Send for Receiver<T> {}
impl<T> Drop for Receiver<T> {
fn drop(&mut self) {
if let Some(channel) = self.0.take() {
let mut inner = (channel.0).0.lock().unwrap();
inner.source = ChannelSourceState::Destroyed;
if let Some(ChannelBound { ref cond, .. }) = (channel.0).1 {
cond.notify_all();
}
}
}
}
impl<T> Receiver<T> {
pub fn attach<'a, P: Into<Option<&'a MainContext>>, F: FnMut(T) -> Continue + 'static>(
mut self,
context: P,
func: F,
) -> SourceId {
let context = context.into();
unsafe {
let channel = self.0.take().expect("Receiver without channel");
let source_funcs = Box::new(glib_ffi::GSourceFuncs {
check: Some(check::<T>),
prepare: Some(prepare::<T>),
dispatch: Some(dispatch::<T, F>),
finalize: Some(finalize::<T, F>),
closure_callback: None,
closure_marshal: None,
});
let source = glib_ffi::g_source_new(
mut_override(&*source_funcs),
mem::size_of::<ChannelSource<T, F>>() as u32,
) as *mut ChannelSource<T, F>;
assert!(!source.is_null());
{
let source = &mut *source;
let mut inner = (channel.0).0.lock().unwrap();
glib_ffi::g_source_set_priority(mut_override(&source.source), self.1.to_glib());
glib_ffi::g_source_set_ready_time(
mut_override(&source.source),
if !inner.queue.is_empty() || Arc::strong_count(&channel.0) == 1 {
0
} else {
-1
},
);
inner.source = ChannelSourceState::Attached(&mut source.source);
}
{
let source = &mut *source;
source.thread_id = get_thread_id();
ptr::write(&mut source.channel, Some(channel));
ptr::write(&mut source.callback, Some(RefCell::new(func)));
ptr::write(&mut source.source_funcs, Some(source_funcs));
}
let source = Source::from_glib_full(mut_override(&(*source).source));
let id = if let Some(context) = context {
assert!(context.is_owner());
source.attach(context)
} else {
let context = MainContext::ref_thread_default();
assert!(context.is_owner());
source.attach(&context)
};
id
}
}
}
impl MainContext {
pub fn channel<T>(priority: Priority) -> (Sender<T>, Receiver<T>) {
let channel = Channel::new(None);
let receiver = Receiver(Some(channel.clone()), priority);
let sender = Sender(Some(channel));
(sender, receiver)
}
pub fn sync_channel<T>(priority: Priority, bound: usize) -> (SyncSender<T>, Receiver<T>) {
let channel = Channel::new(Some(bound));
let receiver = Receiver(Some(channel.clone()), priority);
let sender = SyncSender(Some(channel));
(sender, receiver)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
use std::rc::Rc;
use std::thread;
use std::time;
use MainLoop;
#[test]
fn test_channel() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::channel(Priority::default());
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(&c, move |item| {
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
l_clone.quit();
Continue(false)
} else {
Continue(true)
}
});
sender.send(1).unwrap();
sender.send(2).unwrap();
sender.send(3).unwrap();
l.run();
assert_eq!(*sum.borrow(), 6);
}
#[test]
fn test_drop_sender() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
struct Helper(MainLoop);
impl Drop for Helper {
fn drop(&mut self) {
self.0.quit();
}
}
let helper = Helper(l.clone());
receiver.attach(&c, move |_| {
let _ = helper;
Continue(true)
});
drop(sender);
l.run();
}
#[test]
fn test_drop_receiver() {
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
drop(receiver);
assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
}
#[test]
fn test_remove_receiver() {
let c = MainContext::new();
c.acquire();
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
let source_id = receiver.attach(&c, move |_| Continue(true));
let source = c.find_source_by_id(&source_id).unwrap();
source.destroy();
assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
}
#[test]
fn test_remove_receiver_and_drop_source() {
let c = MainContext::new();
c.acquire();
let (sender, receiver) = MainContext::channel::<i32>(Priority::default());
struct Helper(Arc<Mutex<bool>>);
impl Drop for Helper {
fn drop(&mut self) {
*self.0.lock().unwrap() = true;
}
}
let dropped = Arc::new(Mutex::new(false));
let helper = Helper(dropped.clone());
let source_id = receiver.attach(&c, move |_| {
let _helper = &helper;
Continue(true)
});
let source = c.find_source_by_id(&source_id).unwrap();
source.destroy();
drop(source);
assert_eq!(*dropped.lock().unwrap(), true);
assert_eq!(sender.send(1), Err(mpsc::SendError(1)));
}
#[test]
fn test_sync_channel() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(&c, move |item| {
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
l_clone.quit();
Continue(false)
} else {
Continue(true)
}
});
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
assert!(sender.try_send(3).is_err());
wait_sender.send(()).unwrap();
sender.send(3).unwrap();
});
let _ = wait_receiver.recv().unwrap();
thread::sleep(time::Duration::from_millis(50));
l.run();
thread.join().unwrap();
assert_eq!(*sum.borrow(), 6);
}
#[test]
fn test_sync_channel_drop_wakeup() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 3);
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(&c, move |item| {
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
l_clone.quit();
Continue(false)
} else {
Continue(true)
}
});
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
sender.try_send(3).unwrap();
wait_sender.send(()).unwrap();
for i in 4.. {
if let Err(_) = sender.send(i) {
break;
}
}
});
let _ = wait_receiver.recv().unwrap();
thread::sleep(time::Duration::from_millis(50));
l.run();
thread.join().unwrap();
assert_eq!(*sum.borrow(), 6);
}
#[test]
fn test_sync_channel_drop_receiver_wakeup() {
let c = MainContext::new();
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 2);
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
sender.try_send(1).unwrap();
sender.try_send(2).unwrap();
wait_sender.send(()).unwrap();
assert!(sender.send(3).is_err());
});
let _ = wait_receiver.recv().unwrap();
thread::sleep(time::Duration::from_millis(50));
drop(receiver);
thread.join().unwrap();
}
#[test]
fn test_sync_channel_rendezvous() {
let c = MainContext::new();
let l = MainLoop::new(Some(&c), false);
c.acquire();
let (sender, receiver) = MainContext::sync_channel(Priority::default(), 0);
let (wait_sender, wait_receiver) = mpsc::channel();
let thread = thread::spawn(move || {
wait_sender.send(()).unwrap();
sender.send(1).unwrap();
wait_sender.send(()).unwrap();
sender.send(2).unwrap();
wait_sender.send(()).unwrap();
sender.send(3).unwrap();
wait_sender.send(()).unwrap();
});
let _ = wait_receiver.recv().unwrap();
assert_eq!(
wait_receiver.recv_timeout(time::Duration::from_millis(50)),
Err(mpsc::RecvTimeoutError::Timeout)
);
let sum = Rc::new(RefCell::new(0));
let sum_clone = sum.clone();
let l_clone = l.clone();
receiver.attach(&c, move |item| {
let _ = wait_receiver.recv().unwrap();
*sum_clone.borrow_mut() += item;
if *sum_clone.borrow() == 6 {
assert_eq!(
wait_receiver.recv_timeout(time::Duration::from_millis(50)),
Err(mpsc::RecvTimeoutError::Disconnected)
);
l_clone.quit();
Continue(false)
} else {
assert_eq!(
wait_receiver.recv_timeout(time::Duration::from_millis(50)),
Err(mpsc::RecvTimeoutError::Timeout)
);
Continue(true)
}
});
l.run();
thread.join().unwrap();
assert_eq!(*sum.borrow(), 6);
}
}