Skip to main content

astrid_kernel/
lib.rs

1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7//! Astrid Kernel - The core execution engine and IPC router.
8//!
9//! The Kernel is a pure, decentralized WASM runner. It contains no business
10//! logic, no cognitive loops, and no network servers. Its sole responsibility
11//! is to instantiate `astrid_events::EventBus`, load `.capsule` files into
12//! the Extism sandbox, and route IPC bytes between them.
13
14/// The Management API router listening to the `EventBus`.
15pub mod kernel_router;
16/// The Unix Domain Socket manager.
17pub mod socket;
18
19use arc_swap::ArcSwap;
20use astrid_audit::AuditLog;
21use astrid_capabilities::{CapabilityStore, DirHandle};
22use astrid_capsule::profile_cache::PrincipalProfileCache;
23use astrid_capsule::registry::CapsuleRegistry;
24use astrid_core::SessionId;
25use astrid_core::groups::GroupConfig;
26use astrid_core::principal::PrincipalId;
27use astrid_crypto::KeyPair;
28use astrid_events::EventBus;
29use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
30use astrid_vfs::{HostVfs, OverlayVfsRegistry, Vfs};
31use dashmap::DashMap;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use tokio::sync::{Mutex, RwLock};
36
37/// The core Operating System Kernel.
38pub struct Kernel {
39    /// The unique identifier for this kernel session.
40    pub session_id: SessionId,
41    /// The global IPC message bus.
42    pub event_bus: Arc<EventBus>,
43    /// The process manager (loaded WASM capsules).
44    pub capsules: Arc<RwLock<CapsuleRegistry>>,
45    /// The secure MCP client with capability-based authorization and audit logging.
46    pub mcp: SecureMcpClient,
47    /// The capability store for this session.
48    pub capabilities: Arc<CapabilityStore>,
49    /// The global Virtual File System mount.
50    ///
51    /// Points at the unmodified workspace (no overlay). Principal-scoped
52    /// overlays live in [`overlay_registry`](Self::overlay_registry) — this
53    /// field is kept for kernel-internal paths that do not know a principal
54    /// (discovery, capsule load scan).
55    pub vfs: Arc<dyn Vfs>,
56    /// Per-principal overlay registry (Layer 4, issue #668).
57    ///
58    /// Each invoking principal resolves their own
59    /// [`OverlayVfs`](astrid_vfs::OverlayVfs) from this registry on first
60    /// use — lower layer is the shared workspace, upper layer is a
61    /// principal-private tempdir. Agent A's uncommitted writes are never
62    /// visible to Agent B.
63    pub overlay_registry: Arc<OverlayVfsRegistry>,
64    /// The global physical root handle (cap-std) for the VFS.
65    pub vfs_root_handle: DirHandle,
66    /// The physical path the VFS is mounted to.
67    pub workspace_root: PathBuf,
68    /// The principal home resources directory (`~/.astrid/home/{principal}/`).
69    /// Capsules declaring `fs_read = ["home://"]` can read files under this
70    /// root. Scoped to the principal's home so that keys, databases, and
71    /// system config in `~/.astrid/` are NOT accessible.
72    ///
73    /// Always `Some` in production (boot requires `AstridHome`). Remains
74    /// `Option` for compatibility with `CapsuleContext` and test fixtures.
75    pub home_root: Option<PathBuf>,
76    /// The natively bound Unix Socket for the CLI proxy.
77    pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
78    /// Shared KV store backing all capsule-scoped stores and kernel state.
79    pub kv: Arc<astrid_storage::SurrealKvStore>,
80    /// Chain-linked cryptographic audit log with persistent storage.
81    pub audit_log: Arc<AuditLog>,
82    /// Per-principal active connection counters (Layer 4, issue #668).
83    ///
84    /// Keyed by [`PrincipalId`]. When a principal's counter hits zero the
85    /// kernel clears that principal's session allowances only — other
86    /// principals' state is untouched. Ephemeral shutdown still waits on
87    /// the global sum via [`total_connection_count`](Self::total_connection_count).
88    active_connections: DashMap<PrincipalId, AtomicUsize>,
89    /// Ephemeral mode: shut down immediately when the last client disconnects.
90    pub ephemeral: AtomicBool,
91    /// Instant when the kernel was booted (for uptime calculation).
92    pub boot_time: std::time::Instant,
93    /// Sender for the API-initiated shutdown signal. The daemon's main loop
94    /// selects on the receiver to exit gracefully without `process::exit`.
95    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
96    /// Session token for socket authentication. Generated at boot, written to
97    /// `~/.astrid/run/system.token`. CLI sends this as its first message.
98    pub session_token: Arc<astrid_core::session_token::SessionToken>,
99    /// Path where the session token was written at boot. Stored so shutdown
100    /// uses the exact same path (avoids fallback mismatch if env changes).
101    token_path: PathBuf,
102    /// Shared allowance store for capsule-level approval decisions.
103    ///
104    /// Capsules can check existing allowances and create new ones when
105    /// users approve actions with session/always scope.
106    pub allowance_store: Arc<astrid_approval::AllowanceStore>,
107    /// System-wide identity store for platform user resolution.
108    identity_store: Arc<dyn astrid_storage::IdentityStore>,
109    /// System-wide per-principal profile cache (Layer 3 quota enforcement).
110    ///
111    /// One instance per kernel boot. Every capsule load plumbs this into
112    /// [`CapsuleContext::with_profile_cache`](astrid_capsule::context::CapsuleContext::with_profile_cache),
113    /// where [`WasmEngine`](astrid_capsule::engine::wasm::WasmEngine) consumes
114    /// it to apply per-invocation memory / timeout / IPC / process caps.
115    /// Invalidation model: kernel restart. Layer 6 will add explicit
116    /// management IPC to clear entries at runtime (issue #666 tracks that
117    /// follow-up).
118    pub(crate) profile_cache: Arc<PrincipalProfileCache>,
119    /// Static group-to-capability configuration (issue #670), made
120    /// hot-reloadable in Layer 6 (issue #672).
121    ///
122    /// Loaded once at boot from `$ASTRID_HOME/etc/groups.toml`. The
123    /// enforcement preamble in [`kernel_router::handle_request`] /
124    /// `handle_admin_request` calls `groups.load_full()` on each request
125    /// — a lock-free `Arc` clone. Group admin topics
126    /// (`astrid.v1.admin.group.*`) rewrite `groups.toml` and then
127    /// `groups.store(Arc::new(new_config))` atomically; in-flight checks
128    /// holding the old `Arc` finish under the old config, the next check
129    /// sees the new one.
130    pub(crate) groups: Arc<ArcSwap<GroupConfig>>,
131    /// Home directory captured at boot — retained for the admin write
132    /// path (`groups.toml`, per-principal `profile.toml`) so handlers
133    /// don't re-resolve `$ASTRID_HOME` and risk a mid-life drift.
134    pub(crate) astrid_home: astrid_core::dirs::AstridHome,
135    /// Serializes mutating admin topics on `profile.toml` / `groups.toml`.
136    ///
137    /// Read-only admin topics (`agent.list`, `group.list`, `quota.get`)
138    /// and the hot authz path do NOT take this lock — the `ArcSwap` on
139    /// [`Kernel::groups`] and the `RwLock` on
140    /// [`PrincipalProfileCache`](astrid_capsule::profile_cache::PrincipalProfileCache)
141    /// cover reads. Tokio's `Mutex` is not poisonable — no
142    /// `PoisonError::into_inner` dance required.
143    pub(crate) admin_write_lock: Mutex<()>,
144}
145
146impl Kernel {
147    /// Boot a new Kernel instance mounted at the specified directory.
148    ///
149    /// # Panics
150    ///
151    /// Panics if called on a single-threaded tokio runtime. The capsule
152    /// system uses `block_in_place` which requires a multi-threaded runtime.
153    ///
154    /// # Errors
155    ///
156    /// Returns an error if the VFS mount paths cannot be registered.
157    #[expect(
158        clippy::too_many_lines,
159        reason = "boot sequence: sequential setup that does not benefit from splitting"
160    )]
161    pub async fn new(
162        session_id: SessionId,
163        workspace_root: PathBuf,
164    ) -> Result<Arc<Self>, std::io::Error> {
165        use astrid_core::dirs::AstridHome;
166
167        assert!(
168            tokio::runtime::Handle::current().runtime_flavor()
169                == tokio::runtime::RuntimeFlavor::MultiThread,
170            "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
171             single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
172        );
173
174        let event_bus = Arc::new(EventBus::new());
175        let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
176
177        // Resolve the Astrid home directory. Required for persistent KV store
178        // and audit log. Fails boot if neither $ASTRID_HOME nor $HOME is set.
179        let home = AstridHome::resolve().map_err(|e| {
180            std::io::Error::other(format!(
181                "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
182            ))
183        })?;
184
185        // Resolve the home directory for the `home://` VFS scheme.
186        // Points to `~/.astrid/home/{principal}/` — NOT the full `~/.astrid/`
187        // root — so capsules cannot access keys, databases, or config.
188        let default_principal = astrid_core::PrincipalId::default();
189        let principal_home = home.principal_home(&default_principal);
190        let home_root = Some(principal_home.root().to_path_buf());
191
192        // 1. Open the persistent KV store (needed by capability store below).
193        let kv_path = home.state_db_path();
194        let kv = Arc::new(
195            astrid_storage::SurrealKvStore::open(&kv_path)
196                .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
197        );
198        // TODO: clear ephemeral keys (e: prefix) on boot when the key
199        // lifecycle tier convention is established.
200
201        // 2. Initialize MCP process manager with security layer.
202        //    Set workspace_root so sandboxed MCP servers have a writable directory.
203        let mcp_config = ServersConfig::load_default().unwrap_or_default();
204        let mcp_manager = ServerManager::new(mcp_config)
205            .with_workspace_root(workspace_root.clone())
206            .with_capsule_log_dir(principal_home.log_dir());
207        let mcp_client = McpClient::new(mcp_manager);
208
209        // 3. Bootstrap capability store (persistent) and audit log.
210        //    Key rotation invalidates persisted tokens (fail-secure by design).
211        let capabilities = Arc::new(
212            CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
213                .map_err(|e| {
214                    std::io::Error::other(format!("Failed to init capability store: {e}"))
215                })?,
216        );
217        let audit_log = open_audit_log()?;
218        let mcp = SecureMcpClient::new(
219            mcp_client,
220            Arc::clone(&capabilities),
221            Arc::clone(&audit_log),
222            session_id.clone(),
223        );
224
225        // 4. Establish the physical security boundary (sandbox handle)
226        let root_handle = DirHandle::new();
227
228        // 5. Principal-scoped overlay registry: each invoking principal
229        //    gets a fresh OverlayVfs on first use (Layer 4, issue #668).
230        //    The kernel-internal `vfs` field keeps pointing at a plain
231        //    HostVfs over the workspace for paths that don't yet know a
232        //    principal (discovery, capsule load scan).
233        let kernel_host_vfs = HostVfs::new();
234        kernel_host_vfs
235            .register_dir(root_handle.clone(), workspace_root.clone())
236            .await
237            .map_err(|_| std::io::Error::other("Failed to register kernel workspace vfs"))?;
238        let overlay_registry = Arc::new(OverlayVfsRegistry::new(
239            workspace_root.clone(),
240            root_handle.clone(),
241        ));
242
243        // 6. Bind the secure Unix socket and generate session token.
244        // The socket is bound here, but not yet listened on. The token is
245        // generated before any capsule can accept connections, preventing
246        // a race where a client connects before the token file exists.
247        let listener = socket::bind_session_socket()?;
248        let (session_token, token_path) = socket::generate_session_token()?;
249
250        let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
251        // Create system-wide identity store backed by the shared KV.
252        let identity_kv = astrid_storage::ScopedKvStore::new(
253            Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
254            "system:identity",
255        )
256        .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
257        let identity_store: Arc<dyn astrid_storage::IdentityStore> =
258            Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
259
260        // Load group config (issue #670). Boot-loaded once, then swapped
261        // atomically by Layer 6 admin topics (issue #672). Missing file
262        // → built-ins only; malformed TOML is a hard boot failure
263        // (fail-closed).
264        let groups_loaded = GroupConfig::load(&home)
265            .map_err(|e| std::io::Error::other(format!("Failed to load groups config: {e}")))?;
266        let groups = Arc::new(ArcSwap::from_pointee(groups_loaded));
267
268        // Bootstrap the CLI root user (idempotent). Also seeds the
269        // default principal's profile with `groups = ["admin"]` so
270        // single-tenant deployments get full management-API access.
271        bootstrap_cli_root_user(&identity_store, &home)
272            .await
273            .map_err(|e| {
274                std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
275            })?;
276
277        // Apply pre-configured identity links from config.
278        apply_identity_config(&identity_store, &workspace_root).await;
279
280        let kernel = Arc::new(Self {
281            session_id,
282            event_bus,
283            capsules,
284            mcp,
285            capabilities,
286            vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
287            overlay_registry,
288            vfs_root_handle: root_handle,
289            workspace_root,
290            home_root,
291            cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
292            kv,
293            audit_log,
294            active_connections: DashMap::new(),
295            ephemeral: AtomicBool::new(false),
296            boot_time: std::time::Instant::now(),
297            shutdown_tx: tokio::sync::watch::channel(false).0,
298            session_token: Arc::new(session_token),
299            token_path,
300            allowance_store,
301            identity_store,
302            profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
303            groups,
304            astrid_home: home,
305            admin_write_lock: Mutex::new(()),
306        });
307
308        drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
309        drop(spawn_idle_monitor(Arc::clone(&kernel)));
310        drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
311        drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
312
313        // Spawn the event dispatcher — routes EventBus events to capsule interceptors.
314        // Wire the identity store so auto-provisioning is gated.
315        let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
316            Arc::clone(&kernel.capsules),
317            Arc::clone(&kernel.event_bus),
318        )
319        .with_identity_store(Arc::clone(&kernel.identity_store));
320        tokio::spawn(dispatcher.run());
321
322        debug_assert_eq!(
323            kernel.event_bus.subscriber_count(),
324            INTERNAL_SUBSCRIBER_COUNT,
325            "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
326        );
327
328        Ok(kernel)
329    }
330
331    /// Load a capsule into the Kernel from a directory containing a Capsule.toml
332    ///
333    /// # Errors
334    ///
335    /// Returns an error if the manifest cannot be loaded, the capsule cannot be created, or registration fails.
336    async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
337        let manifest_path = dir.join("Capsule.toml");
338        let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
339            .map_err(|e| anyhow::anyhow!(e))?;
340
341        // Skip if already registered (prevents double-load from overlapping
342        // discovery paths like principal home + workspace capsules).
343        {
344            let registry = self.capsules.read().await;
345            let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
346            if registry.get(&id).is_some() {
347                return Ok(());
348            }
349        }
350
351        let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
352        let mut capsule = loader.create_capsule(manifest, dir.clone())?;
353
354        // Build the context — use the shared kernel KV so capsules can
355        // communicate state through overlapping KV namespaces.
356        let principal = astrid_core::PrincipalId::default();
357        let kv = astrid_storage::ScopedKvStore::new(
358            Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
359            format!("{principal}:capsule:{}", capsule.id()),
360        )?;
361
362        // Pre-load env config into the KV store.
363        // Check principal config first, fall back to capsule dir's .env.json.
364        let capsule_name = capsule.id().to_string();
365        let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
366            let ph = home.principal_home(&principal);
367            let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
368            if principal_env.exists() {
369                principal_env
370            } else {
371                dir.join(".env.json")
372            }
373        } else {
374            dir.join(".env.json")
375        };
376        if env_path.exists()
377            && let Ok(contents) = std::fs::read_to_string(&env_path)
378            && let Ok(env_map) =
379                serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
380        {
381            for (k, v) in env_map {
382                let _ = kv.set(&k, v.into_bytes()).await;
383            }
384        }
385
386        let ctx = astrid_capsule::context::CapsuleContext::new(
387            principal.clone(),
388            self.workspace_root.clone(),
389            self.home_root.clone(),
390            kv,
391            Arc::clone(&self.event_bus),
392            self.cli_socket_listener.clone(),
393        )
394        .with_registry(Arc::clone(&self.capsules))
395        .with_session_token(Arc::clone(&self.session_token))
396        .with_allowance_store(Arc::clone(&self.allowance_store))
397        .with_identity_store(Arc::clone(&self.identity_store))
398        .with_profile_cache(Arc::clone(&self.profile_cache))
399        .with_overlay_registry(Arc::clone(&self.overlay_registry));
400
401        capsule.load(&ctx).await?;
402
403        let mut registry = self.capsules.write().await;
404        registry
405            .register(capsule)
406            .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
407
408        Ok(())
409    }
410
411    /// Restart a capsule by unloading it and re-loading from its source directory.
412    ///
413    /// # Errors
414    ///
415    /// Returns an error if the capsule has no source directory, cannot be
416    /// unregistered, or fails to reload.
417    async fn restart_capsule(
418        &self,
419        id: &astrid_capsule::capsule::CapsuleId,
420    ) -> Result<(), anyhow::Error> {
421        // Get source directory before unregistering.
422        let source_dir = {
423            let registry = self.capsules.read().await;
424            let capsule = registry
425                .get(id)
426                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
427            capsule
428                .source_dir()
429                .map(std::path::Path::to_path_buf)
430                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
431        };
432
433        // Unregister and explicitly unload. There is no Drop impl that
434        // calls unload() (it's async), so we must do it here to avoid
435        // leaking MCP subprocesses and other engine resources.
436        let old_capsule = {
437            let mut registry = self.capsules.write().await;
438            registry
439                .unregister(id)
440                .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
441        };
442        // Explicitly unload the old capsule. There is no Drop impl that
443        // calls unload() (it's async), so we must do it here to avoid
444        // leaking MCP subprocesses and other engine resources.
445        // Arc::get_mut requires exclusive ownership (strong_count == 1).
446        {
447            let mut old = old_capsule;
448            if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
449                if let Err(e) = capsule.unload().await {
450                    tracing::warn!(
451                        capsule_id = %id,
452                        error = %e,
453                        "Capsule unload failed during restart"
454                    );
455                }
456            } else {
457                tracing::warn!(
458                    capsule_id = %id,
459                    "Cannot call unload during restart - Arc still held by in-flight task"
460                );
461            }
462        }
463
464        // Re-load from disk.
465        self.load_capsule(source_dir).await?;
466
467        // Signal the newly loaded capsule to clean up ephemeral state
468        // from the previous incarnation. Capsules that don't implement
469        // `handle_lifecycle_restart` will return an error, which is fine.
470        //
471        // Clone the capsule Arc under a brief read lock, then drop the
472        // guard before invoke_interceptor which calls block_in_place.
473        // Holding the RwLock across block_in_place parks the worker thread
474        // and starves registry writers (health monitor, capsule loading).
475        let capsule = {
476            let registry = self.capsules.read().await;
477            registry.get(id)
478        };
479        if let Some(capsule) = capsule
480            && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[], None)
481        {
482            tracing::debug!(
483                capsule_id = %id,
484                error = %e,
485                "Capsule does not handle lifecycle restart (optional)"
486            );
487        }
488
489        Ok(())
490    }
491
492    /// Auto-discover and load all capsules from the standard directories (`~/.astrid/capsules` and `.astrid/capsules`).
493    ///
494    /// Capsules are loaded in dependency order (topological sort) with
495    /// uplink/daemon capsules loaded first. Each uplink must signal
496    /// readiness before non-uplink capsules are loaded.
497    ///
498    /// After all capsules are loaded, tool schemas are injected into every
499    /// capsule's KV namespace and the `astrid.v1.capsules_loaded` event is published.
500    pub async fn load_all_capsules(&self) {
501        use astrid_capsule::toposort::toposort_manifests;
502        use astrid_core::dirs::AstridHome;
503
504        // Discovery paths in priority order: principal > workspace.
505        let mut paths = Vec::new();
506        if let Ok(home) = AstridHome::resolve() {
507            let principal = astrid_core::PrincipalId::default();
508            paths.push(home.principal_home(&principal).capsules_dir());
509        }
510
511        let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
512
513        // Topological sort ALL capsules together so cross-partition
514        // requirements (e.g. a non-uplink requiring an uplink's capability)
515        // resolve correctly without spurious "not provided" warnings.
516        let sorted = match toposort_manifests(discovered) {
517            Ok(sorted) => sorted,
518            Err((e, original)) => {
519                tracing::error!(
520                    cycle = %e,
521                    "Dependency cycle in capsules, falling back to discovery order"
522                );
523                original
524            },
525        };
526
527        // Defence-in-depth: manifest validation in discovery.rs rejects
528        // uplinks with [imports], but warn here in case a manifest bypasses
529        // the normal load path.
530        for (manifest, _) in &sorted {
531            if manifest.capabilities.uplink && manifest.has_imports() {
532                tracing::warn!(
533                    capsule = %manifest.package.name,
534                    "Uplink capsule has [imports] - \
535                     this should have been rejected at manifest load time"
536                );
537            }
538        }
539
540        // Validate imports/exports: every required import must have a matching export.
541        validate_imports_exports(&sorted);
542
543        // Partition after sorting: uplinks first, then the rest.
544        // The relative order within each partition is preserved from the
545        // toposort, so dependency edges are still respected. Cross-partition
546        // edges (non-uplink requiring an uplink) are satisfied by construction
547        // since all uplinks load first. The inverse (uplink requiring a
548        // non-uplink) is rejected above.
549        let (uplinks, others): (Vec<_>, Vec<_>) =
550            sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
551
552        // Load uplinks first so their event bus subscriptions are ready.
553        let uplink_names: Vec<String> = uplinks
554            .iter()
555            .map(|(m, _)| m.package.name.clone())
556            .collect();
557        for (manifest, dir) in &uplinks {
558            if let Err(e) = self.load_capsule(dir.clone()).await {
559                tracing::warn!(
560                    capsule = %manifest.package.name,
561                    error = %e,
562                    "Failed to load uplink capsule during discovery"
563                );
564            }
565        }
566
567        // Wait for uplink capsules to signal readiness before loading
568        // non-uplink capsules. This ensures IPC subscriptions are active.
569        self.await_capsule_readiness(&uplink_names).await;
570
571        for (manifest, dir) in &others {
572            if let Err(e) = self.load_capsule(dir.clone()).await {
573                tracing::warn!(
574                    capsule = %manifest.package.name,
575                    error = %e,
576                    "Failed to load capsule during discovery"
577                );
578            }
579        }
580
581        // Wait for non-uplink run-loop capsules too, so any future
582        // dependency edges between them are respected.
583        let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
584        self.await_capsule_readiness(&other_names).await;
585
586        // Signal that all capsules have been loaded so uplink capsules
587        // (like the registry) can proceed with discovery instead of
588        // polling with arbitrary timeouts.
589        let msg = astrid_events::ipc::IpcMessage::new(
590            "astrid.v1.capsules_loaded",
591            astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
592            self.session_id.0,
593        );
594        let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
595            metadata: astrid_events::EventMetadata::new("kernel"),
596            message: msg,
597        });
598    }
599
600    /// Record that a new client connection for `principal` has been established.
601    pub fn connection_opened(&self, principal: &PrincipalId) {
602        self.active_connections
603            .entry(principal.clone())
604            .or_insert_with(|| AtomicUsize::new(0))
605            .fetch_add(1, Ordering::Relaxed);
606    }
607
608    /// Record that a client connection for `principal` has been closed.
609    ///
610    /// Uses `fetch_update` for atomic saturating decrement - avoids the
611    /// TOCTOU window where `fetch_sub` wraps to `usize::MAX` before a
612    /// corrective store.
613    ///
614    /// When *this* principal's counter reaches zero, clears only that
615    /// principal's session-scoped allowances — other principals' state is
616    /// untouched. The global ephemeral-shutdown path remains gated on the
617    /// sum across every principal (see
618    /// [`total_connection_count`](Self::total_connection_count)).
619    pub fn connection_closed(&self, principal: &PrincipalId) {
620        // Hold the DashMap entry guard across the decrement AND the
621        // session-scoped clears. While we hold the guard any concurrent
622        // `connection_opened(principal)` on the same key blocks on the
623        // shard lock, so its new session allowances cannot be born and
624        // then nuked by the tail-end cleanup here (pre-Layer-4 bug
625        // surfaced more narrowly under per-principal scoping).
626        //
627        // The downstream stores do not re-enter `active_connections`, so
628        // holding this guard while calling into them cannot deadlock.
629        let entry = self
630            .active_connections
631            .entry(principal.clone())
632            .or_insert_with(|| AtomicUsize::new(0));
633        let result = entry.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
634            if n == 0 {
635                None
636            } else {
637                Some(n.saturating_sub(1))
638            }
639        });
640
641        if result == Ok(1) {
642            self.allowance_store.clear_session_allowances(principal);
643            if let Err(e) = self.capabilities.clear_session_for(principal) {
644                tracing::warn!(%principal, error = %e, "failed to clear capability session");
645            }
646            tracing::info!(
647                %principal,
648                "last connection for principal disconnected, session state cleared"
649            );
650        }
651        // Release the shard lock before touching the map again — `remove_if`
652        // re-acquires it.
653        drop(entry);
654
655        if result == Ok(1) {
656            self.active_connections
657                .remove_if(principal, |_, count| count.load(Ordering::Relaxed) == 0);
658        }
659    }
660
661    /// Enable or disable ephemeral mode (immediate shutdown on last disconnect).
662    pub fn set_ephemeral(&self, val: bool) {
663        self.ephemeral.store(val, Ordering::Relaxed);
664    }
665
666    /// Total number of active client connections across all principals.
667    ///
668    /// Used by the ephemeral-shutdown gate: the kernel shuts down only
669    /// when *every* principal's counter has reached zero.
670    pub fn total_connection_count(&self) -> usize {
671        self.active_connections
672            .iter()
673            .map(|e| e.value().load(Ordering::Relaxed))
674            .sum()
675    }
676
677    /// Snapshot of `(principal, count)` for every principal with a
678    /// non-zero active connection. The `astrid who` admin surface
679    /// reads this to attribute connections to specific agents
680    /// instead of fabricating a `default`-only row from the bare
681    /// total.
682    ///
683    /// Not a hot-path call site — taken at status-RPC time. Iterating
684    /// the `DashMap` snapshots the shard guards individually, so the
685    /// total may not be perfectly consistent with a concurrent
686    /// connect/disconnect, but each entry is internally consistent
687    /// and the operator-facing accuracy bound (a flickering one-off
688    /// count) is acceptable.
689    pub fn connections_by_principal(&self) -> Vec<(PrincipalId, usize)> {
690        self.active_connections
691            .iter()
692            .filter_map(|e| {
693                let count = e.value().load(Ordering::Relaxed);
694                if count == 0 {
695                    None
696                } else {
697                    Some((e.key().clone(), count))
698                }
699            })
700            .collect()
701    }
702
703    /// Gracefully shut down the kernel.
704    ///
705    /// 1. Publish `KernelShutdown` event on the bus.
706    /// 2. Drain and unload all capsules (stops MCP child processes, WASM engines).
707    /// 3. Flush and close the persistent KV store.
708    /// 4. Remove the Unix socket file.
709    pub async fn shutdown(&self, reason: Option<String>) {
710        tracing::info!(reason = ?reason, "Kernel shutting down");
711
712        // 1. Notify all subscribers so capsules can react.
713        let _ = self
714            .event_bus
715            .publish(astrid_events::AstridEvent::KernelShutdown {
716                metadata: astrid_events::EventMetadata::new("kernel"),
717                reason: reason.clone(),
718            });
719
720        // Clear every principal's session-only state in one sweep. Belt-
721        // and-suspenders for a process that is exiting anyway, but load-
722        // bearing the moment session allowances are ever persisted
723        // (Layer 7) — without this call a persisted-allowance layer would
724        // inherit stale per-session grants from the previous process.
725        self.allowance_store.clear_all_session_allowances();
726        if let Err(e) = self.capabilities.clear_session() {
727            tracing::warn!(error = %e, "failed to clear capability session on shutdown");
728        }
729
730        // 2. Drain the registry so the dispatcher cannot hand out new Arc clones,
731        // then unload each capsule. MCP engine unload is critical - it calls
732        // `mcp_client.disconnect()` to gracefully terminate child processes.
733        // Without explicit unload, MCP child processes become orphaned.
734        //
735        // The `EventDispatcher` temporarily clones `Arc<dyn Capsule>` into
736        // spawned interceptor tasks. After draining, no new clones can be
737        // created, but in-flight tasks may still hold references. We retry
738        // `Arc::get_mut` with brief yields to let them complete.
739        let capsules = {
740            let mut reg = self.capsules.write().await;
741            reg.drain()
742        };
743        for mut arc in capsules {
744            let id = arc.id().clone();
745            let mut unloaded = false;
746
747            for retry in 0..20_u32 {
748                if let Some(capsule) = Arc::get_mut(&mut arc) {
749                    if let Err(e) = capsule.unload().await {
750                        tracing::warn!(
751                            capsule_id = %id,
752                            error = %e,
753                            "Failed to unload capsule during shutdown"
754                        );
755                    }
756                    unloaded = true;
757                    break;
758                }
759                if retry < 19 {
760                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
761                }
762            }
763
764            if !unloaded {
765                tracing::warn!(
766                    capsule_id = %id,
767                    strong_count = Arc::strong_count(&arc),
768                    "Dropping capsule without explicit unload after retries exhausted; \
769                     MCP child processes may be orphaned"
770                );
771            }
772            drop(arc);
773        }
774
775        // 3. Flush the persistent KV store.
776        if let Err(e) = self.kv.close().await {
777            tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
778        }
779
780        // 4. Remove the socket and token files so stale-socket detection works
781        // on next boot and the auth token doesn't persist on disk after shutdown.
782        // This runs AFTER capsule unload, which is the correct order: MCP child
783        // processes communicate via stdio pipes (not this Unix socket), so they
784        // are already terminated by step 2. The socket is only used for
785        // CLI-to-kernel IPC.
786        let socket_path = crate::socket::kernel_socket_path();
787        let _ = std::fs::remove_file(&socket_path);
788        let _ = std::fs::remove_file(&self.token_path);
789        crate::socket::remove_readiness_file();
790
791        tracing::info!("Kernel shutdown complete");
792    }
793
794    /// Wait for a set of capsules to signal readiness, in parallel.
795    ///
796    /// Collects `Arc<dyn Capsule>` handles under a short-lived read lock,
797    /// then drops the lock before awaiting. Capsules without a run loop
798    /// return `Ready` immediately and don't contribute to wait time.
799    async fn await_capsule_readiness(&self, names: &[String]) {
800        use astrid_capsule::capsule::ReadyStatus;
801
802        if names.is_empty() {
803            return;
804        }
805
806        let timeout = std::time::Duration::from_millis(500);
807        let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
808            let registry = self.capsules.read().await;
809            names
810                .iter()
811                .filter_map(
812                    |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
813                        Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
814                        Err(e) => {
815                            tracing::warn!(
816                                capsule = %name,
817                                error = %e,
818                                "Invalid capsule ID, skipping readiness wait"
819                            );
820                            None
821                        },
822                    },
823                )
824                .collect()
825        };
826
827        // Await all capsules concurrently - independent capsules shouldn't
828        // compound each other's timeout.
829        let mut set = tokio::task::JoinSet::new();
830        for (name, capsule) in capsules {
831            set.spawn(async move {
832                let status = capsule.wait_ready(timeout).await;
833                (name, status)
834            });
835        }
836        while let Some(result) = set.join_next().await {
837            if let Ok((name, status)) = result {
838                match status {
839                    ReadyStatus::Ready => {},
840                    ReadyStatus::Timeout => {
841                        tracing::warn!(
842                            capsule = %name,
843                            timeout_ms = timeout.as_millis(),
844                            "Capsule did not signal ready within timeout"
845                        );
846                    },
847                    ReadyStatus::Crashed => {
848                        tracing::error!(
849                            capsule = %name,
850                            "Capsule run loop exited before signaling ready"
851                        );
852                    },
853                }
854            }
855        }
856    }
857}
858
859/// Test-only lightweight constructor (issue #672) that builds a
860/// [`Kernel`] with just the fields the admin handlers touch:
861/// `event_bus`, `session_id`, `audit_log`, `profile_cache`,
862/// `identity_store`, `groups`, `astrid_home`, `admin_write_lock`, plus
863/// the shared allowance / capability / kv store handles. Skips the
864/// heavy boot bits (socket bind, MCP init, token generation, capsule
865/// discovery) that aren't load-bearing for admin-topic tests.
866///
867/// The `home` argument is used verbatim — tests pass a tempdir-rooted
868/// [`astrid_core::dirs::AstridHome`] so every call is fully isolated
869/// from the process-global `$ASTRID_HOME`.
870#[cfg(test)]
871pub(crate) async fn test_kernel_with_home(home: astrid_core::dirs::AstridHome) -> Arc<Kernel> {
872    use astrid_capsule::profile_cache::PrincipalProfileCache;
873
874    home.ensure()
875        .expect("test kernel: ensure astrid home dir tree");
876
877    let session_id = SessionId::SYSTEM;
878    let event_bus = Arc::new(EventBus::new());
879    let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
880
881    // Persistent KV backing capabilities + identity store.
882    let kv = Arc::new(
883        astrid_storage::SurrealKvStore::open(&home.state_db_path()).expect("test kernel: open kv"),
884    );
885    let capabilities = Arc::new(
886        CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
887            .expect("test kernel: capability store"),
888    );
889
890    // Audit log at the tempdir — chain verification is trivially Ok on a
891    // fresh log, no historical entries.
892    let runtime_key =
893        load_or_generate_runtime_key(&home.keys_dir()).expect("test kernel: runtime key");
894    let default_principal = astrid_core::PrincipalId::default();
895    let principal_home = home.principal_home(&default_principal);
896    principal_home
897        .ensure()
898        .expect("test kernel: ensure principal home");
899    let audit_log = Arc::new(
900        AuditLog::open(principal_home.audit_dir(), runtime_key)
901            .expect("test kernel: open audit log"),
902    );
903
904    // MCP: use a no-op secure client wrapped around an empty manager.
905    // Admin handlers do not touch MCP.
906    let mcp_manager = ServerManager::new(ServersConfig::default());
907    let mcp_client = McpClient::new(mcp_manager);
908    let mcp = SecureMcpClient::new(
909        mcp_client,
910        Arc::clone(&capabilities),
911        Arc::clone(&audit_log),
912        session_id.clone(),
913    );
914
915    let root_handle = DirHandle::new();
916    let kernel_host_vfs = HostVfs::new();
917    kernel_host_vfs
918        .register_dir(root_handle.clone(), home.root().to_path_buf())
919        .await
920        .expect("test kernel: register workspace vfs");
921    let overlay_registry = Arc::new(OverlayVfsRegistry::new(
922        home.root().to_path_buf(),
923        root_handle.clone(),
924    ));
925
926    let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
927    let identity_kv = astrid_storage::ScopedKvStore::new(
928        Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
929        "system:identity",
930    )
931    .expect("test kernel: identity kv scope");
932    let identity_store: Arc<dyn astrid_storage::IdentityStore> =
933        Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
934
935    let groups = Arc::new(ArcSwap::from_pointee(
936        GroupConfig::load(&home).expect("test kernel: load groups"),
937    ));
938
939    let kernel = Arc::new(Kernel {
940        session_id,
941        event_bus,
942        capsules,
943        mcp,
944        capabilities,
945        vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
946        overlay_registry,
947        vfs_root_handle: root_handle,
948        workspace_root: home.root().to_path_buf(),
949        home_root: Some(principal_home.root().to_path_buf()),
950        cli_socket_listener: None,
951        kv,
952        audit_log,
953        active_connections: DashMap::new(),
954        ephemeral: AtomicBool::new(false),
955        boot_time: std::time::Instant::now(),
956        shutdown_tx: tokio::sync::watch::channel(false).0,
957        session_token: Arc::new(astrid_core::session_token::SessionToken::generate()),
958        token_path: home.token_path(),
959        allowance_store,
960        identity_store,
961        profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
962        groups,
963        astrid_home: home,
964        admin_write_lock: Mutex::new(()),
965    });
966    // Spawn the Layer 6 admin dispatcher so IPC-driven tests can drive
967    // the full publish → response loop. State-mutating tests that call
968    // `handlers::dispatch` directly are unaffected — those messages
969    // never hit the bus.
970    drop(kernel_router::admin::spawn_admin_router(Arc::clone(
971        &kernel,
972    )));
973    kernel
974}
975
976/// Loads the runtime signing key from `~/.astrid/keys/runtime.key`, generating a
977/// new one if it doesn't exist. Opens the `SurrealKV`-backed audit database at
978/// `~/.astrid/audit.db` and runs `verify_all()` to detect any tampering of
979/// historical entries. Verification failures are logged at `error!` level but
980/// do not block boot (fail-open for availability, loud alert for integrity).
981fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
982    use astrid_core::dirs::AstridHome;
983
984    let home = AstridHome::resolve()
985        .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
986    home.ensure()
987        .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
988
989    let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
990    let default_principal = astrid_core::PrincipalId::default();
991    let principal_home = home.principal_home(&default_principal);
992    principal_home
993        .ensure()
994        .map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
995    let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
996        .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
997
998    // Verify all historical chains on boot.
999    match audit_log.verify_all() {
1000        Ok(results) => {
1001            let total_sessions = results.len();
1002            let mut tampered_sessions: usize = 0;
1003
1004            for (session_id, result) in &results {
1005                if !result.valid {
1006                    tampered_sessions = tampered_sessions.saturating_add(1);
1007                    for issue in &result.issues {
1008                        tracing::error!(
1009                            session_id = %session_id,
1010                            issue = %issue,
1011                            "Audit chain integrity violation detected"
1012                        );
1013                    }
1014                }
1015            }
1016
1017            if tampered_sessions > 0 {
1018                tracing::error!(
1019                    total_sessions,
1020                    tampered_sessions,
1021                    "Audit chain verification found tampered sessions"
1022                );
1023            } else if total_sessions > 0 {
1024                tracing::info!(
1025                    total_sessions,
1026                    "Audit chain verification passed for all sessions"
1027                );
1028            }
1029        },
1030        Err(e) => {
1031            tracing::error!(error = %e, "Audit chain verification failed to run");
1032        },
1033    }
1034
1035    Ok(Arc::new(audit_log))
1036}
1037
1038/// Load the runtime ed25519 signing key from disk, or generate and persist a new one.
1039///
1040/// The key file is 32 bytes of raw secret key material at `{keys_dir}/runtime.key`.
1041fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
1042    let key_path = keys_dir.join("runtime.key");
1043
1044    if key_path.exists() {
1045        let bytes = std::fs::read(&key_path)?;
1046        KeyPair::from_secret_key(&bytes).map_err(|e| {
1047            std::io::Error::other(format!(
1048                "invalid runtime key at {}: {e}",
1049                key_path.display()
1050            ))
1051        })
1052    } else {
1053        let keypair = KeyPair::generate();
1054        std::fs::create_dir_all(keys_dir)?;
1055        std::fs::write(&key_path, keypair.secret_key_bytes())?;
1056
1057        // Secure permissions (owner-only) on Unix.
1058        #[cfg(unix)]
1059        {
1060            use std::os::unix::fs::PermissionsExt;
1061            std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
1062        }
1063
1064        tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
1065        Ok(keypair)
1066    }
1067}
1068
1069/// Spawns a background task that cleanly shuts down the Kernel if there is no activity.
1070///
1071/// Uses dual-signal idle detection:
1072/// - **Primary:** explicit `active_connections` counter (incremented on first IPC
1073///   message per source, decremented on `Disconnect`).
1074/// - **Secondary:** `EventBus::subscriber_count()` minus the kernel router's own
1075///   subscription. When a CLI process dies without sending `Disconnect`, its
1076///   broadcast receiver is dropped so the subscriber count falls.
1077///
1078/// Takes the minimum of both signals to handle ungraceful disconnects.
1079///
1080/// Idle shutdown is on by default in `--ephemeral` mode (30s after the
1081/// last client disconnects) and **off by default** in persistent mode
1082/// (`astrid start`). Both modes respect `ASTRID_IDLE_TIMEOUT_SECS` —
1083/// setting it in persistent mode opts the operator into auto-shutdown,
1084/// setting it in ephemeral mode overrides the 30s default.
1085/// Number of permanent internal event bus subscribers that are not client
1086/// connections: `KernelRouter` (`kernel.request.*`), `AdminRouter`
1087/// (`kernel.admin.*`), `ConnectionTracker` (`client.*`), and
1088/// `EventDispatcher` (all events).
1089const INTERNAL_SUBSCRIBER_COUNT: usize = 4;
1090
1091/// Initial grace period before idle checking begins.
1092const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
1093/// Additional grace for non-ephemeral daemons to let capsules fully initialize.
1094const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
1095/// How often the idle monitor polls when running in ephemeral mode.
1096const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
1097/// How often the idle monitor polls when running in persistent mode.
1098const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
1099fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1100    tokio::spawn(async move {
1101        // Initial grace period — wait for capsules to boot and first client
1102        // to connect before checking idle status.
1103        tokio::time::sleep(IDLE_INITIAL_GRACE).await;
1104
1105        // Read ephemeral flag after grace period (set by daemon after boot).
1106        let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
1107        let idle_timeout = if ephemeral {
1108            // Give the CLI time to reconnect after brief disconnects (e.g.
1109            // during tool execution when the TUI might momentarily drop
1110            // the socket). Zero timeout caused premature shutdowns.
1111            //
1112            // Operators may still override via `ASTRID_IDLE_TIMEOUT_SECS`
1113            // when they want a longer ephemeral window (e.g. headless
1114            // batch runs that pause between prompts).
1115            std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1116                .ok()
1117                .and_then(|v| v.parse().ok())
1118                .map_or(
1119                    std::time::Duration::from_secs(30),
1120                    std::time::Duration::from_secs,
1121                )
1122        } else {
1123            // Persistent (`astrid start`) mode: idle shutdown is opt-in.
1124            // The operator explicitly chose persistent — honour that.
1125            // Setting `ASTRID_IDLE_TIMEOUT_SECS` switches the monitor on
1126            // for housekeeping flows that genuinely want auto-shutdown.
1127            // Without it, the monitor task exits immediately and the
1128            // daemon stays up until SIGTERM.
1129            let Some(secs) = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1130                .ok()
1131                .and_then(|v| v.parse().ok())
1132            else {
1133                tracing::debug!(
1134                    "Non-ephemeral daemon: idle shutdown disabled \
1135                     (set ASTRID_IDLE_TIMEOUT_SECS to enable)."
1136                );
1137                return;
1138            };
1139            std::time::Duration::from_secs(secs)
1140        };
1141        let check_interval = if ephemeral {
1142            IDLE_EPHEMERAL_CHECK_INTERVAL
1143        } else {
1144            IDLE_CHECK_INTERVAL
1145        };
1146
1147        // Non-ephemeral: additional grace to let capsules fully initialize.
1148        if !ephemeral {
1149            tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
1150        }
1151        let mut idle_since: Option<tokio::time::Instant> = None;
1152
1153        loop {
1154            tokio::time::sleep(check_interval).await;
1155
1156            let connections = kernel.total_connection_count();
1157
1158            // Use the explicit connection counter as the sole signal.
1159            // The previous bus_subscribers heuristic (subscriber_count minus
1160            // internal subscribers) was fragile: capsule run-loop crashes
1161            // reduce subscriber_count, causing false "0 connections" readings
1162            // that trigger premature idle shutdown while a client is active.
1163            let effective_connections = connections;
1164
1165            let has_daemons = {
1166                let reg = kernel.capsules.read().await;
1167                reg.values().any(|c| {
1168                    let m = c.manifest();
1169                    !m.uplinks.is_empty()
1170                })
1171            };
1172
1173            if effective_connections == 0 && !has_daemons {
1174                let now = tokio::time::Instant::now();
1175                let start = *idle_since.get_or_insert(now);
1176                let elapsed = now.duration_since(start);
1177
1178                tracing::debug!(
1179                    idle_secs = elapsed.as_secs(),
1180                    timeout_secs = idle_timeout.as_secs(),
1181                    connections,
1182                    "Kernel idle, monitoring timeout"
1183                );
1184
1185                if elapsed >= idle_timeout {
1186                    tracing::info!("Idle timeout reached, initiating shutdown");
1187                    kernel.shutdown(Some("idle_timeout".to_string())).await;
1188                    std::process::exit(0);
1189                }
1190            } else {
1191                if idle_since.is_some() {
1192                    tracing::debug!(
1193                        effective_connections,
1194                        has_daemons,
1195                        "Activity detected, resetting idle timer"
1196                    );
1197                }
1198                idle_since = None;
1199            }
1200        }
1201    })
1202}
1203
1204/// Tracks restart attempts for a single capsule with exponential backoff.
1205struct RestartTracker {
1206    attempts: u32,
1207    last_attempt: std::time::Instant,
1208    backoff: std::time::Duration,
1209}
1210
1211impl RestartTracker {
1212    const MAX_ATTEMPTS: u32 = 5;
1213    const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
1214    const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
1215
1216    fn new() -> Self {
1217        Self {
1218            attempts: 0,
1219            last_attempt: std::time::Instant::now(),
1220            backoff: Self::INITIAL_BACKOFF,
1221        }
1222    }
1223
1224    /// Returns `true` if a restart should be attempted now.
1225    fn should_restart(&self) -> bool {
1226        self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
1227    }
1228
1229    /// Record a restart attempt and advance the backoff.
1230    fn record_attempt(&mut self) {
1231        self.attempts = self.attempts.saturating_add(1);
1232        self.last_attempt = std::time::Instant::now();
1233        self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
1234    }
1235
1236    /// Returns `true` if all retry attempts have been exhausted.
1237    fn exhausted(&self) -> bool {
1238        self.attempts >= Self::MAX_ATTEMPTS
1239    }
1240}
1241
1242/// Attempts to restart a failed capsule, respecting backoff and max retries.
1243///
1244/// Returns `true` if the tracker should be removed (successful restart).
1245async fn attempt_capsule_restart(
1246    kernel: &Kernel,
1247    id_str: &str,
1248    tracker: &mut RestartTracker,
1249) -> bool {
1250    if tracker.exhausted() {
1251        return false;
1252    }
1253
1254    if !tracker.should_restart() {
1255        tracing::debug!(
1256            capsule_id = %id_str,
1257            next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
1258            "Waiting for backoff before next restart attempt"
1259        );
1260        return false;
1261    }
1262
1263    tracker.record_attempt();
1264    let attempt = tracker.attempts;
1265
1266    tracing::warn!(
1267        capsule_id = %id_str,
1268        attempt,
1269        max_attempts = RestartTracker::MAX_ATTEMPTS,
1270        "Attempting capsule restart"
1271    );
1272
1273    let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1274    match kernel.restart_capsule(&capsule_id).await {
1275        Ok(()) => {
1276            tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1277            true
1278        },
1279        Err(e) => {
1280            tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1281            if tracker.exhausted() {
1282                tracing::error!(
1283                    capsule_id = %id_str,
1284                    "All restart attempts exhausted - capsule will remain down"
1285                );
1286            }
1287            false
1288        },
1289    }
1290}
1291
1292/// Spawns a background task that periodically probes capsule health.
1293///
1294/// Every 10 seconds, reads the capsule registry and calls `check_health()` on
1295/// each capsule that is currently in `Ready` state. If a capsule reports
1296/// `Failed`, attempts to restart it with exponential backoff (max 5 attempts).
1297/// Publishes `astrid.v1.health.failed` IPC events for each detected failure.
1298fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1299    tokio::spawn(async move {
1300        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1301        interval.tick().await; // Skip the first immediate tick.
1302
1303        let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1304            std::collections::HashMap::new();
1305
1306        loop {
1307            interval.tick().await;
1308
1309            // Collect ready capsules under a brief read lock, then drop
1310            // the lock before calling check_health() or publishing events.
1311            let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1312                let registry = kernel.capsules.read().await;
1313                registry
1314                    .list()
1315                    .into_iter()
1316                    .filter_map(|id| {
1317                        let capsule = registry.get(id)?;
1318                        if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1319                            Some(capsule)
1320                        } else {
1321                            None
1322                        }
1323                    })
1324                    .collect()
1325            };
1326
1327            // Probe health once per capsule, collect failures, then drop
1328            // the Arc Vec before restarting. This ensures restart_capsule's
1329            // Arc::get_mut can succeed (no other strong references held).
1330            let mut failures: Vec<(String, String)> = Vec::new();
1331            for capsule in &ready_capsules {
1332                let health = capsule.check_health();
1333                if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1334                    let id_str = capsule.id().to_string();
1335                    tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1336
1337                    let msg = astrid_events::ipc::IpcMessage::new(
1338                        "astrid.v1.health.failed",
1339                        astrid_events::ipc::IpcPayload::Custom {
1340                            data: serde_json::json!({
1341                                "capsule_id": &id_str,
1342                                "reason": &reason,
1343                            }),
1344                        },
1345                        uuid::Uuid::new_v4(),
1346                    );
1347                    let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1348                        metadata: astrid_events::EventMetadata::new("kernel"),
1349                        message: msg,
1350                    });
1351                    failures.push((id_str, reason));
1352                }
1353            }
1354
1355            // Drop all Arc clones so restart_capsule's Arc::get_mut can
1356            // obtain exclusive access for calling unload().
1357            drop(ready_capsules);
1358
1359            let failed_this_tick: std::collections::HashSet<&str> =
1360                failures.iter().map(|(id, _)| id.as_str()).collect();
1361
1362            let mut restarted = Vec::new();
1363            for (id_str, _reason) in &failures {
1364                let tracker = restart_trackers
1365                    .entry(id_str.clone())
1366                    .or_insert_with(RestartTracker::new);
1367
1368                if attempt_capsule_restart(&kernel, id_str, tracker).await {
1369                    restarted.push(id_str.clone());
1370                }
1371            }
1372
1373            // Remove trackers for successfully restarted capsules.
1374            for id in &restarted {
1375                restart_trackers.remove(id);
1376            }
1377
1378            // Prune trackers for capsules that recovered (healthy this tick).
1379            // Keep exhausted trackers and trackers still in their backoff
1380            // window (capsule may have been unregistered by a failed restart
1381            // attempt and won't appear in ready_capsules next tick).
1382            restart_trackers.retain(|id, tracker| {
1383                if tracker.exhausted() {
1384                    return true;
1385                }
1386                // Keep if still within backoff - the capsule may be absent
1387                // from the registry after a failed reload.
1388                if tracker.last_attempt.elapsed() < tracker.backoff {
1389                    return true;
1390                }
1391                failed_this_tick.contains(id.as_str())
1392            });
1393        }
1394    })
1395}
1396
1397/// Spawns a periodic watchdog that publishes `astrid.v1.watchdog.tick` events every 5 seconds.
1398///
1399/// The `ReAct` capsule (WASM guest) cannot use async timers, so this kernel-side task
1400/// drives timeout enforcement by waking the capsule on a fixed interval. Each tick
1401/// causes the capsule's `handle_watchdog_tick` interceptor to run `check_phase_timeout`.
1402fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1403    tokio::spawn(async move {
1404        let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1405        // The first tick fires immediately - skip it to give capsules time to load.
1406        interval.tick().await;
1407
1408        loop {
1409            interval.tick().await;
1410
1411            let msg = astrid_events::ipc::IpcMessage::new(
1412                "astrid.v1.watchdog.tick",
1413                astrid_events::ipc::IpcPayload::Custom {
1414                    data: serde_json::json!({}),
1415                },
1416                uuid::Uuid::new_v4(),
1417            );
1418            let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1419                metadata: astrid_events::EventMetadata::new("kernel"),
1420                message: msg,
1421            });
1422        }
1423    })
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428    use super::*;
1429
1430    #[test]
1431    fn test_load_or_generate_creates_new_key() {
1432        let dir = tempfile::tempdir().unwrap();
1433        let keys_dir = dir.path().join("keys");
1434
1435        let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1436        let key_path = keys_dir.join("runtime.key");
1437
1438        // Key file should exist with 32 bytes.
1439        assert!(key_path.exists());
1440        let bytes = std::fs::read(&key_path).unwrap();
1441        assert_eq!(bytes.len(), 32);
1442
1443        // The written bytes should reconstruct the same public key.
1444        let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1445        assert_eq!(
1446            keypair.public_key_bytes(),
1447            reloaded.public_key_bytes(),
1448            "reloaded key should match generated key"
1449        );
1450    }
1451
1452    #[test]
1453    fn test_load_or_generate_is_idempotent() {
1454        let dir = tempfile::tempdir().unwrap();
1455        let keys_dir = dir.path().join("keys");
1456
1457        let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1458        let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1459
1460        assert_eq!(
1461            first.public_key_bytes(),
1462            second.public_key_bytes(),
1463            "loading the same key file should produce the same keypair"
1464        );
1465    }
1466
1467    #[test]
1468    fn test_load_or_generate_rejects_bad_key_length() {
1469        let dir = tempfile::tempdir().unwrap();
1470        let keys_dir = dir.path().join("keys");
1471        std::fs::create_dir_all(&keys_dir).unwrap();
1472
1473        // Write a key file with wrong length.
1474        std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1475
1476        let result = load_or_generate_runtime_key(&keys_dir);
1477        assert!(result.is_err());
1478        let err = result.unwrap_err().to_string();
1479        assert!(
1480            err.contains("invalid runtime key"),
1481            "expected 'invalid runtime key' error, got: {err}"
1482        );
1483    }
1484
1485    #[test]
1486    fn test_connection_counter_increment_decrement() {
1487        let counter = AtomicUsize::new(0);
1488
1489        // Simulate connection_opened (fetch_add)
1490        counter.fetch_add(1, Ordering::Relaxed);
1491        counter.fetch_add(1, Ordering::Relaxed);
1492        assert_eq!(counter.load(Ordering::Relaxed), 2);
1493
1494        // Simulate connection_closed using the same fetch_update logic
1495        // as the real implementation to exercise the actual code path.
1496        for expected in [1, 0] {
1497            let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1498                if n == 0 {
1499                    None
1500                } else {
1501                    Some(n.saturating_sub(1))
1502                }
1503            });
1504            assert_eq!(counter.load(Ordering::Relaxed), expected);
1505        }
1506    }
1507
1508    #[test]
1509    fn test_connection_counter_underflow_guard() {
1510        // Test the saturating behavior: decrementing from 0 should stay at 0.
1511        // Mirrors the fetch_update logic in connection_closed().
1512        let counter = AtomicUsize::new(0);
1513
1514        let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1515            if n == 0 { None } else { Some(n - 1) }
1516        });
1517        // fetch_update returns Err(0) when the closure returns None (no-op).
1518        assert!(result.is_err());
1519        assert_eq!(counter.load(Ordering::Relaxed), 0);
1520    }
1521
1522    /// Mirrors the `connection_closed(&principal)` logic: only `Ok(1)`
1523    /// (previous value 1, now 0) triggers `clear_session_allowances` for
1524    /// that principal. Update this test if `connection_closed()` is
1525    /// refactored.
1526    #[test]
1527    fn test_last_disconnect_clears_session_allowances_scoped() {
1528        use astrid_approval::AllowanceStore;
1529        use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1530        use astrid_core::principal::PrincipalId;
1531        use astrid_core::types::Timestamp;
1532        use astrid_crypto::KeyPair;
1533
1534        let store = AllowanceStore::new();
1535        let keypair = KeyPair::generate();
1536        let alice = PrincipalId::new("alice").unwrap();
1537        let bob = PrincipalId::new("bob").unwrap();
1538
1539        // Alice: session + persistent.
1540        store
1541            .add_allowance(Allowance {
1542                id: AllowanceId::new(),
1543                principal: alice.clone(),
1544                action_pattern: AllowancePattern::ServerTools {
1545                    server: "alice-session".to_string(),
1546                },
1547                created_at: Timestamp::now(),
1548                expires_at: None,
1549                max_uses: None,
1550                uses_remaining: None,
1551                session_only: true,
1552                workspace_root: None,
1553                signature: keypair.sign(b"test"),
1554            })
1555            .unwrap();
1556        store
1557            .add_allowance(Allowance {
1558                id: AllowanceId::new(),
1559                principal: alice.clone(),
1560                action_pattern: AllowancePattern::ServerTools {
1561                    server: "alice-persistent".to_string(),
1562                },
1563                created_at: Timestamp::now(),
1564                expires_at: None,
1565                max_uses: None,
1566                uses_remaining: None,
1567                session_only: false,
1568                workspace_root: None,
1569                signature: keypair.sign(b"test"),
1570            })
1571            .unwrap();
1572        // Bob: session (must NOT be cleared by alice disconnecting).
1573        store
1574            .add_allowance(Allowance {
1575                id: AllowanceId::new(),
1576                principal: bob.clone(),
1577                action_pattern: AllowancePattern::ServerTools {
1578                    server: "bob-session".to_string(),
1579                },
1580                created_at: Timestamp::now(),
1581                expires_at: None,
1582                max_uses: None,
1583                uses_remaining: None,
1584                session_only: true,
1585                workspace_root: None,
1586                signature: keypair.sign(b"test"),
1587            })
1588            .unwrap();
1589        assert_eq!(store.count(), 3);
1590
1591        let alice_counter = AtomicUsize::new(1);
1592        let simulate_alice_disconnect = || {
1593            let result = alice_counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1594                if n == 0 {
1595                    None
1596                } else {
1597                    Some(n.saturating_sub(1))
1598                }
1599            });
1600            if result == Ok(1) {
1601                store.clear_session_allowances(&alice);
1602            }
1603        };
1604
1605        simulate_alice_disconnect();
1606        // Alice's session gone; alice's persistent + bob's session remain.
1607        assert_eq!(store.count(), 2);
1608        assert_eq!(store.count_for(&alice), 1);
1609        assert_eq!(store.count_for(&bob), 1);
1610    }
1611
1612    #[cfg(unix)]
1613    #[test]
1614    fn test_load_or_generate_sets_secure_permissions() {
1615        use std::os::unix::fs::PermissionsExt;
1616
1617        let dir = tempfile::tempdir().unwrap();
1618        let keys_dir = dir.path().join("keys");
1619
1620        let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1621
1622        let key_path = keys_dir.join("runtime.key");
1623        let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1624        assert_eq!(
1625            mode & 0o777,
1626            0o600,
1627            "key file should have 0o600 permissions, got {mode:#o}"
1628        );
1629    }
1630
1631    #[test]
1632    fn restart_tracker_initial_state() {
1633        let tracker = RestartTracker::new();
1634        assert!(!tracker.exhausted());
1635        // Should not restart immediately (backoff hasn't elapsed).
1636        assert!(!tracker.should_restart());
1637    }
1638
1639    #[test]
1640    fn restart_tracker_allows_restart_after_backoff() {
1641        let mut tracker = RestartTracker::new();
1642        // Simulate time passing by setting last_attempt in the past.
1643        tracker.last_attempt = std::time::Instant::now()
1644            - RestartTracker::INITIAL_BACKOFF
1645            - std::time::Duration::from_millis(1);
1646        assert!(tracker.should_restart());
1647    }
1648
1649    #[test]
1650    fn restart_tracker_doubles_backoff() {
1651        let mut tracker = RestartTracker::new();
1652        assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1653
1654        tracker.record_attempt();
1655        assert_eq!(
1656            tracker.backoff,
1657            RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1658        );
1659        assert_eq!(tracker.attempts, 1);
1660
1661        tracker.record_attempt();
1662        assert_eq!(
1663            tracker.backoff,
1664            RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1665        );
1666        assert_eq!(tracker.attempts, 2);
1667    }
1668
1669    #[test]
1670    fn restart_tracker_backoff_caps_at_max() {
1671        let mut tracker = RestartTracker::new();
1672        for _ in 0..20 {
1673            tracker.record_attempt();
1674        }
1675        assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1676    }
1677
1678    #[test]
1679    fn restart_tracker_exhausted_at_max_attempts() {
1680        let mut tracker = RestartTracker::new();
1681        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1682            assert!(!tracker.exhausted());
1683            tracker.record_attempt();
1684        }
1685        assert!(tracker.exhausted());
1686    }
1687
1688    #[test]
1689    fn restart_tracker_should_restart_false_when_exhausted() {
1690        let mut tracker = RestartTracker::new();
1691        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1692            tracker.record_attempt();
1693        }
1694        // Even if backoff has elapsed, exhausted tracker should not restart.
1695        tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1696        assert!(!tracker.should_restart());
1697    }
1698
1699    // ── Bootstrap admin-group seeding (issue #670) ───────────────────
1700
1701    fn scratch_home() -> (tempfile::TempDir, astrid_core::dirs::AstridHome) {
1702        let dir = tempfile::tempdir().unwrap();
1703        let home = astrid_core::dirs::AstridHome::from_path(dir.path());
1704        (dir, home)
1705    }
1706
1707    #[test]
1708    fn seed_admin_writes_fresh_profile_when_missing() {
1709        let (_d, home) = scratch_home();
1710        let default = astrid_core::PrincipalId::default();
1711        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1712        assert!(!path.exists());
1713
1714        seed_default_principal_admin_profile(&home).unwrap();
1715
1716        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1717        assert_eq!(profile.groups, vec!["admin".to_string()]);
1718        assert!(profile.grants.is_empty());
1719        assert!(profile.revokes.is_empty());
1720    }
1721
1722    #[test]
1723    fn seed_admin_is_idempotent_across_reboots() {
1724        let (_d, home) = scratch_home();
1725        let default = astrid_core::PrincipalId::default();
1726
1727        seed_default_principal_admin_profile(&home).unwrap();
1728        seed_default_principal_admin_profile(&home).unwrap();
1729        seed_default_principal_admin_profile(&home).unwrap();
1730
1731        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1732        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1733        // Still exactly one `admin` entry — no duplication.
1734        assert_eq!(profile.groups, vec!["admin".to_string()]);
1735    }
1736
1737    #[test]
1738    fn seed_admin_leaves_operator_configured_groups_intact() {
1739        let (_d, home) = scratch_home();
1740        let default = astrid_core::PrincipalId::default();
1741
1742        // Operator wrote their own config pre-bootstrap.
1743        let mut existing = astrid_core::PrincipalProfile::default();
1744        existing.groups = vec!["agent".to_string()];
1745        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1746        std::fs::create_dir_all(home.profiles_dir()).unwrap();
1747        existing.save_to_path(&path).unwrap();
1748
1749        seed_default_principal_admin_profile(&home).unwrap();
1750
1751        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1752        assert_eq!(profile.groups, vec!["agent".to_string()]);
1753    }
1754
1755    #[test]
1756    fn seed_admin_leaves_operator_configured_grants_intact() {
1757        let (_d, home) = scratch_home();
1758        let default = astrid_core::PrincipalId::default();
1759
1760        let mut existing = astrid_core::PrincipalProfile::default();
1761        existing.grants = vec!["system:status".to_string()];
1762        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1763        std::fs::create_dir_all(home.profiles_dir()).unwrap();
1764        existing.save_to_path(&path).unwrap();
1765
1766        seed_default_principal_admin_profile(&home).unwrap();
1767
1768        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1769        // admin not auto-added because grants are non-empty.
1770        assert!(profile.groups.is_empty());
1771        assert_eq!(profile.grants, vec!["system:status".to_string()]);
1772    }
1773
1774    #[test]
1775    fn seed_admin_leaves_operator_configured_revokes_intact() {
1776        let (_d, home) = scratch_home();
1777        let default = astrid_core::PrincipalId::default();
1778
1779        let mut existing = astrid_core::PrincipalProfile::default();
1780        existing.revokes = vec!["system:shutdown".to_string()];
1781        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1782        std::fs::create_dir_all(home.profiles_dir()).unwrap();
1783        existing.save_to_path(&path).unwrap();
1784
1785        seed_default_principal_admin_profile(&home).unwrap();
1786
1787        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1788        assert!(profile.groups.is_empty());
1789        assert_eq!(profile.revokes, vec!["system:shutdown".to_string()]);
1790    }
1791
1792    // ── Legacy profile path migration (issue #672) ──────────────────
1793
1794    #[test]
1795    fn migrate_legacy_profile_relocates_to_etc() {
1796        // Pre-#672 deployments wrote profile.toml under
1797        // home/{principal}/.config/. The migration moves it to
1798        // etc/profiles/{principal}.toml on first boot.
1799        let (_d, home) = scratch_home();
1800        let default = astrid_core::PrincipalId::default();
1801        let legacy_path = home
1802            .principal_home(&default)
1803            .config_dir()
1804            .join("profile.toml");
1805        std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1806        let mut existing = astrid_core::PrincipalProfile::default();
1807        existing.groups = vec!["operator-configured".to_string()];
1808        existing.save_to_path(&legacy_path).unwrap();
1809
1810        seed_default_principal_admin_profile(&home).unwrap();
1811
1812        // Legacy path gone, new path holds the migrated content.
1813        assert!(!legacy_path.exists());
1814        let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1815        let migrated = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1816        assert_eq!(migrated.groups, vec!["operator-configured".to_string()]);
1817    }
1818
1819    #[test]
1820    fn migrate_legacy_profile_drops_stale_legacy_when_new_already_exists() {
1821        // Operator already migrated by hand (or a prior boot did) —
1822        // the new path holds the canonical config. Don't clobber it
1823        // with the legacy file; just remove the legacy so capsules
1824        // can't reach it through home://.
1825        let (_d, home) = scratch_home();
1826        let default = astrid_core::PrincipalId::default();
1827
1828        // Stale legacy with operator-stale content.
1829        let legacy_path = home
1830            .principal_home(&default)
1831            .config_dir()
1832            .join("profile.toml");
1833        std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1834        let mut stale = astrid_core::PrincipalProfile::default();
1835        stale.groups = vec!["stale".to_string()];
1836        stale.save_to_path(&legacy_path).unwrap();
1837
1838        // Fresh new-path content (migrated already).
1839        let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1840        std::fs::create_dir_all(new_path.parent().unwrap()).unwrap();
1841        let mut canonical = astrid_core::PrincipalProfile::default();
1842        canonical.groups = vec!["canonical".to_string()];
1843        canonical.save_to_path(&new_path).unwrap();
1844
1845        seed_default_principal_admin_profile(&home).unwrap();
1846
1847        // Legacy removed, canonical preserved.
1848        assert!(!legacy_path.exists());
1849        let result = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1850        assert_eq!(result.groups, vec!["canonical".to_string()]);
1851    }
1852}
1853
1854// ---------------------------------------------------------------------------
1855// Boot validation
1856// ---------------------------------------------------------------------------
1857
1858/// Validate that every capsule's required imports have a matching export
1859/// from another loaded capsule. Logs errors for unsatisfied required imports
1860/// and info messages for unsatisfied optional imports. Also warns about
1861/// duplicate exports of the same interface from multiple capsules.
1862fn validate_imports_exports(
1863    manifests: &[(
1864        astrid_capsule::manifest::CapsuleManifest,
1865        std::path::PathBuf,
1866    )],
1867) {
1868    // Track (namespace, interface) → list of (capsule_name, version).
1869    let mut exports_by_interface: std::collections::HashMap<
1870        (&str, &str),
1871        Vec<(&str, &semver::Version)>,
1872    > = std::collections::HashMap::new();
1873
1874    for (m, _) in manifests {
1875        for (ns, name, ver) in m.export_triples() {
1876            exports_by_interface
1877                .entry((ns, name))
1878                .or_default()
1879                .push((&m.package.name, ver));
1880        }
1881    }
1882
1883    // Warn about duplicate exports — two capsules providing the same interface
1884    // will both fire on matching events, causing double-processing.
1885    for ((ns, name), providers) in &exports_by_interface {
1886        if providers.len() > 1 {
1887            let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
1888            tracing::warn!(
1889                interface = %format!("{ns}/{name}"),
1890                providers = ?names,
1891                "Multiple capsules export the same interface — events may be double-processed. \
1892                 Consider removing one with `astrid capsule remove`."
1893            );
1894        }
1895    }
1896
1897    let mut satisfied_count: u32 = 0;
1898    let mut warning_count: u32 = 0;
1899
1900    for (manifest, _) in manifests {
1901        for (ns, name, req, optional) in manifest.import_tuples() {
1902            let has_provider = exports_by_interface
1903                .get(&(ns, name))
1904                .is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
1905
1906            if has_provider {
1907                satisfied_count = satisfied_count.saturating_add(1);
1908            } else if optional {
1909                tracing::info!(
1910                    capsule = %manifest.package.name,
1911                    import = %format!("{ns}/{name} {req}"),
1912                    "Optional import not satisfied — capsule will boot with reduced functionality"
1913                );
1914                warning_count = warning_count.saturating_add(1);
1915            } else {
1916                tracing::error!(
1917                    capsule = %manifest.package.name,
1918                    import = %format!("{ns}/{name} {req}"),
1919                    "Required import not satisfied — no loaded capsule exports this interface"
1920                );
1921                warning_count = warning_count.saturating_add(1);
1922            }
1923        }
1924    }
1925
1926    tracing::info!(
1927        capsules = manifests.len(),
1928        imports_satisfied = satisfied_count,
1929        warnings = warning_count,
1930        "Boot validation complete"
1931    );
1932}
1933
1934// ---------------------------------------------------------------------------
1935// Identity bootstrap helpers
1936// ---------------------------------------------------------------------------
1937
1938/// Bootstrap the CLI root user identity at kernel boot.
1939///
1940/// Creates a deterministic root `AstridUserId` on first boot, or reloads it
1941/// on subsequent boots. Auto-links with `platform="cli"`,
1942/// `platform_user_id="local"`, `method="system"`.
1943///
1944/// Also seeds the default principal's profile on disk with
1945/// `groups = ["admin"]` (issue #670) so single-tenant deployments reach
1946/// the management API with full capabilities. The profile write is
1947/// **idempotent** — if the default principal already has a profile with
1948/// an `admin` group, any explicit `grants` / `revokes`, or non-empty
1949/// `groups`, we leave it untouched.
1950///
1951/// Idempotent: skips creation if the root user already exists.
1952async fn bootstrap_cli_root_user(
1953    store: &Arc<dyn astrid_storage::IdentityStore>,
1954    home: &astrid_core::dirs::AstridHome,
1955) -> Result<(), astrid_storage::IdentityError> {
1956    // Seed the default principal profile with the admin group. Runs
1957    // before the identity-link short-circuit below so a deleted profile
1958    // between boots is restored even when the identity record persists.
1959    if let Err(e) = seed_default_principal_admin_profile(home) {
1960        tracing::warn!(error = %e, "Failed to seed default admin profile — continuing boot");
1961    }
1962
1963    // Check if root user already exists by trying to resolve the CLI link.
1964    if let Some(_user) = store.resolve("cli", "local").await? {
1965        tracing::debug!("CLI root user already linked");
1966        return Ok(());
1967    }
1968
1969    // No CLI link exists. Create or find the root user.
1970    let user = store.create_user(Some("root")).await?;
1971    tracing::info!(user_id = %user.id, "Created CLI root user");
1972
1973    // Link the CLI platform identity.
1974    store.link("cli", "local", user.id, "system").await?;
1975    tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1976
1977    Ok(())
1978}
1979
1980/// Migrate a legacy per-principal `profile.toml` from the pre-#672
1981/// location (`home/{principal}/.config/profile.toml`) to the
1982/// system-managed `etc/profiles/{principal}.toml`. Idempotent across
1983/// boots: if the new path exists, the old one is removed (assumed
1984/// already migrated); if neither exists, no-op.
1985///
1986/// Profile contents are 100% system policy (enabled, groups, grants,
1987/// revokes, quotas, auth public keys) and a capsule running with
1988/// `fs_read = ["home://"]` could read its own policy from the legacy
1989/// location. Moving it under `etc/` puts it outside the `home://` VFS
1990/// scheme entirely.
1991fn migrate_legacy_profile_path(
1992    home: &astrid_core::dirs::AstridHome,
1993    principal: &astrid_core::PrincipalId,
1994) -> Result<(), std::io::Error> {
1995    let legacy_path = home
1996        .principal_home(principal)
1997        .config_dir()
1998        .join("profile.toml");
1999    let new_path = home.profile_path(principal);
2000    if !legacy_path.exists() {
2001        return Ok(());
2002    }
2003    if new_path.exists() {
2004        // Operator already migrated, or a prior boot did the rename.
2005        // Drop the stale legacy file so capsules can no longer reach
2006        // it via `home://.config/profile.toml`.
2007        let _ = std::fs::remove_file(&legacy_path);
2008        return Ok(());
2009    }
2010    if let Some(parent) = new_path.parent() {
2011        std::fs::create_dir_all(parent)?;
2012    }
2013    std::fs::rename(&legacy_path, &new_path)?;
2014    tracing::warn!(
2015        %principal,
2016        legacy = %legacy_path.display(),
2017        new = %new_path.display(),
2018        "Migrated profile.toml out of principal home directory \
2019         (security: capsules with home:// fs_read could read the legacy file)"
2020    );
2021    Ok(())
2022}
2023
2024/// Idempotently ensure the default principal's profile on disk has the
2025/// built-in `admin` group, so the single-tenant CLI path carries full
2026/// management-API capabilities (issue #670).
2027///
2028/// - Missing profile → writes a fresh default with `groups = ["admin"]`.
2029/// - Existing profile with any non-empty `groups` OR any `grants` OR
2030///   any `revokes` → treated as operator-configured, left untouched.
2031/// - Existing profile with `groups = []`, `grants = []`, `revokes = []`
2032///   → adds `admin` to `groups`. This covers the fresh-default case
2033///   where a prior boot wrote a `PrincipalProfile::default()`.
2034///
2035/// Also migrates the legacy `profile.toml` location
2036/// (`home/{principal}/.config/`) to the new system-managed location
2037/// (`etc/profiles/`) on first boot post-#672, see
2038/// [`migrate_legacy_profile_path`].
2039fn seed_default_principal_admin_profile(
2040    home: &astrid_core::dirs::AstridHome,
2041) -> Result<(), astrid_core::ProfileError> {
2042    use astrid_core::PrincipalProfile;
2043
2044    let default_principal = astrid_core::PrincipalId::default();
2045
2046    // Move any legacy file in front of load — load_from_path on the new
2047    // path would otherwise return Default and clobber the operator's
2048    // existing groups/grants/revokes.
2049    if let Err(e) = migrate_legacy_profile_path(home, &default_principal) {
2050        tracing::warn!(error = %e, "Failed to migrate legacy profile path — continuing");
2051    }
2052
2053    let path = PrincipalProfile::path_for(home, &default_principal);
2054    let profile = PrincipalProfile::load_from_path(&path)?;
2055
2056    if !profile.groups.is_empty() || !profile.grants.is_empty() || !profile.revokes.is_empty() {
2057        tracing::debug!(
2058            principal = %default_principal,
2059            "Default principal profile already has group/grant/revoke entries — leaving intact"
2060        );
2061        return Ok(());
2062    }
2063
2064    let mut updated = profile;
2065    updated
2066        .groups
2067        .push(astrid_core::groups::BUILTIN_ADMIN.to_string());
2068    updated.save_to_path(&path)?;
2069    tracing::info!(
2070        principal = %default_principal,
2071        "Seeded default principal with built-in `admin` group"
2072    );
2073    Ok(())
2074}
2075
2076/// Apply pre-configured identity links from the config file.
2077///
2078/// For each `[[identity.links]]` entry, resolves or creates the referenced
2079/// Astrid user and links the platform identity. Logs warnings on failure
2080/// but does not abort boot.
2081async fn apply_identity_config(
2082    store: &Arc<dyn astrid_storage::IdentityStore>,
2083    workspace_root: &std::path::Path,
2084) {
2085    let config = match astrid_config::Config::load(Some(workspace_root)) {
2086        Ok(resolved) => resolved.config,
2087        Err(e) => {
2088            tracing::debug!(error = %e, "No config loaded for identity links");
2089            return;
2090        },
2091    };
2092
2093    for link_cfg in &config.identity.links {
2094        let result = apply_single_identity_link(store, link_cfg).await;
2095        if let Err(e) = result {
2096            tracing::warn!(
2097                platform = %link_cfg.platform,
2098                platform_user_id = %link_cfg.platform_user_id,
2099                astrid_user = %link_cfg.astrid_user,
2100                error = %e,
2101                "Failed to apply identity link from config"
2102            );
2103        }
2104    }
2105}
2106
2107/// Apply a single identity link from config.
2108async fn apply_single_identity_link(
2109    store: &Arc<dyn astrid_storage::IdentityStore>,
2110    link_cfg: &astrid_config::types::IdentityLinkConfig,
2111) -> Result<(), astrid_storage::IdentityError> {
2112    // Resolve astrid_user: try UUID first, then name lookup, then create.
2113    let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
2114        // Ensure user record exists. If the UUID was explicitly specified in
2115        // config but doesn't exist in the store, that's a configuration error
2116        // - don't silently create a different user.
2117        if store.get_user(uuid).await?.is_none() {
2118            return Err(astrid_storage::IdentityError::UserNotFound(uuid));
2119        }
2120        uuid
2121    } else {
2122        // Try name lookup.
2123        if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
2124            user.id
2125        } else {
2126            let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
2127            tracing::info!(
2128                user_id = %user.id,
2129                name = %link_cfg.astrid_user,
2130                "Created user from config identity link"
2131            );
2132            user.id
2133        }
2134    };
2135
2136    let method = if link_cfg.method.is_empty() {
2137        "admin"
2138    } else {
2139        &link_cfg.method
2140    };
2141
2142    // Check if link already points to the correct user - skip if idempotent.
2143    if let Some(existing) = store
2144        .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
2145        .await?
2146        && existing.id == user_id
2147    {
2148        tracing::debug!(
2149            platform = %link_cfg.platform,
2150            platform_user_id = %link_cfg.platform_user_id,
2151            user_id = %user_id,
2152            "Identity link from config already exists"
2153        );
2154        return Ok(());
2155    }
2156
2157    store
2158        .link(
2159            &link_cfg.platform,
2160            &link_cfg.platform_user_id,
2161            user_id,
2162            method,
2163        )
2164        .await?;
2165
2166    tracing::info!(
2167        platform = %link_cfg.platform,
2168        platform_user_id = %link_cfg.platform_user_id,
2169        user_id = %user_id,
2170        "Applied identity link from config"
2171    );
2172
2173    Ok(())
2174}