use bmux_plugin_sdk::PluginEventKind;
use serde::Serialize;
use serde_json::Value as JsonValue;
use std::any::{Any, TypeId};
use std::collections::HashMap;
use std::sync::{Arc, RwLock};
use tokio::sync::{broadcast, watch};
pub const DEFAULT_EVENT_BUS_CAPACITY: usize = 1024;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DeliveryMode {
Broadcast,
State,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct JsonPluginEvent {
pub interface: PluginEventKind,
pub delivery: DeliveryMode,
pub payload: JsonValue,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub enum JsonProjectionPolicy {
#[default]
Disabled,
Lazy,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct EventBusChannelOptions {
pub json_projection: JsonProjectionPolicy,
}
impl std::fmt::Display for DeliveryMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Broadcast => f.write_str("broadcast"),
Self::State => f.write_str("state"),
}
}
}
#[derive(Debug)]
pub enum EventBusError {
ChannelNotRegistered {
interface: String,
},
PayloadTypeMismatch {
interface: String,
expected: &'static str,
actual: &'static str,
},
ChannelDeliveryMismatch {
interface: String,
expected: DeliveryMode,
actual: DeliveryMode,
},
JsonProjectionUnavailable {
interface: String,
},
}
impl std::fmt::Display for EventBusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::ChannelNotRegistered { interface } => {
write!(f, "no event channel registered for interface `{interface}`")
}
Self::PayloadTypeMismatch {
interface,
expected,
actual,
} => write!(
f,
"event channel for `{interface}` has payload type `{expected}`; \
caller requested `{actual}`"
),
Self::ChannelDeliveryMismatch {
interface,
expected,
actual,
} => write!(
f,
"event channel for `{interface}` is a {actual} channel; \
caller's API is {expected}"
),
Self::JsonProjectionUnavailable { interface } => write!(
f,
"event channel for `{interface}` was registered without a JSON projection"
),
}
}
}
impl std::error::Error for EventBusError {}
pub type EventBusResult<T> = std::result::Result<T, EventBusError>;
enum ChannelKind {
Broadcast(Arc<dyn Any + Send + Sync>),
State(Arc<dyn Any + Send + Sync>),
}
type JsonEventEncoder =
Arc<dyn Fn(&(dyn Any + Send + Sync)) -> Option<JsonValue> + Send + Sync + 'static>;
type JsonStateEncoder = Arc<dyn Fn(&ChannelKind) -> Option<JsonValue> + Send + Sync + 'static>;
struct JsonProjection {
broadcast: broadcast::Sender<Arc<JsonPluginEvent>>,
state: Option<watch::Sender<Arc<JsonPluginEvent>>>,
event_encoder: Option<JsonEventEncoder>,
state_encoder: Option<JsonStateEncoder>,
}
struct ChannelEntry {
kind: ChannelKind,
payload_type_id: TypeId,
payload_type_name: &'static str,
decoder: Option<BytesDecoder>,
json: Option<JsonProjection>,
}
#[derive(Debug, thiserror::Error)]
pub enum EventBusBytesError {
#[error("failed to decode wire payload: {0}")]
Decode(String),
#[error(transparent)]
Bus(#[from] EventBusError),
}
type BytesDecoder = Arc<dyn Fn(&[u8]) -> Result<(), EventBusBytesError> + Send + Sync + 'static>;
fn event_json_encoder<T>() -> JsonEventEncoder
where
T: Any + Send + Sync + Serialize + 'static,
{
Arc::new(|event| serde_json::to_value(event.downcast_ref::<T>()?).ok())
}
fn state_json_encoder<T>() -> JsonStateEncoder
where
T: Any + Send + Sync + Serialize + 'static,
{
Arc::new(|kind| {
let ChannelKind::State(sender) = kind else {
return None;
};
let sender = sender.clone().downcast::<watch::Sender<Arc<T>>>().ok()?;
serde_json::to_value(sender.borrow().as_ref()).ok()
})
}
#[derive(Default)]
pub struct EventBus {
entries: RwLock<HashMap<PluginEventKind, ChannelEntry>>,
}
impl std::fmt::Debug for EventBus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let count = self.entries.read().map_or(0, |g| g.len());
f.debug_struct("EventBus")
.field("channels", &count)
.finish()
}
}
impl EventBus {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register_channel<E>(&self, interface: PluginEventKind) -> broadcast::Sender<Arc<E>>
where
E: Any + Send + Sync + 'static,
{
self.register_channel_with_capacity::<E>(interface, DEFAULT_EVENT_BUS_CAPACITY)
}
pub fn register_channel_with_json_projection<E>(
&self,
interface: PluginEventKind,
) -> broadcast::Sender<Arc<E>>
where
E: Any + Send + Sync + Serialize + 'static,
{
self.register_channel_with_capacity_and_options::<E>(
interface,
DEFAULT_EVENT_BUS_CAPACITY,
EventBusChannelOptions {
json_projection: JsonProjectionPolicy::Lazy,
},
)
}
pub fn register_channel_with_capacity<E>(
&self,
interface: PluginEventKind,
capacity: usize,
) -> broadcast::Sender<Arc<E>>
where
E: Any + Send + Sync + 'static,
{
self.register_channel_with_capacity_and_json_encoder::<E>(interface, capacity, None)
}
pub fn register_channel_with_capacity_and_options<E>(
&self,
interface: PluginEventKind,
capacity: usize,
options: EventBusChannelOptions,
) -> broadcast::Sender<Arc<E>>
where
E: Any + Send + Sync + Serialize + 'static,
{
let event_encoder = (options.json_projection == JsonProjectionPolicy::Lazy)
.then(|| event_json_encoder::<E>());
self.register_channel_with_capacity_and_json_encoder::<E>(
interface,
capacity,
event_encoder,
)
}
pub fn register_state_channel<T>(
&self,
interface: PluginEventKind,
initial: T,
) -> watch::Sender<Arc<T>>
where
T: Any + Send + Sync + 'static,
{
let (sender, _) = watch::channel::<Arc<T>>(Arc::new(initial));
let entry = ChannelEntry {
kind: ChannelKind::State(Arc::new(sender.clone())),
payload_type_id: TypeId::of::<T>(),
payload_type_name: std::any::type_name::<T>(),
decoder: None,
json: None,
};
let mut guard = self.entries.write().expect("event bus lock poisoned");
guard.insert(interface, entry);
sender
}
pub fn register_state_channel_with_json_projection<T>(
&self,
interface: PluginEventKind,
initial: T,
) -> watch::Sender<Arc<T>>
where
T: Any + Send + Sync + Serialize + 'static,
{
self.register_state_channel_with_options(
interface,
initial,
EventBusChannelOptions {
json_projection: JsonProjectionPolicy::Lazy,
},
)
}
pub fn register_state_channel_with_options<T>(
&self,
interface: PluginEventKind,
initial: T,
options: EventBusChannelOptions,
) -> watch::Sender<Arc<T>>
where
T: Any + Send + Sync + Serialize + 'static,
{
let (sender, _) = watch::channel::<Arc<T>>(Arc::new(initial));
let json = if options.json_projection == JsonProjectionPolicy::Lazy {
let initial_json = JsonPluginEvent {
interface: interface.clone(),
delivery: DeliveryMode::State,
payload: JsonValue::Null,
};
let (json_state_sender, _) =
watch::channel::<Arc<JsonPluginEvent>>(Arc::new(initial_json));
let (json_broadcast_sender, _) =
broadcast::channel::<Arc<JsonPluginEvent>>(DEFAULT_EVENT_BUS_CAPACITY);
Some(JsonProjection {
broadcast: json_broadcast_sender,
state: Some(json_state_sender),
event_encoder: None,
state_encoder: Some(state_json_encoder::<T>()),
})
} else {
None
};
let entry = ChannelEntry {
kind: ChannelKind::State(Arc::new(sender.clone())),
payload_type_id: TypeId::of::<T>(),
payload_type_name: std::any::type_name::<T>(),
decoder: None,
json,
};
let mut guard = self.entries.write().expect("event bus lock poisoned");
guard.insert(interface, entry);
sender
}
#[allow(clippy::needless_pass_by_value)] pub fn register_state_channel_with_decoder<T>(
self: &Arc<Self>,
interface: PluginEventKind,
initial: T,
) -> watch::Sender<Arc<T>>
where
T: Any + Send + Sync + 'static + serde::de::DeserializeOwned,
{
self.register_state_channel_with_bytes_decoder(interface, initial, |bytes| {
serde_json::from_slice(bytes).map_err(|err| EventBusBytesError::Decode(err.to_string()))
})
}
#[allow(clippy::needless_pass_by_value)] pub fn register_state_channel_with_bytes_decoder<T, F>(
self: &Arc<Self>,
interface: PluginEventKind,
initial: T,
decode: F,
) -> watch::Sender<Arc<T>>
where
T: Any + Send + Sync + 'static,
F: Fn(&[u8]) -> Result<T, EventBusBytesError> + Send + Sync + 'static,
{
let sender = self.register_state_channel::<T>(interface.clone(), initial);
self.install_state_bytes_decoder(&interface, decode);
sender
}
fn install_state_bytes_decoder<T, F>(self: &Arc<Self>, interface: &PluginEventKind, decode: F)
where
T: Any + Send + Sync + 'static,
F: Fn(&[u8]) -> Result<T, EventBusBytesError> + Send + Sync + 'static,
{
let bus = Arc::downgrade(self);
let decoder_kind = interface.clone();
let decoder: BytesDecoder =
Arc::new(move |bytes: &[u8]| -> Result<(), EventBusBytesError> {
let Some(bus) = bus.upgrade() else {
return Err(EventBusBytesError::Decode("event bus dropped".to_string()));
};
let value = decode(bytes)?;
bus.publish_state::<T>(&decoder_kind, value)?;
Ok(())
});
self.set_bytes_decoder(interface, decoder);
}
fn set_bytes_decoder(&self, interface: &PluginEventKind, decoder: BytesDecoder) {
if let Ok(mut guard) = self.entries.write()
&& let Some(entry) = guard.get_mut(interface)
{
entry.decoder = Some(decoder);
}
}
pub fn emit_from_bytes(
&self,
interface: &PluginEventKind,
payload: &[u8],
) -> Result<bool, EventBusBytesError> {
let decoder = self
.entries
.read()
.map_err(|_| EventBusBytesError::Decode("event bus lock poisoned".to_string()))?
.get(interface)
.and_then(|entry| entry.decoder.as_ref().map(Arc::clone));
let Some(decoder) = decoder else {
return Ok(false);
};
decoder(payload)?;
Ok(true)
}
pub fn emit<E>(&self, interface: &PluginEventKind, event: E) -> EventBusResult<usize>
where
E: Any + Send + Sync + 'static,
{
let sender = self.broadcast_sender::<E>(interface)?;
let event = Arc::new(event);
let json_payload = if self.json_projection_observed(interface, DeliveryMode::Broadcast) {
self.project_json_event(interface, event.as_ref())
} else {
None
};
let count = sender.send(event).unwrap_or(0);
if let Some(payload) = json_payload {
self.publish_json_projection(interface, DeliveryMode::Broadcast, payload);
}
Ok(count)
}
pub fn publish_state<T>(&self, interface: &PluginEventKind, value: T) -> EventBusResult<()>
where
T: Any + Send + Sync + 'static,
{
let sender = self.state_sender::<T>(interface)?;
sender.send_replace(Arc::new(value));
self.publish_json_projection_from_state_if_observed(interface);
Ok(())
}
pub fn subscribe<E>(
&self,
interface: &PluginEventKind,
) -> EventBusResult<broadcast::Receiver<Arc<E>>>
where
E: Any + Send + Sync + 'static,
{
let sender = self.broadcast_sender::<E>(interface)?;
Ok(sender.subscribe())
}
pub fn subscribe_state<T>(
&self,
interface: &PluginEventKind,
) -> EventBusResult<(Arc<T>, watch::Receiver<Arc<T>>)>
where
T: Any + Send + Sync + 'static,
{
let sender = self.state_sender::<T>(interface)?;
let rx = sender.subscribe();
let current = rx.borrow().clone();
Ok((current, rx))
}
#[allow(clippy::significant_drop_tightening)] pub fn subscribe_json(
&self,
interface: &PluginEventKind,
) -> EventBusResult<broadcast::Receiver<Arc<JsonPluginEvent>>> {
let sender = {
let guard = self.entries.read().expect("event bus lock poisoned");
let entry =
guard
.get(interface)
.ok_or_else(|| EventBusError::ChannelNotRegistered {
interface: interface.as_str().to_string(),
})?;
if !matches!(entry.kind, ChannelKind::Broadcast(_)) {
return Err(EventBusError::ChannelDeliveryMismatch {
interface: interface.as_str().to_string(),
expected: DeliveryMode::Broadcast,
actual: DeliveryMode::State,
});
}
let Some(json) = entry.json.as_ref() else {
return Err(EventBusError::JsonProjectionUnavailable {
interface: interface.as_str().to_string(),
});
};
json.broadcast.clone()
};
Ok(sender.subscribe())
}
#[allow(clippy::significant_drop_tightening)] pub fn subscribe_state_json(
&self,
interface: &PluginEventKind,
) -> EventBusResult<(Arc<JsonPluginEvent>, watch::Receiver<Arc<JsonPluginEvent>>)> {
let (sender, current) = {
let guard = self.entries.read().expect("event bus lock poisoned");
let entry =
guard
.get(interface)
.ok_or_else(|| EventBusError::ChannelNotRegistered {
interface: interface.as_str().to_string(),
})?;
let Some(json) = entry.json.as_ref() else {
return Err(EventBusError::JsonProjectionUnavailable {
interface: interface.as_str().to_string(),
});
};
let Some(sender) = json.state.as_ref() else {
return Err(EventBusError::ChannelDeliveryMismatch {
interface: interface.as_str().to_string(),
expected: DeliveryMode::State,
actual: DeliveryMode::Broadcast,
});
};
let Some(encoder) = json.state_encoder.as_ref() else {
return Err(EventBusError::ChannelDeliveryMismatch {
interface: interface.as_str().to_string(),
expected: DeliveryMode::State,
actual: DeliveryMode::Broadcast,
});
};
let payload = encoder(&entry.kind).unwrap_or(JsonValue::Null);
let current = Arc::new(JsonPluginEvent {
interface: interface.clone(),
delivery: DeliveryMode::State,
payload,
});
sender.send_replace(Arc::clone(¤t));
(sender.clone(), current)
};
let rx = sender.subscribe();
Ok((current, rx))
}
#[must_use]
pub fn len(&self) -> usize {
self.entries.read().expect("event bus lock poisoned").len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.len() == 0
}
#[allow(clippy::significant_drop_tightening)]
fn broadcast_sender<E>(
&self,
interface: &PluginEventKind,
) -> EventBusResult<broadcast::Sender<Arc<E>>>
where
E: Any + Send + Sync + 'static,
{
let (sender_arc, payload_type_id, payload_type_name) = {
let guard = self.entries.read().expect("event bus lock poisoned");
let entry =
guard
.get(interface)
.ok_or_else(|| EventBusError::ChannelNotRegistered {
interface: interface.as_str().to_string(),
})?;
match &entry.kind {
ChannelKind::Broadcast(sender) => (
sender.clone(),
entry.payload_type_id,
entry.payload_type_name,
),
ChannelKind::State(_) => {
return Err(EventBusError::ChannelDeliveryMismatch {
interface: interface.as_str().to_string(),
expected: DeliveryMode::Broadcast,
actual: DeliveryMode::State,
});
}
}
};
if payload_type_id != TypeId::of::<E>() {
return Err(EventBusError::PayloadTypeMismatch {
interface: interface.as_str().to_string(),
expected: payload_type_name,
actual: std::any::type_name::<E>(),
});
}
let downcast = sender_arc
.downcast::<broadcast::Sender<Arc<E>>>()
.map_err(|_| EventBusError::PayloadTypeMismatch {
interface: interface.as_str().to_string(),
expected: payload_type_name,
actual: std::any::type_name::<E>(),
})?;
Ok((*downcast).clone())
}
fn register_channel_with_capacity_and_json_encoder<E>(
&self,
interface: PluginEventKind,
capacity: usize,
event_encoder: Option<JsonEventEncoder>,
) -> broadcast::Sender<Arc<E>>
where
E: Any + Send + Sync + 'static,
{
let (sender, _) = broadcast::channel::<Arc<E>>(capacity);
let json = event_encoder.map(|event_encoder| {
let (json_sender, _) = broadcast::channel::<Arc<JsonPluginEvent>>(capacity);
JsonProjection {
broadcast: json_sender,
state: None,
event_encoder: Some(event_encoder),
state_encoder: None,
}
});
let entry = ChannelEntry {
kind: ChannelKind::Broadcast(Arc::new(sender.clone())),
payload_type_id: TypeId::of::<E>(),
payload_type_name: std::any::type_name::<E>(),
decoder: None,
json,
};
let mut guard = self.entries.write().expect("event bus lock poisoned");
guard.insert(interface, entry);
sender
}
fn json_projection_observed(
&self,
interface: &PluginEventKind,
delivery: DeliveryMode,
) -> bool {
self.entries.read().is_ok_and(|guard| {
let Some(json) = guard.get(interface).and_then(|entry| entry.json.as_ref()) else {
return false;
};
match delivery {
DeliveryMode::Broadcast => json.broadcast.receiver_count() > 0,
DeliveryMode::State => {
json.broadcast.receiver_count() > 0
|| json
.state
.as_ref()
.is_some_and(|sender| sender.receiver_count() > 0)
}
}
})
}
fn project_json_event(
&self,
interface: &PluginEventKind,
event: &(dyn Any + Send + Sync),
) -> Option<JsonValue> {
let encoder = {
let guard = self.entries.read().ok()?;
guard.get(interface)?.json.as_ref()?.event_encoder.clone()?
};
encoder(event)
}
fn publish_json_projection_from_state_if_observed(&self, interface: &PluginEventKind) {
let payload = {
let Ok(guard) = self.entries.read() else {
return;
};
let Some(entry) = guard.get(interface) else {
return;
};
let Some(json) = entry.json.as_ref() else {
return;
};
let observed = json.broadcast.receiver_count() > 0
|| json
.state
.as_ref()
.is_some_and(|sender| sender.receiver_count() > 0);
if !observed {
return;
}
let Some(encoder) = json.state_encoder.as_ref() else {
return;
};
encoder(&entry.kind).unwrap_or(JsonValue::Null)
};
self.publish_json_projection(interface, DeliveryMode::State, payload);
}
fn publish_json_projection(
&self,
interface: &PluginEventKind,
delivery: DeliveryMode,
payload: JsonValue,
) {
let event = Arc::new(JsonPluginEvent {
interface: interface.clone(),
delivery,
payload,
});
let Ok(guard) = self.entries.read() else {
return;
};
let Some(entry) = guard.get(interface) else {
return;
};
let Some(json) = entry.json.as_ref() else {
return;
};
let _ = json.broadcast.send(event.clone());
if let Some(sender) = json.state.as_ref() {
sender.send_replace(event);
}
}
#[allow(clippy::significant_drop_tightening)]
fn state_sender<T>(&self, interface: &PluginEventKind) -> EventBusResult<watch::Sender<Arc<T>>>
where
T: Any + Send + Sync + 'static,
{
let (sender_arc, payload_type_id, payload_type_name) = {
let guard = self.entries.read().expect("event bus lock poisoned");
let entry =
guard
.get(interface)
.ok_or_else(|| EventBusError::ChannelNotRegistered {
interface: interface.as_str().to_string(),
})?;
match &entry.kind {
ChannelKind::State(sender) => (
sender.clone(),
entry.payload_type_id,
entry.payload_type_name,
),
ChannelKind::Broadcast(_) => {
return Err(EventBusError::ChannelDeliveryMismatch {
interface: interface.as_str().to_string(),
expected: DeliveryMode::State,
actual: DeliveryMode::Broadcast,
});
}
}
};
if payload_type_id != TypeId::of::<T>() {
return Err(EventBusError::PayloadTypeMismatch {
interface: interface.as_str().to_string(),
expected: payload_type_name,
actual: std::any::type_name::<T>(),
});
}
let downcast = sender_arc
.downcast::<watch::Sender<Arc<T>>>()
.map_err(|_| EventBusError::PayloadTypeMismatch {
interface: interface.as_str().to_string(),
expected: payload_type_name,
actual: std::any::type_name::<T>(),
})?;
Ok((*downcast).clone())
}
}
#[must_use]
pub fn global_event_bus() -> Arc<EventBus> {
use std::sync::OnceLock;
static GLOBAL: OnceLock<Arc<EventBus>> = OnceLock::new();
GLOBAL.get_or_init(|| Arc::new(EventBus::new())).clone()
}
#[cfg(test)]
mod tests {
use super::*;
use bmux_plugin_sdk::PluginEventKind;
use serde::Serializer;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct SampleEvent {
payload: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct OtherEvent {
value: String,
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct NonSerializeEvent {
payload: u32,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
struct FocusSnapshot {
focused: Option<u64>,
revision: u64,
}
static JSON_EVENT_SERIALIZE_COUNT: AtomicUsize = AtomicUsize::new(0);
static JSON_STATE_SERIALIZE_COUNT: AtomicUsize = AtomicUsize::new(0);
#[derive(Debug, Clone, PartialEq, Eq)]
struct CountingEvent {
value: u64,
}
impl Serialize for CountingEvent {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
JSON_EVENT_SERIALIZE_COUNT.fetch_add(1, Ordering::SeqCst);
serializer.serialize_u64(self.value)
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
struct CountingSnapshot {
value: u64,
}
impl Serialize for CountingSnapshot {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
JSON_STATE_SERIALIZE_COUNT.fetch_add(1, Ordering::SeqCst);
serializer.serialize_u64(self.value)
}
}
const TEST_IFACE: PluginEventKind = PluginEventKind::from_static("test.plugin/test-events");
const OTHER_IFACE: PluginEventKind = PluginEventKind::from_static("test.plugin/other-events");
const STATE_IFACE: PluginEventKind = PluginEventKind::from_static("test.plugin/focus-state");
#[tokio::test]
async fn register_emit_subscribe_round_trip() {
let bus = EventBus::new();
let _sender = bus.register_channel::<SampleEvent>(TEST_IFACE);
let mut subscriber = bus.subscribe::<SampleEvent>(&TEST_IFACE).unwrap();
bus.emit(&TEST_IFACE, SampleEvent { payload: 42 }).unwrap();
let received = subscriber.recv().await.expect("should receive event");
assert_eq!(received.as_ref(), &SampleEvent { payload: 42 });
}
#[tokio::test]
async fn non_json_broadcast_channels_do_not_require_serialize() {
let bus = EventBus::new();
bus.register_channel::<NonSerializeEvent>(TEST_IFACE);
let mut subscriber = bus.subscribe::<NonSerializeEvent>(&TEST_IFACE).unwrap();
bus.emit(&TEST_IFACE, NonSerializeEvent { payload: 42 })
.unwrap();
let received = subscriber.recv().await.expect("should receive event");
assert_eq!(received.as_ref(), &NonSerializeEvent { payload: 42 });
}
#[tokio::test]
async fn multiple_subscribers_receive_fanout() {
let bus = EventBus::new();
bus.register_channel::<SampleEvent>(TEST_IFACE);
let mut s1 = bus.subscribe::<SampleEvent>(&TEST_IFACE).unwrap();
let mut s2 = bus.subscribe::<SampleEvent>(&TEST_IFACE).unwrap();
let count = bus.emit(&TEST_IFACE, SampleEvent { payload: 7 }).unwrap();
assert_eq!(count, 2, "both subscribers should be counted");
assert_eq!(s1.recv().await.unwrap().payload, 7);
assert_eq!(s2.recv().await.unwrap().payload, 7);
}
#[tokio::test]
async fn broadcast_json_subscriber_receives_serialized_payload() {
let bus = EventBus::new();
bus.register_channel_with_json_projection::<SampleEvent>(TEST_IFACE);
let mut rx = bus.subscribe_json(&TEST_IFACE).unwrap();
bus.emit(&TEST_IFACE, SampleEvent { payload: 99 }).unwrap();
let event = rx.recv().await.unwrap();
assert_eq!(event.interface, TEST_IFACE);
assert_eq!(event.delivery, DeliveryMode::Broadcast);
assert_eq!(event.payload["payload"], 99);
}
#[test]
fn emit_on_unregistered_interface_errors() {
let bus = EventBus::new();
let result = bus.emit(&TEST_IFACE, SampleEvent { payload: 1 });
assert!(matches!(
result,
Err(EventBusError::ChannelNotRegistered { .. })
));
}
#[test]
fn subscribe_on_unregistered_interface_errors() {
let bus = EventBus::new();
let result = bus.subscribe::<SampleEvent>(&TEST_IFACE);
assert!(matches!(
result,
Err(EventBusError::ChannelNotRegistered { .. })
));
}
#[test]
fn payload_type_mismatch_is_detected() {
let bus = EventBus::new();
bus.register_channel::<SampleEvent>(TEST_IFACE);
let result = bus.subscribe::<OtherEvent>(&TEST_IFACE);
assert!(matches!(
result,
Err(EventBusError::PayloadTypeMismatch { .. })
));
}
#[tokio::test]
async fn emit_with_no_subscribers_returns_zero() {
let bus = EventBus::new();
bus.register_channel::<SampleEvent>(TEST_IFACE);
let count = bus.emit(&TEST_IFACE, SampleEvent { payload: 0 }).unwrap();
assert_eq!(count, 0);
}
#[tokio::test]
async fn independent_interfaces_do_not_interfere() {
let bus = EventBus::new();
bus.register_channel::<SampleEvent>(TEST_IFACE);
bus.register_channel::<OtherEvent>(OTHER_IFACE);
let mut sub = bus.subscribe::<OtherEvent>(&OTHER_IFACE).unwrap();
bus.emit(&TEST_IFACE, SampleEvent { payload: 1 }).unwrap();
bus.emit(
&OTHER_IFACE,
OtherEvent {
value: "hello".to_string(),
},
)
.unwrap();
let received = sub.recv().await.unwrap();
assert_eq!(received.value, "hello");
}
#[tokio::test]
async fn global_bus_returns_same_instance() {
let a = global_event_bus();
let b = global_event_bus();
assert!(Arc::ptr_eq(&a, &b));
}
#[tokio::test]
async fn state_channel_subscribe_returns_initial_value_before_any_publish() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let (initial, _rx) = bus.subscribe_state::<FocusSnapshot>(&STATE_IFACE).unwrap();
assert_eq!(
initial.as_ref(),
&FocusSnapshot {
focused: None,
revision: 0,
},
);
}
#[tokio::test]
async fn state_channel_replays_latest_value_to_late_subscribers() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
bus.publish_state(
&STATE_IFACE,
FocusSnapshot {
focused: Some(7),
revision: 1,
},
)
.unwrap();
bus.publish_state(
&STATE_IFACE,
FocusSnapshot {
focused: Some(8),
revision: 2,
},
)
.unwrap();
let (initial, _rx) = bus.subscribe_state::<FocusSnapshot>(&STATE_IFACE).unwrap();
assert_eq!(
initial.as_ref(),
&FocusSnapshot {
focused: Some(8),
revision: 2,
},
);
}
#[tokio::test]
async fn state_channel_without_json_projection_rejects_json_subscriber() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let err = bus.subscribe_state_json(&STATE_IFACE).unwrap_err();
assert!(matches!(
err,
EventBusError::JsonProjectionUnavailable { .. }
));
}
#[tokio::test]
async fn lazy_json_projection_serializes_state_only_when_observed() {
JSON_STATE_SERIALIZE_COUNT.store(0, Ordering::SeqCst);
let bus = EventBus::new();
bus.register_state_channel_with_json_projection::<CountingSnapshot>(
STATE_IFACE,
CountingSnapshot { value: 0 },
);
assert_eq!(JSON_STATE_SERIALIZE_COUNT.load(Ordering::SeqCst), 0);
bus.publish_state(&STATE_IFACE, CountingSnapshot { value: 1 })
.unwrap();
assert_eq!(JSON_STATE_SERIALIZE_COUNT.load(Ordering::SeqCst), 0);
let (initial, mut rx) = bus.subscribe_state_json(&STATE_IFACE).unwrap();
assert_eq!(initial.payload, JsonValue::from(1));
assert_eq!(JSON_STATE_SERIALIZE_COUNT.load(Ordering::SeqCst), 1);
bus.publish_state(&STATE_IFACE, CountingSnapshot { value: 2 })
.unwrap();
rx.changed().await.unwrap();
assert_eq!(rx.borrow().payload, JsonValue::from(2));
assert_eq!(JSON_STATE_SERIALIZE_COUNT.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn lazy_json_projection_serializes_broadcast_only_when_observed() {
JSON_EVENT_SERIALIZE_COUNT.store(0, Ordering::SeqCst);
let bus = EventBus::new();
bus.register_channel_with_json_projection::<CountingEvent>(TEST_IFACE);
bus.emit(&TEST_IFACE, CountingEvent { value: 1 }).unwrap();
assert_eq!(JSON_EVENT_SERIALIZE_COUNT.load(Ordering::SeqCst), 0);
let mut rx = bus.subscribe_json(&TEST_IFACE).unwrap();
bus.emit(&TEST_IFACE, CountingEvent { value: 2 }).unwrap();
let event = rx.recv().await.unwrap();
assert_eq!(event.payload, JsonValue::from(2));
assert_eq!(JSON_EVENT_SERIALIZE_COUNT.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn state_json_subscriber_receives_initial_and_live_payloads() {
let bus = EventBus::new();
bus.register_state_channel_with_json_projection::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let (initial, mut rx) = bus.subscribe_state_json(&STATE_IFACE).unwrap();
assert_eq!(initial.delivery, DeliveryMode::State);
assert_eq!(initial.payload["revision"], 0);
bus.publish_state(
&STATE_IFACE,
FocusSnapshot {
focused: Some(12),
revision: 1,
},
)
.unwrap();
rx.changed().await.unwrap();
let event = rx.borrow().clone();
assert_eq!(event.payload["focused"], 12);
assert_eq!(event.payload["revision"], 1);
}
#[tokio::test]
async fn state_channel_pushes_live_updates_to_existing_subscribers() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let (_initial, mut rx) = bus.subscribe_state::<FocusSnapshot>(&STATE_IFACE).unwrap();
bus.publish_state(
&STATE_IFACE,
FocusSnapshot {
focused: Some(1),
revision: 1,
},
)
.unwrap();
rx.changed().await.expect("watch should fire");
let snapshot = rx.borrow().clone();
assert_eq!(
snapshot.as_ref(),
&FocusSnapshot {
focused: Some(1),
revision: 1,
},
);
}
#[test]
fn emit_on_state_channel_errors_with_delivery_mismatch() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let err = bus
.emit(
&STATE_IFACE,
FocusSnapshot {
focused: Some(1),
revision: 1,
},
)
.expect_err("emit on state channel must fail");
match err {
EventBusError::ChannelDeliveryMismatch {
expected, actual, ..
} => {
assert_eq!(expected, DeliveryMode::Broadcast);
assert_eq!(actual, DeliveryMode::State);
}
other => panic!("expected delivery mismatch, got {other:?}"),
}
}
#[test]
fn publish_state_on_broadcast_channel_errors_with_delivery_mismatch() {
let bus = EventBus::new();
bus.register_channel::<SampleEvent>(TEST_IFACE);
let err = bus
.publish_state(&TEST_IFACE, SampleEvent { payload: 1 })
.expect_err("publish_state on broadcast channel must fail");
match err {
EventBusError::ChannelDeliveryMismatch {
expected, actual, ..
} => {
assert_eq!(expected, DeliveryMode::State);
assert_eq!(actual, DeliveryMode::Broadcast);
}
other => panic!("expected delivery mismatch, got {other:?}"),
}
}
#[test]
fn subscribe_state_on_broadcast_channel_errors_with_delivery_mismatch() {
let bus = EventBus::new();
bus.register_channel::<SampleEvent>(TEST_IFACE);
let err = bus
.subscribe_state::<SampleEvent>(&TEST_IFACE)
.expect_err("subscribe_state on broadcast channel must fail");
assert!(matches!(err, EventBusError::ChannelDeliveryMismatch { .. }));
}
#[test]
fn subscribe_on_state_channel_errors_with_delivery_mismatch() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let err = bus
.subscribe::<FocusSnapshot>(&STATE_IFACE)
.expect_err("subscribe on state channel must fail");
assert!(matches!(err, EventBusError::ChannelDeliveryMismatch { .. }));
}
#[test]
fn state_channel_payload_type_mismatch_is_detected() {
let bus = EventBus::new();
bus.register_state_channel::<FocusSnapshot>(
STATE_IFACE,
FocusSnapshot {
focused: None,
revision: 0,
},
);
let err = bus
.subscribe_state::<SampleEvent>(&STATE_IFACE)
.expect_err("wrong payload type should fail");
assert!(matches!(err, EventBusError::PayloadTypeMismatch { .. }));
}
}