#![warn(
missing_docs,
trivial_casts,
trivial_numeric_casts,
unused_extern_crates,
unused_import_braces,
unused_qualifications,
)]
mod source;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::marker::PhantomData;
use std::rc::{Rc, Weak};
use std::sync::mpsc::{self, Receiver, SendError};
use self::source::{SourceFuncs, new_source, source_get};
use glib::{
MainContext,
Source,
SourceId,
};
pub struct StreamHandle<MSG> {
stream: Weak<RefCell<_EventStream<MSG>>>,
}
impl<MSG> Clone for StreamHandle<MSG> {
fn clone(&self) -> Self {
Self {
stream: self.stream.clone(),
}
}
}
impl<MSG> StreamHandle<MSG> {
fn new(stream: Weak<RefCell<_EventStream<MSG>>>) -> Self {
Self {
stream,
}
}
#[must_use]
pub fn stream(&self) -> Self {
self.clone()
}
pub fn emit(&self, msg: MSG) {
if let Some(ref stream) = self.stream.upgrade() {
emit(stream, msg);
}
else {
panic!("Trying to call emit() on a dropped EventStream");
}
}
pub fn lock(&self) -> Lock<MSG> {
if let Some(ref stream) = self.stream.upgrade() {
stream.borrow_mut().locked = true;
Lock {
stream: self.clone(),
}
}
else {
panic!("Trying to call lock() on a dropped EventStream");
}
}
fn unlock(&self) {
if let Some(ref stream) = self.stream.upgrade() {
stream.borrow_mut().locked = false;
}
else {
panic!("Trying to call unlock() on a dropped EventStream");
}
}
pub fn observe<CALLBACK: Fn(&MSG) + 'static>(&self, callback: CALLBACK) {
if let Some(ref stream) = self.stream.upgrade() {
stream.borrow_mut().observers.push(Rc::new(callback));
}
else {
panic!("Trying to call observe() on a dropped EventStream");
}
}
}
#[must_use]
pub struct Lock<MSG> {
stream: StreamHandle<MSG>,
}
impl<MSG> Drop for Lock<MSG> {
fn drop(&mut self) {
self.stream.unlock();
}
}
struct ChannelData<MSG> {
callback: Box<dyn FnMut(MSG)>,
peeked_value: Option<MSG>,
receiver: Receiver<MSG>,
}
pub struct Sender<MSG> {
sender: mpsc::Sender<MSG>,
}
impl<MSG> Clone for Sender<MSG> {
fn clone(&self) -> Self {
Self {
sender: self.sender.clone(),
}
}
}
impl<MSG> Sender<MSG> {
pub fn send(&self, msg: MSG) -> Result<(), SendError<MSG>> {
let result = self.sender.send(msg);
let context = MainContext::default();
context.wakeup();
result
}
}
pub struct Channel<MSG> {
_source: Source,
_phantom: PhantomData<MSG>,
}
impl<MSG> Channel<MSG> {
pub fn new<CALLBACK: FnMut(MSG) + 'static>(callback: CALLBACK) -> (Self, Sender<MSG>) {
let (sender, receiver) = mpsc::channel();
let source = new_source(RefCell::new(ChannelData {
callback: Box::new(callback),
peeked_value: None,
receiver,
}));
let main_context = MainContext::default();
source.attach(Some(&main_context));
(Self {
_source: source,
_phantom: PhantomData,
}, Sender {
sender,
})
}
}
impl<MSG> SourceFuncs for RefCell<ChannelData<MSG>> {
fn dispatch(&self) -> bool {
let msg = self.borrow_mut().peeked_value.take().or_else(|| {
self.borrow().receiver.try_recv().ok()
});
if let Some(msg) = msg {
let callback = &mut self.borrow_mut().callback;
callback(msg);
}
true
}
fn prepare(&self) -> (bool, Option<u32>) {
if self.borrow().peeked_value.is_some() {
return (true, None);
}
let peek_val = self.borrow().receiver.try_recv().ok();
self.borrow_mut().peeked_value = peek_val;
(self.borrow().peeked_value.is_some(), None)
}
}
struct _EventStream<MSG> {
events: VecDeque<MSG>,
locked: bool,
#[allow(clippy::type_complexity)]
observers: Vec<Rc<dyn Fn(&MSG)>>,
}
impl<MSG> SourceFuncs for SourceData<MSG> {
fn dispatch(&self) -> bool {
let event = self.stream.borrow_mut().events.pop_front();
if let (Some(event), Some(callback)) = (event, self.callback.borrow_mut().as_mut()) {
callback(event);
}
true
}
fn prepare(&self) -> (bool, Option<u32>) {
(!self.stream.borrow().events.is_empty(), None)
}
}
type Callback<MSG> = Rc<RefCell<Option<Box<dyn FnMut(MSG)>>>>;
struct SourceData<MSG> {
callback: Callback<MSG>,
stream: Rc<RefCell<_EventStream<MSG>>>,
}
fn emit<MSG>(stream: &Rc<RefCell<_EventStream<MSG>>>, msg: MSG) {
if !stream.borrow().locked {
let len = stream.borrow().observers.len();
for i in 0..len {
let observer = stream.borrow().observers[i].clone();
observer(&msg);
}
stream.borrow_mut().events.push_back(msg);
}
}
pub struct EventStream<MSG> {
source: Source,
source_id: Option<SourceId>,
_phantom: PhantomData<*mut MSG>,
}
impl<MSG> Drop for EventStream<MSG> {
fn drop(&mut self) {
self.source_id.take().expect("source id").remove();
self.close();
}
}
impl<MSG> EventStream<MSG> {
fn get_callback(&self) -> Callback<MSG> {
source_get::<SourceData<MSG>>(&self.source).callback.clone()
}
fn get_stream(&self) -> &Rc<RefCell<_EventStream<MSG>>> {
&source_get::<SourceData<MSG>>(&self.source).stream
}
}
impl<MSG> EventStream<MSG> {
pub fn new() -> Self {
let event_stream: _EventStream<MSG> = _EventStream {
events: VecDeque::new(),
locked: false,
observers: vec![],
};
let source = new_source(SourceData {
callback: Rc::new(RefCell::new(None)),
stream: Rc::new(RefCell::new(event_stream)),
});
let main_context = MainContext::default();
let source_id = Some(source.attach(Some(&main_context)));
EventStream {
source,
source_id,
_phantom: PhantomData,
}
}
pub fn close(&self) {
self.source.destroy();
}
pub fn stream(&self) -> StreamHandle<MSG> {
self.downgrade()
}
pub fn downgrade(&self) -> StreamHandle<MSG> {
StreamHandle::new(Rc::downgrade(self.get_stream()))
}
pub fn emit(&self, event: MSG) {
let stream = self.get_stream();
emit(stream, event)
}
pub fn lock(&self) -> Lock<MSG> {
let stream = self.get_stream();
stream.borrow_mut().locked = true;
Lock {
stream: self.stream(),
}
}
pub fn observe<CALLBACK: Fn(&MSG) + 'static>(&self, callback: CALLBACK) {
let stream = self.get_stream();
stream.borrow_mut().observers.push(Rc::new(callback));
}
pub fn set_callback<CALLBACK: FnMut(MSG) + 'static>(&self, callback: CALLBACK) {
let source_callback = self.get_callback();
*source_callback.borrow_mut() = Some(Box::new(callback));
}
}