Skip to main content

astrid_kernel/
lib.rs

1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7//! Astrid Kernel - The core execution engine and IPC router.
8//!
9//! The Kernel is a pure, decentralized WASM runner. It contains no business
10//! logic, no cognitive loops, and no network servers. Its sole responsibility
11//! is to instantiate `astrid_events::EventBus`, load `.capsule` files into
12//! the Extism sandbox, and route IPC bytes between them.
13
14/// Passive event-bus storm diagnostics (publish-rate monitor).
15mod bus_monitor;
16/// Persistent invite-token store (issue #756).
17pub mod invite;
18/// The Management API router listening to the `EventBus`.
19pub mod kernel_router;
20/// Persistent pair-device token store (issue #756).
21pub mod pair_token;
22/// The Unix Domain Socket manager.
23pub mod socket;
24
25use arc_swap::ArcSwap;
26use astrid_audit::AuditLog;
27use astrid_capabilities::{CapabilityStore, DirHandle};
28use astrid_capsule::profile_cache::PrincipalProfileCache;
29use astrid_capsule::registry::CapsuleRegistry;
30use astrid_core::SessionId;
31use astrid_core::groups::GroupConfig;
32use astrid_core::principal::PrincipalId;
33use astrid_crypto::KeyPair;
34use astrid_events::EventBus;
35use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
36use astrid_vfs::{HostVfs, OverlayVfsRegistry, Vfs};
37use dashmap::DashMap;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41use tokio::sync::{Mutex, RwLock};
42
43/// The core Operating System Kernel.
44pub struct Kernel {
45    /// The unique identifier for this kernel session.
46    pub session_id: SessionId,
47    /// The global IPC message bus.
48    pub event_bus: Arc<EventBus>,
49    /// The process manager (loaded WASM capsules).
50    pub capsules: Arc<RwLock<CapsuleRegistry>>,
51    /// The secure MCP client with capability-based authorization and audit logging.
52    pub mcp: SecureMcpClient,
53    /// The capability store for this session.
54    pub capabilities: Arc<CapabilityStore>,
55    /// The global Virtual File System mount.
56    ///
57    /// Points at the unmodified workspace (no overlay). Principal-scoped
58    /// overlays live in [`overlay_registry`](Self::overlay_registry) — this
59    /// field is kept for kernel-internal paths that do not know a principal
60    /// (discovery, capsule load scan).
61    pub vfs: Arc<dyn Vfs>,
62    /// Per-principal overlay registry (Layer 4, issue #668).
63    ///
64    /// Each invoking principal resolves their own
65    /// [`OverlayVfs`](astrid_vfs::OverlayVfs) from this registry on first
66    /// use — lower layer is the shared workspace, upper layer is a
67    /// principal-private tempdir. Agent A's uncommitted writes are never
68    /// visible to Agent B.
69    pub overlay_registry: Arc<OverlayVfsRegistry>,
70    /// The global physical root handle (cap-std) for the VFS.
71    pub vfs_root_handle: DirHandle,
72    /// The physical path the VFS is mounted to.
73    pub workspace_root: PathBuf,
74    /// The principal home resources directory (`~/.astrid/home/{principal}/`).
75    /// Capsules declaring `fs_read = ["home://"]` can read files under this
76    /// root. Scoped to the principal's home so that keys, databases, and
77    /// system config in `~/.astrid/` are NOT accessible.
78    ///
79    /// Always `Some` in production (boot requires `AstridHome`). Remains
80    /// `Option` for compatibility with `CapsuleContext` and test fixtures.
81    pub home_root: Option<PathBuf>,
82    /// The natively bound Unix Socket for the CLI proxy.
83    pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
84    /// Exclusive advisory lock enforcing a single kernel instance, held for
85    /// the daemon's lifetime (see [`socket::bind_session_socket`]). `None` for
86    /// test kernels that don't bind a real socket. Never read — the point is
87    /// that its `Drop` (or process exit) releases the lock so a restart isn't
88    /// wedged.
89    #[expect(
90        dead_code,
91        reason = "held for the process lifetime; Drop releases the singleton flock"
92    )]
93    singleton_lock: Option<std::fs::File>,
94    /// Shared KV store backing all capsule-scoped stores and kernel state.
95    pub kv: Arc<astrid_storage::SurrealKvStore>,
96    /// Chain-linked cryptographic audit log with persistent storage.
97    pub audit_log: Arc<AuditLog>,
98    /// Per-principal active connection counters (Layer 4, issue #668).
99    ///
100    /// Keyed by [`PrincipalId`]. When a principal's counter hits zero the
101    /// kernel clears that principal's session allowances only — other
102    /// principals' state is untouched. Ephemeral shutdown still waits on
103    /// the global sum via [`total_connection_count`](Self::total_connection_count).
104    active_connections: DashMap<PrincipalId, AtomicUsize>,
105    /// Shared per-principal CPU fuel ledger, cloned into every capsule's
106    /// `WasmEngine` (via the loader) so a principal's interceptor CPU is summed
107    /// across all capsules into one per-principal total. Telemetry today; the
108    /// substrate for a per-principal CPU budget. See
109    /// [`FuelLedger`](astrid_capsule::FuelLedger).
110    fuel_ledger: astrid_capsule::FuelLedger,
111    /// Shared per-principal CPU-rate limiter (the deny side of the budget),
112    /// cloned into every capsule's `WasmEngine` (via the loader) alongside
113    /// `fuel_ledger`. A principal over its `max_cpu_fuel_per_sec` in the rolling
114    /// 1-second window is denied at interceptor entry, cross-capsule. See
115    /// [`FuelRateLimiter`](astrid_capsule::FuelRateLimiter).
116    fuel_rate: astrid_capsule::FuelRateLimiter,
117    /// Shared per-principal peak-memory ledger, the RAM analogue of
118    /// `fuel_ledger`: cloned into every capsule's `WasmEngine` (via the loader)
119    /// so a principal's linear-memory high-water mark is the max across all
120    /// capsules. Telemetry today; fills `ResourceUsage::memory_bytes_peak_total`.
121    /// See [`MemoryLedger`](astrid_capsule::MemoryLedger).
122    memory_ledger: astrid_capsule::MemoryLedger,
123    /// Host-derived (operator-overridable) concurrency ceilings for capsule
124    /// host calls, resolved once by the daemon and forwarded to every
125    /// `WasmEngine` via the loader. The kernel only stores and forwards this
126    /// `Copy` value — no resolution logic lives here. See
127    /// [`CapsuleRuntimeLimits`](astrid_capsule::CapsuleRuntimeLimits).
128    runtime_limits: astrid_capsule::CapsuleRuntimeLimits,
129    /// Ephemeral mode: shut down immediately when the last client disconnects.
130    pub ephemeral: AtomicBool,
131    /// Instant when the kernel was booted (for uptime calculation).
132    pub boot_time: std::time::Instant,
133    /// Sender for the API-initiated shutdown signal. The daemon's main loop
134    /// selects on the receiver to exit gracefully without `process::exit`.
135    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
136    /// Session token for socket authentication. Generated at boot, written to
137    /// `~/.astrid/run/system.token`. CLI sends this as its first message.
138    pub session_token: Arc<astrid_core::session_token::SessionToken>,
139    /// Path where the session token was written at boot. Stored so shutdown
140    /// uses the exact same path (avoids fallback mismatch if env changes).
141    token_path: PathBuf,
142    /// Shared allowance store for capsule-level approval decisions.
143    ///
144    /// Capsules can check existing allowances and create new ones when
145    /// users approve actions with session/always scope.
146    pub allowance_store: Arc<astrid_approval::AllowanceStore>,
147    /// System-wide identity store for platform user resolution.
148    identity_store: Arc<dyn astrid_storage::IdentityStore>,
149    /// System-wide per-principal profile cache (Layer 3 quota enforcement).
150    ///
151    /// One instance per kernel boot. Every capsule load plumbs this into
152    /// [`CapsuleContext::with_profile_cache`](astrid_capsule::context::CapsuleContext::with_profile_cache),
153    /// where [`WasmEngine`](astrid_capsule::engine::wasm::WasmEngine) consumes
154    /// it to apply per-invocation memory / timeout / IPC / process caps.
155    /// Invalidation model: kernel restart. Layer 6 will add explicit
156    /// management IPC to clear entries at runtime (issue #666 tracks that
157    /// follow-up).
158    pub(crate) profile_cache: Arc<PrincipalProfileCache>,
159    /// Static group-to-capability configuration (issue #670), made
160    /// hot-reloadable in Layer 6 (issue #672).
161    ///
162    /// Loaded once at boot from `$ASTRID_HOME/etc/groups.toml`. The
163    /// enforcement preamble in [`kernel_router::handle_request`] /
164    /// `handle_admin_request` calls `groups.load_full()` on each request
165    /// — a lock-free `Arc` clone. Group admin topics
166    /// (`astrid.v1.admin.group.*`) rewrite `groups.toml` and then
167    /// `groups.store(Arc::new(new_config))` atomically; in-flight checks
168    /// holding the old `Arc` finish under the old config, the next check
169    /// sees the new one.
170    pub(crate) groups: Arc<ArcSwap<GroupConfig>>,
171    /// Home directory captured at boot — retained for the admin write
172    /// path (`groups.toml`, per-principal `profile.toml`) so handlers
173    /// don't re-resolve `$ASTRID_HOME` and risk a mid-life drift.
174    pub(crate) astrid_home: astrid_core::dirs::AstridHome,
175    /// Serializes mutating admin topics on `profile.toml` / `groups.toml`.
176    ///
177    /// Read-only admin topics (`agent.list`, `group.list`, `quota.get`)
178    /// and the hot authz path do NOT take this lock — the `ArcSwap` on
179    /// [`Kernel::groups`] and the `RwLock` on
180    /// [`PrincipalProfileCache`](astrid_capsule::profile_cache::PrincipalProfileCache)
181    /// cover reads. Tokio's `Mutex` is not poisonable — no
182    /// `PoisonError::into_inner` dance required.
183    pub(crate) admin_write_lock: Mutex<()>,
184}
185
186impl Kernel {
187    /// Boot a new Kernel instance mounted at the specified directory.
188    ///
189    /// `runtime_limits` is the resolved per-host capsule concurrency ceiling
190    /// pair (blocking vs async-I/O host calls); the daemon resolves it from
191    /// config + CLI + host defaults and the kernel forwards it, unmodified, to
192    /// every capsule's `WasmEngine`. In tests, pass
193    /// [`CapsuleRuntimeLimits::default()`](astrid_capsule::CapsuleRuntimeLimits::default).
194    ///
195    /// # Panics
196    ///
197    /// Panics if called on a single-threaded tokio runtime. The capsule
198    /// system uses `block_in_place` which requires a multi-threaded runtime.
199    ///
200    /// # Errors
201    ///
202    /// Returns an error if the VFS mount paths cannot be registered.
203    #[expect(
204        clippy::too_many_lines,
205        reason = "boot sequence: sequential setup that does not benefit from splitting"
206    )]
207    pub async fn new(
208        session_id: SessionId,
209        workspace_root: PathBuf,
210        runtime_limits: astrid_capsule::CapsuleRuntimeLimits,
211    ) -> Result<Arc<Self>, std::io::Error> {
212        use astrid_core::dirs::AstridHome;
213
214        assert!(
215            tokio::runtime::Handle::current().runtime_flavor()
216                == tokio::runtime::RuntimeFlavor::MultiThread,
217            "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
218             single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
219        );
220
221        let event_bus = Arc::new(EventBus::new());
222        let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
223
224        // Resolve the Astrid home directory. Required for persistent KV store
225        // and audit log. Fails boot if neither $ASTRID_HOME nor $HOME is set.
226        let home = AstridHome::resolve().map_err(|e| {
227            std::io::Error::other(format!(
228                "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
229            ))
230        })?;
231
232        // Resolve the home directory for the `home://` VFS scheme.
233        // Points to `~/.astrid/home/{principal}/` — NOT the full `~/.astrid/`
234        // root — so capsules cannot access keys, databases, or config.
235        let default_principal = astrid_core::PrincipalId::default();
236        let principal_home = home.principal_home(&default_principal);
237        let home_root = Some(principal_home.root().to_path_buf());
238
239        // 1. Open the persistent KV store (needed by capability store below).
240        let kv_path = home.state_db_path();
241        let kv = Arc::new(
242            astrid_storage::SurrealKvStore::open(&kv_path)
243                .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
244        );
245        // TODO: clear ephemeral keys (e: prefix) on boot when the key
246        // lifecycle tier convention is established.
247
248        // 2. Initialize MCP process manager with security layer.
249        //    Set workspace_root so sandboxed MCP servers have a writable directory.
250        let mcp_config = ServersConfig::load_default().unwrap_or_default();
251        let mcp_manager = ServerManager::new(mcp_config)
252            .with_workspace_root(workspace_root.clone())
253            .with_capsule_log_dir(principal_home.log_dir());
254        let mcp_client = McpClient::new(mcp_manager);
255
256        // 3. Bootstrap capability store (persistent) and audit log.
257        //    Key rotation invalidates persisted tokens (fail-secure by design).
258        let capabilities = Arc::new(
259            CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
260                .map_err(|e| {
261                    std::io::Error::other(format!("Failed to init capability store: {e}"))
262                })?,
263        );
264        let audit_log = open_audit_log()?;
265        let mcp = SecureMcpClient::new(
266            mcp_client,
267            Arc::clone(&capabilities),
268            Arc::clone(&audit_log),
269            session_id.clone(),
270        );
271
272        // 4. Establish the physical security boundary (sandbox handle)
273        let root_handle = DirHandle::new();
274
275        // 5. Principal-scoped overlay registry: each invoking principal
276        //    gets a fresh OverlayVfs on first use (Layer 4, issue #668).
277        //    The kernel-internal `vfs` field keeps pointing at a plain
278        //    HostVfs over the workspace for paths that don't yet know a
279        //    principal (discovery, capsule load scan).
280        let kernel_host_vfs = HostVfs::new();
281        kernel_host_vfs
282            .register_dir(root_handle.clone(), workspace_root.clone())
283            .await
284            .map_err(|_| std::io::Error::other("Failed to register kernel workspace vfs"))?;
285        let overlay_registry = Arc::new(OverlayVfsRegistry::new(
286            workspace_root.clone(),
287            root_handle.clone(),
288        ));
289
290        // 6. Bind the secure Unix socket and generate session token.
291        // The socket is bound here, but not yet listened on. The token is
292        // generated before any capsule can accept connections, preventing
293        // a race where a client connects before the token file exists.
294        let (listener, singleton_lock) = socket::bind_session_socket(&home)?;
295        let (session_token, token_path) = socket::generate_session_token()?;
296
297        let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
298        // Create system-wide identity store backed by the shared KV.
299        let identity_kv = astrid_storage::ScopedKvStore::new(
300            Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
301            "system:identity",
302        )
303        .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
304        let identity_store: Arc<dyn astrid_storage::IdentityStore> =
305            Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
306
307        // Load group config (issue #670). Boot-loaded once, then swapped
308        // atomically by Layer 6 admin topics (issue #672). Missing file
309        // → built-ins only; malformed TOML is a hard boot failure
310        // (fail-closed).
311        let groups_loaded = GroupConfig::load(&home)
312            .map_err(|e| std::io::Error::other(format!("Failed to load groups config: {e}")))?;
313        let groups = Arc::new(ArcSwap::from_pointee(groups_loaded));
314
315        // Bootstrap the CLI root user (idempotent). Also seeds the
316        // default principal's profile with `groups = ["admin"]` so
317        // single-tenant deployments get full management-API access.
318        bootstrap_cli_root_user(&identity_store, &home)
319            .await
320            .map_err(|e| {
321                std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
322            })?;
323
324        // Apply pre-configured identity links from config.
325        apply_identity_config(&identity_store, &workspace_root).await;
326
327        let kernel = Arc::new(Self {
328            session_id,
329            event_bus,
330            capsules,
331            mcp,
332            capabilities,
333            vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
334            overlay_registry,
335            vfs_root_handle: root_handle,
336            workspace_root,
337            home_root,
338            cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
339            singleton_lock: Some(singleton_lock),
340            kv,
341            audit_log,
342            active_connections: DashMap::new(),
343            fuel_ledger: astrid_capsule::FuelLedger::default(),
344            fuel_rate: astrid_capsule::FuelRateLimiter::default(),
345            memory_ledger: astrid_capsule::MemoryLedger::default(),
346            runtime_limits,
347            ephemeral: AtomicBool::new(false),
348            boot_time: std::time::Instant::now(),
349            shutdown_tx: tokio::sync::watch::channel(false).0,
350            session_token: Arc::new(session_token),
351            token_path,
352            allowance_store,
353            identity_store,
354            profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
355            groups,
356            astrid_home: home,
357            admin_write_lock: Mutex::new(()),
358        });
359
360        drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
361        drop(spawn_idle_monitor(Arc::clone(&kernel)));
362        drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
363        drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
364        // Passive storm diagnostics — subscribes synchronously inside the
365        // call (before the debug-assert below) so it counts toward
366        // `INTERNAL_SUBSCRIBER_COUNT`.
367        drop(bus_monitor::spawn_bus_activity_monitor(&kernel.event_bus));
368
369        // Spawn the event dispatcher — routes EventBus events to capsule interceptors.
370        // Wire the identity store so auto-provisioning is gated.
371        let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
372            Arc::clone(&kernel.capsules),
373            Arc::clone(&kernel.event_bus),
374        )
375        .with_identity_store(Arc::clone(&kernel.identity_store));
376        tokio::spawn(dispatcher.run());
377
378        debug_assert_eq!(
379            kernel.event_bus.subscriber_count(),
380            INTERNAL_SUBSCRIBER_COUNT,
381            "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
382        );
383
384        Ok(kernel)
385    }
386
387    /// Load a capsule into the Kernel from a directory containing a Capsule.toml
388    ///
389    /// # Errors
390    ///
391    /// Returns an error if the manifest cannot be loaded, the capsule cannot be created, or registration fails.
392    async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
393        let manifest_path = dir.join("Capsule.toml");
394        let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
395            .map_err(|e| anyhow::anyhow!(e))?;
396
397        // Skip if already registered (prevents double-load from overlapping
398        // discovery paths like principal home + workspace capsules).
399        {
400            let registry = self.capsules.read().await;
401            let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
402            if registry.get(&id).is_some() {
403                return Ok(());
404            }
405        }
406
407        let loader = astrid_capsule::loader::CapsuleLoader::new(
408            self.mcp.clone(),
409            self.fuel_ledger.clone(),
410            self.fuel_rate.clone(),
411            self.memory_ledger.clone(),
412            self.runtime_limits,
413        );
414        let mut capsule = loader.create_capsule(manifest, dir.clone())?;
415
416        // Build the context — use the shared kernel KV so capsules can
417        // communicate state through overlapping KV namespaces.
418        let principal = astrid_core::PrincipalId::default();
419        let kv = astrid_storage::ScopedKvStore::new(
420            Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
421            format!("{principal}:capsule:{}", capsule.id()),
422        )?;
423
424        // Pre-load env config into the KV store.
425        // Check principal config first, fall back to capsule dir's .env.json.
426        let capsule_name = capsule.id().to_string();
427        let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
428            let ph = home.principal_home(&principal);
429            let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
430            if principal_env.exists() {
431                principal_env
432            } else {
433                dir.join(".env.json")
434            }
435        } else {
436            dir.join(".env.json")
437        };
438        if env_path.exists()
439            && let Ok(contents) = std::fs::read_to_string(&env_path)
440            && let Ok(env_map) =
441                serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
442        {
443            for (k, v) in env_map {
444                let _ = kv.set(&k, v.into_bytes()).await;
445            }
446        }
447
448        let ctx = astrid_capsule::context::CapsuleContext::new(
449            principal.clone(),
450            self.workspace_root.clone(),
451            self.home_root.clone(),
452            kv,
453            Arc::clone(&self.event_bus),
454            self.cli_socket_listener.clone(),
455        )
456        .with_registry(Arc::clone(&self.capsules))
457        .with_session_token(Arc::clone(&self.session_token))
458        .with_allowance_store(Arc::clone(&self.allowance_store))
459        .with_identity_store(Arc::clone(&self.identity_store))
460        .with_profile_cache(Arc::clone(&self.profile_cache))
461        .with_overlay_registry(Arc::clone(&self.overlay_registry))
462        // Snapshot the live group config so the capsule load path can resolve
463        // the run-loop resource-exemption capability (CAP_RESOURCES_UNBOUNDED)
464        // against the owner principal's groups/grants/revokes.
465        .with_group_config(self.groups.load_full());
466
467        capsule.load(&ctx).await?;
468
469        let mut registry = self.capsules.write().await;
470        registry
471            .register(capsule)
472            .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
473
474        Ok(())
475    }
476
477    /// Restart a capsule by unloading it and re-loading from its source directory.
478    ///
479    /// # Errors
480    ///
481    /// Returns an error if the capsule has no source directory, cannot be
482    /// unregistered, or fails to reload.
483    async fn restart_capsule(
484        &self,
485        id: &astrid_capsule::capsule::CapsuleId,
486    ) -> Result<(), anyhow::Error> {
487        // Get source directory before unregistering.
488        let source_dir = {
489            let registry = self.capsules.read().await;
490            let capsule = registry
491                .get(id)
492                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
493            capsule
494                .source_dir()
495                .map(std::path::Path::to_path_buf)
496                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
497        };
498
499        // Unregister and explicitly unload. There is no Drop impl that
500        // calls unload() (it's async), so we must do it here to avoid
501        // leaking MCP subprocesses and other engine resources.
502        let old_capsule = {
503            let mut registry = self.capsules.write().await;
504            registry
505                .unregister(id)
506                .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
507        };
508        // Explicitly unload the old capsule. There is no Drop impl that
509        // calls unload() (it's async), so we must do it here to avoid
510        // leaking MCP subprocesses and other engine resources.
511        // Arc::get_mut requires exclusive ownership (strong_count == 1).
512        {
513            let mut old = old_capsule;
514            if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
515                if let Err(e) = capsule.unload().await {
516                    tracing::warn!(
517                        capsule_id = %id,
518                        error = %e,
519                        "Capsule unload failed during restart"
520                    );
521                }
522            } else {
523                tracing::warn!(
524                    capsule_id = %id,
525                    "Cannot call unload during restart - Arc still held by in-flight task"
526                );
527            }
528        }
529
530        // Re-load from disk.
531        self.load_capsule(source_dir).await?;
532
533        // Signal the newly loaded capsule to clean up ephemeral state
534        // from the previous incarnation. Capsules that don't implement
535        // `handle_lifecycle_restart` will return an error, which is fine.
536        //
537        // Clone the capsule Arc under a brief read lock, then drop the
538        // guard before invoke_interceptor which calls block_in_place.
539        // Holding the RwLock across block_in_place parks the worker thread
540        // and starves registry writers (health monitor, capsule loading).
541        let capsule = {
542            let registry = self.capsules.read().await;
543            registry.get(id)
544        };
545        if let Some(capsule) = capsule
546            && let Err(e) = capsule
547                .invoke_interceptor("handle_lifecycle_restart", &[], None)
548                .await
549        {
550            tracing::debug!(
551                capsule_id = %id,
552                error = %e,
553                "Capsule does not handle lifecycle restart (optional)"
554            );
555        }
556
557        Ok(())
558    }
559
560    /// Auto-discover and load all capsules from the standard directories (`~/.astrid/capsules` and `.astrid/capsules`).
561    ///
562    /// Capsules are loaded in dependency order (topological sort) with
563    /// uplink/daemon capsules loaded first. Each uplink must signal
564    /// readiness before non-uplink capsules are loaded.
565    ///
566    /// After all capsules are loaded, tool schemas are injected into every
567    /// capsule's KV namespace and the `astrid.v1.capsules_loaded` event is published.
568    pub async fn load_all_capsules(&self) {
569        use astrid_capsule::toposort::toposort_manifests;
570        use astrid_core::dirs::AstridHome;
571
572        // Discovery paths in priority order: principal > workspace.
573        let mut paths = Vec::new();
574        if let Ok(home) = AstridHome::resolve() {
575            let principal = astrid_core::PrincipalId::default();
576            paths.push(home.principal_home(&principal).capsules_dir());
577        }
578
579        let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
580
581        // Topological sort ALL capsules together so cross-partition
582        // requirements (e.g. a non-uplink requiring an uplink's capability)
583        // resolve correctly without spurious "not provided" warnings.
584        let sorted = match toposort_manifests(discovered) {
585            Ok(sorted) => sorted,
586            Err((e, original)) => {
587                tracing::error!(
588                    cycle = %e,
589                    "Dependency cycle in capsules, falling back to discovery order"
590                );
591                original
592            },
593        };
594
595        // Defence-in-depth: manifest validation in discovery.rs rejects
596        // uplinks with [imports], but warn here in case a manifest bypasses
597        // the normal load path.
598        for (manifest, _) in &sorted {
599            if manifest.capabilities.uplink && manifest.has_imports() {
600                tracing::warn!(
601                    capsule = %manifest.package.name,
602                    "Uplink capsule has [imports] - \
603                     this should have been rejected at manifest load time"
604                );
605            }
606        }
607
608        // Validate imports/exports: every required import must have a matching export.
609        validate_imports_exports(&sorted);
610
611        // Partition after sorting: uplinks first, then the rest.
612        // The relative order within each partition is preserved from the
613        // toposort, so dependency edges are still respected. Cross-partition
614        // edges (non-uplink requiring an uplink) are satisfied by construction
615        // since all uplinks load first. The inverse (uplink requiring a
616        // non-uplink) is rejected above.
617        let (uplinks, others): (Vec<_>, Vec<_>) =
618            sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
619
620        // Load uplinks first so their event bus subscriptions are ready.
621        let uplink_names: Vec<String> = uplinks
622            .iter()
623            .map(|(m, _)| m.package.name.clone())
624            .collect();
625        for (manifest, dir) in &uplinks {
626            if let Err(e) = self.load_capsule(dir.clone()).await {
627                tracing::warn!(
628                    capsule = %manifest.package.name,
629                    error = %e,
630                    "Failed to load uplink capsule during discovery"
631                );
632            }
633        }
634
635        // Wait for uplink capsules to signal readiness before loading
636        // non-uplink capsules. This ensures IPC subscriptions are active.
637        self.await_capsule_readiness(&uplink_names).await;
638
639        for (manifest, dir) in &others {
640            if let Err(e) = self.load_capsule(dir.clone()).await {
641                tracing::warn!(
642                    capsule = %manifest.package.name,
643                    error = %e,
644                    "Failed to load capsule during discovery"
645                );
646            }
647        }
648
649        // Wait for non-uplink run-loop capsules too, so any future
650        // dependency edges between them are respected.
651        let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
652        self.await_capsule_readiness(&other_names).await;
653
654        // Signal that all capsules have been loaded so uplink capsules
655        // (like the registry) can proceed with discovery instead of
656        // polling with arbitrary timeouts.
657        let msg = astrid_events::ipc::IpcMessage::new(
658            "astrid.v1.capsules_loaded",
659            astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
660            self.session_id.0,
661        );
662        let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
663            metadata: astrid_events::EventMetadata::new("kernel"),
664            message: msg,
665        });
666    }
667
668    /// Record that a new client connection for `principal` has been established.
669    pub fn connection_opened(&self, principal: &PrincipalId) {
670        self.active_connections
671            .entry(principal.clone())
672            .or_insert_with(|| AtomicUsize::new(0))
673            .fetch_add(1, Ordering::Relaxed);
674        metrics::counter!(METRIC_CONNECTIONS_OPENED_TOTAL).increment(1);
675        metrics::gauge!(METRIC_ACTIVE_CONNECTIONS).increment(1.0);
676    }
677
678    /// Record that a client connection for `principal` has been closed.
679    ///
680    /// Uses `fetch_update` for atomic saturating decrement - avoids the
681    /// TOCTOU window where `fetch_sub` wraps to `usize::MAX` before a
682    /// corrective store.
683    ///
684    /// When *this* principal's counter reaches zero, clears only that
685    /// principal's session-scoped allowances — other principals' state is
686    /// untouched. The global ephemeral-shutdown path remains gated on the
687    /// sum across every principal (see
688    /// [`total_connection_count`](Self::total_connection_count)).
689    pub fn connection_closed(&self, principal: &PrincipalId) {
690        // Hold the DashMap entry guard across the decrement AND the
691        // session-scoped clears. While we hold the guard any concurrent
692        // `connection_opened(principal)` on the same key blocks on the
693        // shard lock, so its new session allowances cannot be born and
694        // then nuked by the tail-end cleanup here (pre-Layer-4 bug
695        // surfaced more narrowly under per-principal scoping).
696        //
697        // The downstream stores do not re-enter `active_connections`, so
698        // holding this guard while calling into them cannot deadlock.
699        let entry = self
700            .active_connections
701            .entry(principal.clone())
702            .or_insert_with(|| AtomicUsize::new(0));
703        let result = entry.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
704            if n == 0 {
705                None
706            } else {
707                Some(n.saturating_sub(1))
708            }
709        });
710
711        // Only count a real close: `Err` means the counter was already 0
712        // (no connection to drop), so the gauge must not go negative.
713        if result.is_ok() {
714            metrics::counter!(METRIC_CONNECTIONS_CLOSED_TOTAL).increment(1);
715            metrics::gauge!(METRIC_ACTIVE_CONNECTIONS).decrement(1.0);
716        }
717
718        if result == Ok(1) {
719            self.allowance_store.clear_session_allowances(principal);
720            if let Err(e) = self.capabilities.clear_session_for(principal) {
721                tracing::warn!(%principal, error = %e, "failed to clear capability session");
722            }
723            tracing::info!(
724                %principal,
725                "last connection for principal disconnected, session state cleared"
726            );
727        }
728        // Release the shard lock before touching the map again — `remove_if`
729        // re-acquires it.
730        drop(entry);
731
732        if result == Ok(1) {
733            self.active_connections
734                .remove_if(principal, |_, count| count.load(Ordering::Relaxed) == 0);
735        }
736    }
737
738    /// Enable or disable ephemeral mode (immediate shutdown on last disconnect).
739    pub fn set_ephemeral(&self, val: bool) {
740        self.ephemeral.store(val, Ordering::Relaxed);
741    }
742
743    /// Total number of active client connections across all principals.
744    ///
745    /// Used by the ephemeral-shutdown gate: the kernel shuts down only
746    /// when *every* principal's counter has reached zero.
747    pub fn total_connection_count(&self) -> usize {
748        self.active_connections
749            .iter()
750            .map(|e| e.value().load(Ordering::Relaxed))
751            .sum()
752    }
753
754    /// Snapshot of `(principal, count)` for every principal with a
755    /// non-zero active connection. The `astrid who` admin surface
756    /// reads this to attribute connections to specific agents
757    /// instead of fabricating a `default`-only row from the bare
758    /// total.
759    ///
760    /// Not a hot-path call site — taken at status-RPC time. Iterating
761    /// the `DashMap` snapshots the shard guards individually, so the
762    /// total may not be perfectly consistent with a concurrent
763    /// connect/disconnect, but each entry is internally consistent
764    /// and the operator-facing accuracy bound (a flickering one-off
765    /// count) is acceptable.
766    pub fn connections_by_principal(&self) -> Vec<(PrincipalId, usize)> {
767        self.active_connections
768            .iter()
769            .filter_map(|e| {
770                let count = e.value().load(Ordering::Relaxed);
771                if count == 0 {
772                    None
773                } else {
774                    Some((e.key().clone(), count))
775                }
776            })
777            .collect()
778    }
779
780    /// Gracefully shut down the kernel.
781    ///
782    /// 1. Publish `KernelShutdown` event on the bus.
783    /// 2. Drain and unload all capsules (stops MCP child processes, WASM engines).
784    /// 3. Flush and close the persistent KV store.
785    /// 4. Remove the Unix socket file.
786    pub async fn shutdown(&self, reason: Option<String>) {
787        tracing::info!(reason = ?reason, "Kernel shutting down");
788
789        // 1. Notify all subscribers so capsules can react.
790        let _ = self
791            .event_bus
792            .publish(astrid_events::AstridEvent::KernelShutdown {
793                metadata: astrid_events::EventMetadata::new("kernel"),
794                reason: reason.clone(),
795            });
796
797        // Clear every principal's session-only state in one sweep. Belt-
798        // and-suspenders for a process that is exiting anyway, but load-
799        // bearing the moment session allowances are ever persisted
800        // (Layer 7) — without this call a persisted-allowance layer would
801        // inherit stale per-session grants from the previous process.
802        self.allowance_store.clear_all_session_allowances();
803        if let Err(e) = self.capabilities.clear_session() {
804            tracing::warn!(error = %e, "failed to clear capability session on shutdown");
805        }
806
807        // 2. Drain the registry so the dispatcher cannot hand out new Arc clones,
808        // then unload each capsule. MCP engine unload is critical - it calls
809        // `mcp_client.disconnect()` to gracefully terminate child processes.
810        // Without explicit unload, MCP child processes become orphaned.
811        //
812        // The `EventDispatcher` temporarily clones `Arc<dyn Capsule>` into
813        // spawned interceptor tasks. After draining, no new clones can be
814        // created, but in-flight tasks may still hold references. We retry
815        // `Arc::get_mut` with brief yields to let them complete.
816        let capsules = {
817            let mut reg = self.capsules.write().await;
818            reg.drain()
819        };
820        for mut arc in capsules {
821            let id = arc.id().clone();
822            let mut unloaded = false;
823
824            for retry in 0..20_u32 {
825                if let Some(capsule) = Arc::get_mut(&mut arc) {
826                    if let Err(e) = capsule.unload().await {
827                        tracing::warn!(
828                            capsule_id = %id,
829                            error = %e,
830                            "Failed to unload capsule during shutdown"
831                        );
832                    }
833                    unloaded = true;
834                    break;
835                }
836                if retry < 19 {
837                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
838                }
839            }
840
841            if !unloaded {
842                tracing::warn!(
843                    capsule_id = %id,
844                    strong_count = Arc::strong_count(&arc),
845                    "Dropping capsule without explicit unload after retries exhausted; \
846                     MCP child processes may be orphaned"
847                );
848            }
849            drop(arc);
850        }
851
852        // 3. Flush the persistent KV store.
853        if let Err(e) = self.kv.close().await {
854            tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
855        }
856
857        // 4. Remove the socket and token files so stale-socket detection works
858        // on next boot and the auth token doesn't persist on disk after shutdown.
859        // This runs AFTER capsule unload, which is the correct order: MCP child
860        // processes communicate via stdio pipes (not this Unix socket), so they
861        // are already terminated by step 2. The socket is only used for
862        // CLI-to-kernel IPC.
863        let socket_path = crate::socket::kernel_socket_path();
864        let _ = std::fs::remove_file(&socket_path);
865        let _ = std::fs::remove_file(&self.token_path);
866        crate::socket::remove_readiness_file();
867
868        tracing::info!("Kernel shutdown complete");
869    }
870
871    /// Wait for a set of capsules to signal readiness, in parallel.
872    ///
873    /// Collects `Arc<dyn Capsule>` handles under a short-lived read lock,
874    /// then drops the lock before awaiting. Capsules without a run loop
875    /// return `Ready` immediately and don't contribute to wait time.
876    async fn await_capsule_readiness(&self, names: &[String]) {
877        use astrid_capsule::capsule::ReadyStatus;
878
879        if names.is_empty() {
880            return;
881        }
882
883        let timeout = std::time::Duration::from_millis(500);
884        let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
885            let registry = self.capsules.read().await;
886            names
887                .iter()
888                .filter_map(
889                    |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
890                        Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
891                        Err(e) => {
892                            tracing::warn!(
893                                capsule = %name,
894                                error = %e,
895                                "Invalid capsule ID, skipping readiness wait"
896                            );
897                            None
898                        },
899                    },
900                )
901                .collect()
902        };
903
904        // Await all capsules concurrently - independent capsules shouldn't
905        // compound each other's timeout.
906        let mut set = tokio::task::JoinSet::new();
907        for (name, capsule) in capsules {
908            set.spawn(async move {
909                let status = capsule.wait_ready(timeout).await;
910                (name, status)
911            });
912        }
913        while let Some(result) = set.join_next().await {
914            if let Ok((name, status)) = result {
915                match status {
916                    ReadyStatus::Ready => {},
917                    ReadyStatus::Timeout => {
918                        tracing::warn!(
919                            capsule = %name,
920                            timeout_ms = timeout.as_millis(),
921                            "Capsule did not signal ready within timeout"
922                        );
923                    },
924                    ReadyStatus::Crashed => {
925                        tracing::error!(
926                            capsule = %name,
927                            "Capsule run loop exited before signaling ready"
928                        );
929                    },
930                }
931            }
932        }
933    }
934}
935
936/// Test-only lightweight constructor (issue #672) that builds a
937/// [`Kernel`] with just the fields the admin handlers touch:
938/// `event_bus`, `session_id`, `audit_log`, `profile_cache`,
939/// `identity_store`, `groups`, `astrid_home`, `admin_write_lock`, plus
940/// the shared allowance / capability / kv store handles. Skips the
941/// heavy boot bits (socket bind, MCP init, token generation, capsule
942/// discovery) that aren't load-bearing for admin-topic tests.
943///
944/// The `home` argument is used verbatim — tests pass a tempdir-rooted
945/// [`astrid_core::dirs::AstridHome`] so every call is fully isolated
946/// from the process-global `$ASTRID_HOME`.
947#[cfg(test)]
948pub(crate) async fn test_kernel_with_home(home: astrid_core::dirs::AstridHome) -> Arc<Kernel> {
949    use astrid_capsule::profile_cache::PrincipalProfileCache;
950
951    home.ensure()
952        .expect("test kernel: ensure astrid home dir tree");
953
954    let session_id = SessionId::SYSTEM;
955    let event_bus = Arc::new(EventBus::new());
956    let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
957
958    // Persistent KV backing capabilities + identity store.
959    let kv = Arc::new(
960        astrid_storage::SurrealKvStore::open(&home.state_db_path()).expect("test kernel: open kv"),
961    );
962    let capabilities = Arc::new(
963        CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
964            .expect("test kernel: capability store"),
965    );
966
967    // Audit log at the tempdir — chain verification is trivially Ok on a
968    // fresh log, no historical entries.
969    let runtime_key =
970        load_or_generate_runtime_key(&home.keys_dir()).expect("test kernel: runtime key");
971    let default_principal = astrid_core::PrincipalId::default();
972    let principal_home = home.principal_home(&default_principal);
973    principal_home
974        .ensure()
975        .expect("test kernel: ensure principal home");
976    let audit_log = Arc::new(
977        AuditLog::open(principal_home.audit_dir(), runtime_key)
978            .expect("test kernel: open audit log"),
979    );
980
981    // MCP: use a no-op secure client wrapped around an empty manager.
982    // Admin handlers do not touch MCP.
983    let mcp_manager = ServerManager::new(ServersConfig::default());
984    let mcp_client = McpClient::new(mcp_manager);
985    let mcp = SecureMcpClient::new(
986        mcp_client,
987        Arc::clone(&capabilities),
988        Arc::clone(&audit_log),
989        session_id.clone(),
990    );
991
992    let root_handle = DirHandle::new();
993    let kernel_host_vfs = HostVfs::new();
994    kernel_host_vfs
995        .register_dir(root_handle.clone(), home.root().to_path_buf())
996        .await
997        .expect("test kernel: register workspace vfs");
998    let overlay_registry = Arc::new(OverlayVfsRegistry::new(
999        home.root().to_path_buf(),
1000        root_handle.clone(),
1001    ));
1002
1003    let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
1004    let identity_kv = astrid_storage::ScopedKvStore::new(
1005        Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
1006        "system:identity",
1007    )
1008    .expect("test kernel: identity kv scope");
1009    let identity_store: Arc<dyn astrid_storage::IdentityStore> =
1010        Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
1011
1012    let groups = Arc::new(ArcSwap::from_pointee(
1013        GroupConfig::load(&home).expect("test kernel: load groups"),
1014    ));
1015
1016    let kernel = Arc::new(Kernel {
1017        session_id,
1018        event_bus,
1019        capsules,
1020        mcp,
1021        capabilities,
1022        vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
1023        overlay_registry,
1024        vfs_root_handle: root_handle,
1025        workspace_root: home.root().to_path_buf(),
1026        home_root: Some(principal_home.root().to_path_buf()),
1027        cli_socket_listener: None,
1028        singleton_lock: None,
1029        kv,
1030        audit_log,
1031        active_connections: DashMap::new(),
1032        fuel_ledger: astrid_capsule::FuelLedger::default(),
1033        fuel_rate: astrid_capsule::FuelRateLimiter::default(),
1034        memory_ledger: astrid_capsule::MemoryLedger::default(),
1035        runtime_limits: astrid_capsule::CapsuleRuntimeLimits::default(),
1036        ephemeral: AtomicBool::new(false),
1037        boot_time: std::time::Instant::now(),
1038        shutdown_tx: tokio::sync::watch::channel(false).0,
1039        session_token: Arc::new(astrid_core::session_token::SessionToken::generate()),
1040        token_path: home.token_path(),
1041        allowance_store,
1042        identity_store,
1043        profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
1044        groups,
1045        astrid_home: home,
1046        admin_write_lock: Mutex::new(()),
1047    });
1048    // Spawn the Layer 6 admin dispatcher so IPC-driven tests can drive
1049    // the full publish → response loop. State-mutating tests that call
1050    // `handlers::dispatch` directly are unaffected — those messages
1051    // never hit the bus.
1052    drop(kernel_router::admin::spawn_admin_router(Arc::clone(
1053        &kernel,
1054    )));
1055    kernel
1056}
1057
1058/// Loads the runtime signing key from `~/.astrid/keys/runtime.key`, generating a
1059/// new one if it doesn't exist. Opens the `SurrealKV`-backed audit database at
1060/// `~/.astrid/audit.db` and runs `verify_all()` to detect any tampering of
1061/// historical entries. Verification failures are logged at `error!` level but
1062/// do not block boot (fail-open for availability, loud alert for integrity).
1063fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
1064    use astrid_core::dirs::AstridHome;
1065
1066    let home = AstridHome::resolve()
1067        .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
1068    home.ensure()
1069        .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
1070
1071    let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
1072    let default_principal = astrid_core::PrincipalId::default();
1073    let principal_home = home.principal_home(&default_principal);
1074    principal_home
1075        .ensure()
1076        .map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
1077    let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
1078        .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
1079
1080    // Verify all historical chains on boot.
1081    match audit_log.verify_all() {
1082        Ok(results) => {
1083            let total_sessions = results.len();
1084            let mut tampered_sessions: usize = 0;
1085
1086            for (session_id, result) in &results {
1087                if !result.valid {
1088                    tampered_sessions = tampered_sessions.saturating_add(1);
1089                    for issue in &result.issues {
1090                        tracing::error!(
1091                            session_id = %session_id,
1092                            issue = %issue,
1093                            "Audit chain integrity violation detected"
1094                        );
1095                    }
1096                }
1097            }
1098
1099            if tampered_sessions > 0 {
1100                tracing::error!(
1101                    total_sessions,
1102                    tampered_sessions,
1103                    "Audit chain verification found tampered sessions"
1104                );
1105            } else if total_sessions > 0 {
1106                tracing::info!(
1107                    total_sessions,
1108                    "Audit chain verification passed for all sessions"
1109                );
1110            }
1111        },
1112        Err(e) => {
1113            tracing::error!(error = %e, "Audit chain verification failed to run");
1114        },
1115    }
1116
1117    Ok(Arc::new(audit_log))
1118}
1119
1120/// Load the runtime ed25519 signing key from disk, or generate and persist a new one.
1121///
1122/// The key file is 32 bytes of raw secret key material at `{keys_dir}/runtime.key`.
1123fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
1124    let key_path = keys_dir.join("runtime.key");
1125
1126    if key_path.exists() {
1127        let bytes = std::fs::read(&key_path)?;
1128        KeyPair::from_secret_key(&bytes).map_err(|e| {
1129            std::io::Error::other(format!(
1130                "invalid runtime key at {}: {e}",
1131                key_path.display()
1132            ))
1133        })
1134    } else {
1135        let keypair = KeyPair::generate();
1136        std::fs::create_dir_all(keys_dir)?;
1137        std::fs::write(&key_path, keypair.secret_key_bytes())?;
1138
1139        // Secure permissions (owner-only) on Unix.
1140        #[cfg(unix)]
1141        {
1142            use std::os::unix::fs::PermissionsExt;
1143            std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
1144        }
1145
1146        tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
1147        Ok(keypair)
1148    }
1149}
1150
1151/// Spawns a background task that cleanly shuts down the Kernel if there is no activity.
1152///
1153/// Uses dual-signal idle detection:
1154/// - **Primary:** explicit `active_connections` counter (incremented on first IPC
1155///   message per source, decremented on `Disconnect`).
1156/// - **Secondary:** `EventBus::subscriber_count()` minus the kernel router's own
1157///   subscription. When a CLI process dies without sending `Disconnect`, its
1158///   broadcast receiver is dropped so the subscriber count falls.
1159///
1160/// Takes the minimum of both signals to handle ungraceful disconnects.
1161///
1162/// Idle shutdown is on by default in `--ephemeral` mode (30s after the
1163/// last client disconnects) and **off by default** in persistent mode
1164/// (`astrid start`). Both modes respect `ASTRID_IDLE_TIMEOUT_SECS` —
1165/// setting it in persistent mode opts the operator into auto-shutdown,
1166/// setting it in ephemeral mode overrides the 30s default.
1167/// Number of permanent internal event bus subscribers that are not client
1168/// connections: `KernelRouter` (`kernel.request.*`), `AdminRouter`
1169/// (`kernel.admin.*`), `ConnectionTracker` (`client.*`),
1170/// `EventDispatcher` (all events), and the bus activity monitor (all events,
1171/// storm diagnostics — see [`bus_monitor::spawn_bus_activity_monitor`]).
1172const INTERNAL_SUBSCRIBER_COUNT: usize = 5;
1173
1174/// Gauge: current active client connections (sum across principals).
1175/// Mirrors [`Kernel::total_connection_count`]; lets a dashboard graph
1176/// "who is connected" without polling.
1177const METRIC_ACTIVE_CONNECTIONS: &str = "astrid_daemon_active_connections";
1178/// Counter: client connections opened (cumulative).
1179const METRIC_CONNECTIONS_OPENED_TOTAL: &str = "astrid_daemon_connections_opened_total";
1180/// Counter: client connections closed (cumulative). `opened - closed`
1181/// cross-checks the gauge.
1182const METRIC_CONNECTIONS_CLOSED_TOTAL: &str = "astrid_daemon_connections_closed_total";
1183/// Counter: background monitor-loop iterations, labelled by `loop`. A
1184/// flat `rate()` is a parked loop; a runaway `rate()` is a spin loop —
1185/// the direct signal for the idle-CPU class of incident. Shared with
1186/// [`bus_monitor`], hence `pub(crate)`.
1187pub(crate) const METRIC_BACKGROUND_TICKS_TOTAL: &str = "astrid_daemon_background_ticks_total";
1188
1189/// Initial grace period before idle checking begins.
1190const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
1191/// Additional grace for non-ephemeral daemons to let capsules fully initialize.
1192const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
1193/// How often the idle monitor polls when running in ephemeral mode.
1194const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
1195/// How often the idle monitor polls when running in persistent mode.
1196const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
1197fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1198    tokio::spawn(async move {
1199        // Initial grace period — wait for capsules to boot and first client
1200        // to connect before checking idle status.
1201        tokio::time::sleep(IDLE_INITIAL_GRACE).await;
1202
1203        // Read ephemeral flag after grace period (set by daemon after boot).
1204        let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
1205        let idle_timeout = if ephemeral {
1206            // Give the CLI time to reconnect after brief disconnects (e.g.
1207            // during tool execution when the TUI might momentarily drop
1208            // the socket). Zero timeout caused premature shutdowns.
1209            //
1210            // Operators may still override via `ASTRID_IDLE_TIMEOUT_SECS`
1211            // when they want a longer ephemeral window (e.g. headless
1212            // batch runs that pause between prompts).
1213            std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1214                .ok()
1215                .and_then(|v| v.parse().ok())
1216                .map_or(
1217                    std::time::Duration::from_secs(30),
1218                    std::time::Duration::from_secs,
1219                )
1220        } else {
1221            // Persistent (`astrid start`) mode: idle shutdown is opt-in.
1222            // The operator explicitly chose persistent — honour that.
1223            // Setting `ASTRID_IDLE_TIMEOUT_SECS` switches the monitor on
1224            // for housekeeping flows that genuinely want auto-shutdown.
1225            // Without it, the monitor task exits immediately and the
1226            // daemon stays up until SIGTERM.
1227            let Some(secs) = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1228                .ok()
1229                .and_then(|v| v.parse().ok())
1230            else {
1231                tracing::debug!(
1232                    "Non-ephemeral daemon: idle shutdown disabled \
1233                     (set ASTRID_IDLE_TIMEOUT_SECS to enable)."
1234                );
1235                return;
1236            };
1237            std::time::Duration::from_secs(secs)
1238        };
1239        let check_interval = if ephemeral {
1240            IDLE_EPHEMERAL_CHECK_INTERVAL
1241        } else {
1242            IDLE_CHECK_INTERVAL
1243        };
1244
1245        // Non-ephemeral: additional grace to let capsules fully initialize.
1246        if !ephemeral {
1247            tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
1248        }
1249        let mut idle_since: Option<tokio::time::Instant> = None;
1250
1251        loop {
1252            tokio::time::sleep(check_interval).await;
1253            metrics::counter!(METRIC_BACKGROUND_TICKS_TOTAL, "loop" => "idle").increment(1);
1254
1255            let connections = kernel.total_connection_count();
1256
1257            // Use the explicit connection counter as the sole signal.
1258            // The previous bus_subscribers heuristic (subscriber_count minus
1259            // internal subscribers) was fragile: capsule run-loop crashes
1260            // reduce subscriber_count, causing false "0 connections" readings
1261            // that trigger premature idle shutdown while a client is active.
1262            let effective_connections = connections;
1263
1264            let has_daemons = {
1265                let reg = kernel.capsules.read().await;
1266                reg.values().any(|c| {
1267                    let m = c.manifest();
1268                    !m.uplinks.is_empty()
1269                })
1270            };
1271
1272            if effective_connections == 0 && !has_daemons {
1273                let now = tokio::time::Instant::now();
1274                let start = *idle_since.get_or_insert(now);
1275                let elapsed = now.duration_since(start);
1276
1277                tracing::debug!(
1278                    idle_secs = elapsed.as_secs(),
1279                    timeout_secs = idle_timeout.as_secs(),
1280                    connections,
1281                    "Kernel idle, monitoring timeout"
1282                );
1283
1284                if elapsed >= idle_timeout {
1285                    tracing::info!("Idle timeout reached, initiating shutdown");
1286                    kernel.shutdown(Some("idle_timeout".to_string())).await;
1287                    std::process::exit(0);
1288                }
1289            } else {
1290                if idle_since.is_some() {
1291                    tracing::debug!(
1292                        effective_connections,
1293                        has_daemons,
1294                        "Activity detected, resetting idle timer"
1295                    );
1296                }
1297                idle_since = None;
1298            }
1299        }
1300    })
1301}
1302
1303/// Tracks restart attempts for a single capsule with exponential backoff.
1304struct RestartTracker {
1305    attempts: u32,
1306    last_attempt: std::time::Instant,
1307    backoff: std::time::Duration,
1308}
1309
1310impl RestartTracker {
1311    const MAX_ATTEMPTS: u32 = 5;
1312    const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
1313    const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_mins(2);
1314
1315    fn new() -> Self {
1316        Self {
1317            attempts: 0,
1318            last_attempt: std::time::Instant::now(),
1319            backoff: Self::INITIAL_BACKOFF,
1320        }
1321    }
1322
1323    /// Returns `true` if a restart should be attempted now.
1324    fn should_restart(&self) -> bool {
1325        self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
1326    }
1327
1328    /// Record a restart attempt and advance the backoff.
1329    fn record_attempt(&mut self) {
1330        self.attempts = self.attempts.saturating_add(1);
1331        self.last_attempt = std::time::Instant::now();
1332        self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
1333    }
1334
1335    /// Returns `true` if all retry attempts have been exhausted.
1336    fn exhausted(&self) -> bool {
1337        self.attempts >= Self::MAX_ATTEMPTS
1338    }
1339}
1340
1341/// Attempts to restart a failed capsule, respecting backoff and max retries.
1342///
1343/// Returns `true` if the tracker should be removed (successful restart).
1344async fn attempt_capsule_restart(
1345    kernel: &Kernel,
1346    id_str: &str,
1347    tracker: &mut RestartTracker,
1348) -> bool {
1349    if tracker.exhausted() {
1350        return false;
1351    }
1352
1353    if !tracker.should_restart() {
1354        tracing::debug!(
1355            capsule_id = %id_str,
1356            next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
1357            "Waiting for backoff before next restart attempt"
1358        );
1359        return false;
1360    }
1361
1362    tracker.record_attempt();
1363    let attempt = tracker.attempts;
1364
1365    tracing::warn!(
1366        capsule_id = %id_str,
1367        attempt,
1368        max_attempts = RestartTracker::MAX_ATTEMPTS,
1369        "Attempting capsule restart"
1370    );
1371
1372    let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1373    match kernel.restart_capsule(&capsule_id).await {
1374        Ok(()) => {
1375            tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1376            true
1377        },
1378        Err(e) => {
1379            tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1380            if tracker.exhausted() {
1381                tracing::error!(
1382                    capsule_id = %id_str,
1383                    "All restart attempts exhausted - capsule will remain down"
1384                );
1385            }
1386            false
1387        },
1388    }
1389}
1390
1391/// Spawns a background task that periodically probes capsule health.
1392///
1393/// Every 10 seconds, reads the capsule registry and calls `check_health()` on
1394/// each capsule that is currently in `Ready` state. If a capsule reports
1395/// `Failed`, attempts to restart it with exponential backoff (max 5 attempts).
1396/// Publishes `astrid.v1.health.failed` IPC events for each detected failure.
1397fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1398    tokio::spawn(async move {
1399        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1400        interval.tick().await; // Skip the first immediate tick.
1401
1402        let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1403            std::collections::HashMap::new();
1404
1405        loop {
1406            interval.tick().await;
1407            metrics::counter!(METRIC_BACKGROUND_TICKS_TOTAL, "loop" => "capsule_health")
1408                .increment(1);
1409
1410            // Collect ready capsules under a brief read lock, then drop
1411            // the lock before calling check_health() or publishing events.
1412            let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1413                let registry = kernel.capsules.read().await;
1414                registry
1415                    .list()
1416                    .into_iter()
1417                    .filter_map(|id| {
1418                        let capsule = registry.get(id)?;
1419                        if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1420                            Some(capsule)
1421                        } else {
1422                            None
1423                        }
1424                    })
1425                    .collect()
1426            };
1427
1428            // Probe health once per capsule, collect failures, then drop
1429            // the Arc Vec before restarting. This ensures restart_capsule's
1430            // Arc::get_mut can succeed (no other strong references held).
1431            let mut failures: Vec<(String, String)> = Vec::new();
1432            for capsule in &ready_capsules {
1433                let health = capsule.check_health();
1434                if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1435                    let id_str = capsule.id().to_string();
1436                    tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1437
1438                    let msg = astrid_events::ipc::IpcMessage::new(
1439                        "astrid.v1.health.failed",
1440                        astrid_events::ipc::IpcPayload::Custom {
1441                            data: serde_json::json!({
1442                                "capsule_id": &id_str,
1443                                "reason": &reason,
1444                            }),
1445                        },
1446                        uuid::Uuid::new_v4(),
1447                    );
1448                    let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1449                        metadata: astrid_events::EventMetadata::new("kernel"),
1450                        message: msg,
1451                    });
1452                    failures.push((id_str, reason));
1453                }
1454            }
1455
1456            // Drop all Arc clones so restart_capsule's Arc::get_mut can
1457            // obtain exclusive access for calling unload().
1458            drop(ready_capsules);
1459
1460            let failed_this_tick: std::collections::HashSet<&str> =
1461                failures.iter().map(|(id, _)| id.as_str()).collect();
1462
1463            let mut restarted = Vec::new();
1464            for (id_str, _reason) in &failures {
1465                let tracker = restart_trackers
1466                    .entry(id_str.clone())
1467                    .or_insert_with(RestartTracker::new);
1468
1469                if attempt_capsule_restart(&kernel, id_str, tracker).await {
1470                    restarted.push(id_str.clone());
1471                }
1472            }
1473
1474            // Remove trackers for successfully restarted capsules.
1475            for id in &restarted {
1476                restart_trackers.remove(id);
1477            }
1478
1479            // Prune trackers for capsules that recovered (healthy this tick).
1480            // Keep exhausted trackers and trackers still in their backoff
1481            // window (capsule may have been unregistered by a failed restart
1482            // attempt and won't appear in ready_capsules next tick).
1483            restart_trackers.retain(|id, tracker| {
1484                if tracker.exhausted() {
1485                    return true;
1486                }
1487                // Keep if still within backoff - the capsule may be absent
1488                // from the registry after a failed reload.
1489                if tracker.last_attempt.elapsed() < tracker.backoff {
1490                    return true;
1491                }
1492                failed_this_tick.contains(id.as_str())
1493            });
1494        }
1495    })
1496}
1497
1498/// Spawns a periodic watchdog that publishes `astrid.v1.watchdog.tick` events every 5 seconds.
1499///
1500/// The `ReAct` capsule (WASM guest) cannot use async timers, so this kernel-side task
1501/// drives timeout enforcement by waking the capsule on a fixed interval. Each tick
1502/// causes the capsule's `handle_watchdog_tick` interceptor to run `check_phase_timeout`.
1503fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1504    tokio::spawn(async move {
1505        let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1506        // The first tick fires immediately - skip it to give capsules time to load.
1507        interval.tick().await;
1508
1509        loop {
1510            interval.tick().await;
1511            metrics::counter!(METRIC_BACKGROUND_TICKS_TOTAL, "loop" => "react_watchdog")
1512                .increment(1);
1513
1514            let msg = astrid_events::ipc::IpcMessage::new(
1515                "astrid.v1.watchdog.tick",
1516                astrid_events::ipc::IpcPayload::Custom {
1517                    data: serde_json::json!({}),
1518                },
1519                uuid::Uuid::new_v4(),
1520            );
1521            let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1522                metadata: astrid_events::EventMetadata::new("kernel"),
1523                message: msg,
1524            });
1525        }
1526    })
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531    use super::*;
1532
1533    #[test]
1534    fn test_load_or_generate_creates_new_key() {
1535        let dir = tempfile::tempdir().unwrap();
1536        let keys_dir = dir.path().join("keys");
1537
1538        let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1539        let key_path = keys_dir.join("runtime.key");
1540
1541        // Key file should exist with 32 bytes.
1542        assert!(key_path.exists());
1543        let bytes = std::fs::read(&key_path).unwrap();
1544        assert_eq!(bytes.len(), 32);
1545
1546        // The written bytes should reconstruct the same public key.
1547        let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1548        assert_eq!(
1549            keypair.public_key_bytes(),
1550            reloaded.public_key_bytes(),
1551            "reloaded key should match generated key"
1552        );
1553    }
1554
1555    #[test]
1556    fn test_load_or_generate_is_idempotent() {
1557        let dir = tempfile::tempdir().unwrap();
1558        let keys_dir = dir.path().join("keys");
1559
1560        let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1561        let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1562
1563        assert_eq!(
1564            first.public_key_bytes(),
1565            second.public_key_bytes(),
1566            "loading the same key file should produce the same keypair"
1567        );
1568    }
1569
1570    #[test]
1571    fn test_load_or_generate_rejects_bad_key_length() {
1572        let dir = tempfile::tempdir().unwrap();
1573        let keys_dir = dir.path().join("keys");
1574        std::fs::create_dir_all(&keys_dir).unwrap();
1575
1576        // Write a key file with wrong length.
1577        std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1578
1579        let result = load_or_generate_runtime_key(&keys_dir);
1580        assert!(result.is_err());
1581        let err = result.unwrap_err().to_string();
1582        assert!(
1583            err.contains("invalid runtime key"),
1584            "expected 'invalid runtime key' error, got: {err}"
1585        );
1586    }
1587
1588    #[test]
1589    fn test_connection_counter_increment_decrement() {
1590        let counter = AtomicUsize::new(0);
1591
1592        // Simulate connection_opened (fetch_add)
1593        counter.fetch_add(1, Ordering::Relaxed);
1594        counter.fetch_add(1, Ordering::Relaxed);
1595        assert_eq!(counter.load(Ordering::Relaxed), 2);
1596
1597        // Simulate connection_closed using the same fetch_update logic
1598        // as the real implementation to exercise the actual code path.
1599        for expected in [1, 0] {
1600            let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1601                if n == 0 {
1602                    None
1603                } else {
1604                    Some(n.saturating_sub(1))
1605                }
1606            });
1607            assert_eq!(counter.load(Ordering::Relaxed), expected);
1608        }
1609    }
1610
1611    #[test]
1612    fn test_connection_counter_underflow_guard() {
1613        // Test the saturating behavior: decrementing from 0 should stay at 0.
1614        // Mirrors the fetch_update logic in connection_closed().
1615        let counter = AtomicUsize::new(0);
1616
1617        let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1618            if n == 0 { None } else { Some(n - 1) }
1619        });
1620        // fetch_update returns Err(0) when the closure returns None (no-op).
1621        assert!(result.is_err());
1622        assert_eq!(counter.load(Ordering::Relaxed), 0);
1623    }
1624
1625    /// Mirrors the `connection_closed(&principal)` logic: only `Ok(1)`
1626    /// (previous value 1, now 0) triggers `clear_session_allowances` for
1627    /// that principal. Update this test if `connection_closed()` is
1628    /// refactored.
1629    #[test]
1630    fn test_last_disconnect_clears_session_allowances_scoped() {
1631        use astrid_approval::AllowanceStore;
1632        use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1633        use astrid_core::principal::PrincipalId;
1634        use astrid_core::types::Timestamp;
1635        use astrid_crypto::KeyPair;
1636
1637        let store = AllowanceStore::new();
1638        let keypair = KeyPair::generate();
1639        let alice = PrincipalId::new("alice").unwrap();
1640        let bob = PrincipalId::new("bob").unwrap();
1641
1642        // Alice: session + persistent.
1643        store
1644            .add_allowance(Allowance {
1645                id: AllowanceId::new(),
1646                principal: alice.clone(),
1647                action_pattern: AllowancePattern::ServerTools {
1648                    server: "alice-session".to_string(),
1649                },
1650                created_at: Timestamp::now(),
1651                expires_at: None,
1652                max_uses: None,
1653                uses_remaining: None,
1654                session_only: true,
1655                workspace_root: None,
1656                signature: keypair.sign(b"test"),
1657            })
1658            .unwrap();
1659        store
1660            .add_allowance(Allowance {
1661                id: AllowanceId::new(),
1662                principal: alice.clone(),
1663                action_pattern: AllowancePattern::ServerTools {
1664                    server: "alice-persistent".to_string(),
1665                },
1666                created_at: Timestamp::now(),
1667                expires_at: None,
1668                max_uses: None,
1669                uses_remaining: None,
1670                session_only: false,
1671                workspace_root: None,
1672                signature: keypair.sign(b"test"),
1673            })
1674            .unwrap();
1675        // Bob: session (must NOT be cleared by alice disconnecting).
1676        store
1677            .add_allowance(Allowance {
1678                id: AllowanceId::new(),
1679                principal: bob.clone(),
1680                action_pattern: AllowancePattern::ServerTools {
1681                    server: "bob-session".to_string(),
1682                },
1683                created_at: Timestamp::now(),
1684                expires_at: None,
1685                max_uses: None,
1686                uses_remaining: None,
1687                session_only: true,
1688                workspace_root: None,
1689                signature: keypair.sign(b"test"),
1690            })
1691            .unwrap();
1692        assert_eq!(store.count(), 3);
1693
1694        let alice_counter = AtomicUsize::new(1);
1695        let simulate_alice_disconnect = || {
1696            let result = alice_counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1697                if n == 0 {
1698                    None
1699                } else {
1700                    Some(n.saturating_sub(1))
1701                }
1702            });
1703            if result == Ok(1) {
1704                store.clear_session_allowances(&alice);
1705            }
1706        };
1707
1708        simulate_alice_disconnect();
1709        // Alice's session gone; alice's persistent + bob's session remain.
1710        assert_eq!(store.count(), 2);
1711        assert_eq!(store.count_for(&alice), 1);
1712        assert_eq!(store.count_for(&bob), 1);
1713    }
1714
1715    #[cfg(unix)]
1716    #[test]
1717    fn test_load_or_generate_sets_secure_permissions() {
1718        use std::os::unix::fs::PermissionsExt;
1719
1720        let dir = tempfile::tempdir().unwrap();
1721        let keys_dir = dir.path().join("keys");
1722
1723        let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1724
1725        let key_path = keys_dir.join("runtime.key");
1726        let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1727        assert_eq!(
1728            mode & 0o777,
1729            0o600,
1730            "key file should have 0o600 permissions, got {mode:#o}"
1731        );
1732    }
1733
1734    #[test]
1735    fn restart_tracker_initial_state() {
1736        let tracker = RestartTracker::new();
1737        assert!(!tracker.exhausted());
1738        // Should not restart immediately (backoff hasn't elapsed).
1739        assert!(!tracker.should_restart());
1740    }
1741
1742    #[test]
1743    fn restart_tracker_allows_restart_after_backoff() {
1744        let mut tracker = RestartTracker::new();
1745        // Simulate time passing by setting last_attempt in the past.
1746        tracker.last_attempt = std::time::Instant::now()
1747            - RestartTracker::INITIAL_BACKOFF
1748            - std::time::Duration::from_millis(1);
1749        assert!(tracker.should_restart());
1750    }
1751
1752    #[test]
1753    fn restart_tracker_doubles_backoff() {
1754        let mut tracker = RestartTracker::new();
1755        assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1756
1757        tracker.record_attempt();
1758        assert_eq!(
1759            tracker.backoff,
1760            RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1761        );
1762        assert_eq!(tracker.attempts, 1);
1763
1764        tracker.record_attempt();
1765        assert_eq!(
1766            tracker.backoff,
1767            RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1768        );
1769        assert_eq!(tracker.attempts, 2);
1770    }
1771
1772    #[test]
1773    fn restart_tracker_backoff_caps_at_max() {
1774        let mut tracker = RestartTracker::new();
1775        for _ in 0..20 {
1776            tracker.record_attempt();
1777        }
1778        assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1779    }
1780
1781    #[test]
1782    fn restart_tracker_exhausted_at_max_attempts() {
1783        let mut tracker = RestartTracker::new();
1784        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1785            assert!(!tracker.exhausted());
1786            tracker.record_attempt();
1787        }
1788        assert!(tracker.exhausted());
1789    }
1790
1791    #[test]
1792    fn restart_tracker_should_restart_false_when_exhausted() {
1793        let mut tracker = RestartTracker::new();
1794        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1795            tracker.record_attempt();
1796        }
1797        // Even if backoff has elapsed, exhausted tracker should not restart.
1798        tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1799        assert!(!tracker.should_restart());
1800    }
1801
1802    // ── Bootstrap admin-group seeding (issue #670) ───────────────────
1803
1804    fn scratch_home() -> (tempfile::TempDir, astrid_core::dirs::AstridHome) {
1805        let dir = tempfile::tempdir().unwrap();
1806        let home = astrid_core::dirs::AstridHome::from_path(dir.path());
1807        (dir, home)
1808    }
1809
1810    #[test]
1811    fn seed_admin_writes_fresh_profile_when_missing() {
1812        let (_d, home) = scratch_home();
1813        let default = astrid_core::PrincipalId::default();
1814        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1815        assert!(!path.exists());
1816
1817        seed_default_principal_admin_profile(&home).unwrap();
1818
1819        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1820        assert_eq!(profile.groups, vec!["admin".to_string()]);
1821        assert!(profile.grants.is_empty());
1822        assert!(profile.revokes.is_empty());
1823    }
1824
1825    #[test]
1826    fn seed_admin_is_idempotent_across_reboots() {
1827        let (_d, home) = scratch_home();
1828        let default = astrid_core::PrincipalId::default();
1829
1830        seed_default_principal_admin_profile(&home).unwrap();
1831        seed_default_principal_admin_profile(&home).unwrap();
1832        seed_default_principal_admin_profile(&home).unwrap();
1833
1834        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1835        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1836        // Still exactly one `admin` entry — no duplication.
1837        assert_eq!(profile.groups, vec!["admin".to_string()]);
1838    }
1839
1840    #[test]
1841    fn seed_admin_leaves_operator_configured_groups_intact() {
1842        let (_d, home) = scratch_home();
1843        let default = astrid_core::PrincipalId::default();
1844
1845        // Operator wrote their own config pre-bootstrap.
1846        let mut existing = astrid_core::PrincipalProfile::default();
1847        existing.groups = vec!["agent".to_string()];
1848        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1849        std::fs::create_dir_all(home.profiles_dir()).unwrap();
1850        existing.save_to_path(&path).unwrap();
1851
1852        seed_default_principal_admin_profile(&home).unwrap();
1853
1854        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1855        assert_eq!(profile.groups, vec!["agent".to_string()]);
1856    }
1857
1858    #[test]
1859    fn seed_admin_leaves_operator_configured_grants_intact() {
1860        let (_d, home) = scratch_home();
1861        let default = astrid_core::PrincipalId::default();
1862
1863        let mut existing = astrid_core::PrincipalProfile::default();
1864        existing.grants = vec!["system:status".to_string()];
1865        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1866        std::fs::create_dir_all(home.profiles_dir()).unwrap();
1867        existing.save_to_path(&path).unwrap();
1868
1869        seed_default_principal_admin_profile(&home).unwrap();
1870
1871        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1872        // admin not auto-added because grants are non-empty.
1873        assert!(profile.groups.is_empty());
1874        assert_eq!(profile.grants, vec!["system:status".to_string()]);
1875    }
1876
1877    #[test]
1878    fn seed_admin_leaves_operator_configured_revokes_intact() {
1879        let (_d, home) = scratch_home();
1880        let default = astrid_core::PrincipalId::default();
1881
1882        let mut existing = astrid_core::PrincipalProfile::default();
1883        existing.revokes = vec!["system:shutdown".to_string()];
1884        let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1885        std::fs::create_dir_all(home.profiles_dir()).unwrap();
1886        existing.save_to_path(&path).unwrap();
1887
1888        seed_default_principal_admin_profile(&home).unwrap();
1889
1890        let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1891        assert!(profile.groups.is_empty());
1892        assert_eq!(profile.revokes, vec!["system:shutdown".to_string()]);
1893    }
1894
1895    // ── Legacy profile path migration (issue #672) ──────────────────
1896
1897    #[test]
1898    fn migrate_legacy_profile_relocates_to_etc() {
1899        // Pre-#672 deployments wrote profile.toml under
1900        // home/{principal}/.config/. The migration moves it to
1901        // etc/profiles/{principal}.toml on first boot.
1902        let (_d, home) = scratch_home();
1903        let default = astrid_core::PrincipalId::default();
1904        let legacy_path = home
1905            .principal_home(&default)
1906            .config_dir()
1907            .join("profile.toml");
1908        std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1909        let mut existing = astrid_core::PrincipalProfile::default();
1910        existing.groups = vec!["operator-configured".to_string()];
1911        existing.save_to_path(&legacy_path).unwrap();
1912
1913        seed_default_principal_admin_profile(&home).unwrap();
1914
1915        // Legacy path gone, new path holds the migrated content.
1916        assert!(!legacy_path.exists());
1917        let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1918        let migrated = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1919        assert_eq!(migrated.groups, vec!["operator-configured".to_string()]);
1920    }
1921
1922    #[test]
1923    fn migrate_legacy_profile_drops_stale_legacy_when_new_already_exists() {
1924        // Operator already migrated by hand (or a prior boot did) —
1925        // the new path holds the canonical config. Don't clobber it
1926        // with the legacy file; just remove the legacy so capsules
1927        // can't reach it through home://.
1928        let (_d, home) = scratch_home();
1929        let default = astrid_core::PrincipalId::default();
1930
1931        // Stale legacy with operator-stale content.
1932        let legacy_path = home
1933            .principal_home(&default)
1934            .config_dir()
1935            .join("profile.toml");
1936        std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1937        let mut stale = astrid_core::PrincipalProfile::default();
1938        stale.groups = vec!["stale".to_string()];
1939        stale.save_to_path(&legacy_path).unwrap();
1940
1941        // Fresh new-path content (migrated already).
1942        let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1943        std::fs::create_dir_all(new_path.parent().unwrap()).unwrap();
1944        let mut canonical = astrid_core::PrincipalProfile::default();
1945        canonical.groups = vec!["canonical".to_string()];
1946        canonical.save_to_path(&new_path).unwrap();
1947
1948        seed_default_principal_admin_profile(&home).unwrap();
1949
1950        // Legacy removed, canonical preserved.
1951        assert!(!legacy_path.exists());
1952        let result = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1953        assert_eq!(result.groups, vec!["canonical".to_string()]);
1954    }
1955}
1956
1957// ---------------------------------------------------------------------------
1958// Boot validation
1959// ---------------------------------------------------------------------------
1960
1961/// Validate that every capsule's required imports have a matching export
1962/// from another loaded capsule. Logs errors for unsatisfied required imports
1963/// and info messages for unsatisfied optional imports. Also warns about
1964/// duplicate exports of the same interface from multiple capsules.
1965fn validate_imports_exports(
1966    manifests: &[(
1967        astrid_capsule::manifest::CapsuleManifest,
1968        std::path::PathBuf,
1969    )],
1970) {
1971    // Track (namespace, interface) → list of (capsule_name, version).
1972    let mut exports_by_interface: std::collections::HashMap<
1973        (&str, &str),
1974        Vec<(&str, &semver::Version)>,
1975    > = std::collections::HashMap::new();
1976
1977    for (m, _) in manifests {
1978        for (ns, name, ver) in m.export_triples() {
1979            exports_by_interface
1980                .entry((ns, name))
1981                .or_default()
1982                .push((&m.package.name, ver));
1983        }
1984    }
1985
1986    // Warn about duplicate exports — two capsules providing the same interface
1987    // will both fire on matching events, causing double-processing.
1988    for ((ns, name), providers) in &exports_by_interface {
1989        if providers.len() > 1 {
1990            let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
1991            tracing::warn!(
1992                interface = %format!("{ns}/{name}"),
1993                providers = ?names,
1994                "Multiple capsules export the same interface — events may be double-processed. \
1995                 Consider removing one with `astrid capsule remove`."
1996            );
1997        }
1998    }
1999
2000    let mut satisfied_count: u32 = 0;
2001    let mut warning_count: u32 = 0;
2002
2003    for (manifest, _) in manifests {
2004        for (ns, name, req, optional) in manifest.import_tuples() {
2005            let has_provider = exports_by_interface
2006                .get(&(ns, name))
2007                .is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
2008
2009            if has_provider {
2010                satisfied_count = satisfied_count.saturating_add(1);
2011            } else if optional {
2012                tracing::info!(
2013                    capsule = %manifest.package.name,
2014                    import = %format!("{ns}/{name} {req}"),
2015                    "Optional import not satisfied — capsule will boot with reduced functionality"
2016                );
2017                warning_count = warning_count.saturating_add(1);
2018            } else {
2019                tracing::error!(
2020                    capsule = %manifest.package.name,
2021                    import = %format!("{ns}/{name} {req}"),
2022                    "Required import not satisfied — no loaded capsule exports this interface"
2023                );
2024                warning_count = warning_count.saturating_add(1);
2025            }
2026        }
2027    }
2028
2029    tracing::info!(
2030        capsules = manifests.len(),
2031        imports_satisfied = satisfied_count,
2032        warnings = warning_count,
2033        "Boot validation complete"
2034    );
2035}
2036
2037// ---------------------------------------------------------------------------
2038// Identity bootstrap helpers
2039// ---------------------------------------------------------------------------
2040
2041/// Bootstrap the CLI root user identity at kernel boot.
2042///
2043/// Creates a deterministic root `AstridUserId` on first boot, or reloads it
2044/// on subsequent boots. Auto-links with `platform="cli"`,
2045/// `platform_user_id="local"`, `method="system"`.
2046///
2047/// Also seeds the default principal's profile on disk with
2048/// `groups = ["admin"]` (issue #670) so single-tenant deployments reach
2049/// the management API with full capabilities. The profile write is
2050/// **idempotent** — if the default principal already has a profile with
2051/// an `admin` group, any explicit `grants` / `revokes`, or non-empty
2052/// `groups`, we leave it untouched.
2053///
2054/// Idempotent: skips creation if the root user already exists.
2055async fn bootstrap_cli_root_user(
2056    store: &Arc<dyn astrid_storage::IdentityStore>,
2057    home: &astrid_core::dirs::AstridHome,
2058) -> Result<(), astrid_storage::IdentityError> {
2059    // Seed the default principal profile with the admin group. Runs
2060    // before the identity-link short-circuit below so a deleted profile
2061    // between boots is restored even when the identity record persists.
2062    if let Err(e) = seed_default_principal_admin_profile(home) {
2063        tracing::warn!(error = %e, "Failed to seed default admin profile — continuing boot");
2064    }
2065
2066    // Check if root user already exists by trying to resolve the CLI link.
2067    if let Some(_user) = store.resolve("cli", "local").await? {
2068        tracing::debug!("CLI root user already linked");
2069        return Ok(());
2070    }
2071
2072    // No CLI link exists. Create or find the root user.
2073    let user = store.create_user(Some("root")).await?;
2074    tracing::info!(user_id = %user.id, "Created CLI root user");
2075
2076    // Link the CLI platform identity.
2077    store.link("cli", "local", user.id, "system").await?;
2078    tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
2079
2080    Ok(())
2081}
2082
2083/// Migrate a legacy per-principal `profile.toml` from the pre-#672
2084/// location (`home/{principal}/.config/profile.toml`) to the
2085/// system-managed `etc/profiles/{principal}.toml`. Idempotent across
2086/// boots: if the new path exists, the old one is removed (assumed
2087/// already migrated); if neither exists, no-op.
2088///
2089/// Profile contents are 100% system policy (enabled, groups, grants,
2090/// revokes, quotas, auth public keys) and a capsule running with
2091/// `fs_read = ["home://"]` could read its own policy from the legacy
2092/// location. Moving it under `etc/` puts it outside the `home://` VFS
2093/// scheme entirely.
2094fn migrate_legacy_profile_path(
2095    home: &astrid_core::dirs::AstridHome,
2096    principal: &astrid_core::PrincipalId,
2097) -> Result<(), std::io::Error> {
2098    let legacy_path = home
2099        .principal_home(principal)
2100        .config_dir()
2101        .join("profile.toml");
2102    let new_path = home.profile_path(principal);
2103    if !legacy_path.exists() {
2104        return Ok(());
2105    }
2106    if new_path.exists() {
2107        // Operator already migrated, or a prior boot did the rename.
2108        // Drop the stale legacy file so capsules can no longer reach
2109        // it via `home://.config/profile.toml`.
2110        let _ = std::fs::remove_file(&legacy_path);
2111        return Ok(());
2112    }
2113    if let Some(parent) = new_path.parent() {
2114        std::fs::create_dir_all(parent)?;
2115    }
2116    std::fs::rename(&legacy_path, &new_path)?;
2117    tracing::warn!(
2118        %principal,
2119        legacy = %legacy_path.display(),
2120        new = %new_path.display(),
2121        "Migrated profile.toml out of principal home directory \
2122         (security: capsules with home:// fs_read could read the legacy file)"
2123    );
2124    Ok(())
2125}
2126
2127/// Idempotently ensure the default principal's profile on disk has the
2128/// built-in `admin` group, so the single-tenant CLI path carries full
2129/// management-API capabilities (issue #670).
2130///
2131/// - Missing profile → writes a fresh default with `groups = ["admin"]`.
2132/// - Existing profile with any non-empty `groups` OR any `grants` OR
2133///   any `revokes` → treated as operator-configured, left untouched.
2134/// - Existing profile with `groups = []`, `grants = []`, `revokes = []`
2135///   → adds `admin` to `groups`. This covers the fresh-default case
2136///   where a prior boot wrote a `PrincipalProfile::default()`.
2137///
2138/// Also migrates the legacy `profile.toml` location
2139/// (`home/{principal}/.config/`) to the new system-managed location
2140/// (`etc/profiles/`) on first boot post-#672, see
2141/// [`migrate_legacy_profile_path`].
2142fn seed_default_principal_admin_profile(
2143    home: &astrid_core::dirs::AstridHome,
2144) -> Result<(), astrid_core::ProfileError> {
2145    use astrid_core::PrincipalProfile;
2146
2147    let default_principal = astrid_core::PrincipalId::default();
2148
2149    // Move any legacy file in front of load — load_from_path on the new
2150    // path would otherwise return Default and clobber the operator's
2151    // existing groups/grants/revokes.
2152    if let Err(e) = migrate_legacy_profile_path(home, &default_principal) {
2153        tracing::warn!(error = %e, "Failed to migrate legacy profile path — continuing");
2154    }
2155
2156    let path = PrincipalProfile::path_for(home, &default_principal);
2157    let profile = PrincipalProfile::load_from_path(&path)?;
2158
2159    if !profile.groups.is_empty() || !profile.grants.is_empty() || !profile.revokes.is_empty() {
2160        tracing::debug!(
2161            principal = %default_principal,
2162            "Default principal profile already has group/grant/revoke entries — leaving intact"
2163        );
2164        return Ok(());
2165    }
2166
2167    let mut updated = profile;
2168    updated
2169        .groups
2170        .push(astrid_core::groups::BUILTIN_ADMIN.to_string());
2171    updated.save_to_path(&path)?;
2172    tracing::info!(
2173        principal = %default_principal,
2174        "Seeded default principal with built-in `admin` group"
2175    );
2176    Ok(())
2177}
2178
2179/// Apply pre-configured identity links from the config file.
2180///
2181/// For each `[[identity.links]]` entry, resolves or creates the referenced
2182/// Astrid user and links the platform identity. Logs warnings on failure
2183/// but does not abort boot.
2184async fn apply_identity_config(
2185    store: &Arc<dyn astrid_storage::IdentityStore>,
2186    workspace_root: &std::path::Path,
2187) {
2188    let config = match astrid_config::Config::load(Some(workspace_root)) {
2189        Ok(resolved) => resolved.config,
2190        Err(e) => {
2191            tracing::debug!(error = %e, "No config loaded for identity links");
2192            return;
2193        },
2194    };
2195
2196    for link_cfg in &config.identity.links {
2197        let result = apply_single_identity_link(store, link_cfg).await;
2198        if let Err(e) = result {
2199            tracing::warn!(
2200                platform = %link_cfg.platform,
2201                platform_user_id = %link_cfg.platform_user_id,
2202                astrid_user = %link_cfg.astrid_user,
2203                error = %e,
2204                "Failed to apply identity link from config"
2205            );
2206        }
2207    }
2208}
2209
2210/// Apply a single identity link from config.
2211async fn apply_single_identity_link(
2212    store: &Arc<dyn astrid_storage::IdentityStore>,
2213    link_cfg: &astrid_config::types::IdentityLinkConfig,
2214) -> Result<(), astrid_storage::IdentityError> {
2215    // Resolve astrid_user: try UUID first, then name lookup, then create.
2216    let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
2217        // Ensure user record exists. If the UUID was explicitly specified in
2218        // config but doesn't exist in the store, that's a configuration error
2219        // - don't silently create a different user.
2220        if store.get_user(uuid).await?.is_none() {
2221            return Err(astrid_storage::IdentityError::UserNotFound(uuid));
2222        }
2223        uuid
2224    } else {
2225        // Try name lookup.
2226        if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
2227            user.id
2228        } else {
2229            let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
2230            tracing::info!(
2231                user_id = %user.id,
2232                name = %link_cfg.astrid_user,
2233                "Created user from config identity link"
2234            );
2235            user.id
2236        }
2237    };
2238
2239    let method = if link_cfg.method.is_empty() {
2240        "admin"
2241    } else {
2242        &link_cfg.method
2243    };
2244
2245    // Check if link already points to the correct user - skip if idempotent.
2246    if let Some(existing) = store
2247        .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
2248        .await?
2249        && existing.id == user_id
2250    {
2251        tracing::debug!(
2252            platform = %link_cfg.platform,
2253            platform_user_id = %link_cfg.platform_user_id,
2254            user_id = %user_id,
2255            "Identity link from config already exists"
2256        );
2257        return Ok(());
2258    }
2259
2260    store
2261        .link(
2262            &link_cfg.platform,
2263            &link_cfg.platform_user_id,
2264            user_id,
2265            method,
2266        )
2267        .await?;
2268
2269    tracing::info!(
2270        platform = %link_cfg.platform,
2271        platform_user_id = %link_cfg.platform_user_id,
2272        user_id = %user_id,
2273        "Applied identity link from config"
2274    );
2275
2276    Ok(())
2277}