1pub mod admin;
3
4use std::collections::{HashMap, VecDeque};
5use std::sync::Arc;
6use std::time::Instant;
7
8use astrid_audit::{AuditAction, AuditOutcome, AuthorizationProof};
9use astrid_capabilities::{CapabilityCheck, PermissionError};
10use astrid_core::principal::PrincipalId;
11use astrid_events::ipc::{IpcMessage, IpcPayload};
12use astrid_events::kernel_api::{KernelRequest, KernelResponse};
13use serde::Serialize;
14use tracing::{debug, info, warn};
15
16#[must_use]
26pub(crate) fn spawn_kernel_router(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
27 drop(spawn_connection_tracker(Arc::clone(&kernel)));
29 drop(admin::spawn_admin_router(Arc::clone(&kernel)));
31
32 let mut receiver = kernel.event_bus.subscribe_topic("astrid.v1.request.*");
33
34 tokio::spawn(async move {
35 let mut rate_limiter = ManagementRateLimiter::new();
36
37 while let Some(event) = receiver.recv().await {
38 let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
39 continue;
40 };
41
42 let IpcPayload::RawJson(val) = &message.payload else {
44 continue;
45 };
46
47 match serde_json::from_value::<KernelRequest>(val.clone()) {
48 Ok(req) => {
49 let (method, limit) = rate_limit_for_request(&req);
50 if let Some(max) = limit
51 && !rate_limiter.check(method, max)
52 {
53 warn!(
54 security_event = true,
55 method = method,
56 "Rate limited kernel management request"
57 );
58 let response_topic =
59 message.topic.replace("kernel.request.", "kernel.response.");
60 publish_response(
61 &kernel,
62 response_topic,
63 KernelResponse::Error(format!(
64 "Rate limited: max {max} {method} requests per minute"
65 )),
66 );
67 continue;
68 }
69 let caller = resolve_caller(message);
70 handle_request(&kernel, message.topic.clone(), caller, req).await;
71 },
72 Err(e) => {
73 warn!(error = %e, topic = %message.topic, "Failed to parse KernelRequest from IPC");
74 },
75 }
76 }
77 })
78}
79
80fn spawn_connection_tracker(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
86 let mut receiver = kernel.event_bus.subscribe_topic("client.v1.*");
87
88 tokio::spawn(async move {
89 while let Some(event) = receiver.recv().await {
90 let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
91 continue;
92 };
93 let principal = message
99 .principal
100 .as_deref()
101 .and_then(|p| astrid_core::principal::PrincipalId::new(p).ok())
102 .unwrap_or_default();
103 match &message.payload {
104 IpcPayload::Disconnect { reason } => {
105 kernel.connection_closed(&principal);
106 debug!(%principal, reason = ?reason, "Client disconnected");
107 },
108 IpcPayload::Connect => {
109 kernel.connection_opened(&principal);
110 debug!(%principal, "New client connection accepted");
111 },
112 _ => {},
113 }
114 }
115 })
116}
117
118#[expect(clippy::too_many_lines)]
119async fn handle_request(
120 kernel: &Arc<crate::Kernel>,
121 topic: String,
122 caller: PrincipalId,
123 req: KernelRequest,
124) {
125 let response_topic = if let Some(suffix) = topic.strip_prefix("astrid.v1.request.") {
126 format!("astrid.v1.response.{suffix}")
127 } else {
128 topic.clone()
129 };
130
131 let method = kernel_request_method(&req);
136 let scope = resolve_scope(&req, &caller);
137 let required_cap = required_capability(&req, scope);
138 match authorize_request(kernel, &caller, required_cap) {
139 Ok(()) => {
140 record_admin_audit(
141 kernel,
142 AdminAuditEntry {
143 caller: &caller,
144 method,
145 required_cap,
146 target_principal: None,
147 params: None,
148 authorization: AuthorizationProof::System {
149 reason: format!("policy allow: {caller} holds {required_cap}"),
150 },
151 outcome: AuditOutcome::success(),
152 },
153 );
154 },
155 Err(e) => {
156 warn!(
157 security_event = true,
158 method = method,
159 principal = %caller,
160 required = required_cap,
161 "Permission check denied admin request"
162 );
163 record_admin_audit(
164 kernel,
165 AdminAuditEntry {
166 caller: &caller,
167 method,
168 required_cap,
169 target_principal: None,
170 params: None,
171 authorization: AuthorizationProof::Denied {
172 reason: e.to_string(),
173 },
174 outcome: AuditOutcome::failure(e.to_string()),
175 },
176 );
177 publish_response(kernel, response_topic, KernelResponse::Error(e.to_string()));
178 return;
179 },
180 }
181
182 let res = match req {
183 KernelRequest::InstallCapsule { source, workspace } => {
184 info!(source = %source, workspace, "Kernel received install request");
185 KernelResponse::Error(
188 "Installation logic not yet implemented in kernel router".to_string(),
189 )
190 },
191 KernelRequest::ApproveCapability {
192 request_id,
193 signature: _,
194 } => {
195 info!(request_id = %request_id, "Kernel received capability approval");
196 KernelResponse::Error("Approval logic not yet implemented in kernel router".to_string())
197 },
198 KernelRequest::ListCapsules => {
199 let reg = kernel.capsules.read().await;
200 let mut list = Vec::new();
201 for c in reg.list() {
202 list.push(c.to_string());
203 }
204 KernelResponse::Success(serde_json::json!(list))
205 },
206 KernelRequest::GetCommands => {
207 let reg = kernel.capsules.read().await;
208 let mut commands = Vec::new();
209 for c in reg.values() {
210 for cmd in &c.manifest().commands {
211 commands.push(astrid_events::kernel_api::CommandInfo {
212 name: cmd.name.clone(),
213 description: cmd
214 .description
215 .clone()
216 .unwrap_or_else(|| "No description".to_string()),
217 provider_capsule: c.id().to_string(),
218 });
219 }
220 }
221 info!(
222 count = commands.len(),
223 capsules = reg.len(),
224 "GetCommands: returning {} commands from {} capsules",
225 commands.len(),
226 reg.len()
227 );
228 KernelResponse::Commands(commands)
229 },
230 KernelRequest::ReloadCapsules => {
231 {
234 let reg = kernel.capsules.read().await;
235 let failed_ids: Vec<_> = reg
236 .list()
237 .into_iter()
238 .filter(|id| {
239 reg.get(id).is_some_and(|c| {
240 matches!(c.state(), astrid_capsule::capsule::CapsuleState::Failed(_))
241 })
242 })
243 .cloned()
244 .collect();
245 drop(reg);
246
247 let mut reg = kernel.capsules.write().await;
248 for id in failed_ids {
249 let _ = reg.unregister(&id);
250 }
251 }
252
253 kernel.load_all_capsules().await;
254 KernelResponse::Success(serde_json::json!({"status": "reloaded"}))
255 },
256 KernelRequest::Shutdown { reason } => {
257 info!(
258 reason = reason.as_deref().unwrap_or("none"),
259 "Kernel received shutdown request via management API"
260 );
261 publish_response(
263 kernel,
264 response_topic.clone(),
265 KernelResponse::Success(serde_json::json!({"status": "shutting_down"})),
266 );
267 let _ = kernel.shutdown_tx.send(true);
269 return;
271 },
272 KernelRequest::GetStatus => {
273 let uptime = kernel.boot_time.elapsed().as_secs();
274 let reg = kernel.capsules.read().await;
275 let loaded: Vec<String> = reg.list().iter().map(ToString::to_string).collect();
276 let by_principal = kernel
277 .connections_by_principal()
278 .into_iter()
279 .map(
280 |(p, c)| astrid_events::kernel_api::PrincipalConnectionCount {
281 principal: p.to_string(),
282 count: u32::try_from(c).unwrap_or(u32::MAX),
283 },
284 )
285 .collect();
286 let status = astrid_events::kernel_api::DaemonStatus {
287 pid: std::process::id(),
288 uptime_secs: uptime,
289 version: env!("CARGO_PKG_VERSION").to_string(),
290 ephemeral: false, connected_clients: u32::try_from(kernel.total_connection_count())
292 .unwrap_or(u32::MAX),
293 connections_by_principal: by_principal,
294 loaded_capsules: loaded,
295 };
296 KernelResponse::Status(status)
297 },
298 KernelRequest::GetCapsuleMetadata => {
299 let reg = kernel.capsules.read().await;
300 let mut entries = Vec::new();
301 for capsule in reg.values() {
302 let manifest = capsule.manifest();
303 entries.push(astrid_events::kernel_api::CapsuleMetadataEntry {
304 name: manifest.package.name.clone(),
305 interceptor_events: manifest
306 .interceptors
307 .iter()
308 .map(|i| i.event.clone())
309 .collect(),
310 });
311 }
312 KernelResponse::CapsuleMetadata(entries)
313 },
314 };
315
316 publish_response(kernel, response_topic, res);
317}
318
319fn publish_response<R: Serialize>(kernel: &Arc<crate::Kernel>, response_topic: String, res: R) {
320 if let Ok(val) = serde_json::to_value(res) {
321 let msg = IpcMessage::new(
322 response_topic,
323 IpcPayload::RawJson(val),
324 kernel.session_id.0,
325 );
326 let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
327 metadata: astrid_events::EventMetadata::new("kernel_router"),
328 message: msg,
329 });
330 }
331}
332
333struct ManagementRateLimiter {
342 buckets: HashMap<&'static str, VecDeque<Instant>>,
343}
344
345impl ManagementRateLimiter {
346 fn new() -> Self {
347 Self {
348 buckets: HashMap::new(),
349 }
350 }
351
352 fn check(&mut self, method: &'static str, max_per_minute: u32) -> bool {
355 let now = Instant::now();
356 let window = std::time::Duration::from_secs(60);
357 let timestamps = self.buckets.entry(method).or_default();
358
359 while let Some(&oldest) = timestamps.front() {
361 if now.saturating_duration_since(oldest) >= window {
362 timestamps.pop_front();
363 } else {
364 break;
365 }
366 }
367
368 if timestamps.len() >= max_per_minute as usize {
369 return false;
370 }
371 timestamps.push_back(now);
372 true
373 }
374}
375
376fn rate_limit_for_request(req: &KernelRequest) -> (&'static str, Option<u32>) {
379 (kernel_request_method(req), rate_limit_max(req))
380}
381
382fn rate_limit_max(req: &KernelRequest) -> Option<u32> {
384 match req {
385 KernelRequest::ReloadCapsules => Some(5),
386 KernelRequest::InstallCapsule { .. } | KernelRequest::ApproveCapability { .. } => Some(10),
387 KernelRequest::Shutdown { .. } => Some(1),
388 KernelRequest::ListCapsules
389 | KernelRequest::GetCommands
390 | KernelRequest::GetCapsuleMetadata
391 | KernelRequest::GetStatus => None,
392 }
393}
394
395#[derive(Debug, Clone, Copy, PartialEq, Eq)]
405pub enum AuthorityScope {
406 Self_,
408 Global,
410}
411
412#[must_use]
417pub fn resolve_scope(_req: &KernelRequest, _caller: &PrincipalId) -> AuthorityScope {
418 AuthorityScope::Self_
419}
420
421#[must_use]
428pub fn required_capability(req: &KernelRequest, scope: AuthorityScope) -> &'static str {
429 match (req, scope) {
430 (KernelRequest::Shutdown { .. }, _) => "system:shutdown",
431 (KernelRequest::GetStatus, _) => "system:status",
432 (KernelRequest::ReloadCapsules, AuthorityScope::Self_) => "self:capsule:reload",
433 (KernelRequest::ReloadCapsules, _) => "capsule:reload",
434 (KernelRequest::InstallCapsule { .. }, AuthorityScope::Self_) => "self:capsule:install",
435 (KernelRequest::InstallCapsule { .. }, _) => "capsule:install",
436 (
437 KernelRequest::ListCapsules
438 | KernelRequest::GetCommands
439 | KernelRequest::GetCapsuleMetadata,
440 AuthorityScope::Self_,
441 ) => "self:capsule:list",
442 (
443 KernelRequest::ListCapsules
444 | KernelRequest::GetCommands
445 | KernelRequest::GetCapsuleMetadata,
446 _,
447 ) => "capsule:list",
448 (KernelRequest::ApproveCapability { .. }, _) => "self:approval:respond",
449 }
450}
451
452#[must_use]
455pub fn kernel_request_method(req: &KernelRequest) -> &'static str {
456 match req {
457 KernelRequest::ReloadCapsules => "ReloadCapsules",
458 KernelRequest::InstallCapsule { .. } => "InstallCapsule",
459 KernelRequest::ApproveCapability { .. } => "ApproveCapability",
460 KernelRequest::ListCapsules => "ListCapsules",
461 KernelRequest::GetCommands => "GetCommands",
462 KernelRequest::GetCapsuleMetadata => "GetCapsuleMetadata",
463 KernelRequest::Shutdown { .. } => "Shutdown",
464 KernelRequest::GetStatus => "GetStatus",
465 }
466}
467
468fn resolve_caller(message: &IpcMessage) -> PrincipalId {
475 message
476 .principal
477 .as_deref()
478 .and_then(|p| PrincipalId::new(p).ok())
479 .unwrap_or_default()
480}
481
482fn authorize_request(
490 kernel: &crate::Kernel,
491 caller: &PrincipalId,
492 required_cap: &str,
493) -> Result<(), PermissionError> {
494 let profile = match kernel.profile_cache.resolve(caller) {
495 Ok(p) => p,
496 Err(e) => {
497 warn!(
498 security_event = true,
499 principal = %caller,
500 error = %e,
501 "Profile resolution failed — fail-closed deny"
502 );
503 return Err(PermissionError::MissingCapability {
504 principal: caller.clone(),
505 required: required_cap.to_string(),
506 });
507 },
508 };
509 if !profile.enabled {
516 warn!(
517 security_event = true,
518 principal = %caller,
519 required = required_cap,
520 "Disabled principal denied — fail-closed enforcement"
521 );
522 return Err(PermissionError::PrincipalDisabled {
523 principal: caller.clone(),
524 });
525 }
526 let groups = kernel.groups.load_full();
527 let check = CapabilityCheck::new(profile.as_ref(), groups.as_ref(), caller.clone());
528 check.require(required_cap)
529}
530
531pub(crate) struct AdminAuditEntry<'a> {
534 pub caller: &'a PrincipalId,
536 pub method: &'a str,
538 pub required_cap: &'a str,
540 pub target_principal: Option<PrincipalId>,
544 pub params: Option<serde_json::Value>,
548 pub authorization: AuthorizationProof,
550 pub outcome: AuditOutcome,
552}
553
554fn record_admin_audit(kernel: &crate::Kernel, entry: AdminAuditEntry<'_>) {
558 let AdminAuditEntry {
559 caller,
560 method,
561 required_cap,
562 target_principal,
563 params,
564 authorization,
565 outcome,
566 } = entry;
567 let action = AuditAction::AdminRequest {
568 method: method.to_string(),
569 required_capability: required_cap.to_string(),
570 target_principal,
571 params,
572 };
573 if let Err(e) = kernel.audit_log.append_with_principal(
574 kernel.session_id.clone(),
575 caller.clone(),
576 action,
577 authorization,
578 outcome,
579 ) {
580 warn!(
581 security_event = true,
582 principal = %caller,
583 method = method,
584 error = %e,
585 "Failed to persist admin-request audit entry — continuing"
586 );
587 }
588}
589
590#[cfg(test)]
591mod tests {
592 use super::*;
593
594 #[test]
595 fn rate_limiter_allows_within_limit() {
596 let mut limiter = ManagementRateLimiter::new();
597 for _ in 0..5 {
598 assert!(limiter.check("ReloadCapsules", 5));
599 }
600 assert!(!limiter.check("ReloadCapsules", 5));
602 }
603
604 #[test]
605 fn rate_limiter_independent_buckets() {
606 let mut limiter = ManagementRateLimiter::new();
607 for _ in 0..5 {
609 assert!(limiter.check("ReloadCapsules", 5));
610 }
611 assert!(!limiter.check("ReloadCapsules", 5));
612
613 assert!(limiter.check("InstallCapsule", 10));
615 }
616
617 #[test]
618 fn rate_limiter_sliding_window_eviction() {
619 let mut limiter = ManagementRateLimiter::new();
620 for _ in 0..5 {
622 assert!(limiter.check("ReloadCapsules", 5));
623 }
624 assert!(!limiter.check("ReloadCapsules", 5));
625
626 if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
628 let past = Instant::now() - std::time::Duration::from_secs(61);
629 for ts in timestamps.iter_mut() {
630 *ts = past;
631 }
632 }
633
634 assert!(limiter.check("ReloadCapsules", 5));
636 }
637
638 #[test]
639 fn rate_limiter_sliding_window_prevents_boundary_burst() {
640 let mut limiter = ManagementRateLimiter::new();
641 for _ in 0..5 {
643 assert!(limiter.check("ReloadCapsules", 5));
644 }
645
646 if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
649 let past = Instant::now() - std::time::Duration::from_secs(61);
650 for ts in timestamps.iter_mut().take(3) {
651 *ts = past;
652 }
653 }
654
655 for _ in 0..3 {
657 assert!(limiter.check("ReloadCapsules", 5));
658 }
659 assert!(!limiter.check("ReloadCapsules", 5));
660 }
661
662 #[test]
663 fn rate_limit_for_request_returns_correct_limits() {
664 let (name, limit) = rate_limit_for_request(&KernelRequest::ReloadCapsules);
665 assert_eq!(name, "ReloadCapsules");
666 assert_eq!(limit, Some(5));
667
668 let (name, limit) = rate_limit_for_request(&KernelRequest::ListCapsules);
669 assert_eq!(name, "ListCapsules");
670 assert_eq!(limit, None);
671 }
672
673 fn all_request_variants() -> Vec<KernelRequest> {
676 vec![
677 KernelRequest::Shutdown { reason: None },
678 KernelRequest::GetStatus,
679 KernelRequest::ReloadCapsules,
680 KernelRequest::InstallCapsule {
681 source: "x".to_string(),
682 workspace: false,
683 },
684 KernelRequest::ListCapsules,
685 KernelRequest::GetCommands,
686 KernelRequest::GetCapsuleMetadata,
687 KernelRequest::ApproveCapability {
688 request_id: "r".to_string(),
689 signature: "s".to_string(),
690 },
691 ]
692 }
693
694 #[test]
695 fn required_capability_every_variant_has_non_empty_mapping() {
696 for req in all_request_variants() {
697 let cap = required_capability(&req, AuthorityScope::Self_);
698 assert!(
699 !cap.is_empty(),
700 "required_capability returned empty for {req:?}"
701 );
702 }
703 }
704
705 #[test]
706 fn required_capability_mapping_per_variant_self_scope() {
707 assert_eq!(
708 required_capability(
709 &KernelRequest::Shutdown { reason: None },
710 AuthorityScope::Self_
711 ),
712 "system:shutdown"
713 );
714 assert_eq!(
715 required_capability(&KernelRequest::GetStatus, AuthorityScope::Self_),
716 "system:status"
717 );
718 assert_eq!(
719 required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Self_),
720 "self:capsule:reload"
721 );
722 assert_eq!(
723 required_capability(
724 &KernelRequest::InstallCapsule {
725 source: String::new(),
726 workspace: false
727 },
728 AuthorityScope::Self_
729 ),
730 "self:capsule:install"
731 );
732 assert_eq!(
733 required_capability(&KernelRequest::ListCapsules, AuthorityScope::Self_),
734 "self:capsule:list"
735 );
736 assert_eq!(
737 required_capability(&KernelRequest::GetCommands, AuthorityScope::Self_),
738 "self:capsule:list"
739 );
740 assert_eq!(
741 required_capability(&KernelRequest::GetCapsuleMetadata, AuthorityScope::Self_),
742 "self:capsule:list"
743 );
744 assert_eq!(
745 required_capability(
746 &KernelRequest::ApproveCapability {
747 request_id: String::new(),
748 signature: String::new(),
749 },
750 AuthorityScope::Self_
751 ),
752 "self:approval:respond"
753 );
754 }
755
756 #[test]
757 fn required_capability_mapping_global_scope() {
758 assert_eq!(
761 required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Global),
762 "capsule:reload"
763 );
764 assert_eq!(
765 required_capability(
766 &KernelRequest::InstallCapsule {
767 source: String::new(),
768 workspace: false
769 },
770 AuthorityScope::Global
771 ),
772 "capsule:install"
773 );
774 assert_eq!(
775 required_capability(&KernelRequest::ListCapsules, AuthorityScope::Global),
776 "capsule:list"
777 );
778 assert_eq!(
780 required_capability(
781 &KernelRequest::Shutdown { reason: None },
782 AuthorityScope::Global
783 ),
784 "system:shutdown"
785 );
786 }
787
788 #[test]
789 fn resolve_scope_defaults_to_self() {
790 let caller = PrincipalId::new("alice").unwrap();
791 for req in all_request_variants() {
792 assert_eq!(
793 resolve_scope(&req, &caller),
794 AuthorityScope::Self_,
795 "scope should default to Self_ for today's variants"
796 );
797 }
798 }
799
800 #[test]
803 fn resolve_caller_uses_ipc_principal_when_present() {
804 let mut msg = IpcMessage::new(
805 "astrid.v1.request.system",
806 IpcPayload::RawJson(serde_json::json!({})),
807 uuid::Uuid::nil(),
808 );
809 msg.principal = Some("alice".to_string());
810 let caller = resolve_caller(&msg);
811 assert_eq!(caller.as_str(), "alice");
812 }
813
814 #[test]
815 fn resolve_caller_falls_back_to_default_when_missing() {
816 let msg = IpcMessage::new(
817 "astrid.v1.request.system",
818 IpcPayload::RawJson(serde_json::json!({})),
819 uuid::Uuid::nil(),
820 );
821 let caller = resolve_caller(&msg);
822 assert_eq!(caller, PrincipalId::default());
823 }
824
825 #[test]
826 fn resolve_caller_falls_back_to_default_on_invalid_principal() {
827 let mut msg = IpcMessage::new(
828 "astrid.v1.request.system",
829 IpcPayload::RawJson(serde_json::json!({})),
830 uuid::Uuid::nil(),
831 );
832 msg.principal = Some("alice@evil.example".to_string());
834 let caller = resolve_caller(&msg);
835 assert_eq!(caller, PrincipalId::default());
836 }
837}