use std::sync::Arc;
use tokio::sync::Mutex;
use wasmtime::component::Resource;
use wasmtime_wasi::p2::DynPollable;
use crate::engine::wasm::bindings::astrid::ipc::host::{
self as ipc, ErrorCode, HostSubscription, InterceptorBinding, IpcEnvelope,
IpcMessage as WitIpcMessage, PrincipalAttribution, Subscription,
};
use crate::engine::wasm::host::util;
use crate::engine::wasm::host_state::HostState;
use astrid_events::AstridEvent;
use astrid_events::EventMetadata;
use astrid_events::RoutedEventReceiver;
use astrid_events::ipc::{IpcMessage as InternalIpcMessage, IpcPayload};
const MAX_PAYLOAD_BYTES: usize = 1024 * 1024;
const MAX_DRAIN_BYTES: usize = MAX_PAYLOAD_BYTES;
const MAX_SUBSCRIPTIONS: usize = 128;
const MAX_RECV_TIMEOUT_MS: u64 = 60_000;
const AUDIT_TOPIC: &str = "astrid.v1.audit.entry";
fn pattern_covers_audit(pattern: &str) -> bool {
let synthetic = AstridEvent::Ipc {
metadata: EventMetadata::new("audit_scope_probe"),
message: InternalIpcMessage::new(
AUDIT_TOPIC,
IpcPayload::RawJson(serde_json::Value::Null),
uuid::Uuid::nil(),
),
};
astrid_events::TopicMatcher::new(pattern).matches(&synthetic)
}
pub(super) struct SubscriptionEntry {
pub(super) receiver: Arc<Mutex<RoutedEventReceiver>>,
pub(super) topic_pattern: String,
}
fn extract_message(event: &Arc<AstridEvent>) -> Option<InternalIpcMessage> {
match &**event {
AstridEvent::Ipc { message, .. } => Some(message.clone()),
_ => None,
}
}
fn map_principal(msg: &InternalIpcMessage) -> PrincipalAttribution {
match msg.principal.clone() {
Some(p) => PrincipalAttribution::Verified(p),
None => PrincipalAttribution::System,
}
}
fn to_wit_message(msg: &InternalIpcMessage) -> WitIpcMessage {
let payload = msg
.payload
.to_guest_bytes()
.map(|b| String::from_utf8_lossy(&b).into_owned())
.unwrap_or_default();
WitIpcMessage {
topic: msg.topic.clone(),
payload,
source_id: msg.source_id.to_string(),
principal: map_principal(msg),
}
}
fn envelope_from(messages: Vec<InternalIpcMessage>, lagged: u64) -> IpcEnvelope {
IpcEnvelope {
messages: messages.iter().map(to_wit_message).collect(),
dropped: 0,
lagged,
}
}
fn audit_ipc<T, E: std::fmt::Debug>(
state: &HostState,
op: &'static str,
topic: &str,
bytes: u64,
result: &Result<T, E>,
) {
let capsule_id = state.capsule_id.as_str();
let principal = state.effective_principal();
match result {
Ok(_) => tracing::debug!(
target: "astrid.audit.ipc",
%capsule_id,
%principal,
fn = op,
topic,
bytes,
"audit",
),
Err(e) => tracing::debug!(
target: "astrid.audit.ipc",
%capsule_id,
%principal,
fn = op,
topic,
error = ?e,
"audit",
),
}
}
fn check_subscribe_acl(state: &HostState, topic_pattern: &str) -> Result<(), ErrorCode> {
if state.ipc_subscribe_patterns.is_empty() {
return Err(ErrorCode::CapabilityDenied);
}
if !state
.ipc_subscribe_patterns
.iter()
.any(|acl| astrid_events::topic_pattern_matches(acl, topic_pattern))
{
return Err(ErrorCode::CapabilityDenied);
}
Ok(())
}
fn publish_inner(
state: &mut HostState,
topic: String,
payload: String,
principal_str: String,
) -> Result<(), ErrorCode> {
if topic.len() > 256 {
return Err(ErrorCode::InvalidInput);
}
let payload_len = payload.len();
let principal = astrid_core::principal::PrincipalId::new(&principal_str)
.unwrap_or_else(|_| state.effective_principal());
let throughput_cap = usize::try_from(state.effective_profile().quotas.max_ipc_throughput_bytes)
.unwrap_or(usize::MAX);
state
.ipc_limiter
.check_quota(state.capsule_uuid, &principal, payload_len, throughput_cap)
.map_err(|_| ErrorCode::RateLimited)?;
if !crate::topic::has_valid_segments(&topic) {
return Err(ErrorCode::InvalidInput);
}
if topic.split('.').count() > 8 {
return Err(ErrorCode::InvalidInput);
}
if state.ipc_publish_patterns.is_empty() {
return Err(ErrorCode::CapabilityDenied);
}
if !state
.ipc_publish_patterns
.iter()
.any(|pattern| astrid_events::topic_pattern_matches(pattern, &topic))
{
return Err(ErrorCode::CapabilityDenied);
}
let payload_bytes = payload.as_bytes();
if payload_bytes.len() > MAX_PAYLOAD_BYTES {
return Err(ErrorCode::InvalidInput);
}
let ipc_payload = match serde_json::from_slice::<serde_json::Value>(payload_bytes) {
Ok(data) => IpcPayload::from_json_value(data),
Err(_) => return Err(ErrorCode::InvalidInput),
};
let message = InternalIpcMessage::new(topic, ipc_payload, state.capsule_uuid)
.with_principal(principal_str);
state.event_bus.publish(AstridEvent::Ipc {
metadata: EventMetadata::new("wasm_guest").with_session_id(state.capsule_uuid),
message,
});
Ok(())
}
impl ipc::Host for HostState {
fn publish(&mut self, topic: String, payload: String) -> Result<(), ErrorCode> {
let principal_str = self
.caller_context
.as_ref()
.and_then(|c| c.principal.clone())
.unwrap_or_else(|| self.principal.to_string());
let bytes = payload.len() as u64;
let topic_for_audit = topic.clone();
let result = publish_inner(self, topic, payload, principal_str);
audit_ipc(
self,
"astrid:ipc/host.publish",
&topic_for_audit,
bytes,
&result,
);
result
}
fn publish_as(
&mut self,
topic: String,
payload: String,
principal: String,
) -> Result<(), ErrorCode> {
if !self.has_uplink_capability {
return Err(ErrorCode::CapabilityDenied);
}
if astrid_core::principal::PrincipalId::new(&principal).is_err() {
return Err(ErrorCode::InvalidInput);
}
let bytes = payload.len() as u64;
let topic_for_audit = topic.clone();
let result = publish_inner(self, topic, payload, principal);
audit_ipc(
self,
"astrid:ipc/host.publish-as",
&topic_for_audit,
bytes,
&result,
);
result
}
fn subscribe(&mut self, topic_pattern: String) -> Result<Resource<Subscription>, ErrorCode> {
if topic_pattern.len() > 256 {
return Err(ErrorCode::InvalidInput);
}
if !crate::topic::has_valid_segments(&topic_pattern) {
return Err(ErrorCode::InvalidInput);
}
{
let mut segments = topic_pattern.split('.');
#[expect(clippy::search_is_some)]
if segments.position(|s| s == "*").is_some() && segments.next().is_some() {
return Err(ErrorCode::InvalidInput);
}
}
if topic_pattern.split('.').count() > 8 {
return Err(ErrorCode::InvalidInput);
}
check_subscribe_acl(self, &topic_pattern)?;
if self.subscription_count >= MAX_SUBSCRIPTIONS {
return Err(ErrorCode::Quota);
}
let covers_audit = pattern_covers_audit(&topic_pattern);
let scope: Option<astrid_events::PrincipalKey> = if covers_audit && !self.audit_firehose {
Some(Some(self.principal.to_string()))
} else {
None
};
if covers_audit {
tracing::info!(
target: "astrid.audit.ipc",
security_event = true,
capsule_id = %self.capsule_id.as_str(),
principal = %self.principal,
topic = %topic_pattern,
scoped = scope.is_some(),
firehose = self.audit_firehose,
"ipc::subscribe: audit-covering subscription scoped to owner principal unless firehose holder",
);
}
let receiver = self.event_bus.subscribe_topic_routed_scoped(
self.capsule_uuid,
topic_pattern.clone(),
self.capsule_id.as_str().to_string(),
"capsule_guest",
scope,
);
let entry = SubscriptionEntry {
receiver: Arc::new(Mutex::new(receiver)),
topic_pattern: topic_pattern.clone(),
};
let res = self
.resource_table
.push(entry)
.map_err(|e| ErrorCode::Unknown(format!("resource table: {e}")))?;
self.subscription_count += 1;
let result: Result<Resource<Subscription>, ErrorCode> = Ok(Resource::new_own(res.rep()));
audit_ipc(
self,
"astrid:ipc/host.subscribe",
&topic_pattern,
0,
&result,
);
result
}
fn get_interceptor_bindings(&mut self) -> Result<Vec<InterceptorBinding>, ErrorCode> {
Ok(self
.interceptor_handles
.iter()
.map(|h| InterceptorBinding {
handle_id: h.handle_id,
action: h.action.clone(),
topic: h.topic.clone(),
})
.collect())
}
}
impl HostSubscription for HostState {
fn poll(&mut self, self_: Resource<Subscription>) -> Result<IpcEnvelope, ErrorCode> {
let rep = self_.rep();
let entry = self
.resource_table
.get_mut::<SubscriptionEntry>(&Resource::new_borrow(rep))
.map_err(|_| ErrorCode::Closed)?;
let topic_for_audit = entry.topic_pattern.clone();
let receiver_arc = Arc::clone(&entry.receiver);
let drained = {
let mut receiver = receiver_arc
.try_lock()
.expect("Subscription receiver Arc accessed across threads");
receiver.try_drain(MAX_DRAIN_BYTES)
};
let messages: Vec<InternalIpcMessage> =
drained.iter().filter_map(extract_message).collect();
let lagged = {
let mut receiver = receiver_arc
.try_lock()
.expect("Subscription receiver Arc accessed across threads");
receiver.drain_lagged()
};
if let Some(first) = messages.first() {
self.install_recv_invocation_context(first);
}
let count = messages.len() as u64;
let result: Result<IpcEnvelope, ErrorCode> = Ok(envelope_from(messages, lagged));
audit_ipc(
self,
"astrid:ipc/host.subscription.poll",
&topic_for_audit,
count,
&result,
);
result
}
async fn recv(
&mut self,
self_: Resource<Subscription>,
timeout_ms: u64,
) -> Result<IpcEnvelope, ErrorCode> {
self.recv_yielded = true;
let timeout_ms = timeout_ms.min(MAX_RECV_TIMEOUT_MS);
let rep = self_.rep();
let (receiver_arc, topic_for_audit) = {
let entry = self
.resource_table
.get_mut::<SubscriptionEntry>(&Resource::new_borrow(rep))
.map_err(|_| ErrorCode::Closed)?;
(Arc::clone(&entry.receiver), entry.topic_pattern.clone())
};
let cancel_token = self.cancel_token.clone();
let io_semaphore = self.io_semaphore.clone();
let receiver_for_wait = Arc::clone(&receiver_arc);
let first = util::bounded_await_cancellable(&io_semaphore, &cancel_token, async move {
let mut receiver = receiver_for_wait.lock().await;
receiver
.recv(Some(std::time::Duration::from_millis(timeout_ms)))
.await
})
.await
.flatten();
let mut messages: Vec<InternalIpcMessage> = Vec::new();
let mut consumed = 0usize;
if let Some(event) = first
&& let Some(msg) = extract_message(&event)
{
consumed = msg
.payload
.to_guest_bytes()
.map(|v| v.len())
.unwrap_or(0)
.saturating_add(msg.topic.len());
messages.push(msg);
}
let drained = {
let mut receiver = receiver_arc
.try_lock()
.expect("Subscription receiver Arc accessed across threads");
receiver.try_drain(MAX_DRAIN_BYTES.saturating_sub(consumed))
};
for event in &drained {
if let Some(msg) = extract_message(event) {
messages.push(msg);
}
}
let lagged = {
let mut receiver = receiver_arc
.try_lock()
.expect("Subscription receiver Arc accessed across threads");
receiver.drain_lagged()
};
if let Some(first) = messages.first() {
self.install_recv_invocation_context(first);
}
let count = messages.len() as u64;
let result: Result<IpcEnvelope, ErrorCode> = Ok(envelope_from(messages, lagged));
audit_ipc(
self,
"astrid:ipc/host.subscription.recv",
&topic_for_audit,
count,
&result,
);
result
}
fn subscribe_readiness(&mut self, _self_: Resource<Subscription>) -> Resource<DynPollable> {
super::stubs::always_ready_pollable(&mut self.resource_table)
}
fn drop(&mut self, rep: Resource<Subscription>) -> wasmtime::Result<()> {
if self
.resource_table
.delete::<SubscriptionEntry>(Resource::new_own(rep.rep()))
.is_ok()
{
self.subscription_count = self.subscription_count.saturating_sub(1);
}
Ok(())
}
}
#[cfg(test)]
mod audit_scope_tests {
use super::*;
use crate::engine::wasm::bindings::astrid::ipc::host::Host as IpcHost;
use crate::engine::wasm::test_fixtures::minimal_host_state;
#[test]
fn audit_topic_literal_pinned() {
assert_eq!(AUDIT_TOPIC, "astrid.v1.audit.entry");
}
#[test]
fn pattern_covers_audit_via_route_matcher() {
assert!(pattern_covers_audit("astrid.v1.audit.entry"));
assert!(pattern_covers_audit("astrid.v1.audit.*"));
assert!(pattern_covers_audit("astrid.v1.*"));
assert!(pattern_covers_audit("astrid.*"));
assert!(!pattern_covers_audit("astrid.v1.request.*"));
assert!(!pattern_covers_audit("astrid.v1.session.*"));
assert!(!pattern_covers_audit("astrid.v1.audit"));
assert!(!pattern_covers_audit("user.prompt"));
}
fn publish_audit(bus: &astrid_events::EventBus, principal: &str) {
let msg = InternalIpcMessage::new(
AUDIT_TOPIC,
IpcPayload::RawJson(serde_json::json!({ "principal": principal })),
uuid::Uuid::nil(),
)
.with_principal(principal.to_string());
bus.publish(AstridEvent::Ipc {
metadata: EventMetadata::new("test_kernel"),
message: msg,
});
}
fn drained_principals(state: &mut HostState, sub: &Resource<Subscription>) -> Vec<String> {
let envelope = HostSubscription::poll(state, Resource::new_borrow(sub.rep()))
.expect("poll should succeed");
envelope
.messages
.iter()
.map(|m| match &m.principal {
PrincipalAttribution::Verified(p) | PrincipalAttribution::Claimed(p) => p.clone(),
PrincipalAttribution::System => "<system>".to_string(),
})
.collect()
}
fn host_state_for(
rt: tokio::runtime::Handle,
owner: &str,
firehose: bool,
subscribe_acl: &[&str],
) -> HostState {
let mut state = minimal_host_state(rt);
state.principal = astrid_core::PrincipalId::new(owner).expect("valid principal");
state.audit_firehose = firehose;
state.ipc_subscribe_patterns = subscribe_acl.iter().map(|s| (*s).to_string()).collect();
state
}
#[tokio::test]
async fn subscribe_audit_default_is_scoped_regression() {
let rt = tokio::runtime::Handle::current();
let mut state = host_state_for(rt, "alice", false, &["astrid.v1.audit.entry"]);
let bus = state.event_bus.clone();
let sub = IpcHost::subscribe(&mut state, AUDIT_TOPIC.to_string())
.expect("subscribe should be allowed by the ACL");
for _ in 0..5 {
publish_audit(&bus, "alice");
}
for _ in 0..5 {
publish_audit(&bus, "bob");
}
let got = drained_principals(&mut state, &sub);
assert_eq!(got.len(), 5, "only alice's five entries are delivered");
assert!(
got.iter().all(|p| p == "alice"),
"no foreign-principal audit entry may leak, got: {got:?}"
);
}
#[tokio::test]
async fn subscribe_wildcard_superset_is_scoped() {
let rt = tokio::runtime::Handle::current();
let mut state = host_state_for(rt, "alice", false, &["astrid.v1.*"]);
let bus = state.event_bus.clone();
let sub = IpcHost::subscribe(&mut state, "astrid.v1.*".to_string())
.expect("subscribe should be allowed by the ACL");
publish_audit(&bus, "alice");
publish_audit(&bus, "bob");
let got = drained_principals(&mut state, &sub);
assert!(
got.iter().all(|p| p == "alice"),
"wildcard superset must not leak bob's audit entry, got: {got:?}"
);
assert_eq!(got.len(), 1);
}
#[tokio::test]
async fn subscribe_firehose_holder_unscoped() {
let rt = tokio::runtime::Handle::current();
let mut state = host_state_for(rt, "alice", true, &["astrid.v1.audit.entry"]);
let bus = state.event_bus.clone();
let sub = IpcHost::subscribe(&mut state, AUDIT_TOPIC.to_string())
.expect("subscribe should be allowed by the ACL");
publish_audit(&bus, "alice");
publish_audit(&bus, "bob");
let got = drained_principals(&mut state, &sub);
assert_eq!(got.len(), 2, "firehose holder receives both principals");
assert!(got.iter().any(|p| p == "alice"));
assert!(got.iter().any(|p| p == "bob"));
}
#[tokio::test]
async fn subscribe_non_audit_topic_unaffected() {
let rt = tokio::runtime::Handle::current();
let mut state = host_state_for(rt, "alice", false, &["astrid.v1.session.*"]);
let bus = state.event_bus.clone();
let sub = IpcHost::subscribe(&mut state, "astrid.v1.session.*".to_string())
.expect("subscribe should be allowed by the ACL");
for who in ["alice", "bob"] {
let msg = InternalIpcMessage::new(
"astrid.v1.session.update",
IpcPayload::RawJson(serde_json::json!({})),
uuid::Uuid::nil(),
)
.with_principal(who.to_string());
bus.publish(AstridEvent::Ipc {
metadata: EventMetadata::new("test"),
message: msg,
});
}
let got = drained_principals(&mut state, &sub);
assert_eq!(got.len(), 2, "non-audit fan-in delivers all principals");
assert!(got.iter().any(|p| p == "alice"));
assert!(got.iter().any(|p| p == "bob"));
}
#[tokio::test]
async fn subscribe_rejects_non_terminal_wildcard() {
let rt = tokio::runtime::Handle::current();
let acl = &[
"astrid.v1.admin.response.*",
"astrid.v1.admin.response.*.*",
"astrid.v1.admin.response.*.*.*",
];
let mut state = host_state_for(rt, "default", false, acl);
assert!(
matches!(
IpcHost::subscribe(&mut state, "astrid.v1.admin.response.*.*".to_string()),
Err(ErrorCode::InvalidInput)
),
"a non-terminal wildcard must be rejected even when declared in the ACL",
);
assert!(
matches!(
IpcHost::subscribe(&mut state, "astrid.v1.admin.response.*.*.*".to_string()),
Err(ErrorCode::InvalidInput)
),
"a deeper multi-wildcard must be rejected too",
);
assert!(
IpcHost::subscribe(&mut state, "astrid.v1.admin.response.*".to_string()).is_ok(),
"the single trailing wildcard the fix kept must be subscribable",
);
}
}