use std::sync::Arc;
use std::time::Duration;
use tokio::sync::broadcast;
use tokio::time::timeout;
use crate::{
config::ClawDBConfig,
error::{ClawDBError, ClawDBResult},
events::types::ClawEvent,
};
const DEFAULT_CAPACITY: usize = 1_024;
#[derive(Debug, Clone)]
pub struct EventBus {
sender: broadcast::Sender<Arc<ClawEvent>>,
capacity: usize,
}
impl EventBus {
pub fn new(capacity: usize) -> Self {
let (sender, _) = broadcast::channel(capacity);
Self { sender, capacity }
}
pub fn from_config(config: &ClawDBConfig) -> Self {
let _ = config; Self::new(DEFAULT_CAPACITY)
}
#[inline]
pub fn emit(&self, event: ClawEvent) {
let _ = self.sender.send(Arc::new(event));
}
#[inline]
pub fn publish(&self, event: ClawEvent) -> usize {
self.sender.send(Arc::new(event)).unwrap_or(0)
}
#[inline]
pub fn subscribe(&self) -> broadcast::Receiver<Arc<ClawEvent>> {
self.sender.subscribe()
}
pub async fn emit_and_wait(
&self,
event: ClawEvent,
deadline: Duration,
) -> ClawDBResult<()> {
if self.sender.receiver_count() == 0 {
let _ = self.sender.send(Arc::new(event));
return Ok(());
}
let mut rx = self.sender.subscribe();
let arc = Arc::new(event);
let _ = self.sender.send(arc.clone());
timeout(deadline, async move {
loop {
match rx.recv().await {
Ok(_) => return,
Err(broadcast::error::RecvError::Lagged(_)) => continue,
Err(broadcast::error::RecvError::Closed) => return,
}
}
})
.await
.map_err(|_| {
ClawDBError::EventBusError(format!(
"emit_and_wait timed out after {}ms",
deadline.as_millis()
))
})
}
#[inline]
pub fn subscriber_count(&self) -> usize {
self.sender.receiver_count()
}
#[inline]
pub fn capacity(&self) -> usize {
self.capacity
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new(DEFAULT_CAPACITY)
}
}