Skip to main content

astrid_kernel/kernel_router/
mod.rs

1/// Admin management API dispatcher (issue #672, Layer 6).
2pub mod admin;
3/// `KernelRequest::InstallCapsule` handler — delegates to the
4/// `astrid-capsule-install` library so the daemon and the CLI reach
5/// disk through the same code path.
6mod install;
7
8use std::collections::{HashMap, VecDeque};
9use std::sync::Arc;
10use std::time::Instant;
11
12use astrid_audit::{AuditAction, AuditOutcome, AuthorizationProof};
13use astrid_capabilities::{CapabilityCheck, PermissionError};
14use astrid_core::principal::PrincipalId;
15use astrid_events::ipc::{IpcMessage, IpcPayload};
16use astrid_events::kernel_api::{KernelRequest, KernelResponse};
17use serde::Serialize;
18use tracing::{debug, info, warn};
19
20#[cfg(test)]
21mod capability_catalog_tests;
22#[cfg(test)]
23mod connection_tracker_tests;
24
25/// Spawns background tasks for the kernel management API and connection tracking.
26///
27/// Two listeners:
28/// 1. `astrid.v1.request.*` - handles management commands (list capsules, reload, etc.)
29/// 2. `client.v1.*` - tracks the active connection count per principal.
30///
31/// Uplink capsules (e.g. the CLI proxy) publish `client.v1.connect` /
32/// `client.v1.disconnect` carrying the authenticated principal as a socket is
33/// accepted / closed; the tracker adjusts `active_connections` accordingly.
34/// Because the SDK exposes no typed-payload publish (only JSON), the tracker
35/// keys off the **topic** as well as the typed `IpcPayload::Connect` /
36/// `Disconnect` that native producers emit — see [`connection_signal`].
37#[must_use]
38pub(crate) fn spawn_kernel_router(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
39    // Spawn the connection tracker as a sibling task.
40    drop(spawn_connection_tracker(Arc::clone(&kernel)));
41    // Spawn the Layer 6 admin dispatcher as a sibling task (issue #672).
42    drop(admin::spawn_admin_router(Arc::clone(&kernel)));
43
44    // Broadcast-path subscriber. Routed demux
45    // (`EventBus::subscribe_topic_routed`) is reserved for guest
46    // subscriptions where per-principal isolation matters; kernel-
47    // internal consumers see every event by design (no synthetic
48    // capsule_uuid).
49    let mut receiver = kernel
50        .event_bus
51        .subscribe_topic_as("astrid.v1.request.*", "kernel_router");
52
53    tokio::spawn(async move {
54        let mut rate_limiter = ManagementRateLimiter::new();
55
56        while let Some(event) = receiver.recv().await {
57            let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
58                continue;
59            };
60
61            // Only process standard IPC messages that contain JSON payloads.
62            let IpcPayload::RawJson(val) = &message.payload else {
63                continue;
64            };
65
66            match serde_json::from_value::<KernelRequest>(val.clone()) {
67                Ok(req) => {
68                    let (method, limit) = rate_limit_for_request(&req);
69                    if let Some(max) = limit
70                        && !rate_limiter.check(method, max)
71                    {
72                        warn!(
73                            security_event = true,
74                            method = method,
75                            "Rate limited kernel management request"
76                        );
77                        let response_topic =
78                            message.topic.replace("kernel.request.", "kernel.response.");
79                        publish_response(
80                            &kernel,
81                            response_topic,
82                            KernelResponse::Error(format!(
83                                "Rate limited: max {max} {method} requests per minute"
84                            )),
85                        );
86                        continue;
87                    }
88                    let caller = resolve_caller(message);
89                    handle_request(&kernel, message.topic.clone(), caller, req).await;
90                },
91                Err(e) => {
92                    warn!(error = %e, topic = %message.topic, "Failed to parse KernelRequest from IPC");
93                },
94            }
95        }
96    })
97}
98
99/// Whether a `client.v1.*` message opens or closes a connection.
100#[derive(Debug, PartialEq, Eq)]
101enum ConnectionSignal {
102    Opened,
103    /// Carries the disconnect reason when present — the typed
104    /// `IpcPayload::Disconnect { reason }`, or a `"reason"` string in a JSON
105    /// payload — so the tracker can preserve it in the diagnostic log.
106    Closed {
107        reason: Option<String>,
108    },
109}
110
111/// Classifies a `client.v1.*` message as a connection open/close.
112///
113/// Recognises **both** the typed [`IpcPayload::Connect`]/[`IpcPayload::Disconnect`]
114/// that native producers emit, **and** the `client.v1.connect` /
115/// `client.v1.disconnect` topics carrying any payload. Uplink capsules can only
116/// reach the bus through the JSON-only SDK publish surface (no typed-payload
117/// publish exists), so the topic is the only signal they can produce — without
118/// the topic arm, the per-principal connection counter is never populated and
119/// the idle monitor / `astrid who` see zero connections regardless of reality.
120///
121/// Typed payloads take precedence over the topic, so a mismatched topic can
122/// never suppress a real connection event.
123fn connection_signal(topic: &str, payload: &IpcPayload) -> Option<ConnectionSignal> {
124    match payload {
125        IpcPayload::Disconnect { reason } => Some(ConnectionSignal::Closed {
126            reason: reason.clone(),
127        }),
128        IpcPayload::Connect => Some(ConnectionSignal::Opened),
129        // Uplink capsules can only publish JSON; the topic is the signal, and
130        // the reason (if any) rides along under the `"reason"` key.
131        IpcPayload::RawJson(val) if topic == "client.v1.disconnect" => {
132            let reason = val.get("reason").and_then(|r| r.as_str().map(String::from));
133            Some(ConnectionSignal::Closed { reason })
134        },
135        _ if topic == "client.v1.disconnect" => Some(ConnectionSignal::Closed { reason: None }),
136        _ if topic == "client.v1.connect" => Some(ConnectionSignal::Opened),
137        _ => None,
138    }
139}
140
141/// Tracks client connection lifecycle events.
142///
143/// Listens on `client.v1.*` topics and adjusts the per-principal connection
144/// count via [`connection_signal`] (typed payload or topic).
145fn spawn_connection_tracker(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
146    // Broadcast-path subscriber. See `spawn_kernel_router` for the
147    // rationale on staying on the untargeted subscribe path.
148    let mut receiver = kernel
149        .event_bus
150        .subscribe_topic_as("client.v1.*", "connection_tracker");
151
152    tokio::spawn(async move {
153        while let Some(event) = receiver.recv().await {
154            let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
155                continue;
156            };
157            // Derive the connecting principal from the IPC message. Today's
158            // CLI socket always sets this to the default principal
159            // (bootstrapped in `bootstrap_cli_root_user`), but as per-agent
160            // socket auth lands (#658) the same plumbing will carry the
161            // real invoking principal.
162            let principal = message
163                .principal
164                .as_deref()
165                .and_then(|p| astrid_core::principal::PrincipalId::new(p).ok())
166                .unwrap_or_default();
167            match connection_signal(&message.topic, &message.payload) {
168                Some(ConnectionSignal::Closed { reason }) => {
169                    kernel.connection_closed(&principal);
170                    debug!(%principal, topic = %message.topic, ?reason, "Client disconnected");
171                },
172                Some(ConnectionSignal::Opened) => {
173                    kernel.connection_opened(&principal);
174                    debug!(%principal, topic = %message.topic, "New client connection accepted");
175                },
176                None => {},
177            }
178        }
179    })
180}
181
182#[expect(clippy::too_many_lines)]
183async fn handle_request(
184    kernel: &Arc<crate::Kernel>,
185    topic: String,
186    caller: PrincipalId,
187    req: KernelRequest,
188) {
189    let response_topic = if let Some(suffix) = topic.strip_prefix("astrid.v1.request.") {
190        format!("astrid.v1.response.{suffix}")
191    } else {
192        topic.clone()
193    };
194
195    // Capability enforcement preamble (issue #670). Resolve the caller's
196    // profile, compute the required capability for this request, and
197    // reject with an audited `Denied` entry on failure. No match arm
198    // below is reached without `authorize_request` returning Ok.
199    let method = kernel_request_method(&req);
200    let scope = resolve_scope(&req, &caller);
201    let required_cap = required_capability(&req, scope);
202    match authorize_request(kernel, &caller, required_cap) {
203        Ok(()) => {
204            record_admin_audit(
205                kernel,
206                AdminAuditEntry {
207                    caller: &caller,
208                    method,
209                    required_cap,
210                    target_principal: None,
211                    params: None,
212                    authorization: AuthorizationProof::System {
213                        reason: format!("policy allow: {caller} holds {required_cap}"),
214                    },
215                    outcome: AuditOutcome::success(),
216                },
217            );
218        },
219        Err(e) => {
220            warn!(
221                security_event = true,
222                method = method,
223                principal = %caller,
224                required = required_cap,
225                "Permission check denied admin request"
226            );
227            record_admin_audit(
228                kernel,
229                AdminAuditEntry {
230                    caller: &caller,
231                    method,
232                    required_cap,
233                    target_principal: None,
234                    params: None,
235                    authorization: AuthorizationProof::Denied {
236                        reason: e.to_string(),
237                    },
238                    outcome: AuditOutcome::failure(e.to_string()),
239                },
240            );
241            publish_response(kernel, response_topic, KernelResponse::Error(e.to_string()));
242            return;
243        },
244    }
245
246    let res = match req {
247        KernelRequest::InstallCapsule { source, workspace } => {
248            info!(source = %source, workspace, "Kernel received install request");
249            install::handle_install_capsule(kernel, &source, workspace).await
250        },
251        KernelRequest::ApproveCapability {
252            request_id,
253            signature: _,
254        } => {
255            info!(request_id = %request_id, "Kernel received capability approval");
256            KernelResponse::Error("Approval logic not yet implemented in kernel router".to_string())
257        },
258        KernelRequest::ListCapsules => {
259            let reg = kernel.capsules.read().await;
260            let mut list = Vec::new();
261            for c in reg.list() {
262                list.push(c.to_string());
263            }
264            KernelResponse::Success(serde_json::json!(list))
265        },
266        KernelRequest::GetCommands => {
267            let reg = kernel.capsules.read().await;
268            let mut commands = Vec::new();
269            for c in reg.values() {
270                for cmd in &c.manifest().commands {
271                    commands.push(astrid_events::kernel_api::CommandInfo {
272                        name: cmd.name.clone(),
273                        description: cmd
274                            .description
275                            .clone()
276                            .unwrap_or_else(|| "No description".to_string()),
277                        provider_capsule: c.id().to_string(),
278                    });
279                }
280            }
281            info!(
282                count = commands.len(),
283                capsules = reg.len(),
284                "GetCommands: returning {} commands from {} capsules",
285                commands.len(),
286                reg.len()
287            );
288            KernelResponse::Commands(commands)
289        },
290        KernelRequest::ReloadCapsules => {
291            // Unregister capsules in a Failed state so they can be re-loaded
292            // with fresh configuration (e.g. after onboarding writes .env.json).
293            {
294                let reg = kernel.capsules.read().await;
295                let failed_ids: Vec<_> = reg
296                    .list()
297                    .into_iter()
298                    .filter(|id| {
299                        reg.get(id).is_some_and(|c| {
300                            matches!(c.state(), astrid_capsule::capsule::CapsuleState::Failed(_))
301                        })
302                    })
303                    .cloned()
304                    .collect();
305                drop(reg);
306
307                let mut reg = kernel.capsules.write().await;
308                for id in failed_ids {
309                    let _ = reg.unregister(&id);
310                }
311            }
312
313            kernel.load_all_capsules().await;
314            KernelResponse::Success(serde_json::json!({"status": "reloaded"}))
315        },
316        KernelRequest::Shutdown { reason } => {
317            info!(
318                reason = reason.as_deref().unwrap_or("none"),
319                "Kernel received shutdown request via management API"
320            );
321            // Publish response before signaling shutdown so the client gets confirmation.
322            publish_response(
323                kernel,
324                response_topic.clone(),
325                KernelResponse::Success(serde_json::json!({"status": "shutting_down"})),
326            );
327            // Signal the daemon's main loop to exit gracefully.
328            let _ = kernel.shutdown_tx.send(true);
329            // Return early — the daemon will call kernel.shutdown() from its main loop.
330            return;
331        },
332        KernelRequest::GetStatus => {
333            let uptime = kernel.boot_time.elapsed().as_secs();
334            let reg = kernel.capsules.read().await;
335            let loaded: Vec<String> = reg.list().iter().map(ToString::to_string).collect();
336            let by_principal = kernel
337                .connections_by_principal()
338                .into_iter()
339                .map(
340                    |(p, c)| astrid_events::kernel_api::PrincipalConnectionCount {
341                        principal: p.to_string(),
342                        count: u32::try_from(c).unwrap_or(u32::MAX),
343                    },
344                )
345                .collect();
346            let status = astrid_events::kernel_api::DaemonStatus {
347                pid: std::process::id(),
348                uptime_secs: uptime,
349                version: env!("CARGO_PKG_VERSION").to_string(),
350                ephemeral: false, // The kernel doesn't know; daemon sets this via response override if needed
351                connected_clients: u32::try_from(kernel.total_connection_count())
352                    .unwrap_or(u32::MAX),
353                connections_by_principal: by_principal,
354                loaded_capsules: loaded,
355            };
356            KernelResponse::Status(status)
357        },
358        KernelRequest::GetCapsuleMetadata => {
359            let reg = kernel.capsules.read().await;
360            let mut entries = Vec::new();
361            for capsule in reg.values() {
362                let manifest = capsule.manifest();
363                entries.push(astrid_events::kernel_api::CapsuleMetadataEntry {
364                    name: manifest.package.name.clone(),
365                    interceptor_events: manifest
366                        .subscribes
367                        .iter()
368                        .filter(|(_, def)| def.handler.is_some())
369                        .map(|(topic, _)| topic.clone())
370                        .collect(),
371                });
372            }
373            KernelResponse::CapsuleMetadata(entries)
374        },
375    };
376
377    publish_response(kernel, response_topic, res);
378}
379
380fn publish_response<R: Serialize>(kernel: &Arc<crate::Kernel>, response_topic: String, res: R) {
381    if let Ok(val) = serde_json::to_value(res) {
382        let msg = IpcMessage::new(
383            response_topic,
384            IpcPayload::RawJson(val),
385            kernel.session_id.0,
386        );
387        let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
388            metadata: astrid_events::EventMetadata::new("kernel_router"),
389            message: msg,
390        });
391    }
392}
393
394// ---------------------------------------------------------------------------
395// Management API rate limiting
396// ---------------------------------------------------------------------------
397
398/// Sliding window rate limiter for management API requests.
399/// Tracks per-request timestamps and evicts entries older than 60 seconds,
400/// preventing the 2x burst possible with fixed-window designs.
401/// Single-consumer (owned by the router task), no concurrency concerns.
402struct ManagementRateLimiter {
403    buckets: HashMap<&'static str, VecDeque<Instant>>,
404}
405
406impl ManagementRateLimiter {
407    fn new() -> Self {
408        Self {
409            buckets: HashMap::new(),
410        }
411    }
412
413    /// Check if a request of the given type is within the rate limit.
414    /// Returns `true` if allowed, `false` if rate-limited.
415    fn check(&mut self, method: &'static str, max_per_minute: u32) -> bool {
416        let now = Instant::now();
417        let window = std::time::Duration::from_mins(1);
418        let timestamps = self.buckets.entry(method).or_default();
419
420        // Evict timestamps older than the 60-second sliding window.
421        while let Some(&oldest) = timestamps.front() {
422            if now.saturating_duration_since(oldest) >= window {
423                timestamps.pop_front();
424            } else {
425                break;
426            }
427        }
428
429        if timestamps.len() >= max_per_minute as usize {
430            return false;
431        }
432        timestamps.push_back(now);
433        true
434    }
435}
436
437/// Return the rate limit label and max-per-minute for a request type.
438/// Returns `None` for the limit if the request type is not rate-limited.
439fn rate_limit_for_request(req: &KernelRequest) -> (&'static str, Option<u32>) {
440    (kernel_request_method(req), rate_limit_max(req))
441}
442
443/// Return the max-per-minute rate limit for a request type, if any.
444fn rate_limit_max(req: &KernelRequest) -> Option<u32> {
445    match req {
446        KernelRequest::ReloadCapsules => Some(5),
447        KernelRequest::InstallCapsule { .. } | KernelRequest::ApproveCapability { .. } => Some(10),
448        KernelRequest::Shutdown { .. } => Some(1),
449        KernelRequest::ListCapsules
450        | KernelRequest::GetCommands
451        | KernelRequest::GetCapsuleMetadata
452        | KernelRequest::GetStatus => None,
453    }
454}
455
456// ---------------------------------------------------------------------------
457// Management API capability enforcement (issue #670)
458// ---------------------------------------------------------------------------
459
460/// The authority surface a given [`KernelRequest`] operates over.
461///
462/// Today's `KernelRequest` variants carry no target-principal field, so
463/// [`resolve_scope`] always returns [`AuthorityScope::Self_`] — the
464/// request operates on the caller's own home.
465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
466pub enum AuthorityScope {
467    /// Request operates on the caller's own principal.
468    Self_,
469    /// Request operates on global/system-wide state (e.g. shutdown).
470    Global,
471}
472
473/// Return the authority scope the caller is exercising for `req`.
474///
475/// Currently always returns [`AuthorityScope::Self_`] because no
476/// `KernelRequest` variant carries a `target_principal` field yet.
477#[must_use]
478pub fn resolve_scope(_req: &KernelRequest, _caller: &PrincipalId) -> AuthorityScope {
479    AuthorityScope::Self_
480}
481
482/// Return the static capability string required to satisfy `req` under
483/// `scope`.
484///
485/// Pure function so the capability mapping can be unit-tested in
486/// isolation. Every `KernelRequest` variant is covered; there is no
487/// default-allow branch.
488#[must_use]
489pub fn required_capability(req: &KernelRequest, scope: AuthorityScope) -> &'static str {
490    match (req, scope) {
491        (KernelRequest::Shutdown { .. }, _) => "system:shutdown",
492        (KernelRequest::GetStatus, _) => "system:status",
493        (KernelRequest::ReloadCapsules, AuthorityScope::Self_) => "self:capsule:reload",
494        (KernelRequest::ReloadCapsules, _) => "capsule:reload",
495        (KernelRequest::InstallCapsule { .. }, AuthorityScope::Self_) => "self:capsule:install",
496        (KernelRequest::InstallCapsule { .. }, _) => "capsule:install",
497        (
498            KernelRequest::ListCapsules
499            | KernelRequest::GetCommands
500            | KernelRequest::GetCapsuleMetadata,
501            AuthorityScope::Self_,
502        ) => "self:capsule:list",
503        (
504            KernelRequest::ListCapsules
505            | KernelRequest::GetCommands
506            | KernelRequest::GetCapsuleMetadata,
507            _,
508        ) => "capsule:list",
509        (KernelRequest::ApproveCapability { .. }, _) => "self:approval:respond",
510    }
511}
512
513/// Short identifier for a [`KernelRequest`] variant, used for rate-limit
514/// labels and audit method names.
515#[must_use]
516pub fn kernel_request_method(req: &KernelRequest) -> &'static str {
517    match req {
518        KernelRequest::ReloadCapsules => "ReloadCapsules",
519        KernelRequest::InstallCapsule { .. } => "InstallCapsule",
520        KernelRequest::ApproveCapability { .. } => "ApproveCapability",
521        KernelRequest::ListCapsules => "ListCapsules",
522        KernelRequest::GetCommands => "GetCommands",
523        KernelRequest::GetCapsuleMetadata => "GetCapsuleMetadata",
524        KernelRequest::Shutdown { .. } => "Shutdown",
525        KernelRequest::GetStatus => "GetStatus",
526    }
527}
528
529/// Resolve the caller [`PrincipalId`] from an incoming [`IpcMessage`].
530///
531/// Pre-#658 single-token socket traffic arrives without a principal
532/// field set; we fall back to [`PrincipalId::default`] — the default
533/// principal is bootstrapped with the built-in `admin` group, matching
534/// today's single-tenant behaviour.
535fn resolve_caller(message: &IpcMessage) -> PrincipalId {
536    message
537        .principal
538        .as_deref()
539        .and_then(|p| PrincipalId::new(p).ok())
540        .unwrap_or_default()
541}
542
543/// Evaluate the capability check for `caller` against the kernel's
544/// resolved group config and the caller's profile.
545///
546/// Returns `Ok(())` on success, or the policy reason on denial. Profile
547/// resolution failures (malformed TOML, IO error) are themselves treated
548/// as deny — fail-closed — with a synthesized `MissingCapability` so the
549/// deny path has a single shape in the audit log.
550fn authorize_request(
551    kernel: &crate::Kernel,
552    caller: &PrincipalId,
553    required_cap: &str,
554) -> Result<(), PermissionError> {
555    let profile = match kernel.profile_cache.resolve(caller) {
556        Ok(p) => p,
557        Err(e) => {
558            warn!(
559                security_event = true,
560                principal = %caller,
561                error = %e,
562                "Profile resolution failed — fail-closed deny"
563            );
564            return Err(PermissionError::MissingCapability {
565                principal: caller.clone(),
566                required: required_cap.to_string(),
567            });
568        },
569    };
570    // Enabled gate runs BEFORE the capability check so a disabled
571    // principal cannot exercise any management API surface — even one
572    // they would otherwise be authorized for. The `default` principal
573    // is bootstrap-managed and `caps.revoke`/`agent.disable` against
574    // it are rejected up front, so this check cannot lock the
575    // single-tenant path.
576    if !profile.enabled {
577        warn!(
578            security_event = true,
579            principal = %caller,
580            required = required_cap,
581            "Disabled principal denied — fail-closed enforcement"
582        );
583        return Err(PermissionError::PrincipalDisabled {
584            principal: caller.clone(),
585        });
586    }
587    let groups = kernel.groups.load_full();
588    let check = CapabilityCheck::new(profile.as_ref(), groups.as_ref(), caller.clone());
589    check.require(required_cap)
590}
591
592/// Bundled inputs for [`record_admin_audit`] — keeps the call site
593/// readable and the function under clippy's `too_many_arguments` cap.
594pub(crate) struct AdminAuditEntry<'a> {
595    /// Caller principal making the request.
596    pub caller: &'a PrincipalId,
597    /// Wire-name identifier for the request variant.
598    pub method: &'a str,
599    /// Capability string evaluated for this request.
600    pub required_cap: &'a str,
601    /// `None` when the request operates on the caller's own principal
602    /// (Layer 5) and `Some` when the request mutates another principal
603    /// (Layer 6 admin topics like `admin.quota.set`).
604    pub target_principal: Option<PrincipalId>,
605    /// Request payload for forensic replay (issue #672) — `None` for
606    /// [`KernelRequest`] entries that have no params struct, `Some` with
607    /// the wire payload for [`AdminKernelRequest`].
608    pub params: Option<serde_json::Value>,
609    /// Authorization proof (allow / deny).
610    pub authorization: AuthorizationProof,
611    /// Success or failure outcome.
612    pub outcome: AuditOutcome,
613}
614
615/// IPC topic the kernel publishes structured audit-entry events to
616/// for live subscribers (the HTTP gateway's SSE stream).
617///
618/// The persistent audit log under `~/.astrid/audit.db` remains the
619/// system of record — this topic is a fire-and-forget broadcast for
620/// dashboards / monitoring tools that want a live feed. Subscribers
621/// scope their view at the consumer end: operators with
622/// `audit:read_all` see the firehose, agents see only entries
623/// whose `principal` field matches their own.
624pub const AUDIT_TOPIC: &str = "astrid.v1.audit.entry";
625
626/// Append an `AdminRequest` audit entry for the given outcome.
627/// Persists to the on-disk log AND publishes a live event on
628/// [`AUDIT_TOPIC`]. Failures to persist are logged but do not abort
629/// the request — the audit log degrades to "continue + alert" by
630/// design. A bus-publish failure is similarly best-effort.
631fn record_admin_audit(kernel: &crate::Kernel, entry: AdminAuditEntry<'_>) {
632    let AdminAuditEntry {
633        caller,
634        method,
635        required_cap,
636        target_principal,
637        params,
638        authorization,
639        outcome,
640    } = entry;
641    let action = AuditAction::AdminRequest {
642        method: method.to_string(),
643        required_capability: required_cap.to_string(),
644        target_principal: target_principal.clone(),
645        params: params.clone(),
646    };
647    if let Err(e) = kernel.audit_log.append_with_principal(
648        kernel.session_id.clone(),
649        caller.clone(),
650        action,
651        authorization.clone(),
652        outcome.clone(),
653    ) {
654        warn!(
655            security_event = true,
656            principal = %caller,
657            method = method,
658            error = %e,
659            "Failed to persist admin-request audit entry — continuing"
660        );
661    }
662
663    // Live broadcast. Subscribers filter at the consumer end (the
664    // `principal` field is what the gateway's SSE handler uses).
665    // The payload is intentionally a flat JSON shape so SSE
666    // consumers don't have to reify the kernel-side enum types.
667    let event = serde_json::json!({
668        "ts_epoch": std::time::SystemTime::now()
669            .duration_since(std::time::UNIX_EPOCH)
670            .map_or(0, |d| d.as_secs()),
671        "method": method,
672        "required_capability": required_cap,
673        "principal": caller.to_string(),
674        "target_principal": target_principal.as_ref().map(ToString::to_string),
675        "params": params,
676        "outcome": match &outcome {
677            AuditOutcome::Success { .. } => "success",
678            AuditOutcome::Failure { .. } => "failure",
679        },
680    });
681    let msg = IpcMessage::new(AUDIT_TOPIC, IpcPayload::RawJson(event), uuid::Uuid::nil())
682        .with_principal(caller.to_string());
683    let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
684        metadata: astrid_events::EventMetadata::new("kernel_router::audit"),
685        message: msg,
686    });
687}
688
689#[cfg(test)]
690mod tests {
691    use super::*;
692
693    #[test]
694    fn rate_limiter_allows_within_limit() {
695        let mut limiter = ManagementRateLimiter::new();
696        for _ in 0..5 {
697            assert!(limiter.check("ReloadCapsules", 5));
698        }
699        // 6th should be rejected
700        assert!(!limiter.check("ReloadCapsules", 5));
701    }
702
703    #[test]
704    fn rate_limiter_independent_buckets() {
705        let mut limiter = ManagementRateLimiter::new();
706        // Fill ReloadCapsules
707        for _ in 0..5 {
708            assert!(limiter.check("ReloadCapsules", 5));
709        }
710        assert!(!limiter.check("ReloadCapsules", 5));
711
712        // InstallCapsule should still be allowed
713        assert!(limiter.check("InstallCapsule", 10));
714    }
715
716    #[test]
717    fn rate_limiter_sliding_window_eviction() {
718        let mut limiter = ManagementRateLimiter::new();
719        // Fill the bucket
720        for _ in 0..5 {
721            assert!(limiter.check("ReloadCapsules", 5));
722        }
723        assert!(!limiter.check("ReloadCapsules", 5));
724
725        // Manually set all timestamps to 61 seconds ago to simulate expiry.
726        if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
727            let past = Instant::now() - std::time::Duration::from_secs(61);
728            for ts in timestamps.iter_mut() {
729                *ts = past;
730            }
731        }
732
733        // Should be allowed again after old entries are evicted
734        assert!(limiter.check("ReloadCapsules", 5));
735    }
736
737    #[test]
738    fn rate_limiter_sliding_window_prevents_boundary_burst() {
739        let mut limiter = ManagementRateLimiter::new();
740        // Fill 5 requests
741        for _ in 0..5 {
742            assert!(limiter.check("ReloadCapsules", 5));
743        }
744
745        // Move only 3 of the 5 timestamps to the past (beyond 60s window).
746        // This simulates partial window expiry - only 3 slots should free up.
747        if let Some(timestamps) = limiter.buckets.get_mut("ReloadCapsules") {
748            let past = Instant::now() - std::time::Duration::from_secs(61);
749            for ts in timestamps.iter_mut().take(3) {
750                *ts = past;
751            }
752        }
753
754        // Should allow exactly 3 more (the evicted slots), not 5
755        for _ in 0..3 {
756            assert!(limiter.check("ReloadCapsules", 5));
757        }
758        assert!(!limiter.check("ReloadCapsules", 5));
759    }
760
761    #[test]
762    fn rate_limit_for_request_returns_correct_limits() {
763        let (name, limit) = rate_limit_for_request(&KernelRequest::ReloadCapsules);
764        assert_eq!(name, "ReloadCapsules");
765        assert_eq!(limit, Some(5));
766
767        let (name, limit) = rate_limit_for_request(&KernelRequest::ListCapsules);
768        assert_eq!(name, "ListCapsules");
769        assert_eq!(limit, None);
770    }
771
772    // ── Capability mapping (issue #670) ──────────────────────────────
773
774    fn all_request_variants() -> Vec<KernelRequest> {
775        vec![
776            KernelRequest::Shutdown { reason: None },
777            KernelRequest::GetStatus,
778            KernelRequest::ReloadCapsules,
779            KernelRequest::InstallCapsule {
780                source: "x".to_string(),
781                workspace: false,
782            },
783            KernelRequest::ListCapsules,
784            KernelRequest::GetCommands,
785            KernelRequest::GetCapsuleMetadata,
786            KernelRequest::ApproveCapability {
787                request_id: "r".to_string(),
788                signature: "s".to_string(),
789            },
790        ]
791    }
792
793    #[test]
794    fn required_capability_every_variant_has_non_empty_mapping() {
795        for req in all_request_variants() {
796            let cap = required_capability(&req, AuthorityScope::Self_);
797            assert!(
798                !cap.is_empty(),
799                "required_capability returned empty for {req:?}"
800            );
801        }
802    }
803
804    #[test]
805    fn required_capability_mapping_per_variant_self_scope() {
806        assert_eq!(
807            required_capability(
808                &KernelRequest::Shutdown { reason: None },
809                AuthorityScope::Self_
810            ),
811            "system:shutdown"
812        );
813        assert_eq!(
814            required_capability(&KernelRequest::GetStatus, AuthorityScope::Self_),
815            "system:status"
816        );
817        assert_eq!(
818            required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Self_),
819            "self:capsule:reload"
820        );
821        assert_eq!(
822            required_capability(
823                &KernelRequest::InstallCapsule {
824                    source: String::new(),
825                    workspace: false
826                },
827                AuthorityScope::Self_
828            ),
829            "self:capsule:install"
830        );
831        assert_eq!(
832            required_capability(&KernelRequest::ListCapsules, AuthorityScope::Self_),
833            "self:capsule:list"
834        );
835        assert_eq!(
836            required_capability(&KernelRequest::GetCommands, AuthorityScope::Self_),
837            "self:capsule:list"
838        );
839        assert_eq!(
840            required_capability(&KernelRequest::GetCapsuleMetadata, AuthorityScope::Self_),
841            "self:capsule:list"
842        );
843        assert_eq!(
844            required_capability(
845                &KernelRequest::ApproveCapability {
846                    request_id: String::new(),
847                    signature: String::new(),
848                },
849                AuthorityScope::Self_
850            ),
851            "self:approval:respond"
852        );
853    }
854
855    #[test]
856    fn required_capability_mapping_global_scope() {
857        // Global scope strips the `self:` prefix from capsule operations
858        // (Layer 6 will start using this when cross-agent variants land).
859        assert_eq!(
860            required_capability(&KernelRequest::ReloadCapsules, AuthorityScope::Global),
861            "capsule:reload"
862        );
863        assert_eq!(
864            required_capability(
865                &KernelRequest::InstallCapsule {
866                    source: String::new(),
867                    workspace: false
868                },
869                AuthorityScope::Global
870            ),
871            "capsule:install"
872        );
873        assert_eq!(
874            required_capability(&KernelRequest::ListCapsules, AuthorityScope::Global),
875            "capsule:list"
876        );
877        // system:* variants are scope-invariant.
878        assert_eq!(
879            required_capability(
880                &KernelRequest::Shutdown { reason: None },
881                AuthorityScope::Global
882            ),
883            "system:shutdown"
884        );
885    }
886
887    #[test]
888    fn resolve_scope_defaults_to_self() {
889        let caller = PrincipalId::new("alice").unwrap();
890        for req in all_request_variants() {
891            assert_eq!(
892                resolve_scope(&req, &caller),
893                AuthorityScope::Self_,
894                "scope should default to Self_ for today's variants"
895            );
896        }
897    }
898
899    // ── Caller resolution ────────────────────────────────────────────
900
901    #[test]
902    fn resolve_caller_uses_ipc_principal_when_present() {
903        let mut msg = IpcMessage::new(
904            "astrid.v1.request.system",
905            IpcPayload::RawJson(serde_json::json!({})),
906            uuid::Uuid::nil(),
907        );
908        msg.principal = Some("alice".to_string());
909        let caller = resolve_caller(&msg);
910        assert_eq!(caller.as_str(), "alice");
911    }
912
913    #[test]
914    fn resolve_caller_falls_back_to_default_when_missing() {
915        let msg = IpcMessage::new(
916            "astrid.v1.request.system",
917            IpcPayload::RawJson(serde_json::json!({})),
918            uuid::Uuid::nil(),
919        );
920        let caller = resolve_caller(&msg);
921        assert_eq!(caller, PrincipalId::default());
922    }
923
924    #[test]
925    fn resolve_caller_falls_back_to_default_on_invalid_principal() {
926        let mut msg = IpcMessage::new(
927            "astrid.v1.request.system",
928            IpcPayload::RawJson(serde_json::json!({})),
929            uuid::Uuid::nil(),
930        );
931        // Invalid principal chars → PrincipalId::new fails → fall back.
932        msg.principal = Some("alice@evil.example".to_string());
933        let caller = resolve_caller(&msg);
934        assert_eq!(caller, PrincipalId::default());
935    }
936}