use std::{fmt::Debug, sync::Arc, time::Duration};
use async_channel::{Receiver, Sender};
use futures::future::join_all;
use moduforge_state::{debug, state::State, Transaction};
use tokio::{signal, sync::RwLock, time::timeout};
use crate::error::{EditorResult, error_utils};
#[derive(Clone)]
pub enum Event {
Create(Arc<State>),
TrApply(Arc<Vec<Transaction>>, Arc<State>), Destroy, Stop, }
#[derive(Clone)]
pub struct EventBus {
tx: Sender<Event>,
rt: Receiver<Event>,
event_handlers: Arc<RwLock<Vec<Arc<dyn EventHandler>>>>,
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
impl EventBus {
pub async fn restart(&self) -> EditorResult<()> {
self.broadcast(Event::Stop).await?;
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
self.start_event_loop();
Ok(())
}
pub async fn add_event_handler(
&mut self,
event_handler: Arc<dyn EventHandler>,
) -> EditorResult<()> {
let mut write = self.event_handlers.write().await;
write.push(event_handler);
Ok(())
}
pub async fn add_event_handlers(
&mut self,
event_handlers: Vec<Arc<dyn EventHandler>>,
) -> EditorResult<()> {
let mut write = self.event_handlers.write().await;
write.extend(event_handlers);
Ok(())
}
pub fn start_event_loop(&self) {
let rx: async_channel::Receiver<Event> = self.subscribe();
let event_handlers = self.event_handlers.clone();
tokio::spawn(async move {
let handlers_clone = {
let handlers = event_handlers.read().await;
handlers.clone()
};
loop {
tokio::select! {
event = rx.recv() => match event {
Ok(Event::Stop) => {
debug!("接收到停止事件,等待所有处理器完成...");
let mut pending_handles = Vec::new();
for handler in &handlers_clone {
let handle = handler.handle(&Event::Stop);
pending_handles.push(handle);
}
if let Err(e) = timeout(Duration::from_secs(5), join_all(pending_handles)).await {
debug!("等待处理器完成超时: {}", e);
}
break;
},
Ok(event) => {
let mut handles = Vec::new();
for handler in &handlers_clone {
let handle = handler.handle(&event);
handles.push(handle);
}
let results = join_all(handles.into_iter().map(|handle| {
timeout(Duration::from_secs(3), handle)
})).await;
for result in results {
match result {
Ok(Ok(())) => continue,
Ok(Err(e)) => debug!("事件处理错误: {}", e),
Err(e) => debug!("事件处理超时: {}", e),
}
}
},
Err(e) => {
debug!("事件接收错误: {}", e);
break;
},
},
shutdown_signal = Box::pin(signal::ctrl_c()) => {
match shutdown_signal {
Ok(()) => {
debug!("事件管理器,接收到关闭信号,正在退出...");
break;
},
Err(e) => {
debug!("事件管理器,处理关闭信号时出错: {}", e);
break;
}
}
},
}
}
});
}
pub fn new() -> Self {
let (tx, rt) = async_channel::bounded(100);
Self { tx, rt, event_handlers: Arc::new(RwLock::new(vec![])) }
}
pub fn subscribe(&self) -> Receiver<Event> {
self.rt.clone()
}
pub async fn broadcast(
&self,
event: Event,
) -> EditorResult<()> {
self.tx.send(event).await.map_err(|e| {
error_utils::event_error(format!(
"Failed to broadcast event: {}",
e
))
})
}
pub fn broadcast_blocking(
&self,
event: Event,
) -> EditorResult<()> {
self.tx.send_blocking(event).map_err(|e| {
error_utils::event_error(format!(
"Failed to broadcast event: {}",
e
))
})
}
}
impl Drop for EventBus {
fn drop(&mut self) {
let rt = tokio::runtime::Runtime::new().unwrap();
rt.block_on(async {
if let Err(e) = self.broadcast_blocking(Event::Stop) {
debug!("Failed to broadcast stop event during drop: {}", e);
}
let handlers = self.event_handlers.read().await;
let mut pending_handles = Vec::new();
for handler in handlers.iter() {
let handle = handler.handle(&Event::Stop);
pending_handles.push(handle);
}
if let Err(e) =
timeout(Duration::from_secs(5), join_all(pending_handles)).await
{
debug!(
"Timeout waiting for handlers to complete during drop: {}",
e
);
}
});
}
}
#[async_trait::async_trait]
pub trait EventHandler: Send + Sync + Debug {
async fn handle(
&self,
event: &Event,
) -> EditorResult<()>;
}
pub struct EventContext {
pub state: Arc<State>,
}