use std::collections::HashMap;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, Mutex};
use tokio::sync::mpsc;
use super::coalesce::{CoalescePushOutcome, CoalesceState};
use super::envelope::{EventEnvelope, StreamId};
use super::policy::{
DEFAULT_MAX_EVENT_SIZE_BYTES, PublishError, PublishResult, SlowConsumerPolicy, SubscribeOpts,
};
use super::replay::{DEFAULT_REPLAY_CAPACITY, StreamReplayCache};
use super::sequencer::StreamRegistry;
use super::topic::TopicPattern;
pub const REASON_SLOW_CONSUMER: &str = "slow_consumer";
#[derive(Debug, Clone)]
pub struct SlowConsumerNotice {
pub stream_id: Option<StreamId>,
pub last_delivered_sequence: Option<u64>,
pub reason: &'static str,
}
pub type SubscriptionId = u64;
pub struct Subscription {
pub id: SubscriptionId,
pub topic: TopicPattern,
pub rx: SubscriptionRx,
pub slow_consumer_rx: tokio::sync::oneshot::Receiver<SlowConsumerNotice>,
}
pub struct SubscriptionRx {
inner: SubscriptionRxInner,
}
enum SubscriptionRxInner {
Mpsc(mpsc::Receiver<EventEnvelope>),
Coalesce(Arc<CoalesceState>),
}
impl SubscriptionRx {
pub async fn recv(&mut self) -> Option<EventEnvelope> {
match &mut self.inner {
SubscriptionRxInner::Mpsc(rx) => rx.recv().await,
SubscriptionRxInner::Coalesce(state) => state.recv().await,
}
}
pub fn try_recv(&mut self) -> Result<EventEnvelope, TryRecvError> {
match &mut self.inner {
SubscriptionRxInner::Mpsc(rx) => rx.try_recv().map_err(|e| match e {
mpsc::error::TryRecvError::Empty => TryRecvError::Empty,
mpsc::error::TryRecvError::Disconnected => TryRecvError::Disconnected,
}),
SubscriptionRxInner::Coalesce(state) => state.try_recv(),
}
}
}
impl Drop for SubscriptionRx {
fn drop(&mut self) {
if let SubscriptionRxInner::Coalesce(state) = &self.inner {
state.mark_receiver_dropped();
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TryRecvError {
Empty,
Disconnected,
}
enum Outbound {
Mpsc(mpsc::Sender<EventEnvelope>),
Coalesce(Arc<CoalesceState>),
}
struct SubscriptionInner {
topic: TopicPattern,
outbound: Outbound,
remaining: Option<u64>,
slow_consumer_policy: SlowConsumerPolicy,
in_flight: u64,
last_delivered: Option<(StreamId, u64)>,
slow_consumer_notify: Option<tokio::sync::oneshot::Sender<SlowConsumerNotice>>,
}
#[derive(Clone)]
pub struct EventBus {
inner: Arc<Inner>,
}
struct Inner {
next_id: AtomicU64,
subs: Mutex<HashMap<SubscriptionId, SubscriptionInner>>,
registry: StreamRegistry,
replay_cache: StreamReplayCache,
max_event_size: AtomicUsize,
}
impl EventBus {
pub fn with_registry(registry: StreamRegistry) -> Self {
Self::with_registry_and_replay_capacity(registry, DEFAULT_REPLAY_CAPACITY)
}
pub fn with_registry_and_replay_capacity(
registry: StreamRegistry,
replay_capacity: usize,
) -> Self {
let replay_cache = StreamReplayCache::new(replay_capacity, registry.clone());
Self {
inner: Arc::new(Inner {
next_id: AtomicU64::new(1),
subs: Mutex::new(HashMap::new()),
registry,
replay_cache,
max_event_size: AtomicUsize::new(DEFAULT_MAX_EVENT_SIZE_BYTES),
}),
}
}
pub fn new() -> Self {
Self::with_registry(StreamRegistry::new())
}
pub fn stream_registry(&self) -> &StreamRegistry {
&self.inner.registry
}
pub fn replay_cache(&self) -> &StreamReplayCache {
&self.inner.replay_cache
}
pub fn with_max_event_size(self, limit: usize) -> Self {
self.inner.max_event_size.store(limit, Ordering::Relaxed);
self
}
pub fn max_event_size(&self) -> usize {
self.inner.max_event_size.load(Ordering::Relaxed)
}
pub fn subscribe(&self, topic: TopicPattern, opts: SubscribeOpts) -> Subscription {
let id = self.inner.next_id.fetch_add(1, Ordering::Relaxed);
let capacity = opts.resolved_outbound_capacity();
let (notify_tx, notify_rx) = tokio::sync::oneshot::channel::<SlowConsumerNotice>();
let (outbound, rx_inner) = match &opts.slow_consumer_policy {
SlowConsumerPolicy::Coalesce { key_path } => {
let state = Arc::new(CoalesceState::new(capacity, key_path.clone()));
(
Outbound::Coalesce(state.clone()),
SubscriptionRxInner::Coalesce(state),
)
}
_ => {
let (tx, rx) = mpsc::channel::<EventEnvelope>(capacity);
(Outbound::Mpsc(tx), SubscriptionRxInner::Mpsc(rx))
}
};
let mut subs = self.inner.subs.lock().unwrap();
subs.insert(
id,
SubscriptionInner {
topic: topic.clone(),
outbound,
remaining: opts.limit,
slow_consumer_policy: opts.slow_consumer_policy.clone(),
in_flight: 0,
last_delivered: None,
slow_consumer_notify: Some(notify_tx),
},
);
Subscription {
id,
topic,
rx: SubscriptionRx { inner: rx_inner },
slow_consumer_rx: notify_rx,
}
}
pub fn unsubscribe(&self, id: SubscriptionId) -> bool {
let mut subs = self.inner.subs.lock().unwrap();
match subs.remove(&id) {
Some(sub) => {
if let Outbound::Coalesce(state) = sub.outbound {
state.close();
}
true
}
None => false,
}
}
pub fn try_publish(&self, envelope: EventEnvelope) -> Result<PublishResult, PublishError> {
let limit = self.inner.max_event_size.load(Ordering::Relaxed);
let size = serde_json::to_vec(&envelope).map(|v| v.len()).unwrap_or(0);
if size > limit {
tracing::warn!(
limit,
size,
stream = %envelope.stream_id.as_str(),
"event exceeds max size; rejected"
);
self.inner.registry.evict(&envelope.event_id);
return Err(PublishError::TooLarge { limit });
}
self.inner.replay_cache.record(&envelope);
let mut to_remove: Vec<SubscriptionId> = Vec::new();
let mut result = PublishResult::default();
let topic = envelope.topic();
{
let mut subs = self.inner.subs.lock().unwrap();
for (id, sub) in subs.iter_mut() {
if !sub.topic.matches_event(&topic, &envelope.data) {
continue;
}
match &sub.outbound {
Outbound::Mpsc(tx) => match tx.try_send(envelope.clone()) {
Ok(()) => {
result.delivered += 1;
sub.last_delivered =
Some((envelope.stream_id.clone(), envelope.sequence));
if let Some(rem) = sub.remaining.as_mut() {
*rem = rem.saturating_sub(1);
if *rem == 0 {
to_remove.push(*id);
}
}
}
Err(mpsc::error::TrySendError::Full(_)) => {
match &sub.slow_consumer_policy {
SlowConsumerPolicy::Disconnect => {
disconnect_slow_consumer(sub, *id, &mut to_remove, &mut result);
}
SlowConsumerPolicy::Backpressure
| SlowConsumerPolicy::DropNewest => {
result.dropped += 1;
}
SlowConsumerPolicy::Coalesce { .. } => unreachable!(
"subscribe() invariant: Coalesce uses Outbound::Coalesce"
),
}
}
Err(mpsc::error::TrySendError::Closed(_)) => {
to_remove.push(*id);
}
},
Outbound::Coalesce(state) => {
match state.push(envelope.clone()) {
CoalescePushOutcome::Pushed => {
result.delivered += 1;
sub.last_delivered =
Some((envelope.stream_id.clone(), envelope.sequence));
if let Some(rem) = sub.remaining.as_mut() {
*rem = rem.saturating_sub(1);
if *rem == 0 {
to_remove.push(*id);
}
}
}
CoalescePushOutcome::Replaced => {
result.coalesced += 1;
}
CoalescePushOutcome::Dropped => {
result.dropped += 1;
}
CoalescePushOutcome::ReceiverGone => {
to_remove.push(*id);
}
}
}
}
}
for id in &to_remove {
if let Some(sub) = subs.remove(id)
&& let Outbound::Coalesce(state) = sub.outbound
{
state.close();
}
}
}
Ok(result)
}
pub fn active_subscriptions(&self) -> usize {
self.inner.subs.lock().unwrap().len()
}
}
fn disconnect_slow_consumer(
sub: &mut SubscriptionInner,
id: SubscriptionId,
to_remove: &mut Vec<SubscriptionId>,
result: &mut PublishResult,
) {
let last = sub.last_delivered.clone();
if let Some(notify) = sub.slow_consumer_notify.take() {
let _ = notify.send(SlowConsumerNotice {
stream_id: last.as_ref().map(|(s, _)| s.clone()),
last_delivered_sequence: last.as_ref().map(|(_, n)| *n),
reason: REASON_SLOW_CONSUMER,
});
}
to_remove.push(id);
result.disconnected_slow_consumers.push(id);
}
impl EventBus {
pub async fn publish(&self, envelope: EventEnvelope) -> Result<PublishResult, PublishError> {
let limit = self.inner.max_event_size.load(Ordering::Relaxed);
let size = serde_json::to_vec(&envelope).map(|v| v.len()).unwrap_or(0);
if size > limit {
tracing::warn!(
limit,
size,
stream = %envelope.stream_id.as_str(),
"event exceeds max size; rejected"
);
self.inner.registry.evict(&envelope.event_id);
return Err(PublishError::TooLarge { limit });
}
self.inner.replay_cache.record(&envelope);
struct ClaimGuard {
inner: Arc<Inner>,
id: SubscriptionId,
active: bool,
}
impl Drop for ClaimGuard {
fn drop(&mut self) {
if self.active {
let mut subs = self.inner.subs.lock().unwrap();
if let Some(sub) = subs.get_mut(&self.id) {
sub.in_flight = sub.in_flight.saturating_sub(1);
if let Some(rem) = sub.remaining.as_mut() {
*rem += 1;
}
}
}
}
}
struct Match {
id: SubscriptionId,
tx: mpsc::Sender<EventEnvelope>,
slow_consumer_policy: SlowConsumerPolicy,
}
let topic = envelope.topic();
let mut matches: Vec<Match> = Vec::new();
let mut guards: Vec<ClaimGuard> = Vec::new();
let mut result = PublishResult::default();
let mut to_remove: Vec<SubscriptionId> = Vec::new();
{
let mut subs = self.inner.subs.lock().unwrap();
for (id, sub) in subs.iter_mut() {
if !sub.topic.matches_event(&topic, &envelope.data) {
continue;
}
match &sub.outbound {
Outbound::Coalesce(state) => match state.push(envelope.clone()) {
CoalescePushOutcome::Pushed => {
result.delivered += 1;
sub.last_delivered =
Some((envelope.stream_id.clone(), envelope.sequence));
if let Some(rem) = sub.remaining.as_mut() {
*rem = rem.saturating_sub(1);
if *rem == 0 {
to_remove.push(*id);
}
}
}
CoalescePushOutcome::Replaced => {
result.coalesced += 1;
}
CoalescePushOutcome::Dropped => {
result.dropped += 1;
}
CoalescePushOutcome::ReceiverGone => {
to_remove.push(*id);
}
},
Outbound::Mpsc(tx) => {
if let Some(rem) = sub.remaining.as_mut() {
if *rem == 0 {
continue;
}
*rem -= 1;
}
sub.in_flight += 1;
matches.push(Match {
id: *id,
tx: tx.clone(),
slow_consumer_policy: sub.slow_consumer_policy.clone(),
});
guards.push(ClaimGuard {
inner: self.inner.clone(),
id: *id,
active: true,
});
}
}
}
for id in &to_remove {
if let Some(sub) = subs.remove(id)
&& let Outbound::Coalesce(state) = sub.outbound
{
state.close();
}
}
}
enum SendResult {
Delivered,
Dropped,
DisconnectSlowConsumer,
Closed,
}
for (i, m) in matches.iter().enumerate() {
let outcome = match m.tx.try_send(envelope.clone()) {
Ok(()) => SendResult::Delivered,
Err(mpsc::error::TrySendError::Closed(_)) => SendResult::Closed,
Err(mpsc::error::TrySendError::Full(_)) => match &m.slow_consumer_policy {
SlowConsumerPolicy::Backpressure => {
if m.tx.send(envelope.clone()).await.is_ok() {
SendResult::Delivered
} else {
SendResult::Closed
}
}
SlowConsumerPolicy::DropNewest => SendResult::Dropped,
SlowConsumerPolicy::Coalesce { .. } => {
unreachable!("subscribe() invariant: Coalesce uses Outbound::Coalesce")
}
SlowConsumerPolicy::Disconnect => SendResult::DisconnectSlowConsumer,
},
};
{
let mut subs = self.inner.subs.lock().unwrap();
match outcome {
SendResult::Delivered => {
result.delivered += 1;
let exhausted = if let Some(sub) = subs.get_mut(&m.id) {
sub.last_delivered =
Some((envelope.stream_id.clone(), envelope.sequence));
sub.in_flight = sub.in_flight.saturating_sub(1);
sub.remaining == Some(0) && sub.in_flight == 0
} else {
false
};
if exhausted {
subs.remove(&m.id);
}
}
SendResult::Dropped => {
result.dropped += 1;
if let Some(sub) = subs.get_mut(&m.id) {
sub.in_flight = sub.in_flight.saturating_sub(1);
if let Some(rem) = sub.remaining.as_mut() {
*rem += 1;
}
}
}
SendResult::DisconnectSlowConsumer => {
if let Some(sub) = subs.get_mut(&m.id) {
let last = sub.last_delivered.clone();
if let Some(notify) = sub.slow_consumer_notify.take() {
let _ = notify.send(SlowConsumerNotice {
stream_id: last.as_ref().map(|(s, _)| s.clone()),
last_delivered_sequence: last.as_ref().map(|(_, n)| *n),
reason: REASON_SLOW_CONSUMER,
});
}
}
subs.remove(&m.id);
result.disconnected_slow_consumers.push(m.id);
}
SendResult::Closed => {
subs.remove(&m.id);
}
}
}
guards[i].active = false;
}
Ok(result)
}
}
impl Default for EventBus {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
use crate::events::envelope::{ENVELOPE_VERSION, EventId, NodeId, StreamId};
fn test_envelope(
topic_parts: (&str, &str, &str, &str),
data: serde_json::Value,
) -> EventEnvelope {
let (node, kind, id, stream) = topic_parts;
let node_id = NodeId::new(node);
let stream_id = StreamId::for_resource(&node_id, id, stream);
EventEnvelope {
envelope_version: ENVELOPE_VERSION,
event_id: EventId::from_raw("test-1"),
node_id,
resource_id: id.into(),
resource_kind: kind.into(),
resource_version: 1,
stream_id,
stream: stream.into(),
sequence: 1,
timestamp: time::OffsetDateTime::UNIX_EPOCH,
payload_kind: "resource.state.changed".into(),
payload_version: 1,
payload_schema: None,
correlation_id: None,
causation_id: None,
trace_context: None,
data,
}
}
#[tokio::test]
async fn try_publish_to_matching_subscriber_returns_delivered_one() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/*/state").unwrap();
let mut sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "state"), json!("on"));
let res = bus.try_publish(env).expect("publish ok");
assert_eq!(res.delivered, 1);
let got = sub.rx.recv().await.unwrap();
assert_eq!(got.topic(), "hub/led/abc/state");
}
#[tokio::test]
async fn try_publish_returns_delivered_zero_on_topic_mismatch() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/*/state").unwrap();
let _sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "temperature"), json!(1));
let res = bus.try_publish(env).expect("publish ok");
assert_eq!(res.delivered, 0);
}
#[tokio::test]
async fn try_publish_respects_subscription_limit_and_auto_unsubscribes() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
limit: Some(2),
..Default::default()
},
);
let one = test_envelope(("hub", "led", "abc", "state"), json!("on"));
let two = test_envelope(("hub", "led", "abc", "state"), json!("off"));
let three = test_envelope(("hub", "led", "abc", "state"), json!("on"));
assert_eq!(bus.try_publish(one).unwrap().delivered, 1);
assert_eq!(bus.try_publish(two).unwrap().delivered, 1);
assert_eq!(bus.try_publish(three).unwrap().delivered, 0);
assert_eq!(bus.active_subscriptions(), 0);
}
#[tokio::test]
async fn try_publish_passes_envelope_through_intact() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/*/state").unwrap();
let mut sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "state"), json!("on"));
let expected_event_id = env.event_id.clone();
let expected_sequence = env.sequence;
let expected_stream_id = env.stream_id.clone();
bus.try_publish(env).unwrap();
let got = sub.rx.recv().await.unwrap();
assert_eq!(got.event_id, expected_event_id);
assert_eq!(got.sequence, expected_sequence);
assert_eq!(got.stream_id, expected_stream_id);
}
#[tokio::test]
async fn topic_pattern_still_matches_via_topic_derivation() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/*/state").unwrap();
let mut sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "state"), json!("on"));
let res = bus.try_publish(env).unwrap();
assert_eq!(res.delivered, 1);
let got = sub.rx.recv().await.unwrap();
assert_eq!(got.node_id.as_str(), "hub");
assert_eq!(got.resource_kind, "led");
assert_eq!(got.resource_id, "abc");
assert_eq!(got.stream, "state");
}
#[tokio::test]
async fn caql_filter_on_envelope_data() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/sensor/*/temp?ql=where data > 85").unwrap();
let mut sub = bus.subscribe(pattern, SubscribeOpts::default());
let lo = test_envelope(("hub", "sensor", "abc", "temp"), json!({"data": 50}));
let hi = test_envelope(("hub", "sensor", "abc", "temp"), json!({"data": 90}));
assert_eq!(bus.try_publish(lo).unwrap().delivered, 0);
assert_eq!(bus.try_publish(hi).unwrap().delivered, 1);
let got = sub.rx.recv().await.unwrap();
assert_eq!(got.data["data"], 90);
}
#[tokio::test]
async fn bounded_subscriber_default_capacity_is_64() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
for i in 0..64 {
let env = test_envelope(("hub", "led", "abc", "state"), json!({"i": i}));
let res = bus.try_publish(env).unwrap();
assert_eq!(
res.delivered, 1,
"publish {i} should succeed (default cap 64)"
);
assert_eq!(res.dropped, 0);
}
let res = bus
.try_publish(test_envelope(
("hub", "led", "abc", "state"),
json!("overflow"),
))
.unwrap();
assert_eq!(res.delivered, 0);
assert_eq!(res.dropped, 1);
}
#[tokio::test]
async fn bounded_subscriber_custom_capacity_honored() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(4),
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
for i in 0..4 {
let res = bus
.try_publish(test_envelope(
("hub", "led", "abc", "state"),
json!({"i": i}),
))
.unwrap();
assert_eq!(res.delivered, 1);
}
let res = bus
.try_publish(test_envelope(
("hub", "led", "abc", "state"),
json!("overflow"),
))
.unwrap();
assert_eq!(res.delivered, 0);
assert_eq!(res.dropped, 1);
}
#[tokio::test]
async fn bounded_subscriber_drops_count_in_publish_result_for_drop_newest() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(1),
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("on")))
.unwrap();
assert_eq!(res.delivered, 1);
for _ in 0..3 {
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("more")))
.unwrap();
assert_eq!(res.delivered, 0);
assert_eq!(res.dropped, 1);
}
}
#[tokio::test]
async fn disconnect_policy_full_queue_disconnects_subscriber_in_publish_result() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(2),
..Default::default()
},
);
let id = sub.id;
let _hold = sub;
for _ in 0..2 {
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("x")))
.unwrap();
assert_eq!(res.delivered, 1);
}
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("y")))
.unwrap();
assert_eq!(res.delivered, 0);
assert_eq!(res.dropped, 0);
assert_eq!(res.disconnected_slow_consumers, vec![id]);
assert_eq!(bus.active_subscriptions(), 0);
}
#[tokio::test]
async fn drop_newest_full_queue_does_not_disconnect() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(2),
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
for _ in 0..2 {
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("x")))
.unwrap();
assert_eq!(res.delivered, 1);
}
let mut total_dropped = 0usize;
for _ in 0..3 {
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("y")))
.unwrap();
total_dropped += res.dropped;
assert!(res.disconnected_slow_consumers.is_empty());
}
assert_eq!(total_dropped, 3);
assert_eq!(bus.active_subscriptions(), 1);
}
#[tokio::test]
async fn try_publish_rejects_oversized_envelope() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "state"), json!("x".repeat(300_000)));
match bus.try_publish(env) {
Err(PublishError::TooLarge { limit }) => {
assert_eq!(limit, 256 * 1024)
}
other => panic!("expected TooLarge, got {other:?}"),
}
}
#[tokio::test]
async fn try_publish_accepts_envelope_just_under_limit() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "state"), json!("x".repeat(100)));
let res = bus.try_publish(env).expect("under-cap publish ok");
assert_eq!(res.delivered, 1);
}
#[tokio::test]
async fn try_publish_too_large_evicts_reverse_index_entry() {
let bus = EventBus::new().with_max_event_size(64);
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(pattern, SubscribeOpts::default());
let alloc = bus.stream_registry().allocate(&StreamId::for_resource(
&NodeId::new("hub"),
"abc",
"state",
));
let event_id = alloc.event_id.clone();
let stream_id = StreamId::for_resource(&NodeId::new("hub"), "abc", "state");
assert_eq!(
bus.stream_registry().stream_for(&event_id),
Some(stream_id.clone())
);
let env = EventEnvelope {
envelope_version: ENVELOPE_VERSION,
event_id: event_id.clone(),
node_id: NodeId::new("hub"),
resource_id: "abc".into(),
resource_kind: "led".into(),
resource_version: 1,
stream_id,
stream: "state".into(),
sequence: alloc.sequence,
timestamp: time::OffsetDateTime::UNIX_EPOCH,
payload_kind: "resource.state.changed".into(),
payload_version: 1,
payload_schema: None,
correlation_id: None,
causation_id: None,
trace_context: None,
data: json!("x".repeat(200)),
};
assert!(matches!(
bus.try_publish(env),
Err(PublishError::TooLarge { .. })
));
assert!(
bus.stream_registry().stream_for(&event_id).is_none(),
"reverse-index entry must be evicted when try_publish rejects on size"
);
}
#[tokio::test]
async fn event_bus_with_custom_max_event_size() {
let bus = EventBus::new().with_max_event_size(1024);
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(pattern, SubscribeOpts::default());
let env = test_envelope(("hub", "led", "abc", "state"), json!("x".repeat(2_000)));
match bus.try_publish(env) {
Err(PublishError::TooLarge { limit }) => assert_eq!(limit, 1024),
other => panic!("expected TooLarge {{ limit: 1024 }}, got {other:?}"),
}
}
#[tokio::test]
async fn drop_newest_counts_dropped_events() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(1),
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
let mut total_dropped = 0usize;
for i in 0..5 {
let res = bus
.try_publish(test_envelope(
("hub", "led", "abc", "state"),
json!({"i": i}),
))
.unwrap();
total_dropped += res.dropped;
}
assert_eq!(total_dropped, 4);
}
#[tokio::test]
async fn drop_newest_subscription_stays_alive_after_drop() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(1),
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
for _ in 0..5 {
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("x")))
.unwrap();
assert!(res.disconnected_slow_consumers.is_empty());
}
assert_eq!(bus.active_subscriptions(), 1);
}
#[tokio::test]
async fn try_publish_backpressure_behaves_like_drop_newest() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let _sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(1),
slow_consumer_policy: SlowConsumerPolicy::Backpressure,
..Default::default()
},
);
let mut total_dropped = 0usize;
for _ in 0..5 {
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("x")))
.unwrap();
total_dropped += res.dropped;
}
assert_eq!(total_dropped, 4);
}
#[tokio::test]
async fn disconnect_policy_fires_slow_consumer_notice() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let mut sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(1),
..Default::default()
},
);
let env1 = test_envelope(("hub", "led", "abc", "state"), json!("on"));
let stream_id = env1.stream_id.clone();
bus.try_publish(env1).unwrap();
bus.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("off")))
.unwrap();
let notice = (&mut sub.slow_consumer_rx)
.await
.expect("slow_consumer_rx resolves");
assert_eq!(notice.reason, REASON_SLOW_CONSUMER);
assert_eq!(notice.stream_id.as_ref(), Some(&stream_id));
assert_eq!(notice.last_delivered_sequence, Some(1));
}
#[tokio::test]
async fn bounded_subscriber_recovers_after_consumer_drains() {
let bus = EventBus::new();
let pattern = TopicPattern::parse("hub/led/abc/state").unwrap();
let mut sub = bus.subscribe(
pattern,
SubscribeOpts {
outbound_capacity: Some(2),
slow_consumer_policy: SlowConsumerPolicy::DropNewest,
..Default::default()
},
);
for _ in 0..2 {
assert_eq!(
bus.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("x")))
.unwrap()
.delivered,
1
);
}
let res = bus
.try_publish(test_envelope(("hub", "led", "abc", "state"), json!("y")))
.unwrap();
assert_eq!(res.dropped, 1);
sub.rx.recv().await.unwrap();
sub.rx.recv().await.unwrap();
let res = bus
.try_publish(test_envelope(
("hub", "led", "abc", "state"),
json!("recovered"),
))
.unwrap();
assert_eq!(res.delivered, 1);
assert_eq!(res.dropped, 0);
}
}