use super::*;
use crate::event::EventMetadata;
#[tokio::test]
async fn test_event_bus_creation() {
let bus = EventBus::new();
assert_eq!(bus.capacity(), DEFAULT_CHANNEL_CAPACITY);
assert_eq!(bus.subscriber_count(), 0);
}
#[tokio::test]
async fn test_event_bus_with_capacity() {
let bus = EventBus::with_capacity(100);
assert_eq!(bus.capacity(), 100);
}
#[tokio::test]
async fn test_publish_and_receive() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 1);
let msg = receiver.recv().await.unwrap();
assert_eq!(msg.event_type(), "astrid.v1.lifecycle.runtime_started");
}
#[tokio::test]
async fn test_multiple_subscribers() {
let bus = EventBus::new();
let mut receiver1 = bus.subscribe();
let mut receiver2 = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 2);
let obj1 = receiver1.recv().await.unwrap();
let obj2 = receiver2.recv().await.unwrap();
assert_eq!(obj1.event_type(), "astrid.v1.lifecycle.runtime_started");
assert_eq!(obj2.event_type(), "astrid.v1.lifecycle.runtime_started");
}
#[tokio::test]
async fn test_no_subscribers() {
let bus = EventBus::new();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
let count = bus.publish(event);
assert_eq!(count, 0);
}
#[tokio::test]
async fn test_try_recv_empty() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let result = receiver.try_recv();
assert!(result.is_none());
}
#[tokio::test]
async fn test_try_recv_with_event() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
let result = receiver.try_recv();
assert!(result.is_some());
}
#[tokio::test]
async fn test_subscriber_count() {
let bus = EventBus::new();
assert_eq!(bus.subscriber_count(), 0);
let receiver1 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 1);
let _receiver2 = bus.subscribe();
assert_eq!(bus.subscriber_count(), 2);
drop(receiver1);
}
#[tokio::test]
async fn test_cloned_bus_synchronous_subscriber() {
use crate::subscriber::FilterSubscriber;
use std::sync::atomic::{AtomicUsize, Ordering};
let bus = EventBus::new();
let cloned_bus = bus.clone();
let counter = Arc::new(AtomicUsize::new(0));
let counter_clone = Arc::clone(&counter);
let subscriber = FilterSubscriber::new("test_sync", move |_| {
counter_clone.fetch_add(1, Ordering::SeqCst);
});
cloned_bus.registry().register(Arc::new(subscriber));
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_event_bus_drop_cleans_up_registry() {
use crate::subscriber::FilterSubscriber;
use std::sync::atomic::{AtomicUsize, Ordering};
struct DropNotify(Arc<AtomicUsize>);
impl Drop for DropNotify {
fn drop(&mut self) {
self.0.fetch_add(1, Ordering::SeqCst);
}
}
let drop_count = Arc::new(AtomicUsize::new(0));
let drop_count_clone = Arc::clone(&drop_count);
let notifier = DropNotify(drop_count_clone);
let bus = EventBus::new();
let subscriber = FilterSubscriber::new("test_drop", move |_| {
let _ = ¬ifier; });
bus.registry().register(Arc::new(subscriber));
assert_eq!(drop_count.load(Ordering::SeqCst), 0);
drop(bus);
assert_eq!(drop_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn test_reentrancy_unregister_from_on_event() {
use crate::subscriber::{EventSubscriber, SubscriberId};
use std::sync::Mutex;
struct UnregisteringSubscriber {
my_id: Mutex<Option<SubscriberId>>,
}
impl EventSubscriber for UnregisteringSubscriber {
fn on_event(&self, _event: &AstridEvent, bus: &EventBus) {
let id = self.my_id.lock().unwrap().expect("id not set");
bus.registry().unregister(id);
}
}
let bus = EventBus::new();
let subscriber = Arc::new(UnregisteringSubscriber {
my_id: Mutex::new(None),
});
let id = bus
.registry()
.register(Arc::clone(&subscriber) as Arc<dyn EventSubscriber>);
*subscriber.my_id.lock().unwrap() = Some(id);
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
bus.publish(event);
assert_eq!(bus.registry().len(), 0);
}
#[tokio::test]
async fn test_drop_deadlock_publish_from_drop() {
use crate::subscriber::EventSubscriber;
struct DroppingSubscriber {
bus: EventBus,
}
impl EventSubscriber for DroppingSubscriber {
fn on_event(&self, _event: &AstridEvent, _bus: &EventBus) {}
}
impl Drop for DroppingSubscriber {
fn drop(&mut self) {
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
};
self.bus.publish(event);
}
}
let bus = EventBus::new();
let id = bus
.registry()
.register(Arc::new(DroppingSubscriber { bus: bus.clone() }));
bus.registry().unregister(id);
}
#[tokio::test]
async fn test_topic_subscription_exact() {
let bus = EventBus::new();
let mut all_receiver = bus.subscribe();
let mut specific_receiver = bus.subscribe_topic("astrid.cli.input");
let msg = crate::ipc::IpcMessage::new(
"astrid.cli.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
};
bus.publish(event);
assert!(all_receiver.try_recv().is_some());
assert!(specific_receiver.try_recv().is_some());
let msg2 = crate::ipc::IpcMessage::new(
"astrid.telegram.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event2 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg2,
};
bus.publish(event2);
assert!(all_receiver.try_recv().is_some());
assert!(specific_receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_topic_subscription_wildcard() {
let bus = EventBus::new();
let mut wildcard_receiver = bus.subscribe_topic("astrid.*");
let msg1 = crate::ipc::IpcMessage::new(
"astrid.cli.input",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event1 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg1,
};
let msg2 = crate::ipc::IpcMessage::new(
"system.log",
crate::ipc::IpcPayload::UserInput {
text: "hello".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
);
let event2 = AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg2,
};
bus.publish(event1);
bus.publish(event2);
let received = wildcard_receiver.try_recv().unwrap();
if let AstridEvent::Ipc { message, .. } = &*received {
assert_eq!(message.topic, "astrid.cli.input");
} else {
panic!("Expected IPC event");
}
assert!(wildcard_receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_topic_subscription_ignores_non_ipc() {
let bus = EventBus::new();
let mut specific_receiver = bus.subscribe_topic("astrid.cli.input");
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".into(),
};
bus.publish(event);
assert!(specific_receiver.try_recv().is_none());
}
fn ipc_event(topic: &str) -> AstridEvent {
AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: crate::ipc::IpcMessage::new(
topic,
crate::ipc::IpcPayload::UserInput {
text: "x".into(),
session_id: "default".into(),
context: None,
},
uuid::Uuid::new_v4(),
),
}
}
#[tokio::test]
async fn test_wildcard_matches_multiple_depths() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("astrid.v1.request.*");
bus.publish(ipc_event("astrid.v1.request.list_capsules"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.v1.request.foo.bar"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.v1.request"));
assert!(receiver.try_recv().is_none());
bus.publish(ipc_event("system.v1.request.foo"));
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_wildcard_rejects_deep_topics() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("a.*");
let deep = (0..21)
.map(|i| format!("s{i}"))
.collect::<Vec<_>>()
.join(".");
let topic = format!("a.{deep}");
bus.publish(ipc_event(&topic));
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_middle_wildcard_matches_one_segment() {
let bus = EventBus::new();
let mut receiver = bus.subscribe_topic("astrid.*.input");
bus.publish(ipc_event("astrid.cli.input"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.telegram.input"));
assert!(receiver.try_recv().is_some());
bus.publish(ipc_event("astrid.cli.output"));
assert!(receiver.try_recv().is_none());
bus.publish(ipc_event("astrid.cli.sub.input"));
assert!(receiver.try_recv().is_none());
}
#[tokio::test]
async fn test_drain_lagged_initially_zero() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
assert_eq!(receiver.drain_lagged(), 0);
}
#[tokio::test]
async fn test_drain_lagged_resets_after_read() {
let bus = EventBus::with_capacity(2);
let mut receiver = bus.subscribe();
for i in 0..5 {
let event = AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: format!("{i}"),
};
bus.publish(event);
}
let _ = receiver.try_recv();
let lagged = receiver.drain_lagged();
assert!(lagged > 0, "expected lag count > 0, got {lagged}");
assert_eq!(receiver.drain_lagged(), 0);
}
#[tokio::test]
async fn test_drain_lagged_accumulates_across_calls() {
let bus = EventBus::with_capacity(2);
let mut receiver = bus.subscribe();
for _ in 0..4 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "v1".into(),
});
}
while receiver.try_recv().is_some() {}
let lag1 = receiver.drain_lagged();
for _ in 0..4 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "v2".into(),
});
}
while receiver.try_recv().is_some() {}
let lag2 = receiver.drain_lagged();
assert!(lag1 > 0, "first burst should lag");
assert!(lag2 > 0, "second burst should lag");
}
#[tokio::test]
async fn test_recv_blocking_with_timeout() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let result = tokio::time::timeout(std::time::Duration::from_millis(50), receiver.recv()).await;
assert!(result.is_err(), "expected timeout, got a message");
}
#[tokio::test]
async fn test_recv_blocking_wakes_on_message() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
let bus_clone = bus.clone();
tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(10)).await;
bus_clone.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "wake".into(),
});
});
let result = tokio::time::timeout(std::time::Duration::from_secs(5), receiver.recv()).await;
assert!(result.is_ok(), "recv should have woken up");
let event = result.unwrap().unwrap();
assert_eq!(event.event_type(), "astrid.v1.lifecycle.runtime_started");
}
#[tokio::test]
async fn test_try_recv_drains_burst() {
let bus = EventBus::new();
let mut receiver = bus.subscribe();
for i in 0..10 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: format!("{i}"),
});
}
let mut count = 0;
while receiver.try_recv().is_some() {
count += 1;
}
assert_eq!(count, 10);
assert!(receiver.try_recv().is_none());
}
#[test]
fn lag_and_publish_metrics_record_with_labels() {
use metrics_util::debugging::DebuggingRecorder;
let recorder = DebuggingRecorder::new();
let snapshotter = recorder.snapshotter();
metrics::with_local_recorder(&recorder, || {
let bus = EventBus::with_capacity(2);
let mut rx = bus.subscribe_as("test_subscriber");
for _ in 0..5 {
bus.publish(AstridEvent::RuntimeStarted {
metadata: EventMetadata::new("test"),
version: "0.1.0".to_string(),
});
}
while rx.try_recv().is_some() {}
});
let mut published = false;
let mut lag_labelled = false;
for (composite, _unit, _desc, _value) in snapshotter.snapshot().into_vec() {
let (_kind, key) = composite.into_parts();
let name = key.name();
if name == METRIC_BUS_EVENTS_PUBLISHED_TOTAL {
published = true;
} else if name == METRIC_BUS_RECEIVER_LAGGED_TOTAL {
lag_labelled = key
.labels()
.any(|l| l.key() == "subscriber" && l.value() == "test_subscriber");
}
}
assert!(published, "publish counter not recorded");
assert!(
lag_labelled,
"lag counter missing or not labelled with the subscriber tag"
);
}
fn ipc_evt(topic: &str, principal: Option<&str>) -> AstridEvent {
let mut msg = crate::ipc::IpcMessage::new(
topic,
crate::ipc::IpcPayload::RawJson(serde_json::json!({})),
uuid::Uuid::nil(),
);
msg.principal = principal.map(String::from);
AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
}
}
#[tokio::test]
async fn routed_demux_no_broadcast_storm() {
let bus = EventBus::new();
let mut subs = Vec::new();
for _ in 0..5 {
subs.push(bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-x", "test_sub"));
}
assert_eq!(bus.routed_subscription_count(), 5);
for i in 0..10 {
bus.publish(ipc_evt(&format!("t.x{i}"), Some("alice")));
}
for sub in &mut subs {
let drained = sub.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(
drained.len(),
10,
"each routed sub should receive 10 events"
);
}
}
#[tokio::test]
async fn routed_subscription_isolates_principals_under_burst() {
let bus = EventBus::new();
let cap_uuid = uuid::Uuid::new_v4();
let mut sub = bus.subscribe_topic_routed(cap_uuid, "t.*", "capsule-y", "test_sub");
for _ in 0..200 {
bus.publish(ipc_evt("t.x", Some("alice")));
}
assert_eq!(sub.active_principals(), 1);
let drained = sub.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert!(!drained.is_empty(), "alice's burst should drain");
for ev in &drained {
if let AstridEvent::Ipc { message, .. } = &**ev {
assert_eq!(message.principal.as_deref(), Some("alice"));
}
}
}
#[tokio::test]
async fn routed_subscription_dropped_on_drop() {
let bus = EventBus::new();
let sub = bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-z", "test_sub");
assert_eq!(bus.routed_subscription_count(), 1);
drop(sub);
assert_eq!(bus.routed_subscription_count(), 0);
}
#[tokio::test]
async fn routed_recv_wakes_on_publish() {
let bus = EventBus::new();
let cap_uuid = uuid::Uuid::new_v4();
let mut sub = bus.subscribe_topic_routed(cap_uuid, "t.*", "capsule-w", "test_sub");
let bus2 = bus.clone();
let publisher = tokio::spawn(async move {
tokio::time::sleep(std::time::Duration::from_millis(20)).await;
bus2.publish(ipc_evt("t.wake", Some("alice")));
});
let got = sub
.recv(Some(std::time::Duration::from_secs(2)))
.await
.expect("recv should wake");
if let AstridEvent::Ipc { message, .. } = &*got {
assert_eq!(message.topic, "t.wake");
} else {
panic!("expected IPC event");
}
publisher.await.expect("publisher task");
}
#[tokio::test]
async fn routed_recv_timeout_returns_none_when_idle() {
let bus = EventBus::new();
let cap_uuid = uuid::Uuid::new_v4();
let mut sub = bus.subscribe_topic_routed(cap_uuid, "t.*", "capsule-w", "test_sub");
let got = sub.recv(Some(std::time::Duration::from_millis(20))).await;
assert!(
got.is_none(),
"timeout should expire when no publish arrives"
);
}
#[tokio::test]
async fn routed_path_does_not_disturb_broadcast_subscribers() {
let bus = EventBus::new();
let mut broad = bus.subscribe_as("broad_sub");
let mut routed =
bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-a", "test_sub");
bus.publish(ipc_evt("t.x", Some("alice")));
let b = broad.try_recv().expect("broadcast sub should receive");
assert!(matches!(&*b, AstridEvent::Ipc { .. }));
let drained = routed.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(drained.len(), 1);
}
#[tokio::test]
async fn routed_5000_principals_demand_allocate() {
let bus = EventBus::new();
let mut sub =
bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-load", "test_sub");
for i in 0..5000 {
bus.publish(ipc_evt("t.x", Some(&format!("p{i}"))));
}
assert_eq!(sub.active_principals(), 5000);
let drained = sub.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(drained.len(), 5000);
assert_eq!(sub.active_principals(), 0);
}
#[tokio::test]
async fn routed_receiver_drain_under_n1000_fanin_via_try_drain() {
let bus = EventBus::new();
let mut sub =
bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-fanin", "test_sub");
for i in 0..1000 {
bus.publish(ipc_evt("t.x", Some(&format!("p{i}"))));
}
assert_eq!(sub.active_principals(), 1000);
let drained = sub.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
let mut seen = std::collections::HashSet::new();
for ev in &drained {
if let AstridEvent::Ipc { message, .. } = &**ev
&& let Some(p) = &message.principal
{
seen.insert(p.clone());
}
}
assert_eq!(seen.len(), 1000, "every distinct principal should drain");
}
#[tokio::test]
async fn routed_receiver_isolation_between_subscriptions() {
let bus = EventBus::new();
let mut a = bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-a", "test_sub");
let mut b = bus.subscribe_topic_routed(uuid::Uuid::new_v4(), "t.*", "capsule-b", "test_sub");
for _ in 0..10 {
bus.publish(ipc_evt("t.x", Some("alice")));
}
let drained_a = a.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(drained_a.len(), 10, "a should see all 10");
let drained_b = b.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(drained_b.len(), 10, "b independently sees all 10");
}
const AUDIT_TOPIC: &str = "astrid.v1.audit.entry";
#[tokio::test]
async fn cross_principal_audit_isolation_at_bus() {
let bus = EventBus::new();
let mut sub = bus.subscribe_topic_routed_scoped(
uuid::Uuid::new_v4(),
AUDIT_TOPIC,
"audit-consumer",
"test_sub",
Some(Some("alice".into())),
);
for _ in 0..7 {
bus.publish(ipc_evt(AUDIT_TOPIC, Some("alice")));
}
for _ in 0..4 {
bus.publish(ipc_evt(AUDIT_TOPIC, Some("bob")));
}
bus.publish(ipc_evt(AUDIT_TOPIC, None));
assert_eq!(sub.active_principals(), 1);
let drained = sub.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(drained.len(), 7, "only alice's seven entries are delivered");
for ev in &drained {
if let AstridEvent::Ipc { message, .. } = &**ev {
assert_eq!(message.principal.as_deref(), Some("alice"));
} else {
panic!("expected IPC event");
}
}
}
#[tokio::test]
async fn unscoped_route_unchanged_regression() {
let bus = EventBus::new();
let mut sub = bus.subscribe_topic_routed(
uuid::Uuid::new_v4(),
AUDIT_TOPIC,
"audit-firehose",
"test_sub",
);
for _ in 0..7 {
bus.publish(ipc_evt(AUDIT_TOPIC, Some("alice")));
}
for _ in 0..4 {
bus.publish(ipc_evt(AUDIT_TOPIC, Some("bob")));
}
bus.publish(ipc_evt(AUDIT_TOPIC, None));
assert_eq!(sub.active_principals(), 3);
let drained = sub.try_drain(super::MAX_SUBSCRIPTION_BUDGET_BYTES);
assert_eq!(drained.len(), 12, "firehose delivers all 7 + 4 + 1 events");
}
#[tokio::test]
async fn scoped_drop_no_spurious_notify() {
let bus = EventBus::new();
let mut sub = bus.subscribe_topic_routed_scoped(
uuid::Uuid::new_v4(),
AUDIT_TOPIC,
"audit-consumer",
"test_sub",
Some(Some("alice".into())),
);
for _ in 0..50 {
bus.publish(ipc_evt(AUDIT_TOPIC, Some("bob")));
}
let got = sub.recv(Some(std::time::Duration::from_millis(50))).await;
assert!(got.is_none(), "foreign-only burst must not wake the owner");
assert_eq!(sub.active_principals(), 0, "no bucket ever allocated");
}