use crate::error::{Error, Result};
use crate::event::Event;
use crate::hook::{EventHandler, GrabHandler};
use crate::platform;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc::{self, Receiver, Sender, SyncSender};
use std::thread::{self, JoinHandle};
pub struct ChannelHookHandle {
running: Arc<AtomicBool>,
thread_handle: Option<JoinHandle<()>>,
}
impl ChannelHookHandle {
pub fn stop(mut self) -> Result<()> {
self.stop_inner()
}
pub fn is_running(&self) -> bool {
self.running.load(Ordering::SeqCst)
}
fn stop_inner(&mut self) -> Result<()> {
if !self.running.swap(false, Ordering::SeqCst) {
return Ok(()); }
platform::stop_hook()?;
if let Some(handle) = self.thread_handle.take() {
handle
.join()
.map_err(|_| Error::ThreadError("failed to join hook thread".into()))?;
}
Ok(())
}
}
impl Drop for ChannelHookHandle {
fn drop(&mut self) {
let _ = self.stop_inner();
}
}
struct ChannelHandler {
sender: SyncSender<Event>,
}
impl EventHandler for ChannelHandler {
fn handle_event(&self, event: &Event) {
let _ = self.sender.try_send(event.clone());
}
}
struct UnboundedChannelHandler {
sender: Sender<Event>,
}
impl EventHandler for UnboundedChannelHandler {
fn handle_event(&self, event: &Event) {
let _ = self.sender.send(event.clone());
}
}
pub fn listen_channel(capacity: usize) -> Result<(ChannelHookHandle, Receiver<Event>)> {
let (sender, receiver) = mpsc::sync_channel(capacity);
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
crate::state::reset_mask();
let thread_handle = thread::spawn(move || {
let handler = ChannelHandler { sender };
let _ = platform::run_hook(&running_clone, handler);
running_clone.store(false, Ordering::SeqCst);
});
let handle = ChannelHookHandle {
running,
thread_handle: Some(thread_handle),
};
Ok((handle, receiver))
}
pub fn listen_unbounded_channel() -> Result<(ChannelHookHandle, Receiver<Event>)> {
let (sender, receiver) = mpsc::channel();
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
crate::state::reset_mask();
let thread_handle = thread::spawn(move || {
let handler = UnboundedChannelHandler { sender };
let _ = platform::run_hook(&running_clone, handler);
running_clone.store(false, Ordering::SeqCst);
});
let handle = ChannelHookHandle {
running,
thread_handle: Some(thread_handle),
};
Ok((handle, receiver))
}
struct GrabChannelHandler<F>
where
F: Fn(&Event) -> bool + Send + Sync,
{
sender: SyncSender<Event>,
filter: F,
}
impl<F> GrabHandler for GrabChannelHandler<F>
where
F: Fn(&Event) -> bool + Send + Sync,
{
fn handle_event(&self, event: &Event) -> Option<Event> {
let _ = self.sender.try_send(event.clone());
if (self.filter)(event) {
Some(event.clone())
} else {
None }
}
}
pub fn grab_channel<F>(capacity: usize, filter: F) -> Result<(ChannelHookHandle, Receiver<Event>)>
where
F: Fn(&Event) -> bool + Send + Sync + 'static,
{
let (sender, receiver) = mpsc::sync_channel(capacity);
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
crate::state::reset_mask();
let thread_handle = thread::spawn(move || {
let handler = GrabChannelHandler { sender, filter };
let _ = platform::run_grab_hook(&running_clone, handler);
running_clone.store(false, Ordering::SeqCst);
});
let handle = ChannelHookHandle {
running,
thread_handle: Some(thread_handle),
};
Ok((handle, receiver))
}
#[cfg(feature = "tokio")]
pub use tokio_channel::*;
#[cfg(feature = "tokio")]
mod tokio_channel {
use super::*;
use tokio::sync::mpsc as tokio_mpsc;
struct TokioChannelHandler {
sender: tokio_mpsc::Sender<Event>,
}
impl EventHandler for TokioChannelHandler {
fn handle_event(&self, event: &Event) {
let _ = self.sender.try_send(event.clone());
}
}
pub fn listen_async_channel(
capacity: usize,
) -> Result<(ChannelHookHandle, tokio_mpsc::Receiver<Event>)> {
let (sender, receiver) = tokio_mpsc::channel(capacity);
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
crate::state::reset_mask();
let thread_handle = thread::spawn(move || {
let handler = TokioChannelHandler { sender };
let _ = platform::run_hook(&running_clone, handler);
running_clone.store(false, Ordering::SeqCst);
});
let handle = ChannelHookHandle {
running,
thread_handle: Some(thread_handle),
};
Ok((handle, receiver))
}
struct TokioGrabChannelHandler<F>
where
F: Fn(&Event) -> bool + Send + Sync,
{
sender: tokio_mpsc::Sender<Event>,
filter: F,
}
impl<F> GrabHandler for TokioGrabChannelHandler<F>
where
F: Fn(&Event) -> bool + Send + Sync,
{
fn handle_event(&self, event: &Event) -> Option<Event> {
let _ = self.sender.try_send(event.clone());
if (self.filter)(event) {
Some(event.clone())
} else {
None
}
}
}
pub fn grab_async_channel<F>(
capacity: usize,
filter: F,
) -> Result<(ChannelHookHandle, tokio_mpsc::Receiver<Event>)>
where
F: Fn(&Event) -> bool + Send + Sync + 'static,
{
let (sender, receiver) = tokio_mpsc::channel(capacity);
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
crate::state::reset_mask();
let thread_handle = thread::spawn(move || {
let handler = TokioGrabChannelHandler { sender, filter };
let _ = platform::run_grab_hook(&running_clone, handler);
running_clone.store(false, Ordering::SeqCst);
});
let handle = ChannelHookHandle {
running,
thread_handle: Some(thread_handle),
};
Ok((handle, receiver))
}
}