use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{RwLock, mpsc};
use tracing::{debug, warn};
use crate::capsule::{Capsule, CapsuleId};
use crate::registry::CapsuleRegistry;
use astrid_events::PrincipalKey;
use astrid_events::{AstridEvent, EventBus, EventReceiver};
const CAPSULE_EVENT_QUEUE_CAPACITY: usize = 64;
const MAX_DISPATCHER_QUEUES_PER_CAPSULE: usize = 10_000;
const DEFAULT_IDLE_CONSUMER_GRACE_MS: u64 = 60_000;
static IDLE_CONSUMER_GRACE_MS: std::sync::atomic::AtomicU64 =
std::sync::atomic::AtomicU64::new(DEFAULT_IDLE_CONSUMER_GRACE_MS);
fn idle_consumer_grace() -> Duration {
Duration::from_millis(IDLE_CONSUMER_GRACE_MS.load(std::sync::atomic::Ordering::Relaxed))
}
#[cfg(test)]
pub(crate) fn set_idle_consumer_grace_for_test(ms: u64) {
IDLE_CONSUMER_GRACE_MS.store(ms, std::sync::atomic::Ordering::Relaxed);
}
type ChainLocks =
Arc<parking_lot::RwLock<HashMap<(CapsuleId, PrincipalKey), Arc<tokio::sync::Mutex<()>>>>>;
struct ChainLockGuard {
guard: Option<tokio::sync::OwnedMutexGuard<()>>,
mutex: Arc<tokio::sync::Mutex<()>>,
chain_locks: ChainLocks,
key: (CapsuleId, PrincipalKey),
}
impl Drop for ChainLockGuard {
fn drop(&mut self) {
self.guard.take();
let mut write = self.chain_locks.write();
if let Some(entry) = write.get(&self.key)
&& Arc::ptr_eq(entry, &self.mutex)
&& Arc::strong_count(entry) == 2
{
write.remove(&self.key);
}
}
}
async fn acquire_chain_lock(
chain_locks: &ChainLocks,
key: (CapsuleId, PrincipalKey),
) -> ChainLockGuard {
let mutex = {
let read = chain_locks.read();
if let Some(m) = read.get(&key) {
Arc::clone(m)
} else {
drop(read);
let mut write = chain_locks.write();
Arc::clone(
write
.entry(key.clone())
.or_insert_with(|| Arc::new(tokio::sync::Mutex::new(()))),
)
}
};
let guard = Arc::clone(&mutex).lock_owned().await;
ChainLockGuard {
guard: Some(guard),
mutex,
chain_locks: Arc::clone(chain_locks),
key,
}
}
type CapsuleQueues =
Arc<parking_lot::Mutex<HashMap<(CapsuleId, PrincipalKey), mpsc::Sender<InterceptorWork>>>>;
struct InterceptorWork {
action: String,
payload: Arc<Vec<u8>>,
topic: Arc<String>,
ipc_message: Option<Arc<astrid_events::ipc::IpcMessage>>,
}
pub struct EventDispatcher {
registry: Arc<RwLock<CapsuleRegistry>>,
event_bus: Arc<EventBus>,
receiver: EventReceiver,
identity_store: Option<Arc<dyn astrid_storage::IdentityStore>>,
chain_locks: ChainLocks,
}
impl EventDispatcher {
#[must_use]
pub fn new(registry: Arc<RwLock<CapsuleRegistry>>, event_bus: Arc<EventBus>) -> Self {
let receiver = event_bus.subscribe_as("capsule_dispatcher");
Self {
registry,
event_bus,
receiver,
identity_store: None,
chain_locks: Arc::new(parking_lot::RwLock::new(HashMap::new())),
}
}
#[must_use]
pub fn with_identity_store(mut self, store: Arc<dyn astrid_storage::IdentityStore>) -> Self {
self.identity_store = Some(store);
self
}
pub async fn run(mut self) {
let mut last_lag_notification = std::time::Instant::now()
.checked_sub(std::time::Duration::from_secs(10))
.unwrap_or_else(std::time::Instant::now);
let capsule_queues: CapsuleQueues = Arc::new(parking_lot::Mutex::new(HashMap::new()));
let mut known_principals: HashSet<String> = HashSet::new();
known_principals.insert("default".to_string());
const MAX_KNOWN_PRINCIPALS: usize = 10_000;
debug!("Event dispatcher started");
while let Some(event) = self.receiver.recv().await {
let lagged = self.receiver.drain_lagged();
if lagged > 0 && last_lag_notification.elapsed() >= std::time::Duration::from_secs(10) {
warn!(
lagged_count = lagged,
"Event bus broadcast channel lagged - {lagged} messages dropped"
);
last_lag_notification = std::time::Instant::now();
let msg = astrid_events::ipc::IpcMessage::new(
"astrid.v1.event_bus.lagged",
astrid_events::ipc::IpcPayload::Custom {
data: serde_json::json!({ "lagged_count": lagged }),
},
uuid::Uuid::new_v4(),
);
self.event_bus.publish(AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("dispatcher"),
message: msg,
});
}
let (topic, payload_bytes, ipc_message) = match &*event {
AstridEvent::Ipc { message, .. } => {
let topic = Arc::new(message.topic.clone());
match message.payload.to_guest_bytes() {
Ok(bytes) => (topic, Arc::new(bytes), Some(Arc::new(message.clone()))),
Err(e) => {
warn!(topic = %message.topic, error = %e, "Failed to serialize IPC payload");
continue;
},
}
},
other => {
let topic = Arc::new(other.event_type().to_string());
match serde_json::to_vec(other) {
Ok(bytes) => (topic, Arc::new(bytes), None),
Err(e) => {
warn!(event_type = %topic, error = %e, "Failed to serialize lifecycle event");
continue;
},
}
},
};
if let Some(ref msg) = ipc_message
&& let Some(ref principal_str) = msg.principal
&& !known_principals.contains(principal_str)
{
if let Ok(pid) = astrid_core::PrincipalId::new(principal_str) {
let should_provision =
self.identity_store.is_none() || pid == astrid_core::PrincipalId::default();
if should_provision && let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
let ph = home.principal_home(&pid);
if let Err(e) = ph.ensure() {
warn!(
principal = %pid,
error = %e,
"Failed to auto-provision principal home"
);
} else {
debug!(
principal = %pid,
"Auto-provisioned principal home directory"
);
if known_principals.len() < MAX_KNOWN_PRINCIPALS {
known_principals.insert(principal_str.clone());
}
}
}
} else {
warn!(
principal = %principal_str,
"IPC message has invalid principal string, ignoring"
);
}
}
let matches = find_matching_interceptors(&self.registry, &topic).await;
dispatch_to_capsule_queues(
&capsule_queues,
&self.chain_locks,
matches,
topic,
payload_bytes,
ipc_message,
);
}
debug!("Event dispatcher stopped (event bus closed)");
}
}
fn dispatch_to_capsule_queues(
queues: &CapsuleQueues,
chain_locks: &ChainLocks,
matches: Vec<(Arc<dyn Capsule>, String)>,
topic: Arc<String>,
payload_bytes: Arc<Vec<u8>>,
ipc_message: Option<Arc<astrid_events::ipc::IpcMessage>>,
) {
if matches.is_empty() {
return;
}
let matches_owned: Vec<_> = matches
.into_iter()
.map(|(c, a)| (Arc::clone(&c), a))
.collect();
let principal_key: PrincipalKey = ipc_message.as_deref().and_then(|m| m.principal.clone());
if matches_owned.len() == 1 {
let (capsule, action) = matches_owned.into_iter().next().unwrap();
dispatch_single(
queues,
capsule,
action,
topic,
payload_bytes,
ipc_message,
principal_key,
);
return;
}
let topic_clone = Arc::clone(&topic);
let ipc_clone = ipc_message.clone();
let chain_locks_clone = Arc::clone(chain_locks);
tokio::task::spawn(async move {
let mut current_payload = (*payload_bytes).clone();
for (capsule, action) in &matches_owned {
debug!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
"Dispatching interceptor (chain)"
);
let chain_key = (capsule.id().clone(), principal_key.clone());
let _chain_guard = acquire_chain_lock(&chain_locks_clone, chain_key).await;
let caller = ipc_clone.as_deref();
match capsule
.invoke_interceptor(action, ¤t_payload, caller)
.await
{
Ok(crate::capsule::InterceptResult::Continue(modified_payload)) => {
debug!(
capsule_id = %capsule.id(),
action = %action,
"Interceptor: Continue"
);
if !modified_payload.is_empty() {
current_payload = modified_payload;
}
},
Ok(crate::capsule::InterceptResult::Final(response)) => {
debug!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
response_len = response.len(),
"Interceptor: Final — chain halted"
);
return; },
Ok(crate::capsule::InterceptResult::Deny { reason }) => {
warn!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
reason = %reason,
"Interceptor: Deny — chain halted"
);
return; },
Err(crate::error::CapsuleError::NotSupported(ref msg)) => {
debug!(
capsule_id = %capsule.id(),
action = %action,
reason = %msg,
"Interceptor skipped (NotSupported)"
);
},
Err(e) => {
warn!(
capsule_id = %capsule.id(),
action = %action,
topic = %topic_clone,
error = %e,
"Interceptor invocation failed — continuing chain"
);
},
}
}
});
}
fn queues_per_capsule(
queues: &HashMap<(CapsuleId, PrincipalKey), mpsc::Sender<InterceptorWork>>,
capsule_id: &CapsuleId,
) -> usize {
queues.keys().filter(|(cid, _)| cid == capsule_id).count()
}
fn get_or_spawn_consumer(
queues: &CapsuleQueues,
capsule: &Arc<dyn Capsule>,
key: (CapsuleId, PrincipalKey),
) -> mpsc::Sender<InterceptorWork> {
let mut guard = queues.lock();
match guard.get(&key) {
Some(s) if !s.is_closed() => return s.clone(),
Some(_) => {
guard.remove(&key);
},
None => {},
}
let mut effective_key = key.clone();
if effective_key.1.is_some()
&& queues_per_capsule(&guard, &effective_key.0) >= MAX_DISPATCHER_QUEUES_PER_CAPSULE
{
tracing::error!(
target: "astrid.audit.ipc",
security_event = true,
capsule = %effective_key.0,
principal_key_count = MAX_DISPATCHER_QUEUES_PER_CAPSULE,
"dispatcher: per-principal queue cap exceeded; degrading to shared queue"
);
effective_key.1 = None;
match guard.get(&effective_key) {
Some(s) if !s.is_closed() => return s.clone(),
Some(_) => {
guard.remove(&effective_key);
},
None => {},
}
}
let (tx, rx) = mpsc::channel::<InterceptorWork>(CAPSULE_EVENT_QUEUE_CAPACITY);
guard.insert(effective_key.clone(), tx.clone());
drop(guard);
let capsule_arc = Arc::clone(capsule);
let queues_arc = Arc::clone(queues);
let cleanup_key = effective_key.clone();
tokio::task::spawn(async move {
run_consumer(rx, capsule_arc, queues_arc, cleanup_key).await;
});
tx
}
async fn run_consumer(
mut rx: mpsc::Receiver<InterceptorWork>,
capsule: Arc<dyn Capsule>,
queues: CapsuleQueues,
key: (CapsuleId, PrincipalKey),
) {
loop {
match tokio::time::timeout(idle_consumer_grace(), rx.recv()).await {
Ok(Some(work)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
topic = %work.topic,
"Dispatching interceptor (ordered)"
);
let caller = work.ipc_message.as_deref();
match capsule
.invoke_interceptor(&work.action, &work.payload, caller)
.await
{
Ok(crate::capsule::InterceptResult::Continue(_)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
"Interceptor completed (Continue)"
);
},
Ok(crate::capsule::InterceptResult::Final(_)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
"Interceptor completed (Final)"
);
},
Ok(crate::capsule::InterceptResult::Deny { reason }) => {
warn!(
capsule_id = %capsule.id(),
action = %work.action,
topic = %work.topic,
reason = %reason,
"Interceptor: Deny"
);
},
Err(crate::error::CapsuleError::NotSupported(ref msg)) => {
debug!(
capsule_id = %capsule.id(),
action = %work.action,
reason = %msg,
"Interceptor skipped (NotSupported)"
);
},
Err(e) => {
warn!(
capsule_id = %capsule.id(),
action = %work.action,
topic = %work.topic,
error = %e,
"Interceptor invocation failed"
);
},
}
},
Ok(None) => {
debug!(
capsule_id = %capsule.id(),
"Per-principal consumer exiting: sender dropped"
);
return;
},
Err(_elapsed) => {
let mut guard = queues.lock();
if rx.try_recv().is_err() && rx.sender_strong_count() == 1 {
guard.remove(&key);
drop(guard);
debug!(
capsule_id = %capsule.id(),
"Per-principal consumer idle-evicted after grace"
);
return;
}
drop(guard);
},
}
}
}
fn dispatch_single(
queues: &CapsuleQueues,
capsule: Arc<dyn Capsule>,
action: String,
topic: Arc<String>,
payload_bytes: Arc<Vec<u8>>,
ipc_message: Option<Arc<astrid_events::ipc::IpcMessage>>,
principal_key: PrincipalKey,
) {
let key = (capsule.id().clone(), principal_key);
let sender = get_or_spawn_consumer(queues, &capsule, key.clone());
let work = InterceptorWork {
action,
payload: Arc::clone(&payload_bytes),
topic: Arc::clone(&topic),
ipc_message: ipc_message.clone(),
};
match sender.try_send(work) {
Ok(()) => {},
Err(mpsc::error::TrySendError::Closed(work)) => {
let sender = get_or_spawn_consumer(queues, &capsule, key);
match sender.try_send(work) {
Ok(()) => {},
Err(e @ mpsc::error::TrySendError::Full(_)) => {
warn!(
capsule_id = %capsule.id(),
topic = %topic,
"Capsule dispatch queue full after re-spawn, dropping event (backpressure): {e}"
);
},
Err(e @ mpsc::error::TrySendError::Closed(_)) => {
warn!(
capsule_id = %capsule.id(),
topic = %topic,
security_event = true,
"BUG: capsule dispatch sender closed immediately after re-spawn; event dropped: {e}"
);
},
}
},
Err(e @ mpsc::error::TrySendError::Full(_)) => {
warn!(
capsule_id = %capsule.id(),
topic = %topic,
"Capsule dispatch queue full, dropping event (backpressure): {e}"
);
},
}
}
async fn find_matching_interceptors(
registry: &RwLock<CapsuleRegistry>,
topic: &str,
) -> Vec<(Arc<dyn crate::capsule::Capsule>, String)> {
let registry = registry.read().await;
let mut matches: Vec<(Arc<dyn crate::capsule::Capsule>, String, u32)> = Vec::new();
for capsule_id in registry.list() {
if let Some(capsule) = registry.get(capsule_id) {
if !matches!(capsule.state(), crate::capsule::CapsuleState::Ready) {
continue;
}
for interceptor in capsule.manifest().effective_interceptors() {
if crate::topic::topic_matches(topic, &interceptor.event) {
matches.push((
Arc::clone(&capsule),
interceptor.action,
interceptor.priority,
));
}
}
}
}
matches.sort_by_key(|(_, _, priority)| *priority);
matches
.into_iter()
.map(|(capsule, action, _)| (capsule, action))
.collect()
}
#[cfg(test)]
#[path = "dispatcher_tests.rs"]
mod tests;