Skip to main content

astrid_kernel/
lib.rs

1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7//! Astrid Kernel - The core execution engine and IPC router.
8//!
9//! The Kernel is a pure, decentralized WASM runner. It contains no business
10//! logic, no cognitive loops, and no network servers. Its sole responsibility
11//! is to instantiate `astrid_events::EventBus`, load `.capsule` files into
12//! the Extism sandbox, and route IPC bytes between them.
13
14/// The Management API router listening to the `EventBus`.
15pub mod kernel_router;
16/// The Unix Domain Socket manager.
17pub mod socket;
18
19use astrid_audit::AuditLog;
20use astrid_capabilities::{CapabilityStore, DirHandle};
21use astrid_capsule::registry::CapsuleRegistry;
22use astrid_core::SessionId;
23use astrid_crypto::KeyPair;
24use astrid_events::EventBus;
25use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
26use astrid_vfs::{HostVfs, OverlayVfs, Vfs};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
30use tokio::sync::RwLock;
31
32/// The core Operating System Kernel.
33pub struct Kernel {
34    /// The unique identifier for this kernel session.
35    pub session_id: SessionId,
36    /// The global IPC message bus.
37    pub event_bus: Arc<EventBus>,
38    /// The process manager (loaded WASM capsules).
39    pub capsules: Arc<RwLock<CapsuleRegistry>>,
40    /// The secure MCP client with capability-based authorization and audit logging.
41    pub mcp: SecureMcpClient,
42    /// The capability store for this session.
43    pub capabilities: Arc<CapabilityStore>,
44    /// The global Virtual File System mount.
45    pub vfs: Arc<dyn Vfs>,
46    /// Concrete reference to the [`OverlayVfs`] for commit/rollback operations.
47    pub overlay_vfs: Arc<OverlayVfs>,
48    /// Ephemeral upper directory for the overlay VFS. Kept alive for the
49    /// kernel session lifetime; dropped on shutdown to discard uncommitted writes.
50    _upper_dir: Arc<tempfile::TempDir>,
51    /// The global physical root handle (cap-std) for the VFS.
52    pub vfs_root_handle: DirHandle,
53    /// The physical path the VFS is mounted to.
54    pub workspace_root: PathBuf,
55    /// The principal home resources directory (`~/.astrid/home/{principal}/`).
56    /// Capsules declaring `fs_read = ["home://"]` can read files under this
57    /// root. Scoped to the principal's home so that keys, databases, and
58    /// system config in `~/.astrid/` are NOT accessible.
59    ///
60    /// Always `Some` in production (boot requires `AstridHome`). Remains
61    /// `Option` for compatibility with `CapsuleContext` and test fixtures.
62    pub home_root: Option<PathBuf>,
63    /// The natively bound Unix Socket for the CLI proxy.
64    pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
65    /// Shared KV store backing all capsule-scoped stores and kernel state.
66    pub kv: Arc<astrid_storage::SurrealKvStore>,
67    /// Chain-linked cryptographic audit log with persistent storage.
68    pub audit_log: Arc<AuditLog>,
69    /// Number of active client connections (CLI sessions).
70    pub active_connections: AtomicUsize,
71    /// Ephemeral mode: shut down immediately when the last client disconnects.
72    pub ephemeral: AtomicBool,
73    /// Instant when the kernel was booted (for uptime calculation).
74    pub boot_time: std::time::Instant,
75    /// Sender for the API-initiated shutdown signal. The daemon's main loop
76    /// selects on the receiver to exit gracefully without `process::exit`.
77    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
78    /// Session token for socket authentication. Generated at boot, written to
79    /// `~/.astrid/run/system.token`. CLI sends this as its first message.
80    pub session_token: Arc<astrid_core::session_token::SessionToken>,
81    /// Path where the session token was written at boot. Stored so shutdown
82    /// uses the exact same path (avoids fallback mismatch if env changes).
83    token_path: PathBuf,
84    /// Shared allowance store for capsule-level approval decisions.
85    ///
86    /// Capsules can check existing allowances and create new ones when
87    /// users approve actions with session/always scope.
88    pub allowance_store: Arc<astrid_approval::AllowanceStore>,
89    /// System-wide identity store for platform user resolution.
90    identity_store: Arc<dyn astrid_storage::IdentityStore>,
91}
92
93impl Kernel {
94    /// Boot a new Kernel instance mounted at the specified directory.
95    ///
96    /// # Panics
97    ///
98    /// Panics if called on a single-threaded tokio runtime. The capsule
99    /// system uses `block_in_place` which requires a multi-threaded runtime.
100    ///
101    /// # Errors
102    ///
103    /// Returns an error if the VFS mount paths cannot be registered.
104    pub async fn new(
105        session_id: SessionId,
106        workspace_root: PathBuf,
107    ) -> Result<Arc<Self>, std::io::Error> {
108        use astrid_core::dirs::AstridHome;
109
110        assert!(
111            tokio::runtime::Handle::current().runtime_flavor()
112                == tokio::runtime::RuntimeFlavor::MultiThread,
113            "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
114             single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
115        );
116
117        let event_bus = Arc::new(EventBus::new());
118        let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
119
120        // Resolve the Astrid home directory. Required for persistent KV store
121        // and audit log. Fails boot if neither $ASTRID_HOME nor $HOME is set.
122        let home = AstridHome::resolve().map_err(|e| {
123            std::io::Error::other(format!(
124                "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
125            ))
126        })?;
127
128        // Resolve the home directory for the `home://` VFS scheme.
129        // Points to `~/.astrid/home/{principal}/` — NOT the full `~/.astrid/`
130        // root — so capsules cannot access keys, databases, or config.
131        let default_principal = astrid_core::PrincipalId::default();
132        let principal_home = home.principal_home(&default_principal);
133        let home_root = Some(principal_home.root().to_path_buf());
134
135        // 1. Open the persistent KV store (needed by capability store below).
136        let kv_path = home.state_db_path();
137        let kv = Arc::new(
138            astrid_storage::SurrealKvStore::open(&kv_path)
139                .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
140        );
141        // TODO: clear ephemeral keys (e: prefix) on boot when the key
142        // lifecycle tier convention is established.
143
144        // 2. Initialize MCP process manager with security layer.
145        //    Set workspace_root so sandboxed MCP servers have a writable directory.
146        let mcp_config = ServersConfig::load_default().unwrap_or_default();
147        let mcp_manager = ServerManager::new(mcp_config)
148            .with_workspace_root(workspace_root.clone())
149            .with_capsule_log_dir(principal_home.log_dir());
150        let mcp_client = McpClient::new(mcp_manager);
151
152        // 3. Bootstrap capability store (persistent) and audit log.
153        //    Key rotation invalidates persisted tokens (fail-secure by design).
154        let capabilities = Arc::new(
155            CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
156                .map_err(|e| {
157                    std::io::Error::other(format!("Failed to init capability store: {e}"))
158                })?,
159        );
160        let audit_log = open_audit_log()?;
161        let mcp = SecureMcpClient::new(
162            mcp_client,
163            Arc::clone(&capabilities),
164            Arc::clone(&audit_log),
165            session_id.clone(),
166        );
167
168        // 4. Establish the physical security boundary (sandbox handle)
169        let root_handle = DirHandle::new();
170
171        // 5. Initialize sandboxed overlay VFS (lower=workspace, upper=temp)
172        let (overlay_vfs, upper_temp) = init_overlay_vfs(&root_handle, &workspace_root).await?;
173
174        // 6. Bind the secure Unix socket and generate session token.
175        // The socket is bound here, but not yet listened on. The token is
176        // generated before any capsule can accept connections, preventing
177        // a race where a client connects before the token file exists.
178        let listener = socket::bind_session_socket()?;
179        let (session_token, token_path) = socket::generate_session_token()?;
180
181        let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
182
183        // Create system-wide identity store backed by the shared KV.
184        let identity_kv = astrid_storage::ScopedKvStore::new(
185            Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
186            "system:identity",
187        )
188        .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
189        let identity_store: Arc<dyn astrid_storage::IdentityStore> =
190            Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
191
192        // Bootstrap the CLI root user (idempotent).
193        bootstrap_cli_root_user(&identity_store)
194            .await
195            .map_err(|e| {
196                std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
197            })?;
198
199        // Apply pre-configured identity links from config.
200        apply_identity_config(&identity_store, &workspace_root).await;
201
202        let kernel = Arc::new(Self {
203            session_id,
204            event_bus,
205            capsules,
206            mcp,
207            capabilities,
208            vfs: Arc::clone(&overlay_vfs) as Arc<dyn Vfs>,
209            overlay_vfs,
210            _upper_dir: Arc::new(upper_temp),
211            vfs_root_handle: root_handle,
212            workspace_root,
213            home_root,
214            cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
215            kv,
216            audit_log,
217            active_connections: AtomicUsize::new(0),
218            ephemeral: AtomicBool::new(false),
219            boot_time: std::time::Instant::now(),
220            shutdown_tx: tokio::sync::watch::channel(false).0,
221            session_token: Arc::new(session_token),
222            token_path,
223            allowance_store,
224            identity_store,
225        });
226
227        drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
228        drop(spawn_idle_monitor(Arc::clone(&kernel)));
229        drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
230        drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
231
232        // Spawn the event dispatcher — routes EventBus events to capsule interceptors.
233        // Wire the identity store so auto-provisioning is gated.
234        let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
235            Arc::clone(&kernel.capsules),
236            Arc::clone(&kernel.event_bus),
237        )
238        .with_identity_store(Arc::clone(&kernel.identity_store));
239        tokio::spawn(dispatcher.run());
240
241        debug_assert_eq!(
242            kernel.event_bus.subscriber_count(),
243            INTERNAL_SUBSCRIBER_COUNT,
244            "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
245        );
246
247        Ok(kernel)
248    }
249
250    /// Load a capsule into the Kernel from a directory containing a Capsule.toml
251    ///
252    /// # Errors
253    ///
254    /// Returns an error if the manifest cannot be loaded, the capsule cannot be created, or registration fails.
255    async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
256        let manifest_path = dir.join("Capsule.toml");
257        let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
258            .map_err(|e| anyhow::anyhow!(e))?;
259
260        // Skip if already registered (prevents double-load from overlapping
261        // discovery paths like principal home + workspace capsules).
262        {
263            let registry = self.capsules.read().await;
264            let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
265            if registry.get(&id).is_some() {
266                return Ok(());
267            }
268        }
269
270        let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
271        let mut capsule = loader.create_capsule(manifest, dir.clone())?;
272
273        // Build the context — use the shared kernel KV so capsules can
274        // communicate state through overlapping KV namespaces.
275        let principal = astrid_core::PrincipalId::default();
276        let kv = astrid_storage::ScopedKvStore::new(
277            Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
278            format!("{principal}:capsule:{}", capsule.id()),
279        )?;
280
281        // Pre-load env config into the KV store.
282        // Check principal config first, fall back to capsule dir's .env.json.
283        let capsule_name = capsule.id().to_string();
284        let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
285            let ph = home.principal_home(&principal);
286            let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
287            if principal_env.exists() {
288                principal_env
289            } else {
290                dir.join(".env.json")
291            }
292        } else {
293            dir.join(".env.json")
294        };
295        if env_path.exists()
296            && let Ok(contents) = std::fs::read_to_string(&env_path)
297            && let Ok(env_map) =
298                serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
299        {
300            for (k, v) in env_map {
301                let _ = kv.set(&k, v.into_bytes()).await;
302            }
303        }
304
305        let ctx = astrid_capsule::context::CapsuleContext::new(
306            principal.clone(),
307            self.workspace_root.clone(),
308            self.home_root.clone(),
309            kv,
310            Arc::clone(&self.event_bus),
311            self.cli_socket_listener.clone(),
312        )
313        .with_registry(Arc::clone(&self.capsules))
314        .with_session_token(Arc::clone(&self.session_token))
315        .with_allowance_store(Arc::clone(&self.allowance_store))
316        .with_identity_store(Arc::clone(&self.identity_store));
317
318        capsule.load(&ctx).await?;
319
320        let mut registry = self.capsules.write().await;
321        registry
322            .register(capsule)
323            .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
324
325        Ok(())
326    }
327
328    /// Restart a capsule by unloading it and re-loading from its source directory.
329    ///
330    /// # Errors
331    ///
332    /// Returns an error if the capsule has no source directory, cannot be
333    /// unregistered, or fails to reload.
334    async fn restart_capsule(
335        &self,
336        id: &astrid_capsule::capsule::CapsuleId,
337    ) -> Result<(), anyhow::Error> {
338        // Get source directory before unregistering.
339        let source_dir = {
340            let registry = self.capsules.read().await;
341            let capsule = registry
342                .get(id)
343                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
344            capsule
345                .source_dir()
346                .map(std::path::Path::to_path_buf)
347                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
348        };
349
350        // Unregister and explicitly unload. There is no Drop impl that
351        // calls unload() (it's async), so we must do it here to avoid
352        // leaking MCP subprocesses and other engine resources.
353        let old_capsule = {
354            let mut registry = self.capsules.write().await;
355            registry
356                .unregister(id)
357                .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
358        };
359        // Explicitly unload the old capsule. There is no Drop impl that
360        // calls unload() (it's async), so we must do it here to avoid
361        // leaking MCP subprocesses and other engine resources.
362        // Arc::get_mut requires exclusive ownership (strong_count == 1).
363        {
364            let mut old = old_capsule;
365            if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
366                if let Err(e) = capsule.unload().await {
367                    tracing::warn!(
368                        capsule_id = %id,
369                        error = %e,
370                        "Capsule unload failed during restart"
371                    );
372                }
373            } else {
374                tracing::warn!(
375                    capsule_id = %id,
376                    "Cannot call unload during restart - Arc still held by in-flight task"
377                );
378            }
379        }
380
381        // Re-load from disk.
382        self.load_capsule(source_dir).await?;
383
384        // Signal the newly loaded capsule to clean up ephemeral state
385        // from the previous incarnation. Capsules that don't implement
386        // `handle_lifecycle_restart` will return an error, which is fine.
387        //
388        // Clone the capsule Arc under a brief read lock, then drop the
389        // guard before invoke_interceptor which calls block_in_place.
390        // Holding the RwLock across block_in_place parks the worker thread
391        // and starves registry writers (health monitor, capsule loading).
392        let capsule = {
393            let registry = self.capsules.read().await;
394            registry.get(id)
395        };
396        if let Some(capsule) = capsule
397            && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[], None)
398        {
399            tracing::debug!(
400                capsule_id = %id,
401                error = %e,
402                "Capsule does not handle lifecycle restart (optional)"
403            );
404        }
405
406        Ok(())
407    }
408
409    /// Auto-discover and load all capsules from the standard directories (`~/.astrid/capsules` and `.astrid/capsules`).
410    ///
411    /// Capsules are loaded in dependency order (topological sort) with
412    /// uplink/daemon capsules loaded first. Each uplink must signal
413    /// readiness before non-uplink capsules are loaded.
414    ///
415    /// After all capsules are loaded, tool schemas are injected into every
416    /// capsule's KV namespace and the `astrid.v1.capsules_loaded` event is published.
417    pub async fn load_all_capsules(&self) {
418        use astrid_capsule::toposort::toposort_manifests;
419        use astrid_core::dirs::AstridHome;
420
421        // Discovery paths in priority order: principal > workspace.
422        let mut paths = Vec::new();
423        if let Ok(home) = AstridHome::resolve() {
424            let principal = astrid_core::PrincipalId::default();
425            paths.push(home.principal_home(&principal).capsules_dir());
426        }
427
428        let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
429
430        // Topological sort ALL capsules together so cross-partition
431        // requirements (e.g. a non-uplink requiring an uplink's capability)
432        // resolve correctly without spurious "not provided" warnings.
433        let sorted = match toposort_manifests(discovered) {
434            Ok(sorted) => sorted,
435            Err((e, original)) => {
436                tracing::error!(
437                    cycle = %e,
438                    "Dependency cycle in capsules, falling back to discovery order"
439                );
440                original
441            },
442        };
443
444        // Defence-in-depth: manifest validation in discovery.rs rejects
445        // uplinks with [imports], but warn here in case a manifest bypasses
446        // the normal load path.
447        for (manifest, _) in &sorted {
448            if manifest.capabilities.uplink && manifest.has_imports() {
449                tracing::warn!(
450                    capsule = %manifest.package.name,
451                    "Uplink capsule has [imports] - \
452                     this should have been rejected at manifest load time"
453                );
454            }
455        }
456
457        // Validate imports/exports: every required import must have a matching export.
458        validate_imports_exports(&sorted);
459
460        // Partition after sorting: uplinks first, then the rest.
461        // The relative order within each partition is preserved from the
462        // toposort, so dependency edges are still respected. Cross-partition
463        // edges (non-uplink requiring an uplink) are satisfied by construction
464        // since all uplinks load first. The inverse (uplink requiring a
465        // non-uplink) is rejected above.
466        let (uplinks, others): (Vec<_>, Vec<_>) =
467            sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
468
469        // Load uplinks first so their event bus subscriptions are ready.
470        let uplink_names: Vec<String> = uplinks
471            .iter()
472            .map(|(m, _)| m.package.name.clone())
473            .collect();
474        for (manifest, dir) in &uplinks {
475            if let Err(e) = self.load_capsule(dir.clone()).await {
476                tracing::warn!(
477                    capsule = %manifest.package.name,
478                    error = %e,
479                    "Failed to load uplink capsule during discovery"
480                );
481            }
482        }
483
484        // Wait for uplink capsules to signal readiness before loading
485        // non-uplink capsules. This ensures IPC subscriptions are active.
486        self.await_capsule_readiness(&uplink_names).await;
487
488        for (manifest, dir) in &others {
489            if let Err(e) = self.load_capsule(dir.clone()).await {
490                tracing::warn!(
491                    capsule = %manifest.package.name,
492                    error = %e,
493                    "Failed to load capsule during discovery"
494                );
495            }
496        }
497
498        // Wait for non-uplink run-loop capsules too, so any future
499        // dependency edges between them are respected.
500        let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
501        self.await_capsule_readiness(&other_names).await;
502
503        // Signal that all capsules have been loaded so uplink capsules
504        // (like the registry) can proceed with discovery instead of
505        // polling with arbitrary timeouts.
506        let msg = astrid_events::ipc::IpcMessage::new(
507            "astrid.v1.capsules_loaded",
508            astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
509            self.session_id.0,
510        );
511        let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
512            metadata: astrid_events::EventMetadata::new("kernel"),
513            message: msg,
514        });
515    }
516
517    /// Record that a new client connection has been established.
518    pub fn connection_opened(&self) {
519        self.active_connections.fetch_add(1, Ordering::Relaxed);
520    }
521
522    /// Record that a client connection has been closed.
523    ///
524    /// Uses `fetch_update` for atomic saturating decrement - avoids the TOCTOU
525    /// window where `fetch_sub` wraps to `usize::MAX` before a corrective store.
526    ///
527    /// When the last connection closes (counter reaches 0), clears all
528    /// session-scoped allowances so they don't leak into the next CLI session.
529    pub fn connection_closed(&self) {
530        let result =
531            self.active_connections
532                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
533                    if n == 0 {
534                        None
535                    } else {
536                        Some(n.saturating_sub(1))
537                    }
538                });
539
540        // Previous value was 1 -> now 0: last client disconnected.
541        // Clear session-scoped allowances so they don't leak into the next session.
542        if result == Ok(1) {
543            self.allowance_store.clear_session_allowances();
544            tracing::info!("last client disconnected, session allowances cleared");
545        }
546    }
547
548    /// Enable or disable ephemeral mode (immediate shutdown on last disconnect).
549    pub fn set_ephemeral(&self, val: bool) {
550        self.ephemeral.store(val, Ordering::Relaxed);
551    }
552
553    /// Number of active client connections.
554    pub fn connection_count(&self) -> usize {
555        self.active_connections.load(Ordering::Relaxed)
556    }
557
558    /// Gracefully shut down the kernel.
559    ///
560    /// 1. Publish `KernelShutdown` event on the bus.
561    /// 2. Drain and unload all capsules (stops MCP child processes, WASM engines).
562    /// 3. Flush and close the persistent KV store.
563    /// 4. Remove the Unix socket file.
564    pub async fn shutdown(&self, reason: Option<String>) {
565        tracing::info!(reason = ?reason, "Kernel shutting down");
566
567        // 1. Notify all subscribers so capsules can react.
568        let _ = self
569            .event_bus
570            .publish(astrid_events::AstridEvent::KernelShutdown {
571                metadata: astrid_events::EventMetadata::new("kernel"),
572                reason: reason.clone(),
573            });
574
575        // 2. Drain the registry so the dispatcher cannot hand out new Arc clones,
576        // then unload each capsule. MCP engine unload is critical - it calls
577        // `mcp_client.disconnect()` to gracefully terminate child processes.
578        // Without explicit unload, MCP child processes become orphaned.
579        //
580        // The `EventDispatcher` temporarily clones `Arc<dyn Capsule>` into
581        // spawned interceptor tasks. After draining, no new clones can be
582        // created, but in-flight tasks may still hold references. We retry
583        // `Arc::get_mut` with brief yields to let them complete.
584        let capsules = {
585            let mut reg = self.capsules.write().await;
586            reg.drain()
587        };
588        for mut arc in capsules {
589            let id = arc.id().clone();
590            let mut unloaded = false;
591
592            for retry in 0..20_u32 {
593                if let Some(capsule) = Arc::get_mut(&mut arc) {
594                    if let Err(e) = capsule.unload().await {
595                        tracing::warn!(
596                            capsule_id = %id,
597                            error = %e,
598                            "Failed to unload capsule during shutdown"
599                        );
600                    }
601                    unloaded = true;
602                    break;
603                }
604                if retry < 19 {
605                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
606                }
607            }
608
609            if !unloaded {
610                tracing::warn!(
611                    capsule_id = %id,
612                    strong_count = Arc::strong_count(&arc),
613                    "Dropping capsule without explicit unload after retries exhausted; \
614                     MCP child processes may be orphaned"
615                );
616            }
617            drop(arc);
618        }
619
620        // 3. Flush the persistent KV store.
621        if let Err(e) = self.kv.close().await {
622            tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
623        }
624
625        // 4. Remove the socket and token files so stale-socket detection works
626        // on next boot and the auth token doesn't persist on disk after shutdown.
627        // This runs AFTER capsule unload, which is the correct order: MCP child
628        // processes communicate via stdio pipes (not this Unix socket), so they
629        // are already terminated by step 2. The socket is only used for
630        // CLI-to-kernel IPC.
631        let socket_path = crate::socket::kernel_socket_path();
632        let _ = std::fs::remove_file(&socket_path);
633        let _ = std::fs::remove_file(&self.token_path);
634        crate::socket::remove_readiness_file();
635
636        tracing::info!("Kernel shutdown complete");
637    }
638
639    /// Wait for a set of capsules to signal readiness, in parallel.
640    ///
641    /// Collects `Arc<dyn Capsule>` handles under a short-lived read lock,
642    /// then drops the lock before awaiting. Capsules without a run loop
643    /// return `Ready` immediately and don't contribute to wait time.
644    async fn await_capsule_readiness(&self, names: &[String]) {
645        use astrid_capsule::capsule::ReadyStatus;
646
647        if names.is_empty() {
648            return;
649        }
650
651        let timeout = std::time::Duration::from_millis(500);
652        let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
653            let registry = self.capsules.read().await;
654            names
655                .iter()
656                .filter_map(
657                    |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
658                        Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
659                        Err(e) => {
660                            tracing::warn!(
661                                capsule = %name,
662                                error = %e,
663                                "Invalid capsule ID, skipping readiness wait"
664                            );
665                            None
666                        },
667                    },
668                )
669                .collect()
670        };
671
672        // Await all capsules concurrently - independent capsules shouldn't
673        // compound each other's timeout.
674        let mut set = tokio::task::JoinSet::new();
675        for (name, capsule) in capsules {
676            set.spawn(async move {
677                let status = capsule.wait_ready(timeout).await;
678                (name, status)
679            });
680        }
681        while let Some(result) = set.join_next().await {
682            if let Ok((name, status)) = result {
683                match status {
684                    ReadyStatus::Ready => {},
685                    ReadyStatus::Timeout => {
686                        tracing::warn!(
687                            capsule = %name,
688                            timeout_ms = timeout.as_millis(),
689                            "Capsule did not signal ready within timeout"
690                        );
691                    },
692                    ReadyStatus::Crashed => {
693                        tracing::error!(
694                            capsule = %name,
695                            "Capsule run loop exited before signaling ready"
696                        );
697                    },
698                }
699            }
700        }
701    }
702}
703
704/// Open (or create) the persistent audit log and verify historical chain integrity.
705///
706/// Initialize the sandboxed overlay VFS.
707///
708/// Creates a lower (read-only workspace) and upper (session-scoped temp dir)
709/// layer, returning the overlay and the `TempDir` whose lifetime keeps the
710/// upper layer alive.
711async fn init_overlay_vfs(
712    root_handle: &DirHandle,
713    workspace_root: &Path,
714) -> Result<(Arc<OverlayVfs>, tempfile::TempDir), std::io::Error> {
715    let lower_vfs = HostVfs::new();
716    lower_vfs
717        .register_dir(root_handle.clone(), workspace_root.to_path_buf())
718        .await
719        .map_err(|_| std::io::Error::other("Failed to register lower vfs dir"))?;
720
721    let upper_temp = tempfile::TempDir::new()
722        .map_err(|e| std::io::Error::other(format!("Failed to create overlay temp dir: {e}")))?;
723    let upper_vfs = HostVfs::new();
724    upper_vfs
725        .register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
726        .await
727        .map_err(|_| std::io::Error::other("Failed to register upper vfs dir"))?;
728
729    let overlay = Arc::new(OverlayVfs::new(Box::new(lower_vfs), Box::new(upper_vfs)));
730    Ok((overlay, upper_temp))
731}
732
733/// Loads the runtime signing key from `~/.astrid/keys/runtime.key`, generating a
734/// new one if it doesn't exist. Opens the `SurrealKV`-backed audit database at
735/// `~/.astrid/audit.db` and runs `verify_all()` to detect any tampering of
736/// historical entries. Verification failures are logged at `error!` level but
737/// do not block boot (fail-open for availability, loud alert for integrity).
738fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
739    use astrid_core::dirs::AstridHome;
740
741    let home = AstridHome::resolve()
742        .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
743    home.ensure()
744        .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
745
746    let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
747    let default_principal = astrid_core::PrincipalId::default();
748    let principal_home = home.principal_home(&default_principal);
749    principal_home
750        .ensure()
751        .map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
752    let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
753        .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
754
755    // Verify all historical chains on boot.
756    match audit_log.verify_all() {
757        Ok(results) => {
758            let total_sessions = results.len();
759            let mut tampered_sessions: usize = 0;
760
761            for (session_id, result) in &results {
762                if !result.valid {
763                    tampered_sessions = tampered_sessions.saturating_add(1);
764                    for issue in &result.issues {
765                        tracing::error!(
766                            session_id = %session_id,
767                            issue = %issue,
768                            "Audit chain integrity violation detected"
769                        );
770                    }
771                }
772            }
773
774            if tampered_sessions > 0 {
775                tracing::error!(
776                    total_sessions,
777                    tampered_sessions,
778                    "Audit chain verification found tampered sessions"
779                );
780            } else if total_sessions > 0 {
781                tracing::info!(
782                    total_sessions,
783                    "Audit chain verification passed for all sessions"
784                );
785            }
786        },
787        Err(e) => {
788            tracing::error!(error = %e, "Audit chain verification failed to run");
789        },
790    }
791
792    Ok(Arc::new(audit_log))
793}
794
795/// Load the runtime ed25519 signing key from disk, or generate and persist a new one.
796///
797/// The key file is 32 bytes of raw secret key material at `{keys_dir}/runtime.key`.
798fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
799    let key_path = keys_dir.join("runtime.key");
800
801    if key_path.exists() {
802        let bytes = std::fs::read(&key_path)?;
803        KeyPair::from_secret_key(&bytes).map_err(|e| {
804            std::io::Error::other(format!(
805                "invalid runtime key at {}: {e}",
806                key_path.display()
807            ))
808        })
809    } else {
810        let keypair = KeyPair::generate();
811        std::fs::create_dir_all(keys_dir)?;
812        std::fs::write(&key_path, keypair.secret_key_bytes())?;
813
814        // Secure permissions (owner-only) on Unix.
815        #[cfg(unix)]
816        {
817            use std::os::unix::fs::PermissionsExt;
818            std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
819        }
820
821        tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
822        Ok(keypair)
823    }
824}
825
826/// Spawns a background task that cleanly shuts down the Kernel if there is no activity.
827///
828/// Uses dual-signal idle detection:
829/// - **Primary:** explicit `active_connections` counter (incremented on first IPC
830///   message per source, decremented on `Disconnect`).
831/// - **Secondary:** `EventBus::subscriber_count()` minus the kernel router's own
832///   subscription. When a CLI process dies without sending `Disconnect`, its
833///   broadcast receiver is dropped so the subscriber count falls.
834///
835/// Takes the minimum of both signals to handle ungraceful disconnects.
836///
837/// Configurable via `ASTRID_IDLE_TIMEOUT_SECS` (default 300 = 5 minutes).
838/// Number of permanent internal event bus subscribers that are not client
839/// connections: `KernelRouter` (`kernel.request.*`), `ConnectionTracker` (`client.*`),
840/// and `EventDispatcher` (all events).
841const INTERNAL_SUBSCRIBER_COUNT: usize = 3;
842
843/// Initial grace period before idle checking begins.
844const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
845/// Additional grace for non-ephemeral daemons to let capsules fully initialize.
846const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
847/// How often the idle monitor polls when running in ephemeral mode.
848const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
849/// How often the idle monitor polls when running in persistent mode.
850const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
851/// Default idle timeout for non-ephemeral daemons (5 minutes).
852const IDLE_DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_mins(5);
853
854fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
855    tokio::spawn(async move {
856        // Initial grace period — wait for capsules to boot and first client
857        // to connect before checking idle status.
858        tokio::time::sleep(IDLE_INITIAL_GRACE).await;
859
860        // Read ephemeral flag after grace period (set by daemon after boot).
861        let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
862        let idle_timeout = if ephemeral {
863            // Give the CLI time to reconnect after brief disconnects (e.g.
864            // during tool execution when the TUI might momentarily drop
865            // the socket). Zero timeout caused premature shutdowns.
866            std::time::Duration::from_secs(30)
867        } else {
868            std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
869                .ok()
870                .and_then(|v| v.parse().ok())
871                .map_or(IDLE_DEFAULT_TIMEOUT, std::time::Duration::from_secs)
872        };
873        let check_interval = if ephemeral {
874            IDLE_EPHEMERAL_CHECK_INTERVAL
875        } else {
876            IDLE_CHECK_INTERVAL
877        };
878
879        // Non-ephemeral: additional grace to let capsules fully initialize.
880        if !ephemeral {
881            tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
882        }
883        let mut idle_since: Option<tokio::time::Instant> = None;
884
885        loop {
886            tokio::time::sleep(check_interval).await;
887
888            let connections = kernel.connection_count();
889
890            // Use the explicit connection counter as the sole signal.
891            // The previous bus_subscribers heuristic (subscriber_count minus
892            // internal subscribers) was fragile: capsule run-loop crashes
893            // reduce subscriber_count, causing false "0 connections" readings
894            // that trigger premature idle shutdown while a client is active.
895            let effective_connections = connections;
896
897            let has_daemons = {
898                let reg = kernel.capsules.read().await;
899                reg.values().any(|c| {
900                    let m = c.manifest();
901                    !m.uplinks.is_empty()
902                })
903            };
904
905            if effective_connections == 0 && !has_daemons {
906                let now = tokio::time::Instant::now();
907                let start = *idle_since.get_or_insert(now);
908                let elapsed = now.duration_since(start);
909
910                tracing::debug!(
911                    idle_secs = elapsed.as_secs(),
912                    timeout_secs = idle_timeout.as_secs(),
913                    connections,
914                    "Kernel idle, monitoring timeout"
915                );
916
917                if elapsed >= idle_timeout {
918                    tracing::info!("Idle timeout reached, initiating shutdown");
919                    kernel.shutdown(Some("idle_timeout".to_string())).await;
920                    std::process::exit(0);
921                }
922            } else {
923                if idle_since.is_some() {
924                    tracing::debug!(
925                        effective_connections,
926                        has_daemons,
927                        "Activity detected, resetting idle timer"
928                    );
929                }
930                idle_since = None;
931            }
932        }
933    })
934}
935
936/// Tracks restart attempts for a single capsule with exponential backoff.
937struct RestartTracker {
938    attempts: u32,
939    last_attempt: std::time::Instant,
940    backoff: std::time::Duration,
941}
942
943impl RestartTracker {
944    const MAX_ATTEMPTS: u32 = 5;
945    const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
946    const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
947
948    fn new() -> Self {
949        Self {
950            attempts: 0,
951            last_attempt: std::time::Instant::now(),
952            backoff: Self::INITIAL_BACKOFF,
953        }
954    }
955
956    /// Returns `true` if a restart should be attempted now.
957    fn should_restart(&self) -> bool {
958        self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
959    }
960
961    /// Record a restart attempt and advance the backoff.
962    fn record_attempt(&mut self) {
963        self.attempts = self.attempts.saturating_add(1);
964        self.last_attempt = std::time::Instant::now();
965        self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
966    }
967
968    /// Returns `true` if all retry attempts have been exhausted.
969    fn exhausted(&self) -> bool {
970        self.attempts >= Self::MAX_ATTEMPTS
971    }
972}
973
974/// Attempts to restart a failed capsule, respecting backoff and max retries.
975///
976/// Returns `true` if the tracker should be removed (successful restart).
977async fn attempt_capsule_restart(
978    kernel: &Kernel,
979    id_str: &str,
980    tracker: &mut RestartTracker,
981) -> bool {
982    if tracker.exhausted() {
983        return false;
984    }
985
986    if !tracker.should_restart() {
987        tracing::debug!(
988            capsule_id = %id_str,
989            next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
990            "Waiting for backoff before next restart attempt"
991        );
992        return false;
993    }
994
995    tracker.record_attempt();
996    let attempt = tracker.attempts;
997
998    tracing::warn!(
999        capsule_id = %id_str,
1000        attempt,
1001        max_attempts = RestartTracker::MAX_ATTEMPTS,
1002        "Attempting capsule restart"
1003    );
1004
1005    let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1006    match kernel.restart_capsule(&capsule_id).await {
1007        Ok(()) => {
1008            tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1009            true
1010        },
1011        Err(e) => {
1012            tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1013            if tracker.exhausted() {
1014                tracing::error!(
1015                    capsule_id = %id_str,
1016                    "All restart attempts exhausted - capsule will remain down"
1017                );
1018            }
1019            false
1020        },
1021    }
1022}
1023
1024/// Spawns a background task that periodically probes capsule health.
1025///
1026/// Every 10 seconds, reads the capsule registry and calls `check_health()` on
1027/// each capsule that is currently in `Ready` state. If a capsule reports
1028/// `Failed`, attempts to restart it with exponential backoff (max 5 attempts).
1029/// Publishes `astrid.v1.health.failed` IPC events for each detected failure.
1030fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1031    tokio::spawn(async move {
1032        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1033        interval.tick().await; // Skip the first immediate tick.
1034
1035        let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1036            std::collections::HashMap::new();
1037
1038        loop {
1039            interval.tick().await;
1040
1041            // Collect ready capsules under a brief read lock, then drop
1042            // the lock before calling check_health() or publishing events.
1043            let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1044                let registry = kernel.capsules.read().await;
1045                registry
1046                    .list()
1047                    .into_iter()
1048                    .filter_map(|id| {
1049                        let capsule = registry.get(id)?;
1050                        if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1051                            Some(capsule)
1052                        } else {
1053                            None
1054                        }
1055                    })
1056                    .collect()
1057            };
1058
1059            // Probe health once per capsule, collect failures, then drop
1060            // the Arc Vec before restarting. This ensures restart_capsule's
1061            // Arc::get_mut can succeed (no other strong references held).
1062            let mut failures: Vec<(String, String)> = Vec::new();
1063            for capsule in &ready_capsules {
1064                let health = capsule.check_health();
1065                if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1066                    let id_str = capsule.id().to_string();
1067                    tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1068
1069                    let msg = astrid_events::ipc::IpcMessage::new(
1070                        "astrid.v1.health.failed",
1071                        astrid_events::ipc::IpcPayload::Custom {
1072                            data: serde_json::json!({
1073                                "capsule_id": &id_str,
1074                                "reason": &reason,
1075                            }),
1076                        },
1077                        uuid::Uuid::new_v4(),
1078                    );
1079                    let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1080                        metadata: astrid_events::EventMetadata::new("kernel"),
1081                        message: msg,
1082                    });
1083                    failures.push((id_str, reason));
1084                }
1085            }
1086
1087            // Drop all Arc clones so restart_capsule's Arc::get_mut can
1088            // obtain exclusive access for calling unload().
1089            drop(ready_capsules);
1090
1091            let failed_this_tick: std::collections::HashSet<&str> =
1092                failures.iter().map(|(id, _)| id.as_str()).collect();
1093
1094            let mut restarted = Vec::new();
1095            for (id_str, _reason) in &failures {
1096                let tracker = restart_trackers
1097                    .entry(id_str.clone())
1098                    .or_insert_with(RestartTracker::new);
1099
1100                if attempt_capsule_restart(&kernel, id_str, tracker).await {
1101                    restarted.push(id_str.clone());
1102                }
1103            }
1104
1105            // Remove trackers for successfully restarted capsules.
1106            for id in &restarted {
1107                restart_trackers.remove(id);
1108            }
1109
1110            // Prune trackers for capsules that recovered (healthy this tick).
1111            // Keep exhausted trackers and trackers still in their backoff
1112            // window (capsule may have been unregistered by a failed restart
1113            // attempt and won't appear in ready_capsules next tick).
1114            restart_trackers.retain(|id, tracker| {
1115                if tracker.exhausted() {
1116                    return true;
1117                }
1118                // Keep if still within backoff - the capsule may be absent
1119                // from the registry after a failed reload.
1120                if tracker.last_attempt.elapsed() < tracker.backoff {
1121                    return true;
1122                }
1123                failed_this_tick.contains(id.as_str())
1124            });
1125        }
1126    })
1127}
1128
1129/// Spawns a periodic watchdog that publishes `astrid.v1.watchdog.tick` events every 5 seconds.
1130///
1131/// The `ReAct` capsule (WASM guest) cannot use async timers, so this kernel-side task
1132/// drives timeout enforcement by waking the capsule on a fixed interval. Each tick
1133/// causes the capsule's `handle_watchdog_tick` interceptor to run `check_phase_timeout`.
1134fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1135    tokio::spawn(async move {
1136        let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1137        // The first tick fires immediately - skip it to give capsules time to load.
1138        interval.tick().await;
1139
1140        loop {
1141            interval.tick().await;
1142
1143            let msg = astrid_events::ipc::IpcMessage::new(
1144                "astrid.v1.watchdog.tick",
1145                astrid_events::ipc::IpcPayload::Custom {
1146                    data: serde_json::json!({}),
1147                },
1148                uuid::Uuid::new_v4(),
1149            );
1150            let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1151                metadata: astrid_events::EventMetadata::new("kernel"),
1152                message: msg,
1153            });
1154        }
1155    })
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160    use super::*;
1161
1162    #[test]
1163    fn test_load_or_generate_creates_new_key() {
1164        let dir = tempfile::tempdir().unwrap();
1165        let keys_dir = dir.path().join("keys");
1166
1167        let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1168        let key_path = keys_dir.join("runtime.key");
1169
1170        // Key file should exist with 32 bytes.
1171        assert!(key_path.exists());
1172        let bytes = std::fs::read(&key_path).unwrap();
1173        assert_eq!(bytes.len(), 32);
1174
1175        // The written bytes should reconstruct the same public key.
1176        let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1177        assert_eq!(
1178            keypair.public_key_bytes(),
1179            reloaded.public_key_bytes(),
1180            "reloaded key should match generated key"
1181        );
1182    }
1183
1184    #[test]
1185    fn test_load_or_generate_is_idempotent() {
1186        let dir = tempfile::tempdir().unwrap();
1187        let keys_dir = dir.path().join("keys");
1188
1189        let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1190        let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1191
1192        assert_eq!(
1193            first.public_key_bytes(),
1194            second.public_key_bytes(),
1195            "loading the same key file should produce the same keypair"
1196        );
1197    }
1198
1199    #[test]
1200    fn test_load_or_generate_rejects_bad_key_length() {
1201        let dir = tempfile::tempdir().unwrap();
1202        let keys_dir = dir.path().join("keys");
1203        std::fs::create_dir_all(&keys_dir).unwrap();
1204
1205        // Write a key file with wrong length.
1206        std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1207
1208        let result = load_or_generate_runtime_key(&keys_dir);
1209        assert!(result.is_err());
1210        let err = result.unwrap_err().to_string();
1211        assert!(
1212            err.contains("invalid runtime key"),
1213            "expected 'invalid runtime key' error, got: {err}"
1214        );
1215    }
1216
1217    #[test]
1218    fn test_connection_counter_increment_decrement() {
1219        let counter = AtomicUsize::new(0);
1220
1221        // Simulate connection_opened (fetch_add)
1222        counter.fetch_add(1, Ordering::Relaxed);
1223        counter.fetch_add(1, Ordering::Relaxed);
1224        assert_eq!(counter.load(Ordering::Relaxed), 2);
1225
1226        // Simulate connection_closed using the same fetch_update logic
1227        // as the real implementation to exercise the actual code path.
1228        for expected in [1, 0] {
1229            let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1230                if n == 0 {
1231                    None
1232                } else {
1233                    Some(n.saturating_sub(1))
1234                }
1235            });
1236            assert_eq!(counter.load(Ordering::Relaxed), expected);
1237        }
1238    }
1239
1240    #[test]
1241    fn test_connection_counter_underflow_guard() {
1242        // Test the saturating behavior: decrementing from 0 should stay at 0.
1243        // Mirrors the fetch_update logic in connection_closed().
1244        let counter = AtomicUsize::new(0);
1245
1246        let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1247            if n == 0 { None } else { Some(n - 1) }
1248        });
1249        // fetch_update returns Err(0) when the closure returns None (no-op).
1250        assert!(result.is_err());
1251        assert_eq!(counter.load(Ordering::Relaxed), 0);
1252    }
1253
1254    /// Mirrors the `connection_closed()` logic: only `Ok(1)` (previous value 1,
1255    /// now 0) triggers `clear_session_allowances`. Update this test if
1256    /// `connection_closed()` is refactored.
1257    #[test]
1258    fn test_last_disconnect_clears_session_allowances() {
1259        use astrid_approval::AllowanceStore;
1260        use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1261        use astrid_core::types::Timestamp;
1262        use astrid_crypto::KeyPair;
1263
1264        let store = AllowanceStore::new();
1265        let keypair = KeyPair::generate();
1266
1267        // Session-only allowance (should be cleared on last disconnect).
1268        store
1269            .add_allowance(Allowance {
1270                id: AllowanceId::new(),
1271                action_pattern: AllowancePattern::ServerTools {
1272                    server: "session-server".to_string(),
1273                },
1274                created_at: Timestamp::now(),
1275                expires_at: None,
1276                max_uses: None,
1277                uses_remaining: None,
1278                session_only: true,
1279                workspace_root: None,
1280                signature: keypair.sign(b"test"),
1281            })
1282            .unwrap();
1283
1284        // Persistent allowance (should survive).
1285        store
1286            .add_allowance(Allowance {
1287                id: AllowanceId::new(),
1288                action_pattern: AllowancePattern::ServerTools {
1289                    server: "persistent-server".to_string(),
1290                },
1291                created_at: Timestamp::now(),
1292                expires_at: None,
1293                max_uses: None,
1294                uses_remaining: None,
1295                session_only: false,
1296                workspace_root: None,
1297                signature: keypair.sign(b"test"),
1298            })
1299            .unwrap();
1300
1301        assert_eq!(store.count(), 2);
1302
1303        let counter = AtomicUsize::new(2);
1304        let simulate_disconnect = || {
1305            let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1306                if n == 0 {
1307                    None
1308                } else {
1309                    Some(n.saturating_sub(1))
1310                }
1311            });
1312            if result == Ok(1) {
1313                store.clear_session_allowances();
1314            }
1315        };
1316
1317        // Two connections active. First disconnect: 2 -> 1 (not last).
1318        simulate_disconnect();
1319        assert_eq!(
1320            store.count(),
1321            2,
1322            "both allowances should survive non-final disconnect"
1323        );
1324
1325        // Second disconnect: 1 -> 0 (last client gone).
1326        simulate_disconnect();
1327        assert_eq!(
1328            store.count(),
1329            1,
1330            "session allowance should be cleared on last disconnect"
1331        );
1332    }
1333
1334    #[cfg(unix)]
1335    #[test]
1336    fn test_load_or_generate_sets_secure_permissions() {
1337        use std::os::unix::fs::PermissionsExt;
1338
1339        let dir = tempfile::tempdir().unwrap();
1340        let keys_dir = dir.path().join("keys");
1341
1342        let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1343
1344        let key_path = keys_dir.join("runtime.key");
1345        let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1346        assert_eq!(
1347            mode & 0o777,
1348            0o600,
1349            "key file should have 0o600 permissions, got {mode:#o}"
1350        );
1351    }
1352
1353    #[test]
1354    fn restart_tracker_initial_state() {
1355        let tracker = RestartTracker::new();
1356        assert!(!tracker.exhausted());
1357        // Should not restart immediately (backoff hasn't elapsed).
1358        assert!(!tracker.should_restart());
1359    }
1360
1361    #[test]
1362    fn restart_tracker_allows_restart_after_backoff() {
1363        let mut tracker = RestartTracker::new();
1364        // Simulate time passing by setting last_attempt in the past.
1365        tracker.last_attempt = std::time::Instant::now()
1366            - RestartTracker::INITIAL_BACKOFF
1367            - std::time::Duration::from_millis(1);
1368        assert!(tracker.should_restart());
1369    }
1370
1371    #[test]
1372    fn restart_tracker_doubles_backoff() {
1373        let mut tracker = RestartTracker::new();
1374        assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1375
1376        tracker.record_attempt();
1377        assert_eq!(
1378            tracker.backoff,
1379            RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1380        );
1381        assert_eq!(tracker.attempts, 1);
1382
1383        tracker.record_attempt();
1384        assert_eq!(
1385            tracker.backoff,
1386            RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1387        );
1388        assert_eq!(tracker.attempts, 2);
1389    }
1390
1391    #[test]
1392    fn restart_tracker_backoff_caps_at_max() {
1393        let mut tracker = RestartTracker::new();
1394        for _ in 0..20 {
1395            tracker.record_attempt();
1396        }
1397        assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1398    }
1399
1400    #[test]
1401    fn restart_tracker_exhausted_at_max_attempts() {
1402        let mut tracker = RestartTracker::new();
1403        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1404            assert!(!tracker.exhausted());
1405            tracker.record_attempt();
1406        }
1407        assert!(tracker.exhausted());
1408    }
1409
1410    #[test]
1411    fn restart_tracker_should_restart_false_when_exhausted() {
1412        let mut tracker = RestartTracker::new();
1413        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1414            tracker.record_attempt();
1415        }
1416        // Even if backoff has elapsed, exhausted tracker should not restart.
1417        tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1418        assert!(!tracker.should_restart());
1419    }
1420}
1421
1422// ---------------------------------------------------------------------------
1423// Boot validation
1424// ---------------------------------------------------------------------------
1425
1426/// Validate that every capsule's required imports have a matching export
1427/// from another loaded capsule. Logs errors for unsatisfied required imports
1428/// and info messages for unsatisfied optional imports. Also warns about
1429/// duplicate exports of the same interface from multiple capsules.
1430fn validate_imports_exports(
1431    manifests: &[(
1432        astrid_capsule::manifest::CapsuleManifest,
1433        std::path::PathBuf,
1434    )],
1435) {
1436    // Track (namespace, interface) → list of (capsule_name, version).
1437    let mut exports_by_interface: std::collections::HashMap<
1438        (&str, &str),
1439        Vec<(&str, &semver::Version)>,
1440    > = std::collections::HashMap::new();
1441
1442    for (m, _) in manifests {
1443        for (ns, name, ver) in m.export_triples() {
1444            exports_by_interface
1445                .entry((ns, name))
1446                .or_default()
1447                .push((&m.package.name, ver));
1448        }
1449    }
1450
1451    // Warn about duplicate exports — two capsules providing the same interface
1452    // will both fire on matching events, causing double-processing.
1453    for ((ns, name), providers) in &exports_by_interface {
1454        if providers.len() > 1 {
1455            let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
1456            tracing::warn!(
1457                interface = %format!("{ns}/{name}"),
1458                providers = ?names,
1459                "Multiple capsules export the same interface — events may be double-processed. \
1460                 Consider removing one with `astrid capsule remove`."
1461            );
1462        }
1463    }
1464
1465    let mut satisfied_count: u32 = 0;
1466    let mut warning_count: u32 = 0;
1467
1468    for (manifest, _) in manifests {
1469        for (ns, name, req, optional) in manifest.import_tuples() {
1470            let has_provider = exports_by_interface
1471                .get(&(ns, name))
1472                .is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
1473
1474            if has_provider {
1475                satisfied_count = satisfied_count.saturating_add(1);
1476            } else if optional {
1477                tracing::info!(
1478                    capsule = %manifest.package.name,
1479                    import = %format!("{ns}/{name} {req}"),
1480                    "Optional import not satisfied — capsule will boot with reduced functionality"
1481                );
1482                warning_count = warning_count.saturating_add(1);
1483            } else {
1484                tracing::error!(
1485                    capsule = %manifest.package.name,
1486                    import = %format!("{ns}/{name} {req}"),
1487                    "Required import not satisfied — no loaded capsule exports this interface"
1488                );
1489                warning_count = warning_count.saturating_add(1);
1490            }
1491        }
1492    }
1493
1494    tracing::info!(
1495        capsules = manifests.len(),
1496        imports_satisfied = satisfied_count,
1497        warnings = warning_count,
1498        "Boot validation complete"
1499    );
1500}
1501
1502// ---------------------------------------------------------------------------
1503// Identity bootstrap helpers
1504// ---------------------------------------------------------------------------
1505
1506/// Bootstrap the CLI root user identity at kernel boot.
1507///
1508/// Creates a deterministic root `AstridUserId` on first boot, or reloads it
1509/// on subsequent boots. Auto-links with `platform="cli"`,
1510/// `platform_user_id="local"`, `method="system"`.
1511///
1512/// Idempotent: skips creation if the root user already exists.
1513async fn bootstrap_cli_root_user(
1514    store: &Arc<dyn astrid_storage::IdentityStore>,
1515) -> Result<(), astrid_storage::IdentityError> {
1516    // Check if root user already exists by trying to resolve the CLI link.
1517    if let Some(_user) = store.resolve("cli", "local").await? {
1518        tracing::debug!("CLI root user already linked");
1519        return Ok(());
1520    }
1521
1522    // No CLI link exists. Create or find the root user.
1523    let user = store.create_user(Some("root")).await?;
1524    tracing::info!(user_id = %user.id, "Created CLI root user");
1525
1526    // Link the CLI platform identity.
1527    store.link("cli", "local", user.id, "system").await?;
1528    tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1529
1530    Ok(())
1531}
1532
1533/// Apply pre-configured identity links from the config file.
1534///
1535/// For each `[[identity.links]]` entry, resolves or creates the referenced
1536/// Astrid user and links the platform identity. Logs warnings on failure
1537/// but does not abort boot.
1538async fn apply_identity_config(
1539    store: &Arc<dyn astrid_storage::IdentityStore>,
1540    workspace_root: &std::path::Path,
1541) {
1542    let config = match astrid_config::Config::load(Some(workspace_root)) {
1543        Ok(resolved) => resolved.config,
1544        Err(e) => {
1545            tracing::debug!(error = %e, "No config loaded for identity links");
1546            return;
1547        },
1548    };
1549
1550    for link_cfg in &config.identity.links {
1551        let result = apply_single_identity_link(store, link_cfg).await;
1552        if let Err(e) = result {
1553            tracing::warn!(
1554                platform = %link_cfg.platform,
1555                platform_user_id = %link_cfg.platform_user_id,
1556                astrid_user = %link_cfg.astrid_user,
1557                error = %e,
1558                "Failed to apply identity link from config"
1559            );
1560        }
1561    }
1562}
1563
1564/// Apply a single identity link from config.
1565async fn apply_single_identity_link(
1566    store: &Arc<dyn astrid_storage::IdentityStore>,
1567    link_cfg: &astrid_config::types::IdentityLinkConfig,
1568) -> Result<(), astrid_storage::IdentityError> {
1569    // Resolve astrid_user: try UUID first, then name lookup, then create.
1570    let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
1571        // Ensure user record exists. If the UUID was explicitly specified in
1572        // config but doesn't exist in the store, that's a configuration error
1573        // - don't silently create a different user.
1574        if store.get_user(uuid).await?.is_none() {
1575            return Err(astrid_storage::IdentityError::UserNotFound(uuid));
1576        }
1577        uuid
1578    } else {
1579        // Try name lookup.
1580        if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
1581            user.id
1582        } else {
1583            let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
1584            tracing::info!(
1585                user_id = %user.id,
1586                name = %link_cfg.astrid_user,
1587                "Created user from config identity link"
1588            );
1589            user.id
1590        }
1591    };
1592
1593    let method = if link_cfg.method.is_empty() {
1594        "admin"
1595    } else {
1596        &link_cfg.method
1597    };
1598
1599    // Check if link already points to the correct user - skip if idempotent.
1600    if let Some(existing) = store
1601        .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
1602        .await?
1603        && existing.id == user_id
1604    {
1605        tracing::debug!(
1606            platform = %link_cfg.platform,
1607            platform_user_id = %link_cfg.platform_user_id,
1608            user_id = %user_id,
1609            "Identity link from config already exists"
1610        );
1611        return Ok(());
1612    }
1613
1614    store
1615        .link(
1616            &link_cfg.platform,
1617            &link_cfg.platform_user_id,
1618            user_id,
1619            method,
1620        )
1621        .await?;
1622
1623    tracing::info!(
1624        platform = %link_cfg.platform,
1625        platform_user_id = %link_cfg.platform_user_id,
1626        user_id = %user_id,
1627        "Applied identity link from config"
1628    );
1629
1630    Ok(())
1631}