use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use tokio::sync::broadcast;
pub mod events {
pub const SOFT_DELETE: &str = "onSoftDelete";
pub const SERVER_READY: &str = "onServerReady";
pub const READY: &str = "onReady";
pub const ES_DATA_CHANGE: &str = "esDataChange";
}
pub type EventData = Arc<dyn Any + Send + Sync>;
pub type EventHandler =
Arc<dyn Fn(EventData) -> Pin<Box<dyn Future<Output = ()> + Send>> + Send + Sync>;
pub struct EventManager {
handlers: RwLock<HashMap<String, Vec<EventHandler>>>,
sender: broadcast::Sender<(String, EventData)>,
}
impl EventManager {
pub fn new() -> Self {
let (sender, _) = broadcast::channel(1024);
Self {
handlers: RwLock::new(HashMap::new()),
sender,
}
}
pub fn on<F, Fut>(&self, event: &str, handler: F)
where
F: Fn(EventData) -> Fut + Send + Sync + 'static,
Fut: Future<Output = ()> + Send + 'static,
{
let handler: EventHandler = Arc::new(move |data: EventData| {
let fut = handler(data);
Box::pin(fut) as Pin<Box<dyn Future<Output = ()> + Send>>
});
let mut handlers = self.handlers.write();
handlers.entry(event.to_string()).or_default().push(handler);
}
pub async fn emit<T: Any + Send + Sync>(&self, event: &str, data: T) {
let data: EventData = Arc::new(data);
let handlers_vec: Vec<EventHandler> = {
let handlers = self.handlers.read();
handlers.get(event).cloned().unwrap_or_default()
};
for handler in handlers_vec {
let data_clone = Arc::clone(&data);
handler(data_clone).await;
}
let _ = self.sender.send((event.to_string(), data));
}
pub fn subscribe(&self) -> broadcast::Receiver<(String, EventData)> {
self.sender.subscribe()
}
pub fn off(&self, event: &str) {
let mut handlers = self.handlers.write();
handlers.remove(event);
}
pub fn clear(&self) {
let mut handlers = self.handlers.write();
handlers.clear();
}
}
impl Default for EventManager {
fn default() -> Self {
Self::new()
}
}
static GLOBAL_EVENT_MANAGER: once_cell::sync::Lazy<EventManager> =
once_cell::sync::Lazy::new(EventManager::new);
pub fn global_event_manager() -> &'static EventManager {
&GLOBAL_EVENT_MANAGER
}
#[derive(Debug, Clone)]
pub struct SoftDeleteEvent {
pub entity: String,
pub ids: Vec<i64>,
pub tenant_id: Option<i64>,
}
#[derive(Debug, Clone)]
pub struct ServerReadyEvent {
pub address: String,
pub port: u16,
}
#[derive(Debug, Clone)]
pub struct EsDataChangeEvent {
pub index: String,
pub operation: String,
pub doc_id: Option<String>,
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_event_manager() {
let manager = EventManager::new();
let received = Arc::new(RwLock::new(false));
let received_clone = Arc::clone(&received);
manager.on("test", move |_data| {
let received = Arc::clone(&received_clone);
async move {
*received.write() = true;
}
});
manager.emit("test", "hello").await;
assert!(*received.read());
}
}