Skip to main content

astrid_kernel/
lib.rs

1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7//! Astrid Kernel - The core execution engine and IPC router.
8//!
9//! The Kernel is a pure, decentralized WASM runner. It contains no business
10//! logic, no cognitive loops, and no network servers. Its sole responsibility
11//! is to instantiate `astrid_events::EventBus`, load `.capsule` files into
12//! the Extism sandbox, and route IPC bytes between them.
13
14/// The Management API router listening to the `EventBus`.
15pub mod kernel_router;
16/// The Unix Domain Socket manager.
17pub mod socket;
18
19use astrid_audit::AuditLog;
20use astrid_capabilities::{CapabilityStore, DirHandle};
21use astrid_capsule::registry::CapsuleRegistry;
22use astrid_core::SessionId;
23use astrid_crypto::KeyPair;
24use astrid_events::EventBus;
25use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
26use astrid_vfs::{HostVfs, OverlayVfs, Vfs};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use tokio::sync::RwLock;
31
32/// The core Operating System Kernel.
33pub struct Kernel {
34    /// The unique identifier for this kernel session.
35    pub session_id: SessionId,
36    /// The global IPC message bus.
37    pub event_bus: Arc<EventBus>,
38    /// The process manager (loaded WASM capsules).
39    pub capsules: Arc<RwLock<CapsuleRegistry>>,
40    /// The secure MCP client with capability-based authorization and audit logging.
41    pub mcp: SecureMcpClient,
42    /// The capability store for this session.
43    pub capabilities: Arc<CapabilityStore>,
44    /// The global Virtual File System mount.
45    pub vfs: Arc<dyn Vfs>,
46    /// Concrete reference to the [`OverlayVfs`] for commit/rollback operations.
47    pub overlay_vfs: Arc<OverlayVfs>,
48    /// Ephemeral upper directory for the overlay VFS. Kept alive for the
49    /// kernel session lifetime; dropped on shutdown to discard uncommitted writes.
50    _upper_dir: Arc<tempfile::TempDir>,
51    /// The global physical root handle (cap-std) for the VFS.
52    pub vfs_root_handle: DirHandle,
53    /// The physical path the VFS is mounted to.
54    pub workspace_root: PathBuf,
55    /// The global shared resources directory (`~/.astrid/shared/`). Capsules
56    /// declaring `fs_read = ["global://"]` can read files under this root.
57    /// Scoped to `shared/` so that keys, databases, and capsule .env files in
58    /// `~/.astrid/` are NOT accessible. Write access is intentionally not
59    /// granted to any shipped capsule.
60    ///
61    /// Always `Some` in production (boot requires `AstridHome`). Remains
62    /// `Option` for compatibility with `CapsuleContext` and test fixtures.
63    pub global_root: Option<PathBuf>,
64    /// The natively bound Unix Socket for the CLI proxy.
65    pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
66    /// Shared KV store backing all capsule-scoped stores and kernel state.
67    pub kv: Arc<astrid_storage::SurrealKvStore>,
68    /// Chain-linked cryptographic audit log with persistent storage.
69    pub audit_log: Arc<AuditLog>,
70    /// Number of active client connections (CLI sessions).
71    pub active_connections: AtomicUsize,
72    /// Instant when the kernel was booted (for uptime calculation).
73    pub boot_time: std::time::Instant,
74    /// Sender for the API-initiated shutdown signal. The daemon's main loop
75    /// selects on the receiver to exit gracefully without `process::exit`.
76    pub shutdown_tx: tokio::sync::watch::Sender<bool>,
77    /// Session token for socket authentication. Generated at boot, written to
78    /// `~/.astrid/sessions/system.token`. CLI sends this as its first message.
79    pub session_token: Arc<astrid_core::session_token::SessionToken>,
80    /// Path where the session token was written at boot. Stored so shutdown
81    /// uses the exact same path (avoids fallback mismatch if env changes).
82    token_path: PathBuf,
83    /// Shared allowance store for capsule-level approval decisions.
84    ///
85    /// Capsules can check existing allowances and create new ones when
86    /// users approve actions with session/always scope.
87    pub allowance_store: Arc<astrid_approval::AllowanceStore>,
88    /// System-wide identity store for platform user resolution.
89    identity_store: Arc<dyn astrid_storage::IdentityStore>,
90}
91
92impl Kernel {
93    /// Boot a new Kernel instance mounted at the specified directory.
94    ///
95    /// # Panics
96    ///
97    /// Panics if called on a single-threaded tokio runtime. The capsule
98    /// system uses `block_in_place` which requires a multi-threaded runtime.
99    ///
100    /// # Errors
101    ///
102    /// Returns an error if the VFS mount paths cannot be registered.
103    pub async fn new(
104        session_id: SessionId,
105        workspace_root: PathBuf,
106    ) -> Result<Arc<Self>, std::io::Error> {
107        use astrid_core::dirs::AstridHome;
108
109        assert!(
110            tokio::runtime::Handle::current().runtime_flavor()
111                == tokio::runtime::RuntimeFlavor::MultiThread,
112            "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
113             single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
114        );
115
116        let event_bus = Arc::new(EventBus::new());
117        let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
118
119        // Resolve the Astrid home directory. Required for persistent KV store
120        // and audit log. Fails boot if neither $ASTRID_HOME nor $HOME is set.
121        let home = AstridHome::resolve().map_err(|e| {
122            std::io::Error::other(format!(
123                "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
124            ))
125        })?;
126
127        // Resolve the global shared directory for the `global://` VFS scheme.
128        // Scoped to `~/.astrid/shared/` — NOT the full `~/.astrid/` root — so
129        // capsules cannot access keys, databases, or capsule .env files.
130        let global_root = Some(home.shared_dir());
131
132        // 1. Open the persistent KV store (needed by capability store below).
133        let kv_path = home.state_db_path();
134        let kv = Arc::new(
135            astrid_storage::SurrealKvStore::open(&kv_path)
136                .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
137        );
138        // TODO: clear ephemeral keys (e: prefix) on boot when the key
139        // lifecycle tier convention is established.
140
141        // 2. Initialize MCP process manager with security layer.
142        //    Set workspace_root so sandboxed MCP servers have a writable directory.
143        let mcp_config = ServersConfig::load_default().unwrap_or_default();
144        let mcp_manager =
145            ServerManager::new(mcp_config).with_workspace_root(workspace_root.clone());
146        let mcp_client = McpClient::new(mcp_manager);
147
148        // 3. Bootstrap capability store (persistent) and audit log.
149        //    Key rotation invalidates persisted tokens (fail-secure by design).
150        let capabilities = Arc::new(
151            CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
152                .map_err(|e| {
153                    std::io::Error::other(format!("Failed to init capability store: {e}"))
154                })?,
155        );
156        let audit_log = open_audit_log()?;
157        let mcp = SecureMcpClient::new(
158            mcp_client,
159            Arc::clone(&capabilities),
160            Arc::clone(&audit_log),
161            session_id.clone(),
162        );
163
164        // 4. Establish the physical security boundary (sandbox handle)
165        let root_handle = DirHandle::new();
166
167        // 5. Initialize sandboxed overlay VFS (lower=workspace, upper=temp)
168        let (overlay_vfs, upper_temp) = init_overlay_vfs(&root_handle, &workspace_root).await?;
169
170        // 6. Bind the secure Unix socket and generate session token.
171        // The socket is bound here, but not yet listened on. The token is
172        // generated before any capsule can accept connections, preventing
173        // a race where a client connects before the token file exists.
174        let listener = socket::bind_session_socket()?;
175        let (session_token, token_path) = socket::generate_session_token()?;
176
177        let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
178
179        // Create system-wide identity store backed by the shared KV.
180        let identity_kv = astrid_storage::ScopedKvStore::new(
181            Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
182            "system:identity",
183        )
184        .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
185        let identity_store: Arc<dyn astrid_storage::IdentityStore> =
186            Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
187
188        // Bootstrap the CLI root user (idempotent).
189        bootstrap_cli_root_user(&identity_store)
190            .await
191            .map_err(|e| {
192                std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
193            })?;
194
195        // Apply pre-configured identity links from config.
196        apply_identity_config(&identity_store, &workspace_root).await;
197
198        let kernel = Arc::new(Self {
199            session_id,
200            event_bus,
201            capsules,
202            mcp,
203            capabilities,
204            vfs: Arc::clone(&overlay_vfs) as Arc<dyn Vfs>,
205            overlay_vfs,
206            _upper_dir: Arc::new(upper_temp),
207            vfs_root_handle: root_handle,
208            workspace_root,
209            global_root,
210            cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
211            kv,
212            audit_log,
213            active_connections: AtomicUsize::new(0),
214            boot_time: std::time::Instant::now(),
215            shutdown_tx: tokio::sync::watch::channel(false).0,
216            session_token: Arc::new(session_token),
217            token_path,
218            allowance_store,
219            identity_store,
220        });
221
222        drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
223        drop(spawn_idle_monitor(Arc::clone(&kernel)));
224        drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
225        drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
226
227        // Spawn the event dispatcher — routes EventBus events to capsule interceptors
228        let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
229            Arc::clone(&kernel.capsules),
230            Arc::clone(&kernel.event_bus),
231        );
232        tokio::spawn(dispatcher.run());
233
234        debug_assert_eq!(
235            kernel.event_bus.subscriber_count(),
236            INTERNAL_SUBSCRIBER_COUNT,
237            "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
238        );
239
240        Ok(kernel)
241    }
242
243    /// Load a capsule into the Kernel from a directory containing a Capsule.toml
244    ///
245    /// # Errors
246    ///
247    /// Returns an error if the manifest cannot be loaded, the capsule cannot be created, or registration fails.
248    async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
249        let manifest_path = dir.join("Capsule.toml");
250        let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
251            .map_err(|e| anyhow::anyhow!(e))?;
252
253        let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
254        let mut capsule = loader.create_capsule(manifest, dir.clone())?;
255
256        // Build the context — use the shared kernel KV so capsules can
257        // communicate state through overlapping KV namespaces.
258        let kv = astrid_storage::ScopedKvStore::new(
259            Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
260            format!("capsule:{}", capsule.id()),
261        )?;
262
263        // Pre-load `.env.json` into the KV store if it exists
264        let env_path = dir.join(".env.json");
265        if env_path.exists()
266            && let Ok(contents) = std::fs::read_to_string(&env_path)
267            && let Ok(env_map) =
268                serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
269        {
270            for (k, v) in env_map {
271                let _ = kv.set(&k, v.into_bytes()).await;
272            }
273        }
274
275        let ctx = astrid_capsule::context::CapsuleContext::new(
276            self.workspace_root.clone(),
277            self.global_root.clone(),
278            kv,
279            Arc::clone(&self.event_bus),
280            self.cli_socket_listener.clone(),
281        )
282        .with_registry(Arc::clone(&self.capsules))
283        .with_session_token(Arc::clone(&self.session_token))
284        .with_allowance_store(Arc::clone(&self.allowance_store))
285        .with_identity_store(Arc::clone(&self.identity_store));
286
287        capsule.load(&ctx).await?;
288
289        let mut registry = self.capsules.write().await;
290        registry
291            .register(capsule)
292            .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
293
294        Ok(())
295    }
296
297    /// Restart a capsule by unloading it and re-loading from its source directory.
298    ///
299    /// # Errors
300    ///
301    /// Returns an error if the capsule has no source directory, cannot be
302    /// unregistered, or fails to reload.
303    async fn restart_capsule(
304        &self,
305        id: &astrid_capsule::capsule::CapsuleId,
306    ) -> Result<(), anyhow::Error> {
307        // Get source directory before unregistering.
308        let source_dir = {
309            let registry = self.capsules.read().await;
310            let capsule = registry
311                .get(id)
312                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
313            capsule
314                .source_dir()
315                .map(std::path::Path::to_path_buf)
316                .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
317        };
318
319        // Unregister and explicitly unload. There is no Drop impl that
320        // calls unload() (it's async), so we must do it here to avoid
321        // leaking MCP subprocesses and other engine resources.
322        let old_capsule = {
323            let mut registry = self.capsules.write().await;
324            registry
325                .unregister(id)
326                .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
327        };
328        // Explicitly unload the old capsule. There is no Drop impl that
329        // calls unload() (it's async), so we must do it here to avoid
330        // leaking MCP subprocesses and other engine resources.
331        // Arc::get_mut requires exclusive ownership (strong_count == 1).
332        {
333            let mut old = old_capsule;
334            if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
335                if let Err(e) = capsule.unload().await {
336                    tracing::warn!(
337                        capsule_id = %id,
338                        error = %e,
339                        "Capsule unload failed during restart"
340                    );
341                }
342            } else {
343                tracing::warn!(
344                    capsule_id = %id,
345                    "Cannot call unload during restart - Arc still held by in-flight task"
346                );
347            }
348        }
349
350        // Re-load from disk.
351        self.load_capsule(source_dir).await?;
352
353        // Signal the newly loaded capsule to clean up ephemeral state
354        // from the previous incarnation. Capsules that don't implement
355        // `handle_lifecycle_restart` will return an error, which is fine.
356        //
357        // Clone the capsule Arc under a brief read lock, then drop the
358        // guard before invoke_interceptor which calls block_in_place.
359        // Holding the RwLock across block_in_place parks the worker thread
360        // and starves registry writers (health monitor, capsule loading).
361        let capsule = {
362            let registry = self.capsules.read().await;
363            registry.get(id)
364        };
365        if let Some(capsule) = capsule
366            && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[])
367        {
368            tracing::debug!(
369                capsule_id = %id,
370                error = %e,
371                "Capsule does not handle lifecycle restart (optional)"
372            );
373        }
374
375        Ok(())
376    }
377
378    /// Auto-discover and load all capsules from the standard directories (`~/.astrid/capsules` and `.astrid/capsules`).
379    ///
380    /// Capsules are loaded in dependency order (topological sort) with
381    /// uplink/daemon capsules loaded first. Each uplink must signal
382    /// readiness before non-uplink capsules are loaded.
383    ///
384    /// After all capsules are loaded, tool schemas are injected into every
385    /// capsule's KV namespace and the `astrid.v1.capsules_loaded` event is published.
386    pub async fn load_all_capsules(&self) {
387        use astrid_capsule::toposort::toposort_manifests;
388        use astrid_core::dirs::AstridHome;
389
390        let mut paths = Vec::new();
391        if let Ok(home) = AstridHome::resolve() {
392            paths.push(home.capsules_dir());
393        }
394
395        let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
396
397        // Topological sort ALL capsules together so cross-partition
398        // requirements (e.g. a non-uplink requiring an uplink's capability)
399        // resolve correctly without spurious "not provided" warnings.
400        let sorted = match toposort_manifests(discovered) {
401            Ok(sorted) => sorted,
402            Err((e, original)) => {
403                tracing::error!(
404                    cycle = %e,
405                    "Dependency cycle in capsules, falling back to discovery order"
406                );
407                original
408            },
409        };
410
411        // Defence-in-depth: manifest validation in discovery.rs rejects
412        // uplinks with `requires`, but warn here in case a manifest bypasses
413        // the normal load path.
414        for (manifest, _) in &sorted {
415            if manifest.capabilities.uplink && !manifest.dependencies.requires.is_empty() {
416                tracing::warn!(
417                    capsule = %manifest.package.name,
418                    requires = ?manifest.dependencies.requires,
419                    "Uplink capsule has [dependencies].requires - \
420                     this should have been rejected at manifest load time"
421                );
422            }
423        }
424
425        // Partition after sorting: uplinks first, then the rest.
426        // The relative order within each partition is preserved from the
427        // toposort, so dependency edges are still respected. Cross-partition
428        // edges (non-uplink requiring an uplink) are satisfied by construction
429        // since all uplinks load first. The inverse (uplink requiring a
430        // non-uplink) is rejected above.
431        let (uplinks, others): (Vec<_>, Vec<_>) =
432            sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
433
434        // Load uplinks first so their event bus subscriptions are ready.
435        let uplink_names: Vec<String> = uplinks
436            .iter()
437            .map(|(m, _)| m.package.name.clone())
438            .collect();
439        for (manifest, dir) in &uplinks {
440            if let Err(e) = self.load_capsule(dir.clone()).await {
441                tracing::warn!(
442                    capsule = %manifest.package.name,
443                    error = %e,
444                    "Failed to load uplink capsule during discovery"
445                );
446            }
447        }
448
449        // Wait for uplink capsules to signal readiness before loading
450        // non-uplink capsules. This ensures IPC subscriptions are active.
451        self.await_capsule_readiness(&uplink_names).await;
452
453        for (manifest, dir) in &others {
454            if let Err(e) = self.load_capsule(dir.clone()).await {
455                tracing::warn!(
456                    capsule = %manifest.package.name,
457                    error = %e,
458                    "Failed to load capsule during discovery"
459                );
460            }
461        }
462
463        // Wait for non-uplink run-loop capsules too, so any future
464        // dependency edges between them are respected.
465        let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
466        self.await_capsule_readiness(&other_names).await;
467
468        // Inject tool schemas into every capsule's KV namespace so any
469        // capsule (e.g. react) can read them via kv::get_json("tool_schemas").
470        self.inject_tool_schemas().await;
471
472        // Signal that all capsules have been loaded so uplink capsules
473        // (like the registry) can proceed with discovery instead of
474        // polling with arbitrary timeouts.
475        let msg = astrid_events::ipc::IpcMessage::new(
476            "astrid.v1.capsules_loaded",
477            astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
478            self.session_id.0,
479        );
480        let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
481            metadata: astrid_events::EventMetadata::new("kernel"),
482            message: msg,
483        });
484    }
485
486    /// Record that a new client connection has been established.
487    pub fn connection_opened(&self) {
488        self.active_connections.fetch_add(1, Ordering::Relaxed);
489    }
490
491    /// Record that a client connection has been closed.
492    ///
493    /// Uses `fetch_update` for atomic saturating decrement - avoids the TOCTOU
494    /// window where `fetch_sub` wraps to `usize::MAX` before a corrective store.
495    ///
496    /// When the last connection closes (counter reaches 0), clears all
497    /// session-scoped allowances so they don't leak into the next CLI session.
498    pub fn connection_closed(&self) {
499        let result =
500            self.active_connections
501                .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
502                    if n == 0 {
503                        None
504                    } else {
505                        Some(n.saturating_sub(1))
506                    }
507                });
508
509        // Previous value was 1 -> now 0: last client disconnected.
510        // Clear session-scoped allowances so they don't leak into the next session.
511        if result == Ok(1) {
512            self.allowance_store.clear_session_allowances();
513            tracing::info!("last client disconnected, session allowances cleared");
514        }
515    }
516
517    /// Number of active client connections.
518    pub fn connection_count(&self) -> usize {
519        self.active_connections.load(Ordering::Relaxed)
520    }
521
522    /// Gracefully shut down the kernel.
523    ///
524    /// 1. Publish `KernelShutdown` event on the bus.
525    /// 2. Drain and unload all capsules (stops MCP child processes, WASM engines).
526    /// 3. Flush and close the persistent KV store.
527    /// 4. Remove the Unix socket file.
528    pub async fn shutdown(&self, reason: Option<String>) {
529        tracing::info!(reason = ?reason, "Kernel shutting down");
530
531        // 1. Notify all subscribers so capsules can react.
532        let _ = self
533            .event_bus
534            .publish(astrid_events::AstridEvent::KernelShutdown {
535                metadata: astrid_events::EventMetadata::new("kernel"),
536                reason: reason.clone(),
537            });
538
539        // 2. Drain the registry so the dispatcher cannot hand out new Arc clones,
540        // then unload each capsule. MCP engine unload is critical - it calls
541        // `mcp_client.disconnect()` to gracefully terminate child processes.
542        // Without explicit unload, MCP child processes become orphaned.
543        //
544        // The `EventDispatcher` temporarily clones `Arc<dyn Capsule>` into
545        // spawned interceptor tasks. After draining, no new clones can be
546        // created, but in-flight tasks may still hold references. We retry
547        // `Arc::get_mut` with brief yields to let them complete.
548        let capsules = {
549            let mut reg = self.capsules.write().await;
550            reg.drain()
551        };
552        for mut arc in capsules {
553            let id = arc.id().clone();
554            let mut unloaded = false;
555
556            for retry in 0..20_u32 {
557                if let Some(capsule) = Arc::get_mut(&mut arc) {
558                    if let Err(e) = capsule.unload().await {
559                        tracing::warn!(
560                            capsule_id = %id,
561                            error = %e,
562                            "Failed to unload capsule during shutdown"
563                        );
564                    }
565                    unloaded = true;
566                    break;
567                }
568                if retry < 19 {
569                    tokio::time::sleep(std::time::Duration::from_millis(50)).await;
570                }
571            }
572
573            if !unloaded {
574                tracing::warn!(
575                    capsule_id = %id,
576                    strong_count = Arc::strong_count(&arc),
577                    "Dropping capsule without explicit unload after retries exhausted; \
578                     MCP child processes may be orphaned"
579                );
580            }
581            drop(arc);
582        }
583
584        // 3. Flush the persistent KV store.
585        if let Err(e) = self.kv.close().await {
586            tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
587        }
588
589        // 4. Remove the socket and token files so stale-socket detection works
590        // on next boot and the auth token doesn't persist on disk after shutdown.
591        // This runs AFTER capsule unload, which is the correct order: MCP child
592        // processes communicate via stdio pipes (not this Unix socket), so they
593        // are already terminated by step 2. The socket is only used for
594        // CLI-to-kernel IPC.
595        let socket_path = crate::socket::kernel_socket_path();
596        let _ = std::fs::remove_file(&socket_path);
597        let _ = std::fs::remove_file(&self.token_path);
598        crate::socket::remove_readiness_file();
599
600        tracing::info!("Kernel shutdown complete");
601    }
602
603    /// Wait for a set of capsules to signal readiness, in parallel.
604    ///
605    /// Collects `Arc<dyn Capsule>` handles under a short-lived read lock,
606    /// then drops the lock before awaiting. Capsules without a run loop
607    /// return `Ready` immediately and don't contribute to wait time.
608    async fn await_capsule_readiness(&self, names: &[String]) {
609        use astrid_capsule::capsule::ReadyStatus;
610
611        if names.is_empty() {
612            return;
613        }
614
615        let timeout = std::time::Duration::from_millis(500);
616        let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
617            let registry = self.capsules.read().await;
618            names
619                .iter()
620                .filter_map(
621                    |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
622                        Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
623                        Err(e) => {
624                            tracing::warn!(
625                                capsule = %name,
626                                error = %e,
627                                "Invalid capsule ID, skipping readiness wait"
628                            );
629                            None
630                        },
631                    },
632                )
633                .collect()
634        };
635
636        // Await all capsules concurrently - independent capsules shouldn't
637        // compound each other's timeout.
638        let mut set = tokio::task::JoinSet::new();
639        for (name, capsule) in capsules {
640            set.spawn(async move {
641                let status = capsule.wait_ready(timeout).await;
642                (name, status)
643            });
644        }
645        while let Some(result) = set.join_next().await {
646            if let Ok((name, status)) = result {
647                match status {
648                    ReadyStatus::Ready => {},
649                    ReadyStatus::Timeout => {
650                        tracing::warn!(
651                            capsule = %name,
652                            timeout_ms = timeout.as_millis(),
653                            "Capsule did not signal ready within timeout"
654                        );
655                    },
656                    ReadyStatus::Crashed => {
657                        tracing::error!(
658                            capsule = %name,
659                            "Capsule run loop exited before signaling ready"
660                        );
661                    },
662                }
663            }
664        }
665    }
666
667    /// Collect all tool definitions from loaded capsule manifests and write
668    /// them to every capsule's scoped KV namespace as `tool_schemas`.
669    async fn inject_tool_schemas(&self) {
670        use astrid_events::llm::LlmToolDefinition;
671        use astrid_storage::KvStore;
672
673        // Collect tools and capsule IDs under a short-lived read lock,
674        // then drop the guard before the async KV write loop. Holding
675        // the RwLock across N awaits would block all registry writers.
676        let (all_tools, capsule_ids) = {
677            let registry = self.capsules.read().await;
678            let tools: Vec<LlmToolDefinition> = registry
679                .values()
680                .flat_map(|capsule| {
681                    capsule.manifest().tools.iter().map(|t| LlmToolDefinition {
682                        name: t.name.clone(),
683                        description: Some(t.description.clone()),
684                        input_schema: t.input_schema.clone(),
685                    })
686                })
687                .collect();
688            let ids: Vec<String> = registry.list().iter().map(ToString::to_string).collect();
689            (tools, ids)
690        };
691
692        if all_tools.is_empty() {
693            return;
694        }
695
696        let tool_bytes = match serde_json::to_vec(&all_tools) {
697            Ok(b) => b,
698            Err(e) => {
699                tracing::error!(error = %e, "Failed to serialize tool schemas");
700                return;
701            },
702        };
703
704        tracing::info!(
705            tool_count = all_tools.len(),
706            "Injecting tool schemas into capsule KV stores"
707        );
708
709        for capsule_id in &capsule_ids {
710            let namespace = format!("capsule:{capsule_id}");
711            if let Err(e) = self
712                .kv
713                .set(&namespace, "tool_schemas", tool_bytes.clone())
714                .await
715            {
716                tracing::warn!(
717                    capsule = %capsule_id,
718                    error = %e,
719                    "Failed to inject tool schemas"
720                );
721            }
722        }
723    }
724}
725
726/// Open (or create) the persistent audit log and verify historical chain integrity.
727///
728/// Initialize the sandboxed overlay VFS.
729///
730/// Creates a lower (read-only workspace) and upper (session-scoped temp dir)
731/// layer, returning the overlay and the `TempDir` whose lifetime keeps the
732/// upper layer alive.
733async fn init_overlay_vfs(
734    root_handle: &DirHandle,
735    workspace_root: &Path,
736) -> Result<(Arc<OverlayVfs>, tempfile::TempDir), std::io::Error> {
737    let lower_vfs = HostVfs::new();
738    lower_vfs
739        .register_dir(root_handle.clone(), workspace_root.to_path_buf())
740        .await
741        .map_err(|_| std::io::Error::other("Failed to register lower vfs dir"))?;
742
743    let upper_temp = tempfile::TempDir::new()
744        .map_err(|e| std::io::Error::other(format!("Failed to create overlay temp dir: {e}")))?;
745    let upper_vfs = HostVfs::new();
746    upper_vfs
747        .register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
748        .await
749        .map_err(|_| std::io::Error::other("Failed to register upper vfs dir"))?;
750
751    let overlay = Arc::new(OverlayVfs::new(Box::new(lower_vfs), Box::new(upper_vfs)));
752    Ok((overlay, upper_temp))
753}
754
755/// Loads the runtime signing key from `~/.astrid/keys/runtime.key`, generating a
756/// new one if it doesn't exist. Opens the `SurrealKV`-backed audit database at
757/// `~/.astrid/audit.db` and runs `verify_all()` to detect any tampering of
758/// historical entries. Verification failures are logged at `error!` level but
759/// do not block boot (fail-open for availability, loud alert for integrity).
760fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
761    use astrid_core::dirs::AstridHome;
762
763    let home = AstridHome::resolve()
764        .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
765    home.ensure()
766        .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
767
768    let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
769    let audit_log = AuditLog::open(home.audit_db_path(), runtime_key)
770        .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
771
772    // Verify all historical chains on boot.
773    match audit_log.verify_all() {
774        Ok(results) => {
775            let total_sessions = results.len();
776            let mut tampered_sessions: usize = 0;
777
778            for (session_id, result) in &results {
779                if !result.valid {
780                    tampered_sessions = tampered_sessions.saturating_add(1);
781                    for issue in &result.issues {
782                        tracing::error!(
783                            session_id = %session_id,
784                            issue = %issue,
785                            "Audit chain integrity violation detected"
786                        );
787                    }
788                }
789            }
790
791            if tampered_sessions > 0 {
792                tracing::error!(
793                    total_sessions,
794                    tampered_sessions,
795                    "Audit chain verification found tampered sessions"
796                );
797            } else if total_sessions > 0 {
798                tracing::info!(
799                    total_sessions,
800                    "Audit chain verification passed for all sessions"
801                );
802            }
803        },
804        Err(e) => {
805            tracing::error!(error = %e, "Audit chain verification failed to run");
806        },
807    }
808
809    Ok(Arc::new(audit_log))
810}
811
812/// Load the runtime ed25519 signing key from disk, or generate and persist a new one.
813///
814/// The key file is 32 bytes of raw secret key material at `{keys_dir}/runtime.key`.
815fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
816    let key_path = keys_dir.join("runtime.key");
817
818    if key_path.exists() {
819        let bytes = std::fs::read(&key_path)?;
820        KeyPair::from_secret_key(&bytes).map_err(|e| {
821            std::io::Error::other(format!(
822                "invalid runtime key at {}: {e}",
823                key_path.display()
824            ))
825        })
826    } else {
827        let keypair = KeyPair::generate();
828        std::fs::create_dir_all(keys_dir)?;
829        std::fs::write(&key_path, keypair.secret_key_bytes())?;
830
831        // Secure permissions (owner-only) on Unix.
832        #[cfg(unix)]
833        {
834            use std::os::unix::fs::PermissionsExt;
835            std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
836        }
837
838        tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
839        Ok(keypair)
840    }
841}
842
843/// Spawns a background task that cleanly shuts down the Kernel if there is no activity.
844///
845/// Uses dual-signal idle detection:
846/// - **Primary:** explicit `active_connections` counter (incremented on first IPC
847///   message per source, decremented on `Disconnect`).
848/// - **Secondary:** `EventBus::subscriber_count()` minus the kernel router's own
849///   subscription. When a CLI process dies without sending `Disconnect`, its
850///   broadcast receiver is dropped so the subscriber count falls.
851///
852/// Takes the minimum of both signals to handle ungraceful disconnects.
853///
854/// Configurable via `ASTRID_IDLE_TIMEOUT_SECS` (default 300 = 5 minutes).
855/// Number of permanent internal event bus subscribers that are not client
856/// connections: `KernelRouter` (`kernel.request.*`), `ConnectionTracker` (`client.*`),
857/// and `EventDispatcher` (all events).
858const INTERNAL_SUBSCRIBER_COUNT: usize = 3;
859
860fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
861    tokio::spawn(async move {
862        let grace = std::time::Duration::from_secs(30);
863        let timeout_secs: u64 = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
864            .ok()
865            .and_then(|v| v.parse().ok())
866            .unwrap_or(300);
867        let idle_timeout = std::time::Duration::from_secs(timeout_secs);
868        let check_interval = std::time::Duration::from_secs(15);
869
870        tokio::time::sleep(grace).await;
871        let mut idle_since: Option<tokio::time::Instant> = None;
872
873        loop {
874            tokio::time::sleep(check_interval).await;
875
876            let connections = kernel.connection_count();
877
878            // Secondary signal: broadcast subscriber count. Subtract the
879            // permanent internal subscribers: KernelRouter (kernel.request.*),
880            // ConnectionTracker (client.*), and EventDispatcher (all events).
881            let bus_subscribers = kernel
882                .event_bus
883                .subscriber_count()
884                .saturating_sub(INTERNAL_SUBSCRIBER_COUNT);
885
886            // Take the minimum: if a CLI died without Disconnect, the counter
887            // stays inflated but the subscriber count drops.
888            let effective_connections = connections.min(bus_subscribers);
889
890            let has_daemons = {
891                let reg = kernel.capsules.read().await;
892                reg.values().any(|c| {
893                    let m = c.manifest();
894                    !m.uplinks.is_empty() || !m.cron_jobs.is_empty()
895                })
896            };
897
898            if effective_connections == 0 && !has_daemons {
899                let now = tokio::time::Instant::now();
900                let start = *idle_since.get_or_insert(now);
901                let elapsed = now.duration_since(start);
902
903                tracing::debug!(
904                    idle_secs = elapsed.as_secs(),
905                    timeout_secs,
906                    connections,
907                    bus_subscribers,
908                    "Kernel idle, monitoring timeout"
909                );
910
911                if elapsed >= idle_timeout {
912                    tracing::info!("Idle timeout reached, initiating shutdown");
913                    kernel.shutdown(Some("idle_timeout".to_string())).await;
914                    std::process::exit(0);
915                }
916            } else {
917                if idle_since.is_some() {
918                    tracing::debug!(
919                        effective_connections,
920                        has_daemons,
921                        "Activity detected, resetting idle timer"
922                    );
923                }
924                idle_since = None;
925            }
926        }
927    })
928}
929
930/// Tracks restart attempts for a single capsule with exponential backoff.
931struct RestartTracker {
932    attempts: u32,
933    last_attempt: std::time::Instant,
934    backoff: std::time::Duration,
935}
936
937impl RestartTracker {
938    const MAX_ATTEMPTS: u32 = 5;
939    const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
940    const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
941
942    fn new() -> Self {
943        Self {
944            attempts: 0,
945            last_attempt: std::time::Instant::now(),
946            backoff: Self::INITIAL_BACKOFF,
947        }
948    }
949
950    /// Returns `true` if a restart should be attempted now.
951    fn should_restart(&self) -> bool {
952        self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
953    }
954
955    /// Record a restart attempt and advance the backoff.
956    fn record_attempt(&mut self) {
957        self.attempts = self.attempts.saturating_add(1);
958        self.last_attempt = std::time::Instant::now();
959        self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
960    }
961
962    /// Returns `true` if all retry attempts have been exhausted.
963    fn exhausted(&self) -> bool {
964        self.attempts >= Self::MAX_ATTEMPTS
965    }
966}
967
968/// Attempts to restart a failed capsule, respecting backoff and max retries.
969///
970/// Returns `true` if the tracker should be removed (successful restart).
971async fn attempt_capsule_restart(
972    kernel: &Kernel,
973    id_str: &str,
974    tracker: &mut RestartTracker,
975) -> bool {
976    if tracker.exhausted() {
977        return false;
978    }
979
980    if !tracker.should_restart() {
981        tracing::debug!(
982            capsule_id = %id_str,
983            next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
984            "Waiting for backoff before next restart attempt"
985        );
986        return false;
987    }
988
989    tracker.record_attempt();
990    let attempt = tracker.attempts;
991
992    tracing::warn!(
993        capsule_id = %id_str,
994        attempt,
995        max_attempts = RestartTracker::MAX_ATTEMPTS,
996        "Attempting capsule restart"
997    );
998
999    let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1000    match kernel.restart_capsule(&capsule_id).await {
1001        Ok(()) => {
1002            tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1003            true
1004        },
1005        Err(e) => {
1006            tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1007            if tracker.exhausted() {
1008                tracing::error!(
1009                    capsule_id = %id_str,
1010                    "All restart attempts exhausted - capsule will remain down"
1011                );
1012            }
1013            false
1014        },
1015    }
1016}
1017
1018/// Spawns a background task that periodically probes capsule health.
1019///
1020/// Every 10 seconds, reads the capsule registry and calls `check_health()` on
1021/// each capsule that is currently in `Ready` state. If a capsule reports
1022/// `Failed`, attempts to restart it with exponential backoff (max 5 attempts).
1023/// Publishes `astrid.v1.health.failed` IPC events for each detected failure.
1024fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1025    tokio::spawn(async move {
1026        let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1027        interval.tick().await; // Skip the first immediate tick.
1028
1029        let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1030            std::collections::HashMap::new();
1031
1032        loop {
1033            interval.tick().await;
1034
1035            // Collect ready capsules under a brief read lock, then drop
1036            // the lock before calling check_health() or publishing events.
1037            let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1038                let registry = kernel.capsules.read().await;
1039                registry
1040                    .list()
1041                    .into_iter()
1042                    .filter_map(|id| {
1043                        let capsule = registry.get(id)?;
1044                        if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1045                            Some(capsule)
1046                        } else {
1047                            None
1048                        }
1049                    })
1050                    .collect()
1051            };
1052
1053            // Probe health once per capsule, collect failures, then drop
1054            // the Arc Vec before restarting. This ensures restart_capsule's
1055            // Arc::get_mut can succeed (no other strong references held).
1056            let mut failures: Vec<(String, String)> = Vec::new();
1057            for capsule in &ready_capsules {
1058                let health = capsule.check_health();
1059                if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1060                    let id_str = capsule.id().to_string();
1061                    tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1062
1063                    let msg = astrid_events::ipc::IpcMessage::new(
1064                        "astrid.v1.health.failed",
1065                        astrid_events::ipc::IpcPayload::Custom {
1066                            data: serde_json::json!({
1067                                "capsule_id": &id_str,
1068                                "reason": &reason,
1069                            }),
1070                        },
1071                        uuid::Uuid::new_v4(),
1072                    );
1073                    let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1074                        metadata: astrid_events::EventMetadata::new("kernel"),
1075                        message: msg,
1076                    });
1077                    failures.push((id_str, reason));
1078                }
1079            }
1080
1081            // Drop all Arc clones so restart_capsule's Arc::get_mut can
1082            // obtain exclusive access for calling unload().
1083            drop(ready_capsules);
1084
1085            let failed_this_tick: std::collections::HashSet<&str> =
1086                failures.iter().map(|(id, _)| id.as_str()).collect();
1087
1088            let mut restarted = Vec::new();
1089            for (id_str, _reason) in &failures {
1090                let tracker = restart_trackers
1091                    .entry(id_str.clone())
1092                    .or_insert_with(RestartTracker::new);
1093
1094                if attempt_capsule_restart(&kernel, id_str, tracker).await {
1095                    restarted.push(id_str.clone());
1096                }
1097            }
1098
1099            // Remove trackers for successfully restarted capsules.
1100            for id in &restarted {
1101                restart_trackers.remove(id);
1102            }
1103
1104            // Prune trackers for capsules that recovered (healthy this tick).
1105            // Keep exhausted trackers and trackers still in their backoff
1106            // window (capsule may have been unregistered by a failed restart
1107            // attempt and won't appear in ready_capsules next tick).
1108            restart_trackers.retain(|id, tracker| {
1109                if tracker.exhausted() {
1110                    return true;
1111                }
1112                // Keep if still within backoff - the capsule may be absent
1113                // from the registry after a failed reload.
1114                if tracker.last_attempt.elapsed() < tracker.backoff {
1115                    return true;
1116                }
1117                failed_this_tick.contains(id.as_str())
1118            });
1119        }
1120    })
1121}
1122
1123/// Spawns a periodic watchdog that publishes `astrid.v1.watchdog.tick` events every 5 seconds.
1124///
1125/// The `ReAct` capsule (WASM guest) cannot use async timers, so this kernel-side task
1126/// drives timeout enforcement by waking the capsule on a fixed interval. Each tick
1127/// causes the capsule's `handle_watchdog_tick` interceptor to run `check_phase_timeout`.
1128fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1129    tokio::spawn(async move {
1130        let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1131        // The first tick fires immediately - skip it to give capsules time to load.
1132        interval.tick().await;
1133
1134        loop {
1135            interval.tick().await;
1136
1137            let msg = astrid_events::ipc::IpcMessage::new(
1138                "astrid.v1.watchdog.tick",
1139                astrid_events::ipc::IpcPayload::Custom {
1140                    data: serde_json::json!({}),
1141                },
1142                uuid::Uuid::new_v4(),
1143            );
1144            let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1145                metadata: astrid_events::EventMetadata::new("kernel"),
1146                message: msg,
1147            });
1148        }
1149    })
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154    use super::*;
1155
1156    #[test]
1157    fn test_load_or_generate_creates_new_key() {
1158        let dir = tempfile::tempdir().unwrap();
1159        let keys_dir = dir.path().join("keys");
1160
1161        let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1162        let key_path = keys_dir.join("runtime.key");
1163
1164        // Key file should exist with 32 bytes.
1165        assert!(key_path.exists());
1166        let bytes = std::fs::read(&key_path).unwrap();
1167        assert_eq!(bytes.len(), 32);
1168
1169        // The written bytes should reconstruct the same public key.
1170        let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1171        assert_eq!(
1172            keypair.public_key_bytes(),
1173            reloaded.public_key_bytes(),
1174            "reloaded key should match generated key"
1175        );
1176    }
1177
1178    #[test]
1179    fn test_load_or_generate_is_idempotent() {
1180        let dir = tempfile::tempdir().unwrap();
1181        let keys_dir = dir.path().join("keys");
1182
1183        let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1184        let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1185
1186        assert_eq!(
1187            first.public_key_bytes(),
1188            second.public_key_bytes(),
1189            "loading the same key file should produce the same keypair"
1190        );
1191    }
1192
1193    #[test]
1194    fn test_load_or_generate_rejects_bad_key_length() {
1195        let dir = tempfile::tempdir().unwrap();
1196        let keys_dir = dir.path().join("keys");
1197        std::fs::create_dir_all(&keys_dir).unwrap();
1198
1199        // Write a key file with wrong length.
1200        std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1201
1202        let result = load_or_generate_runtime_key(&keys_dir);
1203        assert!(result.is_err());
1204        let err = result.unwrap_err().to_string();
1205        assert!(
1206            err.contains("invalid runtime key"),
1207            "expected 'invalid runtime key' error, got: {err}"
1208        );
1209    }
1210
1211    #[test]
1212    fn test_connection_counter_increment_decrement() {
1213        let counter = AtomicUsize::new(0);
1214
1215        // Simulate connection_opened (fetch_add)
1216        counter.fetch_add(1, Ordering::Relaxed);
1217        counter.fetch_add(1, Ordering::Relaxed);
1218        assert_eq!(counter.load(Ordering::Relaxed), 2);
1219
1220        // Simulate connection_closed using the same fetch_update logic
1221        // as the real implementation to exercise the actual code path.
1222        for expected in [1, 0] {
1223            let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1224                if n == 0 {
1225                    None
1226                } else {
1227                    Some(n.saturating_sub(1))
1228                }
1229            });
1230            assert_eq!(counter.load(Ordering::Relaxed), expected);
1231        }
1232    }
1233
1234    #[test]
1235    fn test_connection_counter_underflow_guard() {
1236        // Test the saturating behavior: decrementing from 0 should stay at 0.
1237        // Mirrors the fetch_update logic in connection_closed().
1238        let counter = AtomicUsize::new(0);
1239
1240        let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1241            if n == 0 { None } else { Some(n - 1) }
1242        });
1243        // fetch_update returns Err(0) when the closure returns None (no-op).
1244        assert!(result.is_err());
1245        assert_eq!(counter.load(Ordering::Relaxed), 0);
1246    }
1247
1248    /// Mirrors the `connection_closed()` logic: only `Ok(1)` (previous value 1,
1249    /// now 0) triggers `clear_session_allowances`. Update this test if
1250    /// `connection_closed()` is refactored.
1251    #[test]
1252    fn test_last_disconnect_clears_session_allowances() {
1253        use astrid_approval::AllowanceStore;
1254        use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1255        use astrid_core::types::Timestamp;
1256        use astrid_crypto::KeyPair;
1257
1258        let store = AllowanceStore::new();
1259        let keypair = KeyPair::generate();
1260
1261        // Session-only allowance (should be cleared on last disconnect).
1262        store
1263            .add_allowance(Allowance {
1264                id: AllowanceId::new(),
1265                action_pattern: AllowancePattern::ServerTools {
1266                    server: "session-server".to_string(),
1267                },
1268                created_at: Timestamp::now(),
1269                expires_at: None,
1270                max_uses: None,
1271                uses_remaining: None,
1272                session_only: true,
1273                workspace_root: None,
1274                signature: keypair.sign(b"test"),
1275            })
1276            .unwrap();
1277
1278        // Persistent allowance (should survive).
1279        store
1280            .add_allowance(Allowance {
1281                id: AllowanceId::new(),
1282                action_pattern: AllowancePattern::ServerTools {
1283                    server: "persistent-server".to_string(),
1284                },
1285                created_at: Timestamp::now(),
1286                expires_at: None,
1287                max_uses: None,
1288                uses_remaining: None,
1289                session_only: false,
1290                workspace_root: None,
1291                signature: keypair.sign(b"test"),
1292            })
1293            .unwrap();
1294
1295        assert_eq!(store.count(), 2);
1296
1297        let counter = AtomicUsize::new(2);
1298        let simulate_disconnect = || {
1299            let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1300                if n == 0 {
1301                    None
1302                } else {
1303                    Some(n.saturating_sub(1))
1304                }
1305            });
1306            if result == Ok(1) {
1307                store.clear_session_allowances();
1308            }
1309        };
1310
1311        // Two connections active. First disconnect: 2 -> 1 (not last).
1312        simulate_disconnect();
1313        assert_eq!(
1314            store.count(),
1315            2,
1316            "both allowances should survive non-final disconnect"
1317        );
1318
1319        // Second disconnect: 1 -> 0 (last client gone).
1320        simulate_disconnect();
1321        assert_eq!(
1322            store.count(),
1323            1,
1324            "session allowance should be cleared on last disconnect"
1325        );
1326    }
1327
1328    #[cfg(unix)]
1329    #[test]
1330    fn test_load_or_generate_sets_secure_permissions() {
1331        use std::os::unix::fs::PermissionsExt;
1332
1333        let dir = tempfile::tempdir().unwrap();
1334        let keys_dir = dir.path().join("keys");
1335
1336        let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1337
1338        let key_path = keys_dir.join("runtime.key");
1339        let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1340        assert_eq!(
1341            mode & 0o777,
1342            0o600,
1343            "key file should have 0o600 permissions, got {mode:#o}"
1344        );
1345    }
1346
1347    #[test]
1348    fn restart_tracker_initial_state() {
1349        let tracker = RestartTracker::new();
1350        assert!(!tracker.exhausted());
1351        // Should not restart immediately (backoff hasn't elapsed).
1352        assert!(!tracker.should_restart());
1353    }
1354
1355    #[test]
1356    fn restart_tracker_allows_restart_after_backoff() {
1357        let mut tracker = RestartTracker::new();
1358        // Simulate time passing by setting last_attempt in the past.
1359        tracker.last_attempt = std::time::Instant::now()
1360            - RestartTracker::INITIAL_BACKOFF
1361            - std::time::Duration::from_millis(1);
1362        assert!(tracker.should_restart());
1363    }
1364
1365    #[test]
1366    fn restart_tracker_doubles_backoff() {
1367        let mut tracker = RestartTracker::new();
1368        assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1369
1370        tracker.record_attempt();
1371        assert_eq!(
1372            tracker.backoff,
1373            RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1374        );
1375        assert_eq!(tracker.attempts, 1);
1376
1377        tracker.record_attempt();
1378        assert_eq!(
1379            tracker.backoff,
1380            RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1381        );
1382        assert_eq!(tracker.attempts, 2);
1383    }
1384
1385    #[test]
1386    fn restart_tracker_backoff_caps_at_max() {
1387        let mut tracker = RestartTracker::new();
1388        for _ in 0..20 {
1389            tracker.record_attempt();
1390        }
1391        assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1392    }
1393
1394    #[test]
1395    fn restart_tracker_exhausted_at_max_attempts() {
1396        let mut tracker = RestartTracker::new();
1397        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1398            assert!(!tracker.exhausted());
1399            tracker.record_attempt();
1400        }
1401        assert!(tracker.exhausted());
1402    }
1403
1404    #[test]
1405    fn restart_tracker_should_restart_false_when_exhausted() {
1406        let mut tracker = RestartTracker::new();
1407        for _ in 0..RestartTracker::MAX_ATTEMPTS {
1408            tracker.record_attempt();
1409        }
1410        // Even if backoff has elapsed, exhausted tracker should not restart.
1411        tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1412        assert!(!tracker.should_restart());
1413    }
1414}
1415
1416// ---------------------------------------------------------------------------
1417// Identity bootstrap helpers
1418// ---------------------------------------------------------------------------
1419
1420/// Bootstrap the CLI root user identity at kernel boot.
1421///
1422/// Creates a deterministic root `AstridUserId` on first boot, or reloads it
1423/// on subsequent boots. Auto-links with `platform="cli"`,
1424/// `platform_user_id="local"`, `method="system"`.
1425///
1426/// Idempotent: skips creation if the root user already exists.
1427async fn bootstrap_cli_root_user(
1428    store: &Arc<dyn astrid_storage::IdentityStore>,
1429) -> Result<(), astrid_storage::IdentityError> {
1430    // Check if root user already exists by trying to resolve the CLI link.
1431    if let Some(_user) = store.resolve("cli", "local").await? {
1432        tracing::debug!("CLI root user already linked");
1433        return Ok(());
1434    }
1435
1436    // No CLI link exists. Create or find the root user.
1437    let user = store.create_user(Some("root")).await?;
1438    tracing::info!(user_id = %user.id, "Created CLI root user");
1439
1440    // Link the CLI platform identity.
1441    store.link("cli", "local", user.id, "system").await?;
1442    tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1443
1444    Ok(())
1445}
1446
1447/// Apply pre-configured identity links from the config file.
1448///
1449/// For each `[[identity.links]]` entry, resolves or creates the referenced
1450/// Astrid user and links the platform identity. Logs warnings on failure
1451/// but does not abort boot.
1452async fn apply_identity_config(
1453    store: &Arc<dyn astrid_storage::IdentityStore>,
1454    workspace_root: &std::path::Path,
1455) {
1456    let config = match astrid_config::Config::load(Some(workspace_root)) {
1457        Ok(resolved) => resolved.config,
1458        Err(e) => {
1459            tracing::debug!(error = %e, "No config loaded for identity links");
1460            return;
1461        },
1462    };
1463
1464    for link_cfg in &config.identity.links {
1465        let result = apply_single_identity_link(store, link_cfg).await;
1466        if let Err(e) = result {
1467            tracing::warn!(
1468                platform = %link_cfg.platform,
1469                platform_user_id = %link_cfg.platform_user_id,
1470                astrid_user = %link_cfg.astrid_user,
1471                error = %e,
1472                "Failed to apply identity link from config"
1473            );
1474        }
1475    }
1476}
1477
1478/// Apply a single identity link from config.
1479async fn apply_single_identity_link(
1480    store: &Arc<dyn astrid_storage::IdentityStore>,
1481    link_cfg: &astrid_config::types::IdentityLinkConfig,
1482) -> Result<(), astrid_storage::IdentityError> {
1483    // Resolve astrid_user: try UUID first, then name lookup, then create.
1484    let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
1485        // Ensure user record exists. If the UUID was explicitly specified in
1486        // config but doesn't exist in the store, that's a configuration error
1487        // - don't silently create a different user.
1488        if store.get_user(uuid).await?.is_none() {
1489            return Err(astrid_storage::IdentityError::UserNotFound(uuid));
1490        }
1491        uuid
1492    } else {
1493        // Try name lookup.
1494        if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
1495            user.id
1496        } else {
1497            let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
1498            tracing::info!(
1499                user_id = %user.id,
1500                name = %link_cfg.astrid_user,
1501                "Created user from config identity link"
1502            );
1503            user.id
1504        }
1505    };
1506
1507    let method = if link_cfg.method.is_empty() {
1508        "admin"
1509    } else {
1510        &link_cfg.method
1511    };
1512
1513    // Check if link already points to the correct user - skip if idempotent.
1514    if let Some(existing) = store
1515        .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
1516        .await?
1517        && existing.id == user_id
1518    {
1519        tracing::debug!(
1520            platform = %link_cfg.platform,
1521            platform_user_id = %link_cfg.platform_user_id,
1522            user_id = %user_id,
1523            "Identity link from config already exists"
1524        );
1525        return Ok(());
1526    }
1527
1528    store
1529        .link(
1530            &link_cfg.platform,
1531            &link_cfg.platform_user_id,
1532            user_id,
1533            method,
1534        )
1535        .await?;
1536
1537    tracing::info!(
1538        platform = %link_cfg.platform,
1539        platform_user_id = %link_cfg.platform_user_id,
1540        user_id = %user_id,
1541        "Applied identity link from config"
1542    );
1543
1544    Ok(())
1545}