Skip to main content

astrid_kernel/kernel_router/admin/
mod.rs

1//! Layer 6 admin dispatcher (issue #672).
2//!
3//! Subscribes to `astrid.v1.admin.*` and routes every variant of
4//! [`AdminRequestKind`] through the same capability-enforcement
5//! preamble introduced in issue #670 (Layer 5). On allow, the mutating
6//! handlers in [`handlers`] acquire
7//! [`Kernel::admin_write_lock`](crate::Kernel::admin_write_lock) before
8//! touching `profile.toml` / `groups.toml`, then atomically replace the
9//! resolved config on the [`ArcSwap`](arc_swap::ArcSwap) backing
10//! [`Kernel::groups`](crate::Kernel::groups) and/or invalidate the
11//! matching [`PrincipalProfileCache`](astrid_capsule::profile_cache::PrincipalProfileCache)
12//! entry.
13//!
14//! # Audit trail
15//!
16//! Every admin topic — allow or deny — appends an
17//! [`AuditAction::AdminRequest`] entry. `method` is the wire name
18//! (`"admin.agent.create"`, etc.); `target_principal` is `Some` for
19//! variants that operate on another principal and `None` otherwise.
20//! `params` captures the full request payload (capabilities granted,
21//! quotas set, group definition) for forensic replay without diffing
22//! `profile.toml` snapshots.
23
24#[cfg(test)]
25mod enforcement_tests;
26mod handlers;
27mod invite_handlers;
28mod pair_device_handlers;
29mod quota;
30#[cfg(test)]
31mod state_tests;
32#[cfg(test)]
33mod state_tests_agent_modify;
34#[cfg(test)]
35mod state_tests_caps;
36#[cfg(test)]
37mod state_tests_usage;
38#[cfg(test)]
39mod tests;
40
41use std::sync::Arc;
42
43use astrid_audit::{AuditOutcome, AuthorizationProof};
44use astrid_core::principal::PrincipalId;
45use astrid_events::ipc::IpcPayload;
46use astrid_events::kernel_api::{
47    AdminKernelRequest, AdminKernelResponse, AdminRequestKind, AdminResponseBody,
48};
49use tracing::warn;
50
51use super::{
52    AdminAuditEntry, AuthorityScope, authorize_request, publish_response, record_admin_audit,
53    resolve_caller,
54};
55
56/// Admin IPC input topic prefix.
57const ADMIN_TOPIC_PREFIX: &str = "astrid.v1.admin.";
58/// Admin IPC response topic prefix (paired with [`ADMIN_TOPIC_PREFIX`]).
59const ADMIN_RESPONSE_PREFIX: &str = "astrid.v1.admin.response.";
60
61/// Spawn the admin dispatcher task. Mirrors [`super::spawn_kernel_router`]
62/// but listens on `astrid.v1.admin.*` and parses
63/// [`AdminKernelRequest`] payloads.
64pub(crate) fn spawn_admin_router(kernel: Arc<crate::Kernel>) -> tokio::task::JoinHandle<()> {
65    let mut receiver = kernel
66        .event_bus
67        .subscribe_topic_as("astrid.v1.admin.*", "admin_router");
68
69    tokio::spawn(async move {
70        while let Some(event) = receiver.recv().await {
71            let astrid_events::AstridEvent::Ipc { message, .. } = &*event else {
72                continue;
73            };
74
75            // Never loop back on our own response topic.
76            if message.topic.starts_with(ADMIN_RESPONSE_PREFIX) {
77                continue;
78            }
79
80            let IpcPayload::RawJson(val) = &message.payload else {
81                continue;
82            };
83
84            match serde_json::from_value::<AdminKernelRequest>(val.clone()) {
85                Ok(req) => {
86                    // Spawn a fresh task per request so reads
87                    // (AgentList, GroupList, QuotaGet, …) run in
88                    // parallel. Writes still serialize through
89                    // `kernel.admin_write_lock` inside the handler.
90                    // Without this, a single in-flight admin
91                    // request blocked every other admin request —
92                    // the dispatcher was the bottleneck pinning
93                    // gateway admin throughput at ~120 RPS even on
94                    // pure-read endpoints. (For an HTTP front that
95                    // hosts thousands of agents the serial loop is
96                    // unworkable.)
97                    let kernel = Arc::clone(&kernel);
98                    let topic = message.topic.clone();
99                    let caller = resolve_caller(message);
100                    tokio::spawn(async move {
101                        handle_admin_request(&kernel, topic, caller, req).await;
102                    });
103                },
104                Err(e) => {
105                    warn!(
106                        error = %e,
107                        topic = %message.topic,
108                        "Failed to parse AdminKernelRequest from IPC"
109                    );
110                },
111            }
112        }
113    })
114}
115
116/// Compute the response topic for an incoming admin request topic.
117fn admin_response_topic(input_topic: &str) -> String {
118    input_topic.strip_prefix(ADMIN_TOPIC_PREFIX).map_or_else(
119        || input_topic.to_string(),
120        |suffix| format!("{ADMIN_RESPONSE_PREFIX}{suffix}"),
121    )
122}
123
124/// Return the authority scope `req` exercises for `caller`.
125///
126/// Self-scoped when the target principal equals the caller
127/// ([`AdminRequestKind::QuotaGet`] / [`AdminRequestKind::QuotaSet`]
128/// / [`AdminRequestKind::AgentList`] — the last scoped as "self" so
129/// agents can see their own row). Everything else is cross-tenant,
130/// including creation / group operations that are intrinsically global.
131#[must_use]
132pub fn resolve_admin_scope(req: &AdminRequestKind, caller: &PrincipalId) -> AuthorityScope {
133    match req {
134        AdminRequestKind::QuotaGet { principal }
135        | AdminRequestKind::QuotaSet { principal, .. }
136        | AdminRequestKind::UsageGet { principal } => {
137            if principal == caller {
138                AuthorityScope::Self_
139            } else {
140                AuthorityScope::Global
141            }
142        },
143        // `GroupList` is read-only over system config and carries no
144        // target principal; every agent legitimately needs to read it
145        // to enumerate their own group-inherited capabilities (e.g.
146        // `caps check <self>` follows AgentList with GroupList to
147        // resolve `(group: agent)` → `self:agent:list`). Self-scoping
148        // makes the request match against `self:group:list`, which
149        // the `self:*` grant on the `agent` builtin already satisfies
150        // — without handing out the admin-tier `group:list` capability.
151        // The mutating group operations (`group create / delete /
152        // modify`) keep their own dedicated caps (`group:create`,
153        // `group:delete`, `group:modify`) and remain
154        // `AuthorityScope::Global` below, so this widening is read-only.
155        AdminRequestKind::AgentList
156        | AdminRequestKind::GroupList
157        | AdminRequestKind::PairDeviceIssue { .. } => AuthorityScope::Self_,
158        AdminRequestKind::AgentCreate { .. }
159        | AdminRequestKind::AgentDelete { .. }
160        | AdminRequestKind::AgentEnable { .. }
161        | AdminRequestKind::AgentDisable { .. }
162        | AdminRequestKind::AgentModify { .. }
163        | AdminRequestKind::GroupCreate { .. }
164        | AdminRequestKind::GroupDelete { .. }
165        | AdminRequestKind::GroupModify { .. }
166        | AdminRequestKind::CapsGrant { .. }
167        | AdminRequestKind::CapsRevoke { .. }
168        | AdminRequestKind::InviteIssue { .. }
169        | AdminRequestKind::InviteRedeem { .. }
170        | AdminRequestKind::InviteList
171        | AdminRequestKind::InviteRevoke { .. }
172        | AdminRequestKind::PairDeviceRedeem { .. } => AuthorityScope::Global,
173        // Note: PairDeviceIssue is intrinsically self-scoped — the
174        // kernel binds the token to the caller's own principal
175        // regardless of any wire-level hint. Folded into the Self_
176        // arm above with AgentList / GroupList.
177    }
178}
179
180/// Static capability string required to satisfy `req` under `scope`.
181///
182/// Pure function — the mapping can be unit-tested in isolation.
183/// Every variant has an entry; there is no default-allow arm.
184///
185/// `self:*` forms apply when the target principal is the caller
186/// themselves; admins operating on another principal need the
187/// unscoped `quota:set` / `caps:grant` forms. Group admin is always
188/// global — there is no "self" variant of `group:create`.
189#[must_use]
190pub fn required_capability_for_admin_request(
191    req: &AdminRequestKind,
192    scope: AuthorityScope,
193) -> &'static str {
194    match (req, scope) {
195        (AdminRequestKind::AgentCreate { .. }, _) => "agent:create",
196        (AdminRequestKind::AgentDelete { .. }, _) => "agent:delete",
197        (AdminRequestKind::AgentEnable { .. }, _) => "agent:enable",
198        (AdminRequestKind::AgentDisable { .. }, _) => "agent:disable",
199        (AdminRequestKind::AgentModify { .. }, _) => "agent:modify",
200        (AdminRequestKind::AgentList, AuthorityScope::Self_) => "self:agent:list",
201        (AdminRequestKind::AgentList, AuthorityScope::Global) => "agent:list",
202        (AdminRequestKind::QuotaSet { .. }, AuthorityScope::Self_) => "self:quota:set",
203        (AdminRequestKind::QuotaSet { .. }, AuthorityScope::Global) => "quota:set",
204        // Usage is a read over the same quota surface; reuse the quota:get
205        // capability so no new grant is minted (a principal that can read its
206        // quota can read its usage).
207        (
208            AdminRequestKind::QuotaGet { .. } | AdminRequestKind::UsageGet { .. },
209            AuthorityScope::Self_,
210        ) => "self:quota:get",
211        (
212            AdminRequestKind::QuotaGet { .. } | AdminRequestKind::UsageGet { .. },
213            AuthorityScope::Global,
214        ) => "quota:get",
215        (AdminRequestKind::GroupCreate { .. }, _) => "group:create",
216        (AdminRequestKind::GroupDelete { .. }, _) => "group:delete",
217        (AdminRequestKind::GroupModify { .. }, _) => "group:modify",
218        (AdminRequestKind::GroupList, AuthorityScope::Self_) => "self:group:list",
219        (AdminRequestKind::GroupList, AuthorityScope::Global) => "group:list",
220        (AdminRequestKind::CapsGrant { .. }, _) => "caps:grant",
221        (AdminRequestKind::CapsRevoke { .. }, _) => "caps:revoke",
222        (AdminRequestKind::InviteIssue { .. }, _) => "invite:issue",
223        // `InviteRedeem` is special-cased in `handle_admin_request`
224        // below — the dispatcher bypasses the capability preamble
225        // because the caller principal does not exist yet (the token
226        // IS the auth). The string returned here is unused for that
227        // variant but kept for completeness so audit records still
228        // carry a stable name. We pick `invite:redeem` rather than
229        // leaving it blank so the audit log reads cleanly.
230        (AdminRequestKind::InviteRedeem { .. }, _) => "invite:redeem",
231        (AdminRequestKind::InviteList, _) => "invite:list",
232        (AdminRequestKind::InviteRevoke { .. }, _) => "invite:revoke",
233        // PairDeviceIssue is self-scoped (kernel binds to caller).
234        (AdminRequestKind::PairDeviceIssue { .. }, _) => "self:auth:pair",
235        // PairDeviceRedeem mirrors InviteRedeem: dispatcher
236        // bypasses the cap-gate because the token IS the auth.
237        // String kept here for audit-log readability.
238        (AdminRequestKind::PairDeviceRedeem { .. }, _) => "auth:pair:redeem",
239    }
240}
241
242/// Stable wire-name identifier for an [`AdminRequestKind`] — used as
243/// the `method` field on every [`AuditAction::AdminRequest`] entry.
244#[must_use]
245pub fn admin_request_method(req: &AdminRequestKind) -> &'static str {
246    match req {
247        AdminRequestKind::AgentCreate { .. } => "admin.agent.create",
248        AdminRequestKind::AgentDelete { .. } => "admin.agent.delete",
249        AdminRequestKind::AgentEnable { .. } => "admin.agent.enable",
250        AdminRequestKind::AgentDisable { .. } => "admin.agent.disable",
251        AdminRequestKind::AgentModify { .. } => "admin.agent.modify",
252        AdminRequestKind::AgentList => "admin.agent.list",
253        AdminRequestKind::QuotaSet { .. } => "admin.quota.set",
254        AdminRequestKind::QuotaGet { .. } => "admin.quota.get",
255        AdminRequestKind::UsageGet { .. } => "admin.usage.get",
256        AdminRequestKind::GroupCreate { .. } => "admin.group.create",
257        AdminRequestKind::GroupDelete { .. } => "admin.group.delete",
258        AdminRequestKind::GroupModify { .. } => "admin.group.modify",
259        AdminRequestKind::GroupList => "admin.group.list",
260        AdminRequestKind::CapsGrant { .. } => "admin.caps.grant",
261        AdminRequestKind::CapsRevoke { .. } => "admin.caps.revoke",
262        AdminRequestKind::InviteIssue { .. } => "admin.invite.issue",
263        AdminRequestKind::InviteRedeem { .. } => "admin.invite.redeem",
264        AdminRequestKind::InviteList => "admin.invite.list",
265        AdminRequestKind::InviteRevoke { .. } => "admin.invite.revoke",
266        AdminRequestKind::PairDeviceIssue { .. } => "admin.auth.pair.issue",
267        AdminRequestKind::PairDeviceRedeem { .. } => "admin.auth.pair.redeem",
268    }
269}
270
271/// Serialise an [`AdminRequestKind`] for audit storage with sensitive
272/// fields redacted. Keeps the wire-name shape so audit consumers can
273/// still discriminate variants — only the secret-bearing fields are
274/// dropped or hashed.
275///
276/// Redactions:
277///
278/// * `InviteRedeem.public_key` → `public_key_fingerprint` (SHA-256 of
279///   the supplied key). Storing the raw ed25519 key in the audit log
280///   would double the system of record for authorization, which Layer
281///   5/6 treat as `AuthConfig.public_keys` alone.
282/// * `InviteRedeem.token` → `token_fingerprint` (`hex(sha256(token))`).
283///   The raw invite token is a secret that grants the right to mint a
284///   principal; persisting it in the audit log would let anyone with
285///   read access replay it on a multi-use invite. The fingerprint
286///   matches the on-disk hash in `invites.toml`, so an auditor can
287///   still correlate a redeem to the issued invite.
288/// * `InviteRevoke.token` → `token_fingerprint`. Same hazard as
289///   `InviteRedeem.token`: the caller can pass either the raw token or
290///   the already-fingerprinted form. Hash unconditionally when the
291///   input doesn't already look like a fingerprint (64 hex chars).
292fn sanitize_admin_audit_params(req: &AdminRequestKind) -> Option<serde_json::Value> {
293    let mut val = serde_json::to_value(req).ok()?;
294    let params = val
295        .as_object_mut()
296        .and_then(|m| m.get_mut("params"))
297        .and_then(|p| p.as_object_mut())?;
298    match req {
299        AdminRequestKind::InviteRedeem {
300            public_key, token, ..
301        } => {
302            let fp = invite_handlers::fingerprint_public_key(public_key);
303            params.remove("public_key");
304            params.insert(
305                "public_key_fingerprint".to_string(),
306                serde_json::Value::String(fp),
307            );
308            params.remove("token");
309            params.insert(
310                "token_fingerprint".to_string(),
311                serde_json::Value::String(crate::invite::hash_token(token)),
312            );
313        },
314        AdminRequestKind::InviteRevoke { token } => {
315            params.remove("token");
316            params.insert(
317                "token_fingerprint".to_string(),
318                serde_json::Value::String(fingerprint_revoke_input(token)),
319            );
320        },
321        AdminRequestKind::PairDeviceRedeem { token, public_key } => {
322            let fp = invite_handlers::fingerprint_public_key(public_key);
323            params.remove("public_key");
324            params.insert(
325                "public_key_fingerprint".to_string(),
326                serde_json::Value::String(fp),
327            );
328            params.remove("token");
329            params.insert(
330                "token_fingerprint".to_string(),
331                serde_json::Value::String(crate::pair_token::hash_token(token)),
332            );
333        },
334        _ => {},
335    }
336    Some(val)
337}
338
339/// Fingerprint helper for `InviteRevoke.token`, which can be supplied
340/// either as the raw token *or* as an already-fingerprinted 64-hex
341/// identifier (from `astrid invite list`). The audit row stores the
342/// fingerprint form unconditionally so an auditor can correlate
343/// against `invites.toml` without seeing the secret.
344fn fingerprint_revoke_input(token: &str) -> String {
345    if token.len() == 64 && token.chars().all(|c| c.is_ascii_hexdigit()) {
346        token.to_ascii_lowercase()
347    } else {
348        crate::invite::hash_token(token)
349    }
350}
351
352/// Borrow the target principal for audit purposes — `Some` only when the
353/// request operates on a principal distinct from the caller.
354#[must_use]
355pub fn admin_target_principal(req: &AdminRequestKind) -> Option<&PrincipalId> {
356    match req {
357        AdminRequestKind::AgentDelete { principal }
358        | AdminRequestKind::AgentEnable { principal }
359        | AdminRequestKind::AgentDisable { principal }
360        | AdminRequestKind::AgentModify { principal, .. }
361        | AdminRequestKind::QuotaSet { principal, .. }
362        | AdminRequestKind::QuotaGet { principal }
363        | AdminRequestKind::UsageGet { principal }
364        | AdminRequestKind::CapsGrant { principal, .. }
365        | AdminRequestKind::CapsRevoke { principal, .. } => Some(principal),
366        AdminRequestKind::AgentCreate { .. }
367        | AdminRequestKind::AgentList
368        | AdminRequestKind::GroupCreate { .. }
369        | AdminRequestKind::GroupDelete { .. }
370        | AdminRequestKind::GroupModify { .. }
371        | AdminRequestKind::GroupList
372        | AdminRequestKind::InviteIssue { .. }
373        | AdminRequestKind::InviteRedeem { .. }
374        | AdminRequestKind::InviteList
375        | AdminRequestKind::InviteRevoke { .. }
376        | AdminRequestKind::PairDeviceIssue { .. }
377        | AdminRequestKind::PairDeviceRedeem { .. } => None,
378    }
379}
380
381/// Map a redeem handler's response to the audit `(authorization, outcome)`
382/// pair. Redeems bypass the capability preamble (the token is the auth),
383/// so the outcome can only be known *after* the handler runs: a rejected
384/// token (`Error`) must record a `Denied` / `Failure` row so brute-force
385/// or forged-token attempts are visible in the audit log itself, not only
386/// in tracing; a mint records the `System` / `Success` row.
387fn redeem_audit_proof(body: &AdminResponseBody) -> (AuthorizationProof, AuditOutcome) {
388    match body {
389        AdminResponseBody::Error(reason) => (
390            AuthorizationProof::Denied {
391                reason: reason.clone(),
392            },
393            AuditOutcome::failure(reason.clone()),
394        ),
395        _ => (
396            AuthorizationProof::System {
397                reason: "redeem (invite or pair-device): token is the auth".to_string(),
398            },
399            AuditOutcome::success(),
400        ),
401    }
402}
403
404async fn handle_admin_request(
405    kernel: &Arc<crate::Kernel>,
406    topic: String,
407    caller: PrincipalId,
408    req: AdminKernelRequest,
409) {
410    let response_topic = admin_response_topic(&topic);
411    let request_id = req.request_id.clone();
412    let method = admin_request_method(&req.kind);
413    let scope = resolve_admin_scope(&req.kind, &caller);
414    let required_cap = required_capability_for_admin_request(&req.kind, scope);
415    let target = admin_target_principal(&req.kind).cloned();
416    // Capture the params field for the audit entry — clients submitting
417    // malformed JSON never reach this point, so serialization is
418    // infallible for shapes we accept. We strip the `public_key` field
419    // out of `InviteRedeem` payloads before storing because the audit
420    // shouldn't permanently embed an ed25519 key that a verifier might
421    // later mistake for a system-of-record entry — the canonical copy
422    // lives on `AuthConfig.public_keys`.
423    let audit_params = sanitize_admin_audit_params(&req.kind);
424
425    // `InviteRedeem` / `PairDeviceRedeem` are the two variants that
426    // bypass the capability preamble: the redeemer's principal does not
427    // exist yet at the moment the request arrives — the token IS the
428    // auth. The handler verifies the token internally and either mints a
429    // principal (success) or rejects it (invalid / expired / consumed /
430    // forged token, or an internal store error).
431    //
432    // Dispatch FIRST, then audit with the REAL outcome. Stamping
433    // `success` before dispatch (as this used to) meant a rejected token
434    // still wrote a success row, so brute-force / forged-token attempts
435    // were invisible in the audit log itself — only in tracing.
436    // Recording after dispatch makes the row's outcome match what
437    // actually happened: this is the "allow OR deny" the comment always
438    // promised. The handler still emits its own `security_event` warn on
439    // rejection; this adds the missing audit-store signal so the
440    // security team can detect token brute-forcing from audit rows alone.
441    if matches!(
442        req.kind,
443        AdminRequestKind::InviteRedeem { .. } | AdminRequestKind::PairDeviceRedeem { .. }
444    ) {
445        let body = handlers::dispatch(kernel, &caller, req.kind).await;
446        let (authorization, outcome) = redeem_audit_proof(&body);
447        record_admin_audit(
448            kernel,
449            AdminAuditEntry {
450                caller: &caller,
451                method,
452                required_cap,
453                target_principal: None,
454                params: audit_params,
455                authorization,
456                outcome,
457            },
458        );
459        publish_response(
460            kernel,
461            response_topic,
462            AdminKernelResponse::for_request(request_id, body),
463        );
464        return;
465    }
466
467    match authorize_request(kernel, &caller, required_cap) {
468        Ok(()) => {
469            record_admin_audit(
470                kernel,
471                AdminAuditEntry {
472                    caller: &caller,
473                    method,
474                    required_cap,
475                    target_principal: target.clone(),
476                    params: audit_params.clone(),
477                    authorization: AuthorizationProof::System {
478                        reason: format!("policy allow: {caller} holds {required_cap}"),
479                    },
480                    outcome: AuditOutcome::success(),
481                },
482            );
483        },
484        Err(e) => {
485            warn!(
486                security_event = true,
487                method = method,
488                principal = %caller,
489                required = required_cap,
490                error = %e,
491                "Permission check denied admin request"
492            );
493            record_admin_audit(
494                kernel,
495                AdminAuditEntry {
496                    caller: &caller,
497                    method,
498                    required_cap,
499                    target_principal: target,
500                    params: audit_params,
501                    authorization: AuthorizationProof::Denied {
502                        reason: e.to_string(),
503                    },
504                    outcome: AuditOutcome::failure(e.to_string()),
505                },
506            );
507            publish_response(
508                kernel,
509                response_topic,
510                AdminKernelResponse::for_request(
511                    request_id,
512                    AdminResponseBody::Error(e.to_string()),
513                ),
514            );
515            return;
516        },
517    }
518
519    let body = handlers::dispatch(kernel, &caller, req.kind).await;
520    publish_response(
521        kernel,
522        response_topic,
523        AdminKernelResponse::for_request(request_id, body),
524    );
525}