use std::any::TypeId;
use std::sync::{
LazyLock, RwLock,
atomic::{AtomicU64, Ordering},
};
use std::time::Instant;
use crate::drain;
use crate::error::{BarkerError, Result};
use crate::handler::{Handler, HandlerType, MessageHandler};
use crate::message::{Envelope, Message};
static GLOBAL_BUS: LazyLock<MessageBus> = LazyLock::new(MessageBus::new);
pub struct MessageBus {
sender: flume::Sender<Envelope>,
receiver: flume::Receiver<Envelope>,
handlers: RwLock<Vec<Handler>>,
next_message_id: AtomicU64,
}
impl MessageBus {
pub fn new() -> Self {
let (sender, receiver) = flume::unbounded();
Self {
sender,
receiver,
handlers: RwLock::new(Vec::new()),
next_message_id: AtomicU64::new(0),
}
}
pub fn bounded(capacity: usize) -> Self {
let (sender, receiver) = flume::bounded(capacity);
Self {
sender,
receiver,
handlers: RwLock::new(Vec::new()),
next_message_id: AtomicU64::new(0),
}
}
pub fn global() -> &'static MessageBus {
&GLOBAL_BUS
}
pub fn send<M: Message>(&self, message: M) -> Result<u64> {
let id = self.next_message_id.fetch_add(1, Ordering::SeqCst);
let envelope = Envelope {
message: Box::new(message),
timestamp: Instant::now(),
};
self.sender.send(envelope).map_err(|_| BarkerError::Send)?;
Ok(id)
}
pub fn register_handler(
&self,
handler: Box<dyn MessageHandler>,
msg_type: Option<TypeId>,
) -> Result<()> {
let type_id = match msg_type {
Some(t) => HandlerType::Typed(t),
None => HandlerType::Generic,
};
let mut handlers = self
.handlers
.write()
.map_err(|_| BarkerError::HandlerLockPoisoned)?;
handlers.push(Handler {
inner: handler,
type_id,
});
Ok(())
}
pub fn process_messages(&self, limit: Option<usize>) -> Result<()> {
drain::process(&self.receiver, &self.handlers, limit)
}
}
impl Default for MessageBus {
fn default() -> Self {
Self::new()
}
}