#![warn(
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
)]
extern crate glib;
extern crate glib_sys;
extern crate libc;
mod source;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::rc::Rc;
use std::sync::mpsc::{self, Receiver, SendError};
use source::{SourceFuncs, new_source, source_get};
use glib::{MainContext, Source};
#[must_use]
pub struct Lock<MSG> {
stream: Rc<RefCell<_EventStream<MSG>>>,
}
impl<MSG> Drop for Lock<MSG> {
fn drop(&mut self) {
self.stream.borrow_mut().locked = false;
}
}
struct ChannelData<MSG> {
callback: Box<FnMut(MSG)>,
peeked_value: Option<MSG>,
receiver: Receiver<MSG>,
}
pub struct Sender<MSG> {
sender: mpsc::Sender<MSG>,
}
impl<MSG> Clone for Sender<MSG> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<MSG> Sender<MSG> {
pub fn send(&self, msg: MSG) -> Result<(), SendError<MSG>> {
let result = self.sender.send(msg);
let context = MainContext::default();
context.wakeup();
result
}
}
pub struct Channel<MSG> {
_source: Source,
_phantom: PhantomData<MSG>,
}
impl<MSG> Channel<MSG> {
pub fn new<CALLBACK: FnMut(MSG) + 'static>(callback: CALLBACK) -> (Self, Sender<MSG>) {
let (sender, receiver) = mpsc::channel();
let source = new_source(RefCell::new(ChannelData {
callback: Box::new(callback),
peeked_value: None,
receiver,
}));
let main_context = MainContext::default();
source.attach(&main_context);
(Self {
_source: source,
_phantom: PhantomData,
}, Sender {
sender,
})
}
}
impl<MSG> SourceFuncs for RefCell<ChannelData<MSG>> {
fn dispatch(&self) -> bool {
let msg = self.borrow_mut().peeked_value.take().or_else(|| {
self.borrow().receiver.try_recv().ok()
});
if let Some(msg) = msg {
let callback = &mut self.borrow_mut().callback;
callback(msg);
}
true
}
fn prepare(&self) -> (bool, Option<u32>) {
if self.borrow().peeked_value.is_some() {
return (true, None);
}
let peek_val = self.borrow().receiver.try_recv().ok();
self.borrow_mut().peeked_value = peek_val;
(self.borrow().peeked_value.is_some(), None)
}
}
struct _EventStream<MSG> {
events: VecDeque<MSG>,
locked: bool,
observers: Vec<Rc<Fn(&MSG)>>,
}
impl<MSG> SourceFuncs for SourceData<MSG> {
fn dispatch(&self) -> bool {
let event = self.stream.borrow_mut().events.pop_front();
if let (Some(event), Some(callback)) = (event, self.callback.borrow_mut().as_mut()) {
callback(event);
}
true
}
fn prepare(&self) -> (bool, Option<u32>) {
(!self.stream.borrow().events.is_empty(), None)
}
}
struct SourceData<MSG> {
callback: Rc<RefCell<Option<Box<FnMut(MSG)>>>>,
stream: Rc<RefCell<_EventStream<MSG>>>,
}
pub struct EventStream<MSG> {
source: Source,
_phantom: PhantomData<*mut MSG>,
}
impl<MSG> Clone for EventStream<MSG> {
fn clone(&self) -> Self {
EventStream {
source: self.source.clone(),
_phantom: PhantomData,
}
}
}
impl<MSG> EventStream<MSG> {
fn get_callback(&self) -> Rc<RefCell<Option<Box<FnMut(MSG)>>>> {
source_get::<SourceData<MSG>>(&self.source).callback.clone()
}
fn get_stream(&self) -> Rc<RefCell<_EventStream<MSG>>> {
source_get::<SourceData<MSG>>(&self.source).stream.clone()
}
}
impl<MSG> EventStream<MSG> {
pub fn new() -> Self {
let event_stream: _EventStream<MSG> = _EventStream {
events: VecDeque::new(),
locked: false,
observers: vec![],
};
let source = new_source(SourceData {
callback: Rc::new(RefCell::new(None)),
stream: Rc::new(RefCell::new(event_stream)),
});
let main_context = MainContext::default();
source.attach(&main_context);
EventStream {
source,
_phantom: PhantomData,
}
}
pub fn close(&self) {
self.source.destroy();
}
pub fn emit(&self, event: MSG) {
let stream = self.get_stream();
if !stream.borrow().locked {
let len = stream.borrow().observers.len();
for i in 0..len {
let observer = stream.borrow().observers[i].clone();
observer(&event);
}
stream.borrow_mut().events.push_back(event);
}
}
pub fn lock(&self) -> Lock<MSG> {
let stream = self.get_stream();
stream.borrow_mut().locked = true;
Lock {
stream: self.get_stream().clone(),
}
}
pub fn observe<CALLBACK: Fn(&MSG) + 'static>(&self, callback: CALLBACK) {
let stream = self.get_stream();
stream.borrow_mut().observers.push(Rc::new(callback));
}
pub fn set_callback<CALLBACK: FnMut(MSG) + 'static>(&self, callback: CALLBACK) {
let source_callback = self.get_callback();
*source_callback.borrow_mut() = Some(Box::new(callback));
}
}