use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub struct UserChanged {
pub user_id: u64,
}
pub struct UserChangeBus {
tx: broadcast::Sender<UserChanged>,
}
pub const USER_CHANGE_CHANNEL_CAPACITY: usize = 1_024;
impl UserChangeBus {
pub fn new() -> (Self, broadcast::Receiver<UserChanged>) {
let (tx, rx) = broadcast::channel(USER_CHANGE_CHANNEL_CAPACITY);
(Self { tx }, rx)
}
pub fn from_existing(tx: broadcast::Sender<UserChanged>) -> Self {
Self { tx }
}
pub fn publish(&self, event: UserChanged) -> usize {
self.tx.send(event).unwrap_or(0)
}
pub fn subscribe(&self) -> broadcast::Receiver<UserChanged> {
self.tx.subscribe()
}
pub fn sender(&self) -> broadcast::Sender<UserChanged> {
self.tx.clone()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn user_change_bus_produce_consume() {
let (bus, mut rx) = UserChangeBus::new();
bus.publish(UserChanged { user_id: 7 });
let event = rx.recv().await.expect("should receive event");
assert_eq!(event.user_id, 7);
}
#[tokio::test]
async fn multiple_events_ordered() {
let (bus, mut rx) = UserChangeBus::new();
for id in [1u64, 2, 3] {
bus.publish(UserChanged { user_id: id });
}
for expected in [1u64, 2, 3] {
let ev = rx.recv().await.expect("event present");
assert_eq!(ev.user_id, expected);
}
}
}