pub mod admin;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::Instant;
use astrid_audit::{AuditAction, AuditOutcome, AuthorizationProof};
use astrid_capabilities::{CapabilityCheck, PermissionError};
use astrid_core::principal::PrincipalId;
use astrid_events::ipc::{IpcMessage, IpcPayload};
use astrid_events::kernel_api::{KernelRequest, KernelResponse};
use serde::Serialize;
use tracing::{debug, info, warn};
#[must_use]
pub(crate) fn spawn_kernel_router(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
drop(spawn_connection_tracker(Arc::clone(&kernel)));
drop(admin::spawn_admin_router(Arc::clone(&kernel)));
let mut receiver = kernel.event_bus.subscribe_topic("astrid.v1.request.*");
tokio::spawn(async move {
let mut rate_limiter = ManagementRateLimiter::new();
while let Some(event) = receiver.recv().await {
let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
continue;
};
let IpcPayload::RawJson(val) = &message.payload else {
continue;
};
match serde_json::from_value::<KernelRequest>(val.clone()) {
Ok(req) => {
let (method, limit) = rate_limit_for_request(&req);
if let Some(max) = limit
&& !rate_limiter.check(method, max)
{
warn!(
security_event = true,
method = method,
"Rate limited kernel management request"
);
let response_topic =
message.topic.replace("kernel.request.", "kernel.response.");
publish_response(
&kernel,
response_topic,
KernelResponse::Error(format!(
"Rate limited: max {max} {method} requests per minute"
)),
);
continue;
}
let caller = resolve_caller(message);
handle_request(&kernel, message.topic.clone(), caller, req).await;
},
Err(e) => {
warn!(error = %e, topic = %message.topic, "Failed to parse KernelRequest from IPC");
},
}
}
})
}
fn spawn_connection_tracker(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
let mut receiver = kernel.event_bus.subscribe_topic("client.v1.*");
tokio::spawn(async move {
while let Some(event) = receiver.recv().await {
let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
continue;
};
let principal = message
.principal
.as_deref()
.and_then(|p| astrid_core::principal::PrincipalId::new(p).ok())
.unwrap_or_default();
match &message.payload {
IpcPayload::Disconnect { reason } => {
kernel.connection_closed(&principal);
debug!(%principal, reason = ?reason, "Client disconnected");
},
IpcPayload::Connect => {
kernel.connection_opened(&principal);
debug!(%principal, "New client connection accepted");
},
_ => {},
}
}
})
}
#[expect(clippy::too_many_lines)]
async fn handle_request(
kernel: &Arc<crate::Kernel>,
topic: String,
caller: PrincipalId,
req: KernelRequest,
) {
let response_topic = if let Some(suffix) = topic.strip_prefix("astrid.v1.request.") {
format!("astrid.v1.response.{suffix}")
} else {
topic.clone()
};
let method = kernel_request_method(&req);
let scope = resolve_scope(&req, &caller);
let required_cap = required_capability(&req, scope);
match authorize_request(kernel, &caller, required_cap) {
Ok(()) => {
record_admin_audit(
kernel,
AdminAuditEntry {
caller: &caller,
method,
required_cap,
target_principal: None,
params: None,
authorization: AuthorizationProof::System {
reason: format!("policy allow: {caller} holds {required_cap}"),
},
outcome: AuditOutcome::success(),
},
);
},
Err(e) => {
warn!(
security_event = true,
method = method,
principal = %caller,
required = required_cap,
"Permission check denied admin request"
);
record_admin_audit(
kernel,
AdminAuditEntry {
caller: &caller,
method,
required_cap,
target_principal: None,
params: None,
authorization: AuthorizationProof::Denied {
reason: e.to_string(),
},
outcome: AuditOutcome::failure(e.to_string()),
},
);
publish_response(kernel, response_topic, KernelResponse::Error(e.to_string()));
return;
},
}
let res = match req {
KernelRequest::InstallCapsule { source, workspace } => {
info!(source = %source, workspace, "Kernel received install request");
KernelResponse::Error(
"Installation logic not yet implemented in kernel router".to_string(),
)
},
KernelRequest::ApproveCapability {
request_id,
signature: _,
} => {
info!(request_id = %request_id, "Kernel received capability approval");
KernelResponse::Error("Approval logic not yet implemented in kernel router".to_string())
},
KernelRequest::ListCapsules => {
let reg = kernel.capsules.read().await;
let mut list = Vec::new();
for c in reg.list() {
list.push(c.to_string());
}
KernelResponse::Success(serde_json::json!(list))
},
KernelRequest::GetCommands => {
let reg = kernel.capsules.read().await;
let mut commands = Vec::new();
for c in reg.values() {
for cmd in &c.manifest().commands {
commands.push(astrid_events::kernel_api::CommandInfo {
name: cmd.name.clone(),
description: cmd
.description
.clone()
.unwrap_or_else(|| "No description".to_string()),
provider_capsule: c.id().to_string(),
});
}
}
info!(
count = commands.len(),
capsules = reg.len(),
"GetCommands: returning {} commands from {} capsules",
commands.len(),
reg.len()
);
KernelResponse::Commands(commands)
},
KernelRequest::ReloadCapsules => {
{
let reg = kernel.capsules.read().await;
let failed_ids: Vec<_> = reg
.list()
.into_iter()
.filter(|id| {
reg.get(id).is_some_and(|c| {
matches!(c.state(), astrid_capsule::capsule::CapsuleState::Failed(_))
})
})
.cloned()
.collect();
drop(reg);
let mut reg = kernel.capsules.write().await;
for id in failed_ids {
let _ = reg.unregister(&id);
}
}
kernel.load_all_capsules().await;
KernelResponse::Success(serde_json::json!({"status": "reloaded"}))
},
KernelRequest::Shutdown { reason } => {
info!(
reason = reason.as_deref().unwrap_or("none"),
"Kernel received shutdown request via management API"
);
publish_response(
kernel,
response_topic.clone(),
KernelResponse::Success(serde_json::json!({"status": "shutting_down"})),
);
let _ = kernel.shutdown_tx.send(true);
return;
},
KernelRequest::GetStatus => {
let uptime = kernel.boot_time.elapsed().as_secs();
let reg = kernel.capsules.read().await;
let loaded: Vec<String> = reg.list().iter().map(ToString::to_string).collect();
let by_principal = kernel
.connections_by_principal()
.into_iter()
.map(
|(p, c)| astrid_events::kernel_api::PrincipalConnectionCount {
principal: p.to_string(),
count: u32::try_from(c).unwrap_or(u32::MAX),
},
)
.collect();
let status = astrid_events::kernel_api::DaemonStatus {
pid: std::process::id(),
uptime_secs: uptime,
version: env!("CARGO_PKG_VERSION").to_string(),
ephemeral: false, connected_clients: u32::try_from(kernel.total_connection_count())
.unwrap_or(u32::MAX),
connections_by_principal: by_principal,
loaded_capsules: loaded,
};
KernelResponse::Status(status)
},
KernelRequest::GetCapsuleMetadata => {
let reg = kernel.capsules.read().await;
let mut entries = Vec::new();
for capsule in reg.values() {
let manifest = capsule.manifest();
entries.push(astrid_events::kernel_api::CapsuleMetadataEntry {
name: manifest.package.name.clone(),
interceptor_events: manifest
.interceptors
.iter()
.map(|i| i.event.clone())
.collect(),
});
}
KernelResponse::CapsuleMetadata(entries)
},
};
publish_response(kernel, response_topic, res);
}
fn publish_response<R: Serialize>(kernel: &Arc<crate::Kernel>, response_topic: String, res: R) {
if let Ok(val) = serde_json::to_value(res) {
let msg = IpcMessage::new(
response_topic,
IpcPayload::RawJson(val),
kernel.session_id.0,
);
let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("kernel_router"),
message: msg,
});
}
}
struct ManagementRateLimiter {
buckets: HashMap<&'static str, VecDeque<Instant>>,
}
impl ManagementRateLimiter {
fn new() -> Self {
Self {
buckets: HashMap::new(),
}
}
fn check(&mut self, method: &'static str, max_per_minute: u32) -> bool {
let now = Instant::now();
let window = std::time::Duration::from_secs(60);
let timestamps = self.buckets.entry(method).or_default();
while let Some(&oldest) = timestamps.front() {
if now.saturating_duration_since(oldest) >= window {
timestamps.pop_front();
} else {
break;
}
}
if timestamps.len() >= max_per_minute as usize {
return false;
}
timestamps.push_back(now);
true
}
}
fn rate_limit_for_request(req: &KernelRequest) -> (&'static str, Option<u32>) {
(kernel_request_method(req), rate_limit_max(req))
}
fn rate_limit_max(req: &KernelRequest) -> Option<u32> {
match req {
KernelRequest::ReloadCapsules => Some(5),
KernelRequest::InstallCapsule { .. } | KernelRequest::ApproveCapability { .. } => Some(10),
KernelRequest::Shutdown { .. } => Some(1),
KernelRequest::ListCapsules
| KernelRequest::GetCommands
| KernelRequest::GetCapsuleMetadata
| KernelRequest::GetStatus => None,
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AuthorityScope {
Self_,
Global,
}
#[must_use]
pub fn resolve_scope(_req: &KernelRequest, _caller: &PrincipalId) -> AuthorityScope {
AuthorityScope::Self_
}
#[must_use]
pub fn required_capability(req: &KernelRequest, scope: AuthorityScope) -> &'static str {
match (req, scope) {
(KernelRequest::Shutdown { .. }, _) => "system:shutdown",
(KernelRequest::GetStatus, _) => "system:status",
(KernelRequest::ReloadCapsules, AuthorityScope::Self_) => "self:capsule:reload",
(KernelRequest::ReloadCapsules, _) => "capsule:reload",
(KernelRequest::InstallCapsule { .. }, AuthorityScope::Self_) => "self:capsule:install",
(KernelRequest::InstallCapsule { .. }, _) => "capsule:install",
(
KernelRequest::ListCapsules
| KernelRequest::GetCommands
| KernelRequest::GetCapsuleMetadata,
AuthorityScope::Self_,
) => "self:capsule:list",
(
KernelRequest::ListCapsules
| KernelRequest::GetCommands
| KernelRequest::GetCapsuleMetadata,
_,
) => "capsule:list",
(KernelRequest::ApproveCapability { .. }, _) => "self:approval:respond",
}
}
#[must_use]
pub fn kernel_request_method(req: &KernelRequest) -> &'static str {
match req {
KernelRequest::ReloadCapsules => "ReloadCapsules",
KernelRequest::InstallCapsule { .. } => "InstallCapsule",
KernelRequest::ApproveCapability { .. } => "ApproveCapability",
KernelRequest::ListCapsules => "ListCapsules",
KernelRequest::GetCommands => "GetCommands",
KernelRequest::GetCapsuleMetadata => "GetCapsuleMetadata",
KernelRequest::Shutdown { .. } => "Shutdown",
KernelRequest::GetStatus => "GetStatus",
}
}
fn resolve_caller(message: &IpcMessage) -> PrincipalId {
message
.principal
.as_deref()
.and_then(|p| PrincipalId::new(p).ok())
.unwrap_or_default()
}
fn authorize_request(
kernel: &crate::Kernel,
caller: &PrincipalId,
required_cap: &str,
) -> Result<(), PermissionError> {
let profile = match kernel.profile_cache.resolve(caller) {
Ok(p) => p,
Err(e) => {
warn!(
security_event = true,
principal = %caller,
error = %e,
"Profile resolution failed — fail-closed deny"
);
return Err(PermissionError::MissingCapability {
principal: caller.clone(),
required: required_cap.to_string(),
});
},
};
if !profile.enabled {
warn!(
security_event = true,
principal = %caller,
required = required_cap,
"Disabled principal denied — fail-closed enforcement"
);
return Err(PermissionError::PrincipalDisabled {
principal: caller.clone(),
});
}
let groups = kernel.groups.load_full();
let check = CapabilityCheck::new(profile.as_ref(), groups.as_ref(), caller.clone());
check.require(required_cap)
}
pub(crate) struct AdminAuditEntry<'a> {
pub caller: &'a PrincipalId,
pub method: &'a str,
pub required_cap: &'a str,
pub target_principal: Option<PrincipalId>,
pub params: Option<serde_json::Value>,
pub authorization: AuthorizationProof,
pub outcome: AuditOutcome,
}
fn record_admin_audit(kernel: &crate::Kernel, entry: AdminAuditEntry<'_>) {
let AdminAuditEntry {
caller,
method,
required_cap,
target_principal,
params,
authorization,
outcome,
} = entry;
let action = AuditAction::AdminRequest {
method: method.to_string(),
required_capability: required_cap.to_string(),
target_principal,
params,
};
if let Err(e) = kernel.audit_log.append_with_principal(
kernel.session_id.clone(),
caller.clone(),
action,
authorization,
outcome,
) {
warn!(
security_event = true,
principal = %caller,
method = method,
error = %e,
"Failed to persist admin-request audit entry — continuing"
);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rate_limiter_allows_within_limit() {
let mut limiter = ManagementRateLimiter::new();
for _ in 0..5 {
assert!(limiter.check("ReloadCapsules", 5));
}
assert!(!limiter.check("ReloadCapsules", 5));
}
#[test]
fn rate_limiter_independent_buckets() {
let mut limiter = ManagementRateLimiter::new();
for _ in 0..5 {
assert!(limiter.check("ReloadCapsules", 5));
}
assert!(!limiter.check("ReloadCapsules", 5));
assert!(limiter.check("InstallCapsule", 10));
}
#[test]
fn rate_limiter_sliding_window_eviction() {
let mut limiter = ManagementRateLimiter::new();
for _ in 0..5 {
assert!(limiter.check("ReloadCapsules", 5));
}
assert!(!limiter.check("ReloadCapsules", 5));
if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
let past = Instant::now() - std::time::Duration::from_secs(61);
for ts in timestamps.iter_mut() {
*ts = past;
}
}
assert!(limiter.check("ReloadCapsules", 5));
}
#[test]
fn rate_limiter_sliding_window_prevents_boundary_burst() {
let mut limiter = ManagementRateLimiter::new();
for _ in 0..5 {
assert!(limiter.check("ReloadCapsules", 5));
}
if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
let past = Instant::now() - std::time::Duration::from_secs(61);
for ts in timestamps.iter_mut().take(3) {
*ts = past;
}
}
for _ in 0..3 {
assert!(limiter.check("ReloadCapsules", 5));
}
assert!(!limiter.check("ReloadCapsules", 5));
}
#[test]
fn rate_limit_for_request_returns_correct_limits() {
let (name, limit) = rate_limit_for_request(&KernelRequest::ReloadCapsules);
assert_eq!(name, "ReloadCapsules");
assert_eq!(limit, Some(5));
let (name, limit) = rate_limit_for_request(&KernelRequest::ListCapsules);
assert_eq!(name, "ListCapsules");
assert_eq!(limit, None);
}
fn all_request_variants() -> Vec<KernelRequest> {
vec![
KernelRequest::Shutdown { reason: None },
KernelRequest::GetStatus,
KernelRequest::ReloadCapsules,
KernelRequest::InstallCapsule {
source: "x".to_string(),
workspace: false,
},
KernelRequest::ListCapsules,
KernelRequest::GetCommands,
KernelRequest::GetCapsuleMetadata,
KernelRequest::ApproveCapability {
request_id: "r".to_string(),
signature: "s".to_string(),
},
]
}
#[test]
fn required_capability_every_variant_has_non_empty_mapping() {
for req in all_request_variants() {
let cap = required_capability(&req, AuthorityScope::Self_);
assert!(
!cap.is_empty(),
"required_capability returned empty for {req:?}"
);
}
}
#[test]
fn required_capability_mapping_per_variant_self_scope() {
assert_eq!(
required_capability(
&KernelRequest::Shutdown { reason: None },
AuthorityScope::Self_
),
"system:shutdown"
);
assert_eq!(
required_capability(&KernelRequest::GetStatus, AuthorityScope::Self_),
"system:status"
);
assert_eq!(
required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Self_),
"self:capsule:reload"
);
assert_eq!(
required_capability(
&KernelRequest::InstallCapsule {
source: String::new(),
workspace: false
},
AuthorityScope::Self_
),
"self:capsule:install"
);
assert_eq!(
required_capability(&KernelRequest::ListCapsules, AuthorityScope::Self_),
"self:capsule:list"
);
assert_eq!(
required_capability(&KernelRequest::GetCommands, AuthorityScope::Self_),
"self:capsule:list"
);
assert_eq!(
required_capability(&KernelRequest::GetCapsuleMetadata, AuthorityScope::Self_),
"self:capsule:list"
);
assert_eq!(
required_capability(
&KernelRequest::ApproveCapability {
request_id: String::new(),
signature: String::new(),
},
AuthorityScope::Self_
),
"self:approval:respond"
);
}
#[test]
fn required_capability_mapping_global_scope() {
assert_eq!(
required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Global),
"capsule:reload"
);
assert_eq!(
required_capability(
&KernelRequest::InstallCapsule {
source: String::new(),
workspace: false
},
AuthorityScope::Global
),
"capsule:install"
);
assert_eq!(
required_capability(&KernelRequest::ListCapsules, AuthorityScope::Global),
"capsule:list"
);
assert_eq!(
required_capability(
&KernelRequest::Shutdown { reason: None },
AuthorityScope::Global
),
"system:shutdown"
);
}
#[test]
fn resolve_scope_defaults_to_self() {
let caller = PrincipalId::new("alice").unwrap();
for req in all_request_variants() {
assert_eq!(
resolve_scope(&req, &caller),
AuthorityScope::Self_,
"scope should default to Self_ for today's variants"
);
}
}
#[test]
fn resolve_caller_uses_ipc_principal_when_present() {
let mut msg = IpcMessage::new(
"astrid.v1.request.system",
IpcPayload::RawJson(serde_json::json!({})),
uuid::Uuid::nil(),
);
msg.principal = Some("alice".to_string());
let caller = resolve_caller(&msg);
assert_eq!(caller.as_str(), "alice");
}
#[test]
fn resolve_caller_falls_back_to_default_when_missing() {
let msg = IpcMessage::new(
"astrid.v1.request.system",
IpcPayload::RawJson(serde_json::json!({})),
uuid::Uuid::nil(),
);
let caller = resolve_caller(&msg);
assert_eq!(caller, PrincipalId::default());
}
#[test]
fn resolve_caller_falls_back_to_default_on_invalid_principal() {
let mut msg = IpcMessage::new(
"astrid.v1.request.system",
IpcPayload::RawJson(serde_json::json!({})),
uuid::Uuid::nil(),
);
msg.principal = Some("alice@evil.example".to_string());
let caller = resolve_caller(&msg);
assert_eq!(caller, PrincipalId::default());
}
}