use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use tokio::sync::{RwLock, mpsc};
use tracing::{debug, warn};
use crate::capsule::{Capsule, CapsuleId};
use crate::registry::CapsuleRegistry;
use astrid_events::{AstridEvent, EventBus, EventReceiver};
const CAPSULE_EVENT_QUEUE_CAPACITY: usize = 256;
struct InterceptorWork {
action: String,
payload: Arc<Vec<u8>>,
topic: Arc<String>,
ipc_message: Option<Arc<astrid_events::ipc::IpcMessage>>,
}
pub struct EventDispatcher {
registry: Arc<RwLock<CapsuleRegistry>>,
event_bus: Arc<EventBus>,
receiver: EventReceiver,
identity_store: Option<Arc<dyn astrid_storage::IdentityStore>>,
}
impl EventDispatcher {
#[must_use]
pub fn new(registry: Arc<RwLock<CapsuleRegistry>>, event_bus: Arc<EventBus>) -> Self {
let receiver = event_bus.subscribe();
Self {
registry,
event_bus,
receiver,
identity_store: None,
}
}
#[must_use]
pub fn with_identity_store(mut self, store: Arc<dyn astrid_storage::IdentityStore>) -> Self {
self.identity_store = Some(store);
self
}
pub async fn run(mut self) {
let mut last_lag_notification = std::time::Instant::now()
.checked_sub(std::time::Duration::from_secs(10))
.unwrap_or_else(std::time::Instant::now);
let mut capsule_queues: HashMap<CapsuleId, mpsc::Sender<InterceptorWork>> = HashMap::new();
let mut known_principals: HashSet<String> = HashSet::new();
known_principals.insert("default".to_string());
const MAX_KNOWN_PRINCIPALS: usize = 10_000;
debug!("Event dispatcher started");
while let Some(event) = self.receiver.recv().await {
let lagged = self.receiver.drain_lagged();
if lagged > 0 && last_lag_notification.elapsed() >= std::time::Duration::from_secs(10) {
warn!(
lagged_count = lagged,
"Event bus broadcast channel lagged - {lagged} messages dropped"
);
last_lag_notification = std::time::Instant::now();
let msg = astrid_events::ipc::IpcMessage::new(
"astrid.v1.event_bus.lagged",
astrid_events::ipc::IpcPayload::Custom {
data: serde_json::json!({ "lagged_count": lagged }),
},
uuid::Uuid::new_v4(),
);
self.event_bus.publish(AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("dispatcher"),
message: msg,
});
}
let (topic, payload_bytes, ipc_message) = match &*event {
AstridEvent::Ipc { message, .. } => {
let topic = Arc::new(message.topic.clone());
match message.payload.to_guest_bytes() {
Ok(bytes) => (topic, Arc::new(bytes), Some(Arc::new(message.clone()))),
Err(e) => {
warn!(topic = %message.topic, error = %e, "Failed to serialize IPC payload");
continue;
},
}
},
other => {
let topic = Arc::new(other.event_type().to_string());
match serde_json::to_vec(other) {
Ok(bytes) => (topic, Arc::new(bytes), None),
Err(e) => {
warn!(event_type = %topic, error = %e, "Failed to serialize lifecycle event");
continue;
},
}
},
};
if let Some(ref msg) = ipc_message
&& let Some(ref principal_str) = msg.principal
&& !known_principals.contains(principal_str)
{
if let Ok(pid) = astrid_core::PrincipalId::new(principal_str) {
let should_provision =
self.identity_store.is_none() || pid == astrid_core::PrincipalId::default();
if should_provision && let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
let ph = home.principal_home(&pid);
if let Err(e) = ph.ensure() {
warn!(
principal = %pid,
error = %e,
"Failed to auto-provision principal home"
);
} else {
debug!(
principal = %pid,
"Auto-provisioned principal home directory"
);
if known_principals.len() < MAX_KNOWN_PRINCIPALS {
known_principals.insert(principal_str.clone());
}
}
}
} else {
warn!(
principal = %principal_str,
"IPC message has invalid principal string, ignoring"
);
}
}
let matches = find_matching_interceptors(&self.registry, &topic).await;
dispatch_to_capsule_queues(
&mut capsule_queues,
matches,
topic,
payload_bytes,
ipc_message,
);
}
debug!("Event dispatcher stopped (event bus closed)");
}
}
fn dispatch_to_capsule_queues(
queues: &mut HashMap<CapsuleId, mpsc::Sender<InterceptorWork>>,
matches: Vec<(Arc<dyn Capsule>, String)>,
topic: Arc<String>,
payload_bytes: Arc<Vec<u8>>,
ipc_message: Option<Arc<astrid_events::ipc::IpcMessage>>,
) {
if matches.is_empty() {
return;
}
let matches_owned: Vec<_> = matches
.into_iter()
.map(|(c, a)| (Arc::clone(&c), a))
.collect();
if matches_owned.len() == 1 {
let (capsule, action) = matches_owned.into_iter().next().unwrap();
dispatch_single(queues, capsule, action, topic, payload_bytes, ipc_message);
return;
}
let topic_clone = Arc::clone(&topic);
let ipc_clone = ipc_message.clone();
tokio::task::spawn(async move {
let mut current_payload = (*payload_bytes).clone();
for (capsule, action) in &matches_owned {
debug!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
"Dispatching interceptor (chain)"
);
let caller = ipc_clone.as_deref();
match capsule.invoke_interceptor(action, ¤t_payload, caller) {
Ok(crate::capsule::InterceptResult::Continue(modified_payload)) => {
debug!(
capsule_id = %capsule.id(),
action = %action,
"Interceptor: Continue"
);
if !modified_payload.is_empty() {
current_payload = modified_payload;
}
},
Ok(crate::capsule::InterceptResult::Final(response)) => {
debug!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
response_len = response.len(),
"Interceptor: Final — chain halted"
);
return; },
Ok(crate::capsule::InterceptResult::Deny { reason }) => {
warn!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
reason = %reason,
"Interceptor: Deny — chain halted"
);
return; },
Err(crate::error::CapsuleError::NotSupported(ref msg)) => {
debug!(
capsule_id = %capsule.id(),
action = %action,
reason = %msg,
"Interceptor skipped (NotSupported)"
);
},
Err(e) => {
warn!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
error = %e,
"Interceptor invocation failed — continuing chain"
);
},
}
}
});
}
fn dispatch_single(
queues: &mut HashMap<CapsuleId, mpsc::Sender<InterceptorWork>>,
capsule: Arc<dyn Capsule>,
action: String,
topic: Arc<String>,
payload_bytes: Arc<Vec<u8>>,
ipc_message: Option<Arc<astrid_events::ipc::IpcMessage>>,
) {
let sender = queues.entry(capsule.id().clone()).or_insert_with(|| {
let (tx, mut rx) = mpsc::channel::<InterceptorWork>(CAPSULE_EVENT_QUEUE_CAPACITY);
let capsule = Arc::clone(&capsule);
tokio::task::spawn(async move {
while let Some(work) = rx.recv().await {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
topic = %work.topic,
"Dispatching interceptor (ordered)"
);
let caller = work.ipc_message.as_deref();
match capsule.invoke_interceptor(&work.action, &work.payload, caller) {
Ok(crate::capsule::InterceptResult::Continue(_)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
"Interceptor completed (Continue)"
);
},
Ok(crate::capsule::InterceptResult::Final(_)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
"Interceptor completed (Final)"
);
},
Ok(crate::capsule::InterceptResult::Deny { reason }) => {
warn!(
capsule_id = %capsule.id(),
action = %work.action,
topic = %work.topic,
reason = %reason,
"Interceptor: Deny"
);
},
Err(crate::error::CapsuleError::NotSupported(ref msg)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
reason = %msg,
"Interceptor skipped (NotSupported)"
);
},
Err(e) => {
warn!(
capsule_id = %capsule.id(),
action = %work.action,
topic = %work.topic,
error = %e,
"Interceptor invocation failed"
);
},
}
}
});
tx
});
let work = InterceptorWork {
action,
payload: Arc::clone(&payload_bytes),
topic: Arc::clone(&topic),
ipc_message: ipc_message.clone(),
};
if let Err(e) = sender.try_send(work) {
warn!(
capsule_id = %capsule.id(),
topic = %topic,
"Capsule dispatch queue full or closed, dropping event: {e}"
);
}
}
async fn find_matching_interceptors(
registry: &RwLock<CapsuleRegistry>,
topic: &str,
) -> Vec<(Arc<dyn crate::capsule::Capsule>, String)> {
let registry = registry.read().await;
let mut matches: Vec<(Arc<dyn crate::capsule::Capsule>, String, u32)> = Vec::new();
for capsule_id in registry.list() {
if let Some(capsule) = registry.get(capsule_id) {
if !matches!(capsule.state(), crate::capsule::CapsuleState::Ready) {
continue;
}
for interceptor in capsule.manifest().effective_interceptors() {
if crate::topic::topic_matches(topic, &interceptor.event) {
matches.push((
Arc::clone(&capsule),
interceptor.action,
interceptor.priority,
));
}
}
}
}
matches.sort_by_key(|(_, _, priority)| *priority);
matches
.into_iter()
.map(|(capsule, action, _)| (capsule, action))
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use std::sync::Mutex;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use crate::capsule::{Capsule, CapsuleId, CapsuleState, InterceptResult};
use crate::context::CapsuleContext;
use crate::error::CapsuleResult;
use crate::manifest::{CapabilitiesDef, CapsuleManifest, InterceptorDef, PackageDef};
use astrid_events::ipc::IpcPayload;
struct MockCapsule {
id: CapsuleId,
manifest: CapsuleManifest,
invoked: Arc<AtomicBool>,
invocation_log: Option<Arc<Mutex<Vec<String>>>>,
result_override: Option<InterceptResult>,
}
impl MockCapsule {
fn new(name: &str, interceptor_event: &str) -> (Self, Arc<AtomicBool>) {
Self::with_priority(name, interceptor_event, 100, None)
}
fn with_priority(
name: &str,
interceptor_event: &str,
priority: u32,
invocation_log: Option<Arc<Mutex<Vec<String>>>>,
) -> (Self, Arc<AtomicBool>) {
let invoked = Arc::new(AtomicBool::new(false));
let manifest = CapsuleManifest {
package: PackageDef {
name: name.to_string(),
version: "0.0.1".to_string(),
description: None,
authors: Vec::new(),
repository: None,
homepage: None,
documentation: None,
license: None,
license_file: None,
readme: None,
keywords: Vec::new(),
categories: Vec::new(),
astrid_version: None,
publish: None,
include: None,
exclude: None,
metadata: None,
},
components: Vec::new(),
imports: std::collections::HashMap::new(),
exports: std::collections::HashMap::new(),
capabilities: CapabilitiesDef::default(),
env: std::collections::HashMap::new(),
context_files: Vec::new(),
commands: Vec::new(),
mcp_servers: Vec::new(),
skills: Vec::new(),
uplinks: Vec::new(),
interceptors: vec![InterceptorDef {
event: interceptor_event.to_string(),
action: "test_action".to_string(),
priority,
}],
topics: Vec::new(),
publishes: ::std::collections::HashMap::new(),
subscribes: ::std::collections::HashMap::new(),
tools: ::std::vec::Vec::new(),
};
let capsule = Self {
id: CapsuleId::from_static(name),
manifest,
invoked: Arc::clone(&invoked),
invocation_log,
result_override: None,
};
(capsule, invoked)
}
}
#[async_trait]
impl Capsule for MockCapsule {
fn id(&self) -> &CapsuleId {
&self.id
}
fn manifest(&self) -> &CapsuleManifest {
&self.manifest
}
fn state(&self) -> CapsuleState {
CapsuleState::Ready
}
async fn load(&mut self, _ctx: &CapsuleContext) -> CapsuleResult<()> {
Ok(())
}
async fn unload(&mut self) -> CapsuleResult<()> {
Ok(())
}
fn invoke_interceptor(
&self,
_action: &str,
_payload: &[u8],
_caller: Option<&astrid_events::ipc::IpcMessage>,
) -> CapsuleResult<InterceptResult> {
self.invoked.store(true, Ordering::SeqCst);
if let Some(ref log) = self.invocation_log {
log.lock().unwrap().push(self.id.to_string());
}
if let Some(ref result) = self.result_override {
return Ok(result.clone());
}
Ok(InterceptResult::Continue(Vec::new()))
}
}
fn publish_ipc(bus: &EventBus, topic: &str) {
let msg = astrid_events::ipc::IpcMessage::new(
topic,
IpcPayload::Custom {
data: serde_json::json!({}),
},
uuid::Uuid::nil(),
);
bus.publish(AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("test"),
message: msg,
});
}
#[tokio::test]
async fn dispatch_routes_to_matching_interceptor() {
let (capsule, invoked) = MockCapsule::new("test-capsule", "test.topic");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "test.topic");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
invoked.load(Ordering::SeqCst),
"interceptor should have been invoked for matching topic"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_skips_non_matching_topic() {
let (capsule, invoked) = MockCapsule::new("test-capsule-skip", "specific.topic");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "other.topic");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
!invoked.load(Ordering::SeqCst),
"interceptor should NOT have been invoked for non-matching topic"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_concurrent_does_not_block() {
let (cap_a, invoked_a) = MockCapsule::new("capsule-a", "topic.a");
let (cap_b, invoked_b) = MockCapsule::new("capsule-b", "topic.b");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(cap_a)).unwrap();
registry.register(Box::new(cap_b)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "topic.a");
publish_ipc(&bus, "topic.b");
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
invoked_a.load(Ordering::SeqCst),
"capsule-a interceptor should have been invoked"
);
assert!(
invoked_b.load(Ordering::SeqCst),
"capsule-b interceptor should have been invoked"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_routes_lifecycle_events() {
let (capsule, invoked) =
MockCapsule::new("lifecycle-capsule", "astrid.v1.lifecycle.tool_call_started");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
bus.publish(AstridEvent::ToolCallStarted {
metadata: astrid_events::EventMetadata::new("test"),
call_id: uuid::Uuid::nil(),
tool_name: "search".into(),
server_name: None,
});
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
invoked.load(Ordering::SeqCst),
"EventDispatcher should dispatch lifecycle events by event_type()"
);
handle.abort();
}
#[tokio::test]
async fn dispatch_publishes_lag_event_on_overflow() {
let bus = Arc::new(EventBus::with_capacity(2));
let (lag_capsule, _lag_invoked) =
MockCapsule::new("lag-listener", "astrid.v1.event_bus.lagged");
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(lag_capsule)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
for i in 0..20 {
publish_ipc(&bus, &format!("flood.event.{i}"));
}
tokio::time::sleep(Duration::from_millis(500)).await;
assert!(!handle.is_finished(), "dispatcher should still be running");
handle.abort();
}
#[test]
fn mock_capsule_check_health_returns_ready() {
let (capsule, _) = MockCapsule::new("health-test", "test.topic");
assert_eq!(capsule.check_health(), CapsuleState::Ready);
}
#[tokio::test]
async fn dispatch_respects_interceptor_priority_order() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (guard, _) =
MockCapsule::with_priority("guard", "shared.topic", 10, Some(Arc::clone(&order)));
let (handler, _) =
MockCapsule::with_priority("handler", "shared.topic", 100, Some(Arc::clone(&order)));
let (transform, _) =
MockCapsule::with_priority("transform", "shared.topic", 50, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(handler)).unwrap();
registry.register(Box::new(guard)).unwrap();
registry.register(Box::new(transform)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "shared.topic");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["guard", "transform", "handler"],
"interceptors must fire in priority order (lower first)"
);
handle.abort();
}
#[tokio::test]
async fn find_matching_interceptors_sorts_by_priority() {
let (low, _) = MockCapsule::with_priority("low-pri", "test.event", 10, None);
let (high, _) = MockCapsule::with_priority("high-pri", "test.event", 200, None);
let (mid, _) = MockCapsule::with_priority("mid-pri", "test.event", 50, None);
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(high)).unwrap();
registry.register(Box::new(low)).unwrap();
registry.register(Box::new(mid)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let matches = find_matching_interceptors(®istry, "test.event").await;
let names: Vec<&str> = matches.iter().map(|(c, _)| c.id().as_str()).collect();
assert_eq!(
names,
vec!["low-pri", "mid-pri", "high-pri"],
"find_matching_interceptors must return results sorted by priority"
);
}
#[tokio::test]
async fn deny_interceptor_short_circuits_chain() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (mut guard, _) =
MockCapsule::with_priority("guard", "shared.topic", 10, Some(Arc::clone(&order)));
guard.result_override = Some(InterceptResult::Deny {
reason: "blocked by guard".into(),
});
let (handler, invoked_handler) =
MockCapsule::with_priority("handler", "shared.topic", 100, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(handler)).unwrap();
registry.register(Box::new(guard)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "shared.topic");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["guard"],
"only the guard should have fired — handler should be short-circuited"
);
assert!(
!invoked_handler.load(Ordering::SeqCst),
"handler must NOT be invoked after Deny"
);
handle.abort();
}
#[tokio::test]
async fn final_interceptor_short_circuits_chain() {
let order = Arc::new(Mutex::new(Vec::<String>::new()));
let (mut cache, _) =
MockCapsule::with_priority("cache", "shared.topic", 30, Some(Arc::clone(&order)));
cache.result_override = Some(InterceptResult::Final(b"cached response".to_vec()));
let (core, invoked_core) =
MockCapsule::with_priority("core", "shared.topic", 100, Some(Arc::clone(&order)));
let mut registry = CapsuleRegistry::new();
registry.register(Box::new(core)).unwrap();
registry.register(Box::new(cache)).unwrap();
let registry = Arc::new(RwLock::new(registry));
let bus = Arc::new(EventBus::with_capacity(64));
let dispatcher = EventDispatcher::new(Arc::clone(®istry), Arc::clone(&bus));
let handle = tokio::spawn(dispatcher.run());
tokio::task::yield_now().await;
publish_ipc(&bus, "shared.topic");
tokio::time::sleep(Duration::from_millis(300)).await;
let recorded = order.lock().unwrap().clone();
assert_eq!(
recorded,
vec!["cache"],
"only the cache should have fired — core should be short-circuited"
);
assert!(
!invoked_core.load(Ordering::SeqCst),
"core must NOT be invoked after Final"
);
handle.abort();
}
#[test]
fn intercept_result_from_guest_bytes() {
let r = InterceptResult::from_guest_bytes(vec![]);
assert!(matches!(r, InterceptResult::Continue(ref b) if b.is_empty()));
let r = InterceptResult::from_guest_bytes(vec![0x00, 1, 2, 3]);
assert!(matches!(r, InterceptResult::Continue(ref b) if b == &[1, 2, 3]));
let r = InterceptResult::from_guest_bytes(vec![0x01, 4, 5]);
assert!(matches!(r, InterceptResult::Final(ref b) if b == &[4, 5]));
let r = InterceptResult::from_guest_bytes(vec![0x02, b'n', b'o']);
assert!(matches!(r, InterceptResult::Deny { ref reason } if reason == "no"));
let r = InterceptResult::from_guest_bytes(vec![0xFF, 1]);
assert!(matches!(r, InterceptResult::Continue(ref b) if b == &[0xFF, 1]));
}
}