use crate::bufferpool::Chunk;
use crate::flow::Message;
use tokio::sync::mpsc;
use std::any::Any;
use std::fmt::Debug;
use std::future::Future;
use std::pin::Pin;
use std::sync::{Arc, Mutex, Weak};
pub trait Event: Any + Debug + Send + Sync {
fn is_interrupt(&self) -> bool {
false
}
fn is_flush(&self) -> bool {
false
}
fn as_any(&self) -> &(dyn Any + Send + Sync);
}
#[derive(Clone, Debug)]
pub struct Disconnection;
impl Event for Disconnection {
fn is_interrupt(&self) -> bool {
true
}
fn as_any(&self) -> &(dyn std::any::Any + Send + Sync) {
self
}
}
type BoxedCallback = Box<dyn FnMut(&Arc<dyn Event>) + Send>;
struct IdentifiedCallback {
callback: BoxedCallback,
id: u64,
}
#[derive(Default)]
struct CallbackRegistry {
callbacks: Vec<IdentifiedCallback>,
next_id: u64,
}
#[derive(Clone, Default)]
pub struct EventHandlers(Arc<Mutex<CallbackRegistry>>);
impl EventHandlers {
pub fn new() -> Self {
Default::default()
}
}
#[must_use]
pub struct EventHandlerGuard {
callbacks: Weak<Mutex<CallbackRegistry>>,
id: u64,
auto: bool,
}
impl Drop for EventHandlerGuard {
fn drop(&mut self) {
if self.auto {
if let Some(callbacks) = Weak::upgrade(&self.callbacks) {
callbacks
.lock()
.unwrap()
.callbacks
.retain(|x| x.id != self.id);
}
}
}
}
impl EventHandlerGuard {
pub fn unregister(self) {}
pub fn forget(mut self) {
self.auto = false;
}
}
impl EventHandlers {
pub fn register<F: FnMut(&Arc<dyn Event>) + Send + 'static>(
&self,
func: F,
) -> EventHandlerGuard {
let mut registry = self.0.lock().unwrap();
let boxed_callback: BoxedCallback = Box::new(func);
let id = registry.next_id;
registry.next_id += 1;
registry.callbacks.push(IdentifiedCallback {
callback: boxed_callback,
id,
});
drop(registry);
EventHandlerGuard {
callbacks: Arc::downgrade(&self.0),
id,
auto: true,
}
}
pub fn invoke(&self, event: &Arc<dyn Event>) {
for IdentifiedCallback { callback, .. } in self.0.lock().unwrap().callbacks.iter_mut() {
callback(event);
}
}
}
pub trait EventHandling {
fn on_event<F: FnMut(&Arc<dyn Event>) + Send + 'static>(&self, func: F) -> EventHandlerGuard;
fn wait_for_event<F>(&self, mut func: F) -> Pin<Box<dyn Future<Output = ()> + Send>>
where
F: FnMut(&Arc<dyn Event>) -> bool + Send + 'static,
{
let (waiter_tx, mut waiter_rx) = mpsc::unbounded_channel::<()>();
let handle = self.on_event(move |event| {
if func(event) {
waiter_tx.send(()).ok();
}
});
Box::pin(async move {
waiter_rx.recv().await.unwrap();
handle.unregister();
})
}
}
#[derive(Clone, Debug)]
pub enum Signal<T> {
Samples {
sample_rate: f64,
chunk: Chunk<T>,
},
Event(
Arc<dyn Event>,
),
}
impl<T> Signal<T> {
pub fn new_event<E: Event>(event: E) -> Self {
Self::Event(Arc::new(event))
}
pub fn is_event(&self) -> bool {
match self {
Signal::Samples { .. } => false,
Signal::Event(_) => true,
}
}
pub fn duration(&self) -> f64 {
match self {
Signal::Samples { sample_rate, chunk } => chunk.len() as f64 / sample_rate,
_ => 0.0,
}
}
}
impl<T> Message for Signal<T>
where
T: Clone,
{
fn disconnection() -> Option<Self> {
Some(Signal::new_event(Disconnection))
}
}