Skip to main content

astrid_kernel/
lib.rs

1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7//! Astrid Kernel - The core execution engine and IPC router.
8//!
9//! The Kernel is a pure, decentralized WASM runner. It contains no business
10//! logic, no cognitive loops, and no network servers. Its sole responsibility
11//! is to instantiate `astrid_events::EventBus`, load `.capsule` files into
12//! the Extism sandbox, and route IPC bytes between them.
13
14/// The Management API router listening to the `EventBus`.
15pub mod kernel_router;
16/// The Unix Domain Socket manager.
17pub mod socket;
18
19use astrid_audit::AuditLog;
20use astrid_capabilities::{CapabilityStore, DirHandle};
21use astrid_capsule::registry::CapsuleRegistry;
22use astrid_core::SessionId;
23use astrid_crypto::KeyPair;
24use astrid_events::EventBus;
25use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
26use astrid_vfs::{HostVfs, OverlayVfs, Vfs};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use tokio::sync::RwLock;
31
32/// The core Operating System Kernel.
33pub struct Kernel {
34    /// The unique identifier for this kernel session.
35    pub session_id: SessionId,
36    /// The global IPC message bus.
37    pub event_bus: Arc<EventBus>,
38    /// The process manager (loaded WASM capsules).
39    pub capsules: Arc<RwLock<CapsuleRegistry>>,
40    /// The secure MCP client with capability-based authorization and audit logging.
41    pub mcp: SecureMcpClient,
42    /// The capability store for this session.
43    pub capabilities: Arc<CapabilityStore>,
44    /// The global Virtual File System mount.
45    pub vfs: Arc<dyn Vfs>,
46    /// Concrete reference to the [`OverlayVfs`] for commit/rollback operations.
47    pub overlay_vfs: Arc<OverlayVfs>,
48    /// Ephemeral upper directory for the overlay VFS. Kept alive for the
49    /// kernel session lifetime; dropped on shutdown to discard uncommitted writes.
50    _upper_dir: Arc<tempfile::TempDir>,
51    /// The global physical root handle (cap-std) for the VFS.
52    pub vfs_root_handle: DirHandle,
53    /// The physical path the VFS is mounted to.
54    pub workspace_root: PathBuf,
55    /// The global shared resources directory (`~/.astrid/shared/`). Capsules
56    /// declaring `fs_read = ["global://"]` can read files under this root.
57    /// Scoped to `shared/` so that keys, databases, and capsule .env files in
58    /// `~/.astrid/` are NOT accessible. Write access is intentionally not
59    /// granted to any shipped capsule.
60    ///
61    /// Always `Some` in production (boot requires `AstridHome`). Remains
62    /// `Option` for compatibility with `CapsuleContext` and test fixtures.
63    pub global_root: Option<PathBuf>,
64    /// The natively bound Unix Socket for the CLI proxy.
65    pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
66    /// Shared KV store backing all capsule-scoped stores and kernel state.
67    pub kv: Arc<astrid_storage::SurrealKvStore>,
68    /// Chain-linked cryptographic audit log with persistent storage.
69    pub audit_log: Arc<AuditLog>,
70    /// Number of active client connections (CLI sessions).
71    pub active_connections: AtomicUsize,
72    /// Session token for socket authentication. Generated at boot, written to
73    /// `~/.astrid/sessions/system.token`. CLI sends this as its first message.
74    pub session_token: Arc<astrid_core::session_token::SessionToken>,
75    /// Path where the session token was written at boot. Stored so shutdown
76    /// uses the exact same path (avoids fallback mismatch if env changes).
77    token_path: PathBuf,
78    /// Shared allowance store for capsule-level approval decisions.
79    ///
80    /// Capsules can check existing allowances and create new ones when
81    /// users approve actions with session/always scope.
82    pub allowance_store: Arc<astrid_approval::AllowanceStore>,
83    /// System-wide identity store for platform user resolution.
84    identity_store: Arc<dyn astrid_storage::IdentityStore>,
85}
86
87impl Kernel {
88    /// Boot a new Kernel instance mounted at the specified directory.
89    ///
90    /// # Panics
91    ///
92    /// Panics if called on a single-threaded tokio runtime. The capsule
93    /// system uses `block_in_place` which requires a multi-threaded runtime.
94    ///
95    /// # Errors
96    ///
97    /// Returns an error if the VFS mount paths cannot be registered.
98    pub async fn new(
99        session_id: SessionId,
100        workspace_root: PathBuf,
101    ) -> Result<Arc<Self>, std::io::Error> {
102        use astrid_core::dirs::AstridHome;
103
104        assert!(
105            tokio::runtime::Handle::current().runtime_flavor()
106                == tokio::runtime::RuntimeFlavor::MultiThread,
107            "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
108             single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
109        );
110
111        let event_bus = Arc::new(EventBus::new());
112        let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
113
114        // Resolve the Astrid home directory. Required for persistent KV store
115        // and audit log. Fails boot if neither $ASTRID_HOME nor $HOME is set.
116        let home = AstridHome::resolve().map_err(|e| {
117            std::io::Error::other(format!(
118                "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
119            ))
120        })?;
121
122        // Resolve the global shared directory for the `global://` VFS scheme.
123        // Scoped to `~/.astrid/shared/` — NOT the full `~/.astrid/` root — so
124        // capsules cannot access keys, databases, or capsule .env files.
125        let global_root = Some(home.shared_dir());
126
127        // 1. Open the persistent KV store (needed by capability store below).
128        let kv_path = home.state_db_path();
129        let kv = Arc::new(
130            astrid_storage::SurrealKvStore::open(&kv_path)
131                .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
132        );
133        // TODO: clear ephemeral keys (e: prefix) on boot when the key
134        // lifecycle tier convention is established.
135
136        // 2. Initialize MCP process manager with security layer.
137        //    Set workspace_root so sandboxed MCP servers have a writable directory.
138        let mcp_config = ServersConfig::load_default().unwrap_or_default();
139        let mcp_manager =
140            ServerManager::new(mcp_config).with_workspace_root(workspace_root.clone());
141        let mcp_client = McpClient::new(mcp_manager);
142
143        // 3. Bootstrap capability store (persistent) and audit log.
144        //    Key rotation invalidates persisted tokens (fail-secure by design).
145        let capabilities = Arc::new(
146            CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
147                .map_err(|e| {
148                    std::io::Error::other(format!("Failed to init capability store: {e}"))
149                })?,
150        );
151        let audit_log = open_audit_log()?;
152        let mcp = SecureMcpClient::new(
153            mcp_client,
154            Arc::clone(&capabilities),
155            Arc::clone(&audit_log),
156            session_id.clone(),
157        );
158
159        // 4. Establish the physical security boundary (sandbox handle)
160        let root_handle = DirHandle::new();
161
162        // 5. Initialize sandboxed overlay VFS (lower=workspace, upper=temp)
163        let (overlay_vfs, upper_temp) = init_overlay_vfs(&root_handle, &workspace_root).await?;
164
165        // 6. Bind the secure Unix socket and generate session token.
166        // The socket is bound here, but not yet listened on. The token is
167        // generated before any capsule can accept connections, preventing
168        // a race where a client connects before the token file exists.
169        let listener = socket::bind_session_socket()?;
170        let (session_token, token_path) = socket::generate_session_token()?;
171
172        let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
173
174        // Create system-wide identity store backed by the shared KV.
175        let identity_kv = astrid_storage::ScopedKvStore::new(
176            Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
177            "system:identity",
178        )
179        .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
180        let identity_store: Arc<dyn astrid_storage::IdentityStore> =
181            Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
182
183        // Bootstrap the CLI root user (idempotent).
184        bootstrap_cli_root_user(&identity_store)
185            .await
186            .map_err(|e| {
187                std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
188            })?;
189
190        // Apply pre-configured identity links from config.
191        apply_identity_config(&identity_store, &workspace_root).await;
192
193        let kernel = Arc::new(Self {
194            session_id,
195            event_bus,
196            capsules,
197            mcp,
198            capabilities,
199            vfs: Arc::clone(&overlay_vfs) as Arc<dyn Vfs>,
200            overlay_vfs,
201            _upper_dir: Arc::new(upper_temp),
202            vfs_root_handle: root_handle,
203            workspace_root,
204            global_root,
205            cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
206            kv,
207            audit_log,
208            active_connections: AtomicUsize::new(0),
209            session_token: Arc::new(session_token),
210            token_path,
211            allowance_store,
212            identity_store,
213        });
214
215        drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
216        drop(spawn_idle_monitor(Arc::clone(&kernel)));
217        drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
218        drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
219
220        // Spawn the event dispatcher — routes EventBus events to capsule interceptors
221        let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
222            Arc::clone(&kernel.capsules),
223            Arc::clone(&kernel.event_bus),
224        );
225        tokio::spawn(dispatcher.run());
226
227        debug_assert_eq!(
228            kernel.event_bus.subscriber_count(),
229            INTERNAL_SUBSCRIBER_COUNT,
230            "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
231        );
232
233        Ok(kernel)
234    }
235
236    /// Load a capsule into the Kernel from a directory containing a Capsule.toml
237    ///
238    /// # Errors
239    ///
240    /// Returns an error if the manifest cannot be loaded, the capsule cannot be created, or registration fails.
241    async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
242        let manifest_path = dir.join("Capsule.toml");
243        let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
244            .map_err(|e| anyhow::anyhow!(e))?;
245
246        let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
247        let mut capsule = loader.create_capsule(manifest, dir.clone())?;
248
249        // Build the context — use the shared kernel KV so capsules can
250        // communicate state through overlapping KV namespaces.
251        let kv = astrid_storage::ScopedKvStore::new(
252            Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
253            format!("capsule:{}", capsule.id()),
254        )?;
255
256        // Pre-load `.env.json` into the KV store if it exists
257        let env_path = dir.join(".env.json");
258        if env_path.exists()
259            && let Ok(contents) = std::fs::read_to_string(&env_path)
260            && let Ok(env_map) =
261                serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
262        {
263            for (k, v) in env_map {
264                let _ = kv.set(&k, v.into_bytes()).await;
265            }
266        }
267
268        let ctx = astrid_capsule::context::CapsuleContext::new(
269            self.workspace_root.clone(),
270            self.global_root.clone(),
271            kv,
272            Arc::clone(&self.event_bus),
273            self.cli_socket_listener.clone(),
274        )
275        .with_registry(Arc::clone(&self.capsules))
276        .with_session_token(Arc::clone(&self.session_token))
277        .with_allowance_store(Arc::clone(&self.allowance_store))
278        .with_identity_store(Arc::clone(&self.identity_store));
279
280        capsule.load(&ctx).await?;
281
282        let mut registry = self.capsules.write().await;
283        registry
284            .register(capsule)
285            .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
286
287        Ok(())
288    }
289
290    /// Restart a capsule by unloading it and re-loading from its source directory.
291    ///
292    /// # Errors
293    ///
294    /// Returns an error if the capsule has no source directory, cannot be
295    /// unregistered, or fails to reload.
296    async fn restart_capsule(
297        &self,
298        id: &astrid_capsule::capsule::CapsuleId,
299    ) -> Result<(), anyhow::Error> {
300        // Get source directory before unregistering.
301        let source_dir = {
302            let registry = self.capsules.read().await;
303            let capsule = registry
304                .get(id)
305                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
306            capsule
307                .source_dir()
308                .map(std::path::Path::to_path_buf)
309                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
310        };
311
312        // Unregister and explicitly unload. There is no Drop impl that
313        // calls unload() (it's async), so we must do it here to avoid
314        // leaking MCP subprocesses and other engine resources.
315        let old_capsule = {
316            let mut registry = self.capsules.write().await;
317            registry
318                .unregister(id)
319                .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
320        };
321        // Explicitly unload the old capsule. There is no Drop impl that
322        // calls unload() (it's async), so we must do it here to avoid
323        // leaking MCP subprocesses and other engine resources.
324        // Arc::get_mut requires exclusive ownership (strong_count == 1).
325        {
326            let mut old = old_capsule;
327            if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
328                if let Err(e) = capsule.unload().await {
329                    tracing::warn!(
330                        capsule_id = %id,
331                        error = %e,
332                        "Capsule unload failed during restart"
333                    );
334                }
335            } else {
336                tracing::warn!(
337                    capsule_id = %id,
338                    "Cannot call unload during restart - Arc still held by in-flight task"
339                );
340            }
341        }
342
343        // Re-load from disk.
344        self.load_capsule(source_dir).await?;
345
346        // Signal the newly loaded capsule to clean up ephemeral state
347        // from the previous incarnation. Capsules that don't implement
348        // `handle_lifecycle_restart` will return an error, which is fine.
349        //
350        // Clone the capsule Arc under a brief read lock, then drop the
351        // guard before invoke_interceptor which calls block_in_place.
352        // Holding the RwLock across block_in_place parks the worker thread
353        // and starves registry writers (health monitor, capsule loading).
354        let capsule = {
355            let registry = self.capsules.read().await;
356            registry.get(id)
357        };
358        if let Some(capsule) = capsule
359            && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[])
360        {
361            tracing::debug!(
362                capsule_id = %id,
363                error = %e,
364                "Capsule does not handle lifecycle restart (optional)"
365            );
366        }
367
368        Ok(())
369    }
370
371    /// Auto-discover and load all capsules from the standard directories (`~/.astrid/capsules` and `.astrid/capsules`).
372    ///
373    /// Capsules are loaded in dependency order (topological sort) with
374    /// uplink/daemon capsules loaded first. Each uplink must signal
375    /// readiness before non-uplink capsules are loaded.
376    ///
377    /// After all capsules are loaded, tool schemas are injected into every
378    /// capsule's KV namespace and the `astrid.v1.capsules_loaded` event is published.
379    pub async fn load_all_capsules(&self) {
380        use astrid_capsule::toposort::toposort_manifests;
381        use astrid_core::dirs::AstridHome;
382
383        let mut paths = Vec::new();
384        if let Ok(home) = AstridHome::resolve() {
385            paths.push(home.capsules_dir());
386        }
387
388        let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
389
390        // Topological sort ALL capsules together so cross-partition
391        // requirements (e.g. a non-uplink requiring an uplink's capability)
392        // resolve correctly without spurious "not provided" warnings.
393        let sorted = match toposort_manifests(discovered) {
394            Ok(sorted) => sorted,
395            Err((e, original)) => {
396                tracing::error!(
397                    cycle = %e,
398                    "Dependency cycle in capsules, falling back to discovery order"
399                );
400                original
401            },
402        };
403
404        // Defence-in-depth: manifest validation in discovery.rs rejects
405        // uplinks with `requires`, but warn here in case a manifest bypasses
406        // the normal load path.
407        for (manifest, _) in &sorted {
408            if manifest.capabilities.uplink && !manifest.dependencies.requires.is_empty() {
409                tracing::warn!(
410                    capsule = %manifest.package.name,
411                    requires = ?manifest.dependencies.requires,
412                    "Uplink capsule has [dependencies].requires - \
413                     this should have been rejected at manifest load time"
414                );
415            }
416        }
417
418        // Partition after sorting: uplinks first, then the rest.
419        // The relative order within each partition is preserved from the
420        // toposort, so dependency edges are still respected. Cross-partition
421        // edges (non-uplink requiring an uplink) are satisfied by construction
422        // since all uplinks load first. The inverse (uplink requiring a
423        // non-uplink) is rejected above.
424        let (uplinks, others): (Vec<_>, Vec<_>) =
425            sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
426
427        // Load uplinks first so their event bus subscriptions are ready.
428        let uplink_names: Vec<String> = uplinks
429            .iter()
430            .map(|(m, _)| m.package.name.clone())
431            .collect();
432        for (manifest, dir) in &uplinks {
433            if let Err(e) = self.load_capsule(dir.clone()).await {
434                tracing::warn!(
435                    capsule = %manifest.package.name,
436                    error = %e,
437                    "Failed to load uplink capsule during discovery"
438                );
439            }
440        }
441
442        // Wait for uplink capsules to signal readiness before loading
443        // non-uplink capsules. This ensures IPC subscriptions are active.
444        self.await_capsule_readiness(&uplink_names).await;
445
446        for (manifest, dir) in &others {
447            if let Err(e) = self.load_capsule(dir.clone()).await {
448                tracing::warn!(
449                    capsule = %manifest.package.name,
450                    error = %e,
451                    "Failed to load capsule during discovery"
452                );
453            }
454        }
455
456        // Wait for non-uplink run-loop capsules too, so any future
457        // dependency edges between them are respected.
458        let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
459        self.await_capsule_readiness(&other_names).await;
460
461        // Inject tool schemas into every capsule's KV namespace so any
462        // capsule (e.g. react) can read them via kv::get_json("tool_schemas").
463        self.inject_tool_schemas().await;
464
465        // Signal that all capsules have been loaded so uplink capsules
466        // (like the registry) can proceed with discovery instead of
467        // polling with arbitrary timeouts.
468        let msg = astrid_events::ipc::IpcMessage::new(
469            "astrid.v1.capsules_loaded",
470            astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
471            self.session_id.0,
472        );
473        let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
474            metadata: astrid_events::EventMetadata::new("kernel"),
475            message: msg,
476        });
477    }
478
479    /// Record that a new client connection has been established.
480    pub fn connection_opened(&self) {
481        self.active_connections.fetch_add(1, Ordering::Relaxed);
482    }
483
484    /// Record that a client connection has been closed.
485    ///
486    /// Uses `fetch_update` for atomic saturating decrement - avoids the TOCTOU
487    /// window where `fetch_sub` wraps to `usize::MAX` before a corrective store.
488    ///
489    /// When the last connection closes (counter reaches 0), clears all
490    /// session-scoped allowances so they don't leak into the next CLI session.
491    pub fn connection_closed(&self) {
492        let result =
493            self.active_connections
494                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
495                    if n == 0 {
496                        None
497                    } else {
498                        Some(n.saturating_sub(1))
499                    }
500                });
501
502        // Previous value was 1 -> now 0: last client disconnected.
503        // Clear session-scoped allowances so they don't leak into the next session.
504        if result == Ok(1) {
505            self.allowance_store.clear_session_allowances();
506            tracing::info!("last client disconnected, session allowances cleared");
507        }
508    }
509
510    /// Number of active client connections.
511    pub fn connection_count(&self) -> usize {
512        self.active_connections.load(Ordering::Relaxed)
513    }
514
515    /// Gracefully shut down the kernel.
516    ///
517    /// 1. Publish `KernelShutdown` event on the bus.
518    /// 2. Drain and unload all capsules (stops MCP child processes, WASM engines).
519    /// 3. Flush and close the persistent KV store.
520    /// 4. Remove the Unix socket file.
521    pub async fn shutdown(&self, reason: Option<String>) {
522        tracing::info!(reason = ?reason, "Kernel shutting down");
523
524        // 1. Notify all subscribers so capsules can react.
525        let _ = self
526            .event_bus
527            .publish(astrid_events::AstridEvent::KernelShutdown {
528                metadata: astrid_events::EventMetadata::new("kernel"),
529                reason: reason.clone(),
530            });
531
532        // 2. Drain the registry so the dispatcher cannot hand out new Arc clones,
533        // then unload each capsule. MCP engine unload is critical - it calls
534        // `mcp_client.disconnect()` to gracefully terminate child processes.
535        // Without explicit unload, MCP child processes become orphaned.
536        //
537        // The `EventDispatcher` temporarily clones `Arc<dyn Capsule>` into
538        // spawned interceptor tasks. After draining, no new clones can be
539        // created, but in-flight tasks may still hold references. We retry
540        // `Arc::get_mut` with brief yields to let them complete.
541        let capsules = {
542            let mut reg = self.capsules.write().await;
543            reg.drain()
544        };
545        for mut arc in capsules {
546            let id = arc.id().clone();
547            let mut unloaded = false;
548
549            for retry in 0..20_u32 {
550                if let Some(capsule) = Arc::get_mut(&mut arc) {
551                    if let Err(e) = capsule.unload().await {
552                        tracing::warn!(
553                            capsule_id = %id,
554                            error = %e,
555                            "Failed to unload capsule during shutdown"
556                        );
557                    }
558                    unloaded = true;
559                    break;
560                }
561                if retry < 19 {
562                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
563                }
564            }
565
566            if !unloaded {
567                tracing::warn!(
568                    capsule_id = %id,
569                    strong_count = Arc::strong_count(&arc),
570                    "Dropping capsule without explicit unload after retries exhausted; \
571                     MCP child processes may be orphaned"
572                );
573            }
574            drop(arc);
575        }
576
577        // 3. Flush the persistent KV store.
578        if let Err(e) = self.kv.close().await {
579            tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
580        }
581
582        // 4. Remove the socket and token files so stale-socket detection works
583        // on next boot and the auth token doesn't persist on disk after shutdown.
584        // This runs AFTER capsule unload, which is the correct order: MCP child
585        // processes communicate via stdio pipes (not this Unix socket), so they
586        // are already terminated by step 2. The socket is only used for
587        // CLI-to-kernel IPC.
588        let socket_path = crate::socket::kernel_socket_path();
589        let _ = std::fs::remove_file(&socket_path);
590        let _ = std::fs::remove_file(&self.token_path);
591        crate::socket::remove_readiness_file();
592
593        tracing::info!("Kernel shutdown complete");
594    }
595
596    /// Wait for a set of capsules to signal readiness, in parallel.
597    ///
598    /// Collects `Arc<dyn Capsule>` handles under a short-lived read lock,
599    /// then drops the lock before awaiting. Capsules without a run loop
600    /// return `Ready` immediately and don't contribute to wait time.
601    async fn await_capsule_readiness(&self, names: &[String]) {
602        use astrid_capsule::capsule::ReadyStatus;
603
604        if names.is_empty() {
605            return;
606        }
607
608        let timeout = std::time::Duration::from_millis(500);
609        let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
610            let registry = self.capsules.read().await;
611            names
612                .iter()
613                .filter_map(
614                    |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
615                        Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
616                        Err(e) => {
617                            tracing::warn!(
618                                capsule = %name,
619                                error = %e,
620                                "Invalid capsule ID, skipping readiness wait"
621                            );
622                            None
623                        },
624                    },
625                )
626                .collect()
627        };
628
629        // Await all capsules concurrently - independent capsules shouldn't
630        // compound each other's timeout.
631        let mut set = tokio::task::JoinSet::new();
632        for (name, capsule) in capsules {
633            set.spawn(async move {
634                let status = capsule.wait_ready(timeout).await;
635                (name, status)
636            });
637        }
638        while let Some(result) = set.join_next().await {
639            if let Ok((name, status)) = result {
640                match status {
641                    ReadyStatus::Ready => {},
642                    ReadyStatus::Timeout => {
643                        tracing::warn!(
644                            capsule = %name,
645                            timeout_ms = timeout.as_millis(),
646                            "Capsule did not signal ready within timeout"
647                        );
648                    },
649                    ReadyStatus::Crashed => {
650                        tracing::error!(
651                            capsule = %name,
652                            "Capsule run loop exited before signaling ready"
653                        );
654                    },
655                }
656            }
657        }
658    }
659
660    /// Collect all tool definitions from loaded capsule manifests and write
661    /// them to every capsule's scoped KV namespace as `tool_schemas`.
662    async fn inject_tool_schemas(&self) {
663        use astrid_events::llm::LlmToolDefinition;
664        use astrid_storage::KvStore;
665
666        // Collect tools and capsule IDs under a short-lived read lock,
667        // then drop the guard before the async KV write loop. Holding
668        // the RwLock across N awaits would block all registry writers.
669        let (all_tools, capsule_ids) = {
670            let registry = self.capsules.read().await;
671            let tools: Vec<LlmToolDefinition> = registry
672                .values()
673                .flat_map(|capsule| {
674                    capsule.manifest().tools.iter().map(|t| LlmToolDefinition {
675                        name: t.name.clone(),
676                        description: Some(t.description.clone()),
677                        input_schema: t.input_schema.clone(),
678                    })
679                })
680                .collect();
681            let ids: Vec<String> = registry.list().iter().map(ToString::to_string).collect();
682            (tools, ids)
683        };
684
685        if all_tools.is_empty() {
686            return;
687        }
688
689        let tool_bytes = match serde_json::to_vec(&all_tools) {
690            Ok(b) => b,
691            Err(e) => {
692                tracing::error!(error = %e, "Failed to serialize tool schemas");
693                return;
694            },
695        };
696
697        tracing::info!(
698            tool_count = all_tools.len(),
699            "Injecting tool schemas into capsule KV stores"
700        );
701
702        for capsule_id in &capsule_ids {
703            let namespace = format!("capsule:{capsule_id}");
704            if let Err(e) = self
705                .kv
706                .set(&namespace, "tool_schemas", tool_bytes.clone())
707                .await
708            {
709                tracing::warn!(
710                    capsule = %capsule_id,
711                    error = %e,
712                    "Failed to inject tool schemas"
713                );
714            }
715        }
716    }
717}
718
719/// Open (or create) the persistent audit log and verify historical chain integrity.
720///
721/// Initialize the sandboxed overlay VFS.
722///
723/// Creates a lower (read-only workspace) and upper (session-scoped temp dir)
724/// layer, returning the overlay and the `TempDir` whose lifetime keeps the
725/// upper layer alive.
726async fn init_overlay_vfs(
727    root_handle: &DirHandle,
728    workspace_root: &Path,
729) -> Result<(Arc<OverlayVfs>, tempfile::TempDir), std::io::Error> {
730    let lower_vfs = HostVfs::new();
731    lower_vfs
732        .register_dir(root_handle.clone(), workspace_root.to_path_buf())
733        .await
734        .map_err(|_| std::io::Error::other("Failed to register lower vfs dir"))?;
735
736    let upper_temp = tempfile::TempDir::new()
737        .map_err(|e| std::io::Error::other(format!("Failed to create overlay temp dir: {e}")))?;
738    let upper_vfs = HostVfs::new();
739    upper_vfs
740        .register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
741        .await
742        .map_err(|_| std::io::Error::other("Failed to register upper vfs dir"))?;
743
744    let overlay = Arc::new(OverlayVfs::new(Box::new(lower_vfs), Box::new(upper_vfs)));
745    Ok((overlay, upper_temp))
746}
747
748/// Loads the runtime signing key from `~/.astrid/keys/runtime.key`, generating a
749/// new one if it doesn't exist. Opens the `SurrealKV`-backed audit database at
750/// `~/.astrid/audit.db` and runs `verify_all()` to detect any tampering of
751/// historical entries. Verification failures are logged at `error!` level but
752/// do not block boot (fail-open for availability, loud alert for integrity).
753fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
754    use astrid_core::dirs::AstridHome;
755
756    let home = AstridHome::resolve()
757        .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
758    home.ensure()
759        .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
760
761    let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
762    let audit_log = AuditLog::open(home.audit_db_path(), runtime_key)
763        .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
764
765    // Verify all historical chains on boot.
766    match audit_log.verify_all() {
767        Ok(results) => {
768            let total_sessions = results.len();
769            let mut tampered_sessions: usize = 0;
770
771            for (session_id, result) in &results {
772                if !result.valid {
773                    tampered_sessions = tampered_sessions.saturating_add(1);
774                    for issue in &result.issues {
775                        tracing::error!(
776                            session_id = %session_id,
777                            issue = %issue,
778                            "Audit chain integrity violation detected"
779                        );
780                    }
781                }
782            }
783
784            if tampered_sessions > 0 {
785                tracing::error!(
786                    total_sessions,
787                    tampered_sessions,
788                    "Audit chain verification found tampered sessions"
789                );
790            } else if total_sessions > 0 {
791                tracing::info!(
792                    total_sessions,
793                    "Audit chain verification passed for all sessions"
794                );
795            }
796        },
797        Err(e) => {
798            tracing::error!(error = %e, "Audit chain verification failed to run");
799        },
800    }
801
802    Ok(Arc::new(audit_log))
803}
804
805/// Load the runtime ed25519 signing key from disk, or generate and persist a new one.
806///
807/// The key file is 32 bytes of raw secret key material at `{keys_dir}/runtime.key`.
808fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
809    let key_path = keys_dir.join("runtime.key");
810
811    if key_path.exists() {
812        let bytes = std::fs::read(&key_path)?;
813        KeyPair::from_secret_key(&bytes).map_err(|e| {
814            std::io::Error::other(format!(
815                "invalid runtime key at {}: {e}",
816                key_path.display()
817            ))
818        })
819    } else {
820        let keypair = KeyPair::generate();
821        std::fs::create_dir_all(keys_dir)?;
822        std::fs::write(&key_path, keypair.secret_key_bytes())?;
823
824        // Secure permissions (owner-only) on Unix.
825        #[cfg(unix)]
826        {
827            use std::os::unix::fs::PermissionsExt;
828            std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
829        }
830
831        tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
832        Ok(keypair)
833    }
834}
835
836/// Spawns a background task that cleanly shuts down the Kernel if there is no activity.
837///
838/// Uses dual-signal idle detection:
839/// - **Primary:** explicit `active_connections` counter (incremented on first IPC
840///   message per source, decremented on `Disconnect`).
841/// - **Secondary:** `EventBus::subscriber_count()` minus the kernel router's own
842///   subscription. When a CLI process dies without sending `Disconnect`, its
843///   broadcast receiver is dropped so the subscriber count falls.
844///
845/// Takes the minimum of both signals to handle ungraceful disconnects.
846///
847/// Configurable via `ASTRID_IDLE_TIMEOUT_SECS` (default 300 = 5 minutes).
848/// Number of permanent internal event bus subscribers that are not client
849/// connections: `KernelRouter` (`kernel.request.*`), `ConnectionTracker` (`client.*`),
850/// and `EventDispatcher` (all events).
851const INTERNAL_SUBSCRIBER_COUNT: usize = 3;
852
853fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
854    tokio::spawn(async move {
855        let grace = std::time::Duration::from_secs(30);
856        let timeout_secs: u64 = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
857            .ok()
858            .and_then(|v| v.parse().ok())
859            .unwrap_or(300);
860        let idle_timeout = std::time::Duration::from_secs(timeout_secs);
861        let check_interval = std::time::Duration::from_secs(15);
862
863        tokio::time::sleep(grace).await;
864        let mut idle_since: Option<tokio::time::Instant> = None;
865
866        loop {
867            tokio::time::sleep(check_interval).await;
868
869            let connections = kernel.connection_count();
870
871            // Secondary signal: broadcast subscriber count. Subtract the
872            // permanent internal subscribers: KernelRouter (kernel.request.*),
873            // ConnectionTracker (client.*), and EventDispatcher (all events).
874            let bus_subscribers = kernel
875                .event_bus
876                .subscriber_count()
877                .saturating_sub(INTERNAL_SUBSCRIBER_COUNT);
878
879            // Take the minimum: if a CLI died without Disconnect, the counter
880            // stays inflated but the subscriber count drops.
881            let effective_connections = connections.min(bus_subscribers);
882
883            let has_daemons = {
884                let reg = kernel.capsules.read().await;
885                reg.values().any(|c| {
886                    let m = c.manifest();
887                    !m.uplinks.is_empty() || !m.cron_jobs.is_empty()
888                })
889            };
890
891            if effective_connections == 0 && !has_daemons {
892                let now = tokio::time::Instant::now();
893                let start = *idle_since.get_or_insert(now);
894                let elapsed = now.duration_since(start);
895
896                tracing::debug!(
897                    idle_secs = elapsed.as_secs(),
898                    timeout_secs,
899                    connections,
900                    bus_subscribers,
901                    "Kernel idle, monitoring timeout"
902                );
903
904                if elapsed >= idle_timeout {
905                    tracing::info!("Idle timeout reached, initiating shutdown");
906                    kernel.shutdown(Some("idle_timeout".to_string())).await;
907                    std::process::exit(0);
908                }
909            } else {
910                if idle_since.is_some() {
911                    tracing::debug!(
912                        effective_connections,
913                        has_daemons,
914                        "Activity detected, resetting idle timer"
915                    );
916                }
917                idle_since = None;
918            }
919        }
920    })
921}
922
923/// Tracks restart attempts for a single capsule with exponential backoff.
924struct RestartTracker {
925    attempts: u32,
926    last_attempt: std::time::Instant,
927    backoff: std::time::Duration,
928}
929
930impl RestartTracker {
931    const MAX_ATTEMPTS: u32 = 5;
932    const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
933    const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
934
935    fn new() -> Self {
936        Self {
937            attempts: 0,
938            last_attempt: std::time::Instant::now(),
939            backoff: Self::INITIAL_BACKOFF,
940        }
941    }
942
943    /// Returns `true` if a restart should be attempted now.
944    fn should_restart(&self) -> bool {
945        self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
946    }
947
948    /// Record a restart attempt and advance the backoff.
949    fn record_attempt(&mut self) {
950        self.attempts = self.attempts.saturating_add(1);
951        self.last_attempt = std::time::Instant::now();
952        self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
953    }
954
955    /// Returns `true` if all retry attempts have been exhausted.
956    fn exhausted(&self) -> bool {
957        self.attempts >= Self::MAX_ATTEMPTS
958    }
959}
960
961/// Attempts to restart a failed capsule, respecting backoff and max retries.
962///
963/// Returns `true` if the tracker should be removed (successful restart).
964async fn attempt_capsule_restart(
965    kernel: &Kernel,
966    id_str: &str,
967    tracker: &mut RestartTracker,
968) -> bool {
969    if tracker.exhausted() {
970        return false;
971    }
972
973    if !tracker.should_restart() {
974        tracing::debug!(
975            capsule_id = %id_str,
976            next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
977            "Waiting for backoff before next restart attempt"
978        );
979        return false;
980    }
981
982    tracker.record_attempt();
983    let attempt = tracker.attempts;
984
985    tracing::warn!(
986        capsule_id = %id_str,
987        attempt,
988        max_attempts = RestartTracker::MAX_ATTEMPTS,
989        "Attempting capsule restart"
990    );
991
992    let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
993    match kernel.restart_capsule(&capsule_id).await {
994        Ok(()) => {
995            tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
996            true
997        },
998        Err(e) => {
999            tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1000            if tracker.exhausted() {
1001                tracing::error!(
1002                    capsule_id = %id_str,
1003                    "All restart attempts exhausted - capsule will remain down"
1004                );
1005            }
1006            false
1007        },
1008    }
1009}
1010
1011/// Spawns a background task that periodically probes capsule health.
1012///
1013/// Every 10 seconds, reads the capsule registry and calls `check_health()` on
1014/// each capsule that is currently in `Ready` state. If a capsule reports
1015/// `Failed`, attempts to restart it with exponential backoff (max 5 attempts).
1016/// Publishes `astrid.v1.health.failed` IPC events for each detected failure.
1017fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1018    tokio::spawn(async move {
1019        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1020        interval.tick().await; // Skip the first immediate tick.
1021
1022        let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1023            std::collections::HashMap::new();
1024
1025        loop {
1026            interval.tick().await;
1027
1028            // Collect ready capsules under a brief read lock, then drop
1029            // the lock before calling check_health() or publishing events.
1030            let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1031                let registry = kernel.capsules.read().await;
1032                registry
1033                    .list()
1034                    .into_iter()
1035                    .filter_map(|id| {
1036                        let capsule = registry.get(id)?;
1037                        if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1038                            Some(capsule)
1039                        } else {
1040                            None
1041                        }
1042                    })
1043                    .collect()
1044            };
1045
1046            // Probe health once per capsule, collect failures, then drop
1047            // the Arc Vec before restarting. This ensures restart_capsule's
1048            // Arc::get_mut can succeed (no other strong references held).
1049            let mut failures: Vec<(String, String)> = Vec::new();
1050            for capsule in &ready_capsules {
1051                let health = capsule.check_health();
1052                if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1053                    let id_str = capsule.id().to_string();
1054                    tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1055
1056                    let msg = astrid_events::ipc::IpcMessage::new(
1057                        "astrid.v1.health.failed",
1058                        astrid_events::ipc::IpcPayload::Custom {
1059                            data: serde_json::json!({
1060                                "capsule_id": &id_str,
1061                                "reason": &reason,
1062                            }),
1063                        },
1064                        uuid::Uuid::new_v4(),
1065                    );
1066                    let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1067                        metadata: astrid_events::EventMetadata::new("kernel"),
1068                        message: msg,
1069                    });
1070                    failures.push((id_str, reason));
1071                }
1072            }
1073
1074            // Drop all Arc clones so restart_capsule's Arc::get_mut can
1075            // obtain exclusive access for calling unload().
1076            drop(ready_capsules);
1077
1078            let failed_this_tick: std::collections::HashSet<&str> =
1079                failures.iter().map(|(id, _)| id.as_str()).collect();
1080
1081            let mut restarted = Vec::new();
1082            for (id_str, _reason) in &failures {
1083                let tracker = restart_trackers
1084                    .entry(id_str.clone())
1085                    .or_insert_with(RestartTracker::new);
1086
1087                if attempt_capsule_restart(&kernel, id_str, tracker).await {
1088                    restarted.push(id_str.clone());
1089                }
1090            }
1091
1092            // Remove trackers for successfully restarted capsules.
1093            for id in &restarted {
1094                restart_trackers.remove(id);
1095            }
1096
1097            // Prune trackers for capsules that recovered (healthy this tick).
1098            // Keep exhausted trackers and trackers still in their backoff
1099            // window (capsule may have been unregistered by a failed restart
1100            // attempt and won't appear in ready_capsules next tick).
1101            restart_trackers.retain(|id, tracker| {
1102                if tracker.exhausted() {
1103                    return true;
1104                }
1105                // Keep if still within backoff - the capsule may be absent
1106                // from the registry after a failed reload.
1107                if tracker.last_attempt.elapsed() < tracker.backoff {
1108                    return true;
1109                }
1110                failed_this_tick.contains(id.as_str())
1111            });
1112        }
1113    })
1114}
1115
1116/// Spawns a periodic watchdog that publishes `astrid.v1.watchdog.tick` events every 5 seconds.
1117///
1118/// The `ReAct` capsule (WASM guest) cannot use async timers, so this kernel-side task
1119/// drives timeout enforcement by waking the capsule on a fixed interval. Each tick
1120/// causes the capsule's `handle_watchdog_tick` interceptor to run `check_phase_timeout`.
1121fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1122    tokio::spawn(async move {
1123        let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1124        // The first tick fires immediately - skip it to give capsules time to load.
1125        interval.tick().await;
1126
1127        loop {
1128            interval.tick().await;
1129
1130            let msg = astrid_events::ipc::IpcMessage::new(
1131                "astrid.v1.watchdog.tick",
1132                astrid_events::ipc::IpcPayload::Custom {
1133                    data: serde_json::json!({}),
1134                },
1135                uuid::Uuid::new_v4(),
1136            );
1137            let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1138                metadata: astrid_events::EventMetadata::new("kernel"),
1139                message: msg,
1140            });
1141        }
1142    })
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148
1149    #[test]
1150    fn test_load_or_generate_creates_new_key() {
1151        let dir = tempfile::tempdir().unwrap();
1152        let keys_dir = dir.path().join("keys");
1153
1154        let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1155        let key_path = keys_dir.join("runtime.key");
1156
1157        // Key file should exist with 32 bytes.
1158        assert!(key_path.exists());
1159        let bytes = std::fs::read(&key_path).unwrap();
1160        assert_eq!(bytes.len(), 32);
1161
1162        // The written bytes should reconstruct the same public key.
1163        let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1164        assert_eq!(
1165            keypair.public_key_bytes(),
1166            reloaded.public_key_bytes(),
1167            "reloaded key should match generated key"
1168        );
1169    }
1170
1171    #[test]
1172    fn test_load_or_generate_is_idempotent() {
1173        let dir = tempfile::tempdir().unwrap();
1174        let keys_dir = dir.path().join("keys");
1175
1176        let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1177        let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1178
1179        assert_eq!(
1180            first.public_key_bytes(),
1181            second.public_key_bytes(),
1182            "loading the same key file should produce the same keypair"
1183        );
1184    }
1185
1186    #[test]
1187    fn test_load_or_generate_rejects_bad_key_length() {
1188        let dir = tempfile::tempdir().unwrap();
1189        let keys_dir = dir.path().join("keys");
1190        std::fs::create_dir_all(&keys_dir).unwrap();
1191
1192        // Write a key file with wrong length.
1193        std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1194
1195        let result = load_or_generate_runtime_key(&keys_dir);
1196        assert!(result.is_err());
1197        let err = result.unwrap_err().to_string();
1198        assert!(
1199            err.contains("invalid runtime key"),
1200            "expected 'invalid runtime key' error, got: {err}"
1201        );
1202    }
1203
1204    #[test]
1205    fn test_connection_counter_increment_decrement() {
1206        let counter = AtomicUsize::new(0);
1207
1208        // Simulate connection_opened (fetch_add)
1209        counter.fetch_add(1, Ordering::Relaxed);
1210        counter.fetch_add(1, Ordering::Relaxed);
1211        assert_eq!(counter.load(Ordering::Relaxed), 2);
1212
1213        // Simulate connection_closed using the same fetch_update logic
1214        // as the real implementation to exercise the actual code path.
1215        for expected in [1, 0] {
1216            let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1217                if n == 0 {
1218                    None
1219                } else {
1220                    Some(n.saturating_sub(1))
1221                }
1222            });
1223            assert_eq!(counter.load(Ordering::Relaxed), expected);
1224        }
1225    }
1226
1227    #[test]
1228    fn test_connection_counter_underflow_guard() {
1229        // Test the saturating behavior: decrementing from 0 should stay at 0.
1230        // Mirrors the fetch_update logic in connection_closed().
1231        let counter = AtomicUsize::new(0);
1232
1233        let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1234            if n == 0 { None } else { Some(n - 1) }
1235        });
1236        // fetch_update returns Err(0) when the closure returns None (no-op).
1237        assert!(result.is_err());
1238        assert_eq!(counter.load(Ordering::Relaxed), 0);
1239    }
1240
1241    /// Mirrors the `connection_closed()` logic: only `Ok(1)` (previous value 1,
1242    /// now 0) triggers `clear_session_allowances`. Update this test if
1243    /// `connection_closed()` is refactored.
1244    #[test]
1245    fn test_last_disconnect_clears_session_allowances() {
1246        use astrid_approval::AllowanceStore;
1247        use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1248        use astrid_core::types::Timestamp;
1249        use astrid_crypto::KeyPair;
1250
1251        let store = AllowanceStore::new();
1252        let keypair = KeyPair::generate();
1253
1254        // Session-only allowance (should be cleared on last disconnect).
1255        store
1256            .add_allowance(Allowance {
1257                id: AllowanceId::new(),
1258                action_pattern: AllowancePattern::ServerTools {
1259                    server: "session-server".to_string(),
1260                },
1261                created_at: Timestamp::now(),
1262                expires_at: None,
1263                max_uses: None,
1264                uses_remaining: None,
1265                session_only: true,
1266                workspace_root: None,
1267                signature: keypair.sign(b"test"),
1268            })
1269            .unwrap();
1270
1271        // Persistent allowance (should survive).
1272        store
1273            .add_allowance(Allowance {
1274                id: AllowanceId::new(),
1275                action_pattern: AllowancePattern::ServerTools {
1276                    server: "persistent-server".to_string(),
1277                },
1278                created_at: Timestamp::now(),
1279                expires_at: None,
1280                max_uses: None,
1281                uses_remaining: None,
1282                session_only: false,
1283                workspace_root: None,
1284                signature: keypair.sign(b"test"),
1285            })
1286            .unwrap();
1287
1288        assert_eq!(store.count(), 2);
1289
1290        let counter = AtomicUsize::new(2);
1291        let simulate_disconnect = || {
1292            let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1293                if n == 0 {
1294                    None
1295                } else {
1296                    Some(n.saturating_sub(1))
1297                }
1298            });
1299            if result == Ok(1) {
1300                store.clear_session_allowances();
1301            }
1302        };
1303
1304        // Two connections active. First disconnect: 2 -> 1 (not last).
1305        simulate_disconnect();
1306        assert_eq!(
1307            store.count(),
1308            2,
1309            "both allowances should survive non-final disconnect"
1310        );
1311
1312        // Second disconnect: 1 -> 0 (last client gone).
1313        simulate_disconnect();
1314        assert_eq!(
1315            store.count(),
1316            1,
1317            "session allowance should be cleared on last disconnect"
1318        );
1319    }
1320
1321    #[cfg(unix)]
1322    #[test]
1323    fn test_load_or_generate_sets_secure_permissions() {
1324        use std::os::unix::fs::PermissionsExt;
1325
1326        let dir = tempfile::tempdir().unwrap();
1327        let keys_dir = dir.path().join("keys");
1328
1329        let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1330
1331        let key_path = keys_dir.join("runtime.key");
1332        let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1333        assert_eq!(
1334            mode & 0o777,
1335            0o600,
1336            "key file should have 0o600 permissions, got {mode:#o}"
1337        );
1338    }
1339
1340    #[test]
1341    fn restart_tracker_initial_state() {
1342        let tracker = RestartTracker::new();
1343        assert!(!tracker.exhausted());
1344        // Should not restart immediately (backoff hasn't elapsed).
1345        assert!(!tracker.should_restart());
1346    }
1347
1348    #[test]
1349    fn restart_tracker_allows_restart_after_backoff() {
1350        let mut tracker = RestartTracker::new();
1351        // Simulate time passing by setting last_attempt in the past.
1352        tracker.last_attempt = std::time::Instant::now()
1353            - RestartTracker::INITIAL_BACKOFF
1354            - std::time::Duration::from_millis(1);
1355        assert!(tracker.should_restart());
1356    }
1357
1358    #[test]
1359    fn restart_tracker_doubles_backoff() {
1360        let mut tracker = RestartTracker::new();
1361        assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1362
1363        tracker.record_attempt();
1364        assert_eq!(
1365            tracker.backoff,
1366            RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1367        );
1368        assert_eq!(tracker.attempts, 1);
1369
1370        tracker.record_attempt();
1371        assert_eq!(
1372            tracker.backoff,
1373            RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1374        );
1375        assert_eq!(tracker.attempts, 2);
1376    }
1377
1378    #[test]
1379    fn restart_tracker_backoff_caps_at_max() {
1380        let mut tracker = RestartTracker::new();
1381        for _ in 0..20 {
1382            tracker.record_attempt();
1383        }
1384        assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1385    }
1386
1387    #[test]
1388    fn restart_tracker_exhausted_at_max_attempts() {
1389        let mut tracker = RestartTracker::new();
1390        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1391            assert!(!tracker.exhausted());
1392            tracker.record_attempt();
1393        }
1394        assert!(tracker.exhausted());
1395    }
1396
1397    #[test]
1398    fn restart_tracker_should_restart_false_when_exhausted() {
1399        let mut tracker = RestartTracker::new();
1400        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1401            tracker.record_attempt();
1402        }
1403        // Even if backoff has elapsed, exhausted tracker should not restart.
1404        tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1405        assert!(!tracker.should_restart());
1406    }
1407}
1408
1409// ---------------------------------------------------------------------------
1410// Identity bootstrap helpers
1411// ---------------------------------------------------------------------------
1412
1413/// Bootstrap the CLI root user identity at kernel boot.
1414///
1415/// Creates a deterministic root `AstridUserId` on first boot, or reloads it
1416/// on subsequent boots. Auto-links with `platform="cli"`,
1417/// `platform_user_id="local"`, `method="system"`.
1418///
1419/// Idempotent: skips creation if the root user already exists.
1420async fn bootstrap_cli_root_user(
1421    store: &Arc<dyn astrid_storage::IdentityStore>,
1422) -> Result<(), astrid_storage::IdentityError> {
1423    // Check if root user already exists by trying to resolve the CLI link.
1424    if let Some(_user) = store.resolve("cli", "local").await? {
1425        tracing::debug!("CLI root user already linked");
1426        return Ok(());
1427    }
1428
1429    // No CLI link exists. Create or find the root user.
1430    let user = store.create_user(Some("root")).await?;
1431    tracing::info!(user_id = %user.id, "Created CLI root user");
1432
1433    // Link the CLI platform identity.
1434    store.link("cli", "local", user.id, "system").await?;
1435    tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1436
1437    Ok(())
1438}
1439
1440/// Apply pre-configured identity links from the config file.
1441///
1442/// For each `[[identity.links]]` entry, resolves or creates the referenced
1443/// Astrid user and links the platform identity. Logs warnings on failure
1444/// but does not abort boot.
1445async fn apply_identity_config(
1446    store: &Arc<dyn astrid_storage::IdentityStore>,
1447    workspace_root: &std::path::Path,
1448) {
1449    let config = match astrid_config::Config::load(Some(workspace_root)) {
1450        Ok(resolved) => resolved.config,
1451        Err(e) => {
1452            tracing::debug!(error = %e, "No config loaded for identity links");
1453            return;
1454        },
1455    };
1456
1457    for link_cfg in &config.identity.links {
1458        let result = apply_single_identity_link(store, link_cfg).await;
1459        if let Err(e) = result {
1460            tracing::warn!(
1461                platform = %link_cfg.platform,
1462                platform_user_id = %link_cfg.platform_user_id,
1463                astrid_user = %link_cfg.astrid_user,
1464                error = %e,
1465                "Failed to apply identity link from config"
1466            );
1467        }
1468    }
1469}
1470
1471/// Apply a single identity link from config.
1472async fn apply_single_identity_link(
1473    store: &Arc<dyn astrid_storage::IdentityStore>,
1474    link_cfg: &astrid_config::types::IdentityLinkConfig,
1475) -> Result<(), astrid_storage::IdentityError> {
1476    // Resolve astrid_user: try UUID first, then name lookup, then create.
1477    let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
1478        // Ensure user record exists. If the UUID was explicitly specified in
1479        // config but doesn't exist in the store, that's a configuration error
1480        // - don't silently create a different user.
1481        if store.get_user(uuid).await?.is_none() {
1482            return Err(astrid_storage::IdentityError::UserNotFound(uuid));
1483        }
1484        uuid
1485    } else {
1486        // Try name lookup.
1487        if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
1488            user.id
1489        } else {
1490            let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
1491            tracing::info!(
1492                user_id = %user.id,
1493                name = %link_cfg.astrid_user,
1494                "Created user from config identity link"
1495            );
1496            user.id
1497        }
1498    };
1499
1500    let method = if link_cfg.method.is_empty() {
1501        "admin"
1502    } else {
1503        &link_cfg.method
1504    };
1505
1506    // Check if link already points to the correct user - skip if idempotent.
1507    if let Some(existing) = store
1508        .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
1509        .await?
1510        && existing.id == user_id
1511    {
1512        tracing::debug!(
1513            platform = %link_cfg.platform,
1514            platform_user_id = %link_cfg.platform_user_id,
1515            user_id = %user_id,
1516            "Identity link from config already exists"
1517        );
1518        return Ok(());
1519    }
1520
1521    store
1522        .link(
1523            &link_cfg.platform,
1524            &link_cfg.platform_user_id,
1525            user_id,
1526            method,
1527        )
1528        .await?;
1529
1530    tracing::info!(
1531        platform = %link_cfg.platform,
1532        platform_user_id = %link_cfg.platform_user_id,
1533        user_id = %user_id,
1534        "Applied identity link from config"
1535    );
1536
1537    Ok(())
1538}