use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{Semaphore, mpsc, watch};
use tokio_util::sync::CancellationToken;
use crate::capsule::CapsuleId;
use astrid_core::uplink::{InboundMessage, MAX_UPLINKS_PER_CAPSULE, UplinkDescriptor};
use astrid_storage::ScopedKvStore;
use astrid_storage::secret::SecretStore;
#[derive(Debug, Clone)]
pub enum NetStream {
Unix(Arc<tokio::sync::Mutex<tokio::net::UnixStream>>),
Tcp(TcpStreamSlot),
}
#[derive(Debug, Clone)]
pub struct TcpStreamSlot {
pub stream: Arc<tokio::sync::Mutex<tokio::net::TcpStream>>,
pub read_timeout: Option<std::time::Duration>,
pub write_timeout: Option<std::time::Duration>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum LifecyclePhase {
Install,
Upgrade,
}
#[derive(Debug, Clone, serde::Serialize)]
pub struct InterceptorHandle {
pub handle_id: u64,
pub action: String,
pub topic: String,
}
use crate::engine::wasm::host::process::ProcessTracker;
use crate::security::CapsuleSecurityGate;
pub struct PrincipalMount {
pub root: PathBuf,
pub vfs: Arc<dyn astrid_vfs::Vfs>,
pub handle: astrid_capabilities::DirHandle,
}
impl std::fmt::Debug for PrincipalMount {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PrincipalMount")
.field("root", &self.root)
.field("handle", &self.handle)
.finish_non_exhaustive()
}
}
pub struct HostState {
pub wasi_ctx: wasmtime_wasi::WasiCtx,
pub resource_table: wasmtime::component::ResourceTable,
pub store_limits: wasmtime::StoreLimits,
pub principal: astrid_core::principal::PrincipalId,
pub capsule_id: CapsuleId,
pub capsule_log: Option<Arc<std::sync::Mutex<std::fs::File>>>,
pub caller_context: Option<astrid_events::ipc::IpcMessage>,
pub capsule_uuid: uuid::Uuid,
pub workspace_root: PathBuf,
pub vfs: Arc<dyn astrid_vfs::Vfs>,
pub vfs_root_handle: astrid_capabilities::DirHandle,
pub home: Option<PrincipalMount>,
pub tmp: Option<PrincipalMount>,
pub overlay_vfs: Option<Arc<astrid_vfs::OverlayVfs>>,
pub upper_dir: Option<Arc<tempfile::TempDir>>,
pub kv: ScopedKvStore,
pub invocation_kv: Option<ScopedKvStore>,
pub invocation_home: Option<PrincipalMount>,
pub invocation_tmp: Option<PrincipalMount>,
pub invocation_secret_store: Option<Arc<dyn SecretStore>>,
pub invocation_capsule_log: Option<Arc<std::sync::Mutex<std::fs::File>>>,
pub invocation_profile: Option<Arc<astrid_core::profile::PrincipalProfile>>,
pub event_bus: astrid_events::EventBus,
pub ipc_limiter: astrid_events::ipc::IpcRateLimiter,
pub config: HashMap<String, serde_json::Value>,
pub secret_env: std::collections::HashSet<String>,
pub ipc_publish_patterns: Vec<String>,
pub ipc_subscribe_patterns: Vec<String>,
pub security: Option<Arc<dyn CapsuleSecurityGate>>,
pub hook_manager: Option<Arc<dyn std::any::Any + Send + Sync>>,
pub capsule_registry: Option<Arc<tokio::sync::RwLock<crate::registry::CapsuleRegistry>>>,
pub runtime_handle: tokio::runtime::Handle,
pub has_uplink_capability: bool,
pub inbound_tx: Option<mpsc::Sender<InboundMessage>>,
pub registered_uplinks: Vec<UplinkDescriptor>,
pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
pub lifecycle_phase: Option<LifecyclePhase>,
pub secret_store: Arc<dyn SecretStore>,
pub ready_tx: Option<watch::Sender<bool>>,
pub host_semaphore: Arc<Semaphore>,
pub cancel_token: CancellationToken,
pub session_token: Option<std::sync::Arc<astrid_core::session_token::SessionToken>>,
pub interceptor_handles: Vec<InterceptorHandle>,
pub allowance_store: Option<std::sync::Arc<astrid_approval::AllowanceStore>>,
pub identity_store: Option<std::sync::Arc<dyn astrid_storage::IdentityStore>>,
pub active_http_streams: HashMap<u64, crate::engine::wasm::host::http::ActiveHttpStream>,
pub next_http_stream_id: u64,
pub process_tracker: Arc<ProcessTracker>,
pub net_stream_count: usize,
pub subscription_count: usize,
pub process_count_total: usize,
pub process_count_by_principal:
std::collections::HashMap<astrid_core::principal::PrincipalId, usize>,
}
impl wasmtime_wasi::WasiView for HostState {
fn ctx(&mut self) -> wasmtime_wasi::WasiCtxView<'_> {
wasmtime_wasi::WasiCtxView {
ctx: &mut self.wasi_ctx,
table: &mut self.resource_table,
}
}
}
impl HostState {
pub fn register_uplink(&mut self, descriptor: UplinkDescriptor) -> Result<(), &'static str> {
if self.registered_uplinks.len() >= MAX_UPLINKS_PER_CAPSULE {
return Err("uplink registration limit reached");
}
let duplicate = self
.registered_uplinks
.iter()
.any(|c| c.name == descriptor.name && c.platform == descriptor.platform);
if duplicate {
return Err("duplicate uplink name and platform");
}
self.registered_uplinks.push(descriptor);
Ok(())
}
#[must_use]
pub fn uplinks(&self) -> &[UplinkDescriptor] {
&self.registered_uplinks
}
#[must_use]
pub fn default_host_semaphore() -> Arc<Semaphore> {
Arc::new(Semaphore::new(
std::thread::available_parallelism()
.map(|n| n.get().saturating_sub(2).max(2))
.unwrap_or(2),
))
}
pub fn set_inbound_tx(&mut self, tx: mpsc::Sender<InboundMessage>) {
self.inbound_tx = Some(tx);
}
#[must_use]
pub fn principal_kv_namespace(&self) -> String {
format!("{}:capsule:{}", self.principal, self.capsule_id)
}
#[must_use]
pub fn effective_kv(&self) -> &ScopedKvStore {
#[cfg(debug_assertions)]
self.debug_assert_invocation_field_set(self.invocation_kv.is_some(), "invocation_kv");
self.invocation_kv.as_ref().unwrap_or(&self.kv)
}
#[cfg(debug_assertions)]
fn debug_assert_invocation_field_set(&self, is_set: bool, field_name: &str) {
let principal_mismatches = self
.caller_context
.as_ref()
.and_then(|m| m.principal.as_deref())
.and_then(|p| astrid_core::PrincipalId::new(p).ok())
.is_some_and(|p| p != self.principal);
if principal_mismatches && !is_set {
debug_assert!(
false,
"invocation principal differs from capsule owner ({owner}) but {field_name} is None — \
effective_* accessor would fall back to the owner's resource, leaking reads/writes",
owner = self.principal,
);
}
}
#[must_use]
pub fn effective_home(&self) -> Option<&PrincipalMount> {
self.invocation_home.as_ref().or(self.home.as_ref())
}
#[must_use]
pub fn effective_tmp(&self) -> Option<&PrincipalMount> {
self.invocation_tmp.as_ref().or(self.tmp.as_ref())
}
#[must_use]
pub fn effective_home_root_buf(&self) -> Option<PathBuf> {
self.effective_home().map(|m| m.root.clone())
}
#[must_use]
pub fn effective_secret_store(&self) -> &Arc<dyn SecretStore> {
#[cfg(debug_assertions)]
self.debug_assert_invocation_field_set(
self.invocation_secret_store.is_some(),
"invocation_secret_store",
);
self.invocation_secret_store
.as_ref()
.unwrap_or(&self.secret_store)
}
#[must_use]
pub fn effective_capsule_log(&self) -> Option<&Arc<std::sync::Mutex<std::fs::File>>> {
self.invocation_capsule_log
.as_ref()
.or(self.capsule_log.as_ref())
}
#[must_use]
pub fn effective_principal(&self) -> astrid_core::principal::PrincipalId {
self.caller_context
.as_ref()
.and_then(|m| m.principal.as_deref())
.and_then(|p| astrid_core::principal::PrincipalId::new(p).ok())
.unwrap_or_else(|| self.principal.clone())
}
#[must_use]
pub fn effective_profile(&self) -> &astrid_core::profile::PrincipalProfile {
match self.invocation_profile.as_deref() {
Some(p) => p,
None => astrid_core::profile::PrincipalProfile::default_ref(),
}
}
pub(crate) fn install_recv_invocation_context(&mut self, msg: &astrid_events::ipc::IpcMessage) {
let new_principal = msg.principal.clone();
let existing_principal = self
.caller_context
.as_ref()
.and_then(|c| c.principal.clone());
if new_principal == existing_principal {
self.caller_context = Some(msg.clone());
return;
}
self.caller_context = Some(msg.clone());
let invocation_principal: Option<astrid_core::PrincipalId> = msg
.principal
.as_deref()
.and_then(|p| astrid_core::PrincipalId::new(p).ok())
.filter(|p| *p != self.principal);
let Some(p) = invocation_principal else {
self.invocation_kv = None;
self.invocation_capsule_log = None;
return;
};
let ns = format!("{}:capsule:{}", p, self.capsule_id);
self.invocation_kv = match self.kv.with_namespace(&ns) {
Ok(kv) => Some(kv),
Err(e) => {
tracing::warn!(
principal = %p,
error = %e,
"Failed to create invocation KV scope on ipc::recv path"
);
None
},
};
self.invocation_capsule_log = super::open_capsule_log(&p, self.capsule_id.as_str(), false);
}
pub(crate) fn clear_recv_invocation_context(&mut self) {
self.caller_context = None;
self.invocation_kv = None;
self.invocation_capsule_log = None;
}
}
impl std::fmt::Debug for HostState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("HostState")
.field("capsule_id", &self.capsule_id)
.field("workspace_root", &self.workspace_root)
.field("vfs_root_handle", &self.vfs_root_handle)
.field("has_home", &self.home.is_some())
.field("has_tmp", &self.tmp.is_some())
.field("has_security", &self.security.is_some())
.field("has_uplink_capability", &self.has_uplink_capability)
.field("has_inbound_tx", &self.inbound_tx.is_some())
.field("registered_uplinks", &self.registered_uplinks.len())
.field(
"host_semaphore_permits",
&self.host_semaphore.available_permits(),
)
.field("cancel_token_cancelled", &self.cancel_token.is_cancelled())
.field("has_identity_store", &self.identity_store.is_some())
.field("active_http_streams", &self.active_http_streams.len())
.field("process_tracker", &self.process_tracker)
.finish_non_exhaustive()
}
}
#[cfg(test)]
#[path = "host_state_tests.rs"]
mod tests;