Skip to main content

astrid_kernel/kernel_router/
mod.rs

1/// Admin management API dispatcher (issue #672, Layer 6).
2pub mod admin;
3
4use std::collections::{HashMap, VecDeque};
5use std::sync::Arc;
6use std::time::Instant;
7
8use astrid_audit::{AuditAction, AuditOutcome, AuthorizationProof};
9use astrid_capabilities::{CapabilityCheck, PermissionError};
10use astrid_core::principal::PrincipalId;
11use astrid_events::ipc::{IpcMessage, IpcPayload};
12use astrid_events::kernel_api::{KernelRequest, KernelResponse};
13use serde::Serialize;
14use tracing::{debug, info, warn};
15
16/// Spawns background tasks for the kernel management API and connection tracking.
17///
18/// Two listeners:
19/// 1. `astrid.v1.request.*` - handles management commands (list capsules, reload, etc.)
20/// 2. `client.v1.disconnect` - decrements the active connection counter on graceful disconnect.
21///
22/// Connection *increment* happens when the WASM proxy capsule accepts a socket
23/// connection (it publishes a `client.v1.connected` event). For ungraceful disconnects,
24/// the idle monitor uses `EventBus::subscriber_count()` as a secondary signal.
25#[must_use]
26pub(crate) fn spawn_kernel_router(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
27    // Spawn the connection tracker as a sibling task.
28    drop(spawn_connection_tracker(Arc::clone(&kernel)));
29    // Spawn the Layer 6 admin dispatcher as a sibling task (issue #672).
30    drop(admin::spawn_admin_router(Arc::clone(&kernel)));
31
32    let mut receiver = kernel.event_bus.subscribe_topic("astrid.v1.request.*");
33
34    tokio::spawn(async move {
35        let mut rate_limiter = ManagementRateLimiter::new();
36
37        while let Some(event) = receiver.recv().await {
38            let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
39                continue;
40            };
41
42            // Only process standard IPC messages that contain JSON payloads.
43            let IpcPayload::RawJson(val) = &message.payload else {
44                continue;
45            };
46
47            match serde_json::from_value::<KernelRequest>(val.clone()) {
48                Ok(req) => {
49                    let (method, limit) = rate_limit_for_request(&req);
50                    if let Some(max) = limit
51                        && !rate_limiter.check(method, max)
52                    {
53                        warn!(
54                            security_event = true,
55                            method = method,
56                            "Rate limited kernel management request"
57                        );
58                        let response_topic =
59                            message.topic.replace("kernel.request.", "kernel.response.");
60                        publish_response(
61                            &kernel,
62                            response_topic,
63                            KernelResponse::Error(format!(
64                                "Rate limited: max {max} {method} requests per minute"
65                            )),
66                        );
67                        continue;
68                    }
69                    let caller = resolve_caller(message);
70                    handle_request(&kernel, message.topic.clone(), caller, req).await;
71                },
72                Err(e) => {
73                    warn!(error = %e, topic = %message.topic, "Failed to parse KernelRequest from IPC");
74                },
75            }
76        }
77    })
78}
79
80/// Tracks client connection lifecycle events.
81///
82/// Listens on `client.v1.*` topics:
83/// - `client.v1.connected` - a new socket connection was accepted.
84/// - `client.v1.disconnect` - a client sent a graceful disconnect.
85fn spawn_connection_tracker(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
86    let mut receiver = kernel.event_bus.subscribe_topic("client.v1.*");
87
88    tokio::spawn(async move {
89        while let Some(event) = receiver.recv().await {
90            let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
91                continue;
92            };
93            // Derive the connecting principal from the IPC message. Today's
94            // CLI socket always sets this to the default principal
95            // (bootstrapped in `bootstrap_cli_root_user`), but as per-agent
96            // socket auth lands (#658) the same plumbing will carry the
97            // real invoking principal.
98            let principal = message
99                .principal
100                .as_deref()
101                .and_then(|p| astrid_core::principal::PrincipalId::new(p).ok())
102                .unwrap_or_default();
103            match &message.payload {
104                IpcPayload::Disconnect { reason } => {
105                    kernel.connection_closed(&principal);
106                    debug!(%principal, reason = ?reason, "Client disconnected");
107                },
108                IpcPayload::Connect => {
109                    kernel.connection_opened(&principal);
110                    debug!(%principal, "New client connection accepted");
111                },
112                _ => {},
113            }
114        }
115    })
116}
117
118#[expect(clippy::too_many_lines)]
119async fn handle_request(
120    kernel: &Arc<crate::Kernel>,
121    topic: String,
122    caller: PrincipalId,
123    req: KernelRequest,
124) {
125    let response_topic = if let Some(suffix) = topic.strip_prefix("astrid.v1.request.") {
126        format!("astrid.v1.response.{suffix}")
127    } else {
128        topic.clone()
129    };
130
131    // Capability enforcement preamble (issue #670). Resolve the caller's
132    // profile, compute the required capability for this request, and
133    // reject with an audited `Denied` entry on failure. No match arm
134    // below is reached without `authorize_request` returning Ok.
135    let method = kernel_request_method(&req);
136    let scope = resolve_scope(&req, &caller);
137    let required_cap = required_capability(&req, scope);
138    match authorize_request(kernel, &caller, required_cap) {
139        Ok(()) => {
140            record_admin_audit(
141                kernel,
142                AdminAuditEntry {
143                    caller: &caller,
144                    method,
145                    required_cap,
146                    target_principal: None,
147                    params: None,
148                    authorization: AuthorizationProof::System {
149                        reason: format!("policy allow: {caller} holds {required_cap}"),
150                    },
151                    outcome: AuditOutcome::success(),
152                },
153            );
154        },
155        Err(e) => {
156            warn!(
157                security_event = true,
158                method = method,
159                principal = %caller,
160                required = required_cap,
161                "Permission check denied admin request"
162            );
163            record_admin_audit(
164                kernel,
165                AdminAuditEntry {
166                    caller: &caller,
167                    method,
168                    required_cap,
169                    target_principal: None,
170                    params: None,
171                    authorization: AuthorizationProof::Denied {
172                        reason: e.to_string(),
173                    },
174                    outcome: AuditOutcome::failure(e.to_string()),
175                },
176            );
177            publish_response(kernel, response_topic, KernelResponse::Error(e.to_string()));
178            return;
179        },
180    }
181
182    let res = match req {
183        KernelRequest::InstallCapsule { source, workspace } => {
184            info!(source = %source, workspace, "Kernel received install request");
185            // Here the kernel would verify identity, parse the capsule, and potentially
186            // return ApprovalRequired if it needs dangerous capabilities!
187            KernelResponse::Error(
188                "Installation logic not yet implemented in kernel router".to_string(),
189            )
190        },
191        KernelRequest::ApproveCapability {
192            request_id,
193            signature: _,
194        } => {
195            info!(request_id = %request_id, "Kernel received capability approval");
196            KernelResponse::Error("Approval logic not yet implemented in kernel router".to_string())
197        },
198        KernelRequest::ListCapsules => {
199            let reg = kernel.capsules.read().await;
200            let mut list = Vec::new();
201            for c in reg.list() {
202                list.push(c.to_string());
203            }
204            KernelResponse::Success(serde_json::json!(list))
205        },
206        KernelRequest::GetCommands => {
207            let reg = kernel.capsules.read().await;
208            let mut commands = Vec::new();
209            for c in reg.values() {
210                for cmd in &c.manifest().commands {
211                    commands.push(astrid_events::kernel_api::CommandInfo {
212                        name: cmd.name.clone(),
213                        description: cmd
214                            .description
215                            .clone()
216                            .unwrap_or_else(|| "No description".to_string()),
217                        provider_capsule: c.id().to_string(),
218                    });
219                }
220            }
221            info!(
222                count = commands.len(),
223                capsules = reg.len(),
224                "GetCommands: returning {} commands from {} capsules",
225                commands.len(),
226                reg.len()
227            );
228            KernelResponse::Commands(commands)
229        },
230        KernelRequest::ReloadCapsules => {
231            // Unregister capsules in a Failed state so they can be re-loaded
232            // with fresh configuration (e.g. after onboarding writes .env.json).
233            {
234                let reg = kernel.capsules.read().await;
235                let failed_ids: Vec<_> = reg
236                    .list()
237                    .into_iter()
238                    .filter(|id| {
239                        reg.get(id).is_some_and(|c| {
240                            matches!(c.state(), astrid_capsule::capsule::CapsuleState::Failed(_))
241                        })
242                    })
243                    .cloned()
244                    .collect();
245                drop(reg);
246
247                let mut reg = kernel.capsules.write().await;
248                for id in failed_ids {
249                    let _ = reg.unregister(&id);
250                }
251            }
252
253            kernel.load_all_capsules().await;
254            KernelResponse::Success(serde_json::json!({"status": "reloaded"}))
255        },
256        KernelRequest::Shutdown { reason } => {
257            info!(
258                reason = reason.as_deref().unwrap_or("none"),
259                "Kernel received shutdown request via management API"
260            );
261            // Publish response before signaling shutdown so the client gets confirmation.
262            publish_response(
263                kernel,
264                response_topic.clone(),
265                KernelResponse::Success(serde_json::json!({"status": "shutting_down"})),
266            );
267            // Signal the daemon's main loop to exit gracefully.
268            let _ = kernel.shutdown_tx.send(true);
269            // Return early — the daemon will call kernel.shutdown() from its main loop.
270            return;
271        },
272        KernelRequest::GetStatus => {
273            let uptime = kernel.boot_time.elapsed().as_secs();
274            let reg = kernel.capsules.read().await;
275            let loaded: Vec<String> = reg.list().iter().map(ToString::to_string).collect();
276            let by_principal = kernel
277                .connections_by_principal()
278                .into_iter()
279                .map(
280                    |(p, c)| astrid_events::kernel_api::PrincipalConnectionCount {
281                        principal: p.to_string(),
282                        count: u32::try_from(c).unwrap_or(u32::MAX),
283                    },
284                )
285                .collect();
286            let status = astrid_events::kernel_api::DaemonStatus {
287                pid: std::process::id(),
288                uptime_secs: uptime,
289                version: env!("CARGO_PKG_VERSION").to_string(),
290                ephemeral: false, // The kernel doesn't know; daemon sets this via response override if needed
291                connected_clients: u32::try_from(kernel.total_connection_count())
292                    .unwrap_or(u32::MAX),
293                connections_by_principal: by_principal,
294                loaded_capsules: loaded,
295            };
296            KernelResponse::Status(status)
297        },
298        KernelRequest::GetCapsuleMetadata => {
299            let reg = kernel.capsules.read().await;
300            let mut entries = Vec::new();
301            for capsule in reg.values() {
302                let manifest = capsule.manifest();
303                entries.push(astrid_events::kernel_api::CapsuleMetadataEntry {
304                    name: manifest.package.name.clone(),
305                    interceptor_events: manifest
306                        .interceptors
307                        .iter()
308                        .map(|i| i.event.clone())
309                        .collect(),
310                });
311            }
312            KernelResponse::CapsuleMetadata(entries)
313        },
314    };
315
316    publish_response(kernel, response_topic, res);
317}
318
319fn publish_response<R: Serialize>(kernel: &Arc<crate::Kernel>, response_topic: String, res: R) {
320    if let Ok(val) = serde_json::to_value(res) {
321        let msg = IpcMessage::new(
322            response_topic,
323            IpcPayload::RawJson(val),
324            kernel.session_id.0,
325        );
326        let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
327            metadata: astrid_events::EventMetadata::new("kernel_router"),
328            message: msg,
329        });
330    }
331}
332
333// ---------------------------------------------------------------------------
334// Management API rate limiting
335// ---------------------------------------------------------------------------
336
337/// Sliding window rate limiter for management API requests.
338/// Tracks per-request timestamps and evicts entries older than 60 seconds,
339/// preventing the 2x burst possible with fixed-window designs.
340/// Single-consumer (owned by the router task), no concurrency concerns.
341struct ManagementRateLimiter {
342    buckets: HashMap<&'static str, VecDeque<Instant>>,
343}
344
345impl ManagementRateLimiter {
346    fn new() -> Self {
347        Self {
348            buckets: HashMap::new(),
349        }
350    }
351
352    /// Check if a request of the given type is within the rate limit.
353    /// Returns `true` if allowed, `false` if rate-limited.
354    fn check(&mut self, method: &'static str, max_per_minute: u32) -> bool {
355        let now = Instant::now();
356        let window = std::time::Duration::from_secs(60);
357        let timestamps = self.buckets.entry(method).or_default();
358
359        // Evict timestamps older than the 60-second sliding window.
360        while let Some(&oldest) = timestamps.front() {
361            if now.saturating_duration_since(oldest) >= window {
362                timestamps.pop_front();
363            } else {
364                break;
365            }
366        }
367
368        if timestamps.len() >= max_per_minute as usize {
369            return false;
370        }
371        timestamps.push_back(now);
372        true
373    }
374}
375
376/// Return the rate limit label and max-per-minute for a request type.
377/// Returns `None` for the limit if the request type is not rate-limited.
378fn rate_limit_for_request(req: &KernelRequest) -> (&'static str, Option<u32>) {
379    (kernel_request_method(req), rate_limit_max(req))
380}
381
382/// Return the max-per-minute rate limit for a request type, if any.
383fn rate_limit_max(req: &KernelRequest) -> Option<u32> {
384    match req {
385        KernelRequest::ReloadCapsules => Some(5),
386        KernelRequest::InstallCapsule { .. } | KernelRequest::ApproveCapability { .. } => Some(10),
387        KernelRequest::Shutdown { .. } => Some(1),
388        KernelRequest::ListCapsules
389        | KernelRequest::GetCommands
390        | KernelRequest::GetCapsuleMetadata
391        | KernelRequest::GetStatus => None,
392    }
393}
394
395// ---------------------------------------------------------------------------
396// Management API capability enforcement (issue #670)
397// ---------------------------------------------------------------------------
398
399/// The authority surface a given [`KernelRequest`] operates over.
400///
401/// Today's `KernelRequest` variants carry no target-principal field, so
402/// [`resolve_scope`] always returns [`AuthorityScope::Self_`] — the
403/// request operates on the caller's own home.
404#[derive(Debug, Clone, Copy, PartialEq, Eq)]
405pub enum AuthorityScope {
406    /// Request operates on the caller's own principal.
407    Self_,
408    /// Request operates on global/system-wide state (e.g. shutdown).
409    Global,
410}
411
412/// Return the authority scope the caller is exercising for `req`.
413///
414/// Currently always returns [`AuthorityScope::Self_`] because no
415/// `KernelRequest` variant carries a `target_principal` field yet.
416#[must_use]
417pub fn resolve_scope(_req: &KernelRequest, _caller: &PrincipalId) -> AuthorityScope {
418    AuthorityScope::Self_
419}
420
421/// Return the static capability string required to satisfy `req` under
422/// `scope`.
423///
424/// Pure function so the capability mapping can be unit-tested in
425/// isolation. Every `KernelRequest` variant is covered; there is no
426/// default-allow branch.
427#[must_use]
428pub fn required_capability(req: &KernelRequest, scope: AuthorityScope) -> &'static str {
429    match (req, scope) {
430        (KernelRequest::Shutdown { .. }, _) => "system:shutdown",
431        (KernelRequest::GetStatus, _) => "system:status",
432        (KernelRequest::ReloadCapsules, AuthorityScope::Self_) => "self:capsule:reload",
433        (KernelRequest::ReloadCapsules, _) => "capsule:reload",
434        (KernelRequest::InstallCapsule { .. }, AuthorityScope::Self_) => "self:capsule:install",
435        (KernelRequest::InstallCapsule { .. }, _) => "capsule:install",
436        (
437            KernelRequest::ListCapsules
438            | KernelRequest::GetCommands
439            | KernelRequest::GetCapsuleMetadata,
440            AuthorityScope::Self_,
441        ) => "self:capsule:list",
442        (
443            KernelRequest::ListCapsules
444            | KernelRequest::GetCommands
445            | KernelRequest::GetCapsuleMetadata,
446            _,
447        ) => "capsule:list",
448        (KernelRequest::ApproveCapability { .. }, _) => "self:approval:respond",
449    }
450}
451
452/// Short identifier for a [`KernelRequest`] variant, used for rate-limit
453/// labels and audit method names.
454#[must_use]
455pub fn kernel_request_method(req: &KernelRequest) -> &'static str {
456    match req {
457        KernelRequest::ReloadCapsules => "ReloadCapsules",
458        KernelRequest::InstallCapsule { .. } => "InstallCapsule",
459        KernelRequest::ApproveCapability { .. } => "ApproveCapability",
460        KernelRequest::ListCapsules => "ListCapsules",
461        KernelRequest::GetCommands => "GetCommands",
462        KernelRequest::GetCapsuleMetadata => "GetCapsuleMetadata",
463        KernelRequest::Shutdown { .. } => "Shutdown",
464        KernelRequest::GetStatus => "GetStatus",
465    }
466}
467
468/// Resolve the caller [`PrincipalId`] from an incoming [`IpcMessage`].
469///
470/// Pre-#658 single-token socket traffic arrives without a principal
471/// field set; we fall back to [`PrincipalId::default`] — the default
472/// principal is bootstrapped with the built-in `admin` group, matching
473/// today's single-tenant behaviour.
474fn resolve_caller(message: &IpcMessage) -> PrincipalId {
475    message
476        .principal
477        .as_deref()
478        .and_then(|p| PrincipalId::new(p).ok())
479        .unwrap_or_default()
480}
481
482/// Evaluate the capability check for `caller` against the kernel's
483/// resolved group config and the caller's profile.
484///
485/// Returns `Ok(())` on success, or the policy reason on denial. Profile
486/// resolution failures (malformed TOML, IO error) are themselves treated
487/// as deny — fail-closed — with a synthesized `MissingCapability` so the
488/// deny path has a single shape in the audit log.
489fn authorize_request(
490    kernel: &crate::Kernel,
491    caller: &PrincipalId,
492    required_cap: &str,
493) -> Result<(), PermissionError> {
494    let profile = match kernel.profile_cache.resolve(caller) {
495        Ok(p) => p,
496        Err(e) => {
497            warn!(
498                security_event = true,
499                principal = %caller,
500                error = %e,
501                "Profile resolution failed — fail-closed deny"
502            );
503            return Err(PermissionError::MissingCapability {
504                principal: caller.clone(),
505                required: required_cap.to_string(),
506            });
507        },
508    };
509    // Enabled gate runs BEFORE the capability check so a disabled
510    // principal cannot exercise any management API surface — even one
511    // they would otherwise be authorized for. The `default` principal
512    // is bootstrap-managed and `caps.revoke`/`agent.disable` against
513    // it are rejected up front, so this check cannot lock the
514    // single-tenant path.
515    if !profile.enabled {
516        warn!(
517            security_event = true,
518            principal = %caller,
519            required = required_cap,
520            "Disabled principal denied — fail-closed enforcement"
521        );
522        return Err(PermissionError::PrincipalDisabled {
523            principal: caller.clone(),
524        });
525    }
526    let groups = kernel.groups.load_full();
527    let check = CapabilityCheck::new(profile.as_ref(), groups.as_ref(), caller.clone());
528    check.require(required_cap)
529}
530
531/// Bundled inputs for [`record_admin_audit`] — keeps the call site
532/// readable and the function under clippy's `too_many_arguments` cap.
533pub(crate) struct AdminAuditEntry<'a> {
534    /// Caller principal making the request.
535    pub caller: &'a PrincipalId,
536    /// Wire-name identifier for the request variant.
537    pub method: &'a str,
538    /// Capability string evaluated for this request.
539    pub required_cap: &'a str,
540    /// `None` when the request operates on the caller's own principal
541    /// (Layer 5) and `Some` when the request mutates another principal
542    /// (Layer 6 admin topics like `admin.quota.set`).
543    pub target_principal: Option<PrincipalId>,
544    /// Request payload for forensic replay (issue #672) — `None` for
545    /// [`KernelRequest`] entries that have no params struct, `Some` with
546    /// the wire payload for [`AdminKernelRequest`].
547    pub params: Option<serde_json::Value>,
548    /// Authorization proof (allow / deny).
549    pub authorization: AuthorizationProof,
550    /// Success or failure outcome.
551    pub outcome: AuditOutcome,
552}
553
554/// Append an `AdminRequest` audit entry for the given outcome. Failures
555/// to persist are logged but do not abort the request — the audit log
556/// degrades to "continue + alert" by design.
557fn record_admin_audit(kernel: &crate::Kernel, entry: AdminAuditEntry<'_>) {
558    let AdminAuditEntry {
559        caller,
560        method,
561        required_cap,
562        target_principal,
563        params,
564        authorization,
565        outcome,
566    } = entry;
567    let action = AuditAction::AdminRequest {
568        method: method.to_string(),
569        required_capability: required_cap.to_string(),
570        target_principal,
571        params,
572    };
573    if let Err(e) = kernel.audit_log.append_with_principal(
574        kernel.session_id.clone(),
575        caller.clone(),
576        action,
577        authorization,
578        outcome,
579    ) {
580        warn!(
581            security_event = true,
582            principal = %caller,
583            method = method,
584            error = %e,
585            "Failed to persist admin-request audit entry — continuing"
586        );
587    }
588}
589
590#[cfg(test)]
591mod tests {
592    use super::*;
593
594    #[test]
595    fn rate_limiter_allows_within_limit() {
596        let mut limiter = ManagementRateLimiter::new();
597        for _ in 0..5 {
598            assert!(limiter.check("ReloadCapsules", 5));
599        }
600        // 6th should be rejected
601        assert!(!limiter.check("ReloadCapsules", 5));
602    }
603
604    #[test]
605    fn rate_limiter_independent_buckets() {
606        let mut limiter = ManagementRateLimiter::new();
607        // Fill ReloadCapsules
608        for _ in 0..5 {
609            assert!(limiter.check("ReloadCapsules", 5));
610        }
611        assert!(!limiter.check("ReloadCapsules", 5));
612
613        // InstallCapsule should still be allowed
614        assert!(limiter.check("InstallCapsule", 10));
615    }
616
617    #[test]
618    fn rate_limiter_sliding_window_eviction() {
619        let mut limiter = ManagementRateLimiter::new();
620        // Fill the bucket
621        for _ in 0..5 {
622            assert!(limiter.check("ReloadCapsules", 5));
623        }
624        assert!(!limiter.check("ReloadCapsules", 5));
625
626        // Manually set all timestamps to 61 seconds ago to simulate expiry.
627        if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
628            let past = Instant::now() - std::time::Duration::from_secs(61);
629            for ts in timestamps.iter_mut() {
630                *ts = past;
631            }
632        }
633
634        // Should be allowed again after old entries are evicted
635        assert!(limiter.check("ReloadCapsules", 5));
636    }
637
638    #[test]
639    fn rate_limiter_sliding_window_prevents_boundary_burst() {
640        let mut limiter = ManagementRateLimiter::new();
641        // Fill 5 requests
642        for _ in 0..5 {
643            assert!(limiter.check("ReloadCapsules", 5));
644        }
645
646        // Move only 3 of the 5 timestamps to the past (beyond 60s window).
647        // This simulates partial window expiry - only 3 slots should free up.
648        if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
649            let past = Instant::now() - std::time::Duration::from_secs(61);
650            for ts in timestamps.iter_mut().take(3) {
651                *ts = past;
652            }
653        }
654
655        // Should allow exactly 3 more (the evicted slots), not 5
656        for _ in 0..3 {
657            assert!(limiter.check("ReloadCapsules", 5));
658        }
659        assert!(!limiter.check("ReloadCapsules", 5));
660    }
661
662    #[test]
663    fn rate_limit_for_request_returns_correct_limits() {
664        let (name, limit) = rate_limit_for_request(&KernelRequest::ReloadCapsules);
665        assert_eq!(name, "ReloadCapsules");
666        assert_eq!(limit, Some(5));
667
668        let (name, limit) = rate_limit_for_request(&KernelRequest::ListCapsules);
669        assert_eq!(name, "ListCapsules");
670        assert_eq!(limit, None);
671    }
672
673    // ── Capability mapping (issue #670) ──────────────────────────────
674
675    fn all_request_variants() -> Vec<KernelRequest> {
676        vec![
677            KernelRequest::Shutdown { reason: None },
678            KernelRequest::GetStatus,
679            KernelRequest::ReloadCapsules,
680            KernelRequest::InstallCapsule {
681                source: "x".to_string(),
682                workspace: false,
683            },
684            KernelRequest::ListCapsules,
685            KernelRequest::GetCommands,
686            KernelRequest::GetCapsuleMetadata,
687            KernelRequest::ApproveCapability {
688                request_id: "r".to_string(),
689                signature: "s".to_string(),
690            },
691        ]
692    }
693
694    #[test]
695    fn required_capability_every_variant_has_non_empty_mapping() {
696        for req in all_request_variants() {
697            let cap = required_capability(&req, AuthorityScope::Self_);
698            assert!(
699                !cap.is_empty(),
700                "required_capability returned empty for {req:?}"
701            );
702        }
703    }
704
705    #[test]
706    fn required_capability_mapping_per_variant_self_scope() {
707        assert_eq!(
708            required_capability(
709                &KernelRequest::Shutdown { reason: None },
710                AuthorityScope::Self_
711            ),
712            "system:shutdown"
713        );
714        assert_eq!(
715            required_capability(&KernelRequest::GetStatus, AuthorityScope::Self_),
716            "system:status"
717        );
718        assert_eq!(
719            required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Self_),
720            "self:capsule:reload"
721        );
722        assert_eq!(
723            required_capability(
724                &KernelRequest::InstallCapsule {
725                    source: String::new(),
726                    workspace: false
727                },
728                AuthorityScope::Self_
729            ),
730            "self:capsule:install"
731        );
732        assert_eq!(
733            required_capability(&KernelRequest::ListCapsules, AuthorityScope::Self_),
734            "self:capsule:list"
735        );
736        assert_eq!(
737            required_capability(&KernelRequest::GetCommands, AuthorityScope::Self_),
738            "self:capsule:list"
739        );
740        assert_eq!(
741            required_capability(&KernelRequest::GetCapsuleMetadata, AuthorityScope::Self_),
742            "self:capsule:list"
743        );
744        assert_eq!(
745            required_capability(
746                &KernelRequest::ApproveCapability {
747                    request_id: String::new(),
748                    signature: String::new(),
749                },
750                AuthorityScope::Self_
751            ),
752            "self:approval:respond"
753        );
754    }
755
756    #[test]
757    fn required_capability_mapping_global_scope() {
758        // Global scope strips the `self:` prefix from capsule operations
759        // (Layer 6 will start using this when cross-agent variants land).
760        assert_eq!(
761            required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Global),
762            "capsule:reload"
763        );
764        assert_eq!(
765            required_capability(
766                &KernelRequest::InstallCapsule {
767                    source: String::new(),
768                    workspace: false
769                },
770                AuthorityScope::Global
771            ),
772            "capsule:install"
773        );
774        assert_eq!(
775            required_capability(&KernelRequest::ListCapsules, AuthorityScope::Global),
776            "capsule:list"
777        );
778        // system:* variants are scope-invariant.
779        assert_eq!(
780            required_capability(
781                &KernelRequest::Shutdown { reason: None },
782                AuthorityScope::Global
783            ),
784            "system:shutdown"
785        );
786    }
787
788    #[test]
789    fn resolve_scope_defaults_to_self() {
790        let caller = PrincipalId::new("alice").unwrap();
791        for req in all_request_variants() {
792            assert_eq!(
793                resolve_scope(&req, &caller),
794                AuthorityScope::Self_,
795                "scope should default to Self_ for today's variants"
796            );
797        }
798    }
799
800    // ── Caller resolution ────────────────────────────────────────────
801
802    #[test]
803    fn resolve_caller_uses_ipc_principal_when_present() {
804        let mut msg = IpcMessage::new(
805            "astrid.v1.request.system",
806            IpcPayload::RawJson(serde_json::json!({})),
807            uuid::Uuid::nil(),
808        );
809        msg.principal = Some("alice".to_string());
810        let caller = resolve_caller(&msg);
811        assert_eq!(caller.as_str(), "alice");
812    }
813
814    #[test]
815    fn resolve_caller_falls_back_to_default_when_missing() {
816        let msg = IpcMessage::new(
817            "astrid.v1.request.system",
818            IpcPayload::RawJson(serde_json::json!({})),
819            uuid::Uuid::nil(),
820        );
821        let caller = resolve_caller(&msg);
822        assert_eq!(caller, PrincipalId::default());
823    }
824
825    #[test]
826    fn resolve_caller_falls_back_to_default_on_invalid_principal() {
827        let mut msg = IpcMessage::new(
828            "astrid.v1.request.system",
829            IpcPayload::RawJson(serde_json::json!({})),
830            uuid::Uuid::nil(),
831        );
832        // Invalid principal chars → PrincipalId::new fails → fall back.
833        msg.principal = Some("alice@evil.example".to_string());
834        let caller = resolve_caller(&msg);
835        assert_eq!(caller, PrincipalId::default());
836    }
837}