1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7pub mod kernel_router;
16pub mod socket;
18
19use astrid_audit::AuditLog;
20use astrid_capabilities::{CapabilityStore, DirHandle};
21use astrid_capsule::registry::CapsuleRegistry;
22use astrid_core::SessionId;
23use astrid_crypto::KeyPair;
24use astrid_events::EventBus;
25use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
26use astrid_vfs::{HostVfs, OverlayVfs, Vfs};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
30use tokio::sync::RwLock;
31
32pub struct Kernel {
34 pub session_id: SessionId,
36 pub event_bus: Arc<EventBus>,
38 pub capsules: Arc<RwLock<CapsuleRegistry>>,
40 pub mcp: SecureMcpClient,
42 pub capabilities: Arc<CapabilityStore>,
44 pub vfs: Arc<dyn Vfs>,
46 pub overlay_vfs: Arc<OverlayVfs>,
48 _upper_dir: Arc<tempfile::TempDir>,
51 pub vfs_root_handle: DirHandle,
53 pub workspace_root: PathBuf,
55 pub home_root: Option<PathBuf>,
63 pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
65 pub kv: Arc<astrid_storage::SurrealKvStore>,
67 pub audit_log: Arc<AuditLog>,
69 pub active_connections: AtomicUsize,
71 pub ephemeral: AtomicBool,
73 pub boot_time: std::time::Instant,
75 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
78 pub session_token: Arc<astrid_core::session_token::SessionToken>,
81 token_path: PathBuf,
84 pub allowance_store: Arc<astrid_approval::AllowanceStore>,
89 identity_store: Arc<dyn astrid_storage::IdentityStore>,
91}
92
93impl Kernel {
94 pub async fn new(
105 session_id: SessionId,
106 workspace_root: PathBuf,
107 ) -> Result<Arc<Self>, std::io::Error> {
108 use astrid_core::dirs::AstridHome;
109
110 assert!(
111 tokio::runtime::Handle::current().runtime_flavor()
112 == tokio::runtime::RuntimeFlavor::MultiThread,
113 "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
114 single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
115 );
116
117 let event_bus = Arc::new(EventBus::new());
118 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
119
120 let home = AstridHome::resolve().map_err(|e| {
123 std::io::Error::other(format!(
124 "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
125 ))
126 })?;
127
128 let default_principal = astrid_core::PrincipalId::default();
132 let principal_home = home.principal_home(&default_principal);
133 let home_root = Some(principal_home.root().to_path_buf());
134
135 let kv_path = home.state_db_path();
137 let kv = Arc::new(
138 astrid_storage::SurrealKvStore::open(&kv_path)
139 .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
140 );
141 let mcp_config = ServersConfig::load_default().unwrap_or_default();
147 let mcp_manager = ServerManager::new(mcp_config)
148 .with_workspace_root(workspace_root.clone())
149 .with_capsule_log_dir(principal_home.log_dir());
150 let mcp_client = McpClient::new(mcp_manager);
151
152 let capabilities = Arc::new(
155 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
156 .map_err(|e| {
157 std::io::Error::other(format!("Failed to init capability store: {e}"))
158 })?,
159 );
160 let audit_log = open_audit_log()?;
161 let mcp = SecureMcpClient::new(
162 mcp_client,
163 Arc::clone(&capabilities),
164 Arc::clone(&audit_log),
165 session_id.clone(),
166 );
167
168 let root_handle = DirHandle::new();
170
171 let (overlay_vfs, upper_temp) = init_overlay_vfs(&root_handle, &workspace_root).await?;
173
174 let listener = socket::bind_session_socket()?;
179 let (session_token, token_path) = socket::generate_session_token()?;
180
181 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
182
183 let identity_kv = astrid_storage::ScopedKvStore::new(
185 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
186 "system:identity",
187 )
188 .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
189 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
190 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
191
192 bootstrap_cli_root_user(&identity_store)
194 .await
195 .map_err(|e| {
196 std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
197 })?;
198
199 apply_identity_config(&identity_store, &workspace_root).await;
201
202 let kernel = Arc::new(Self {
203 session_id,
204 event_bus,
205 capsules,
206 mcp,
207 capabilities,
208 vfs: Arc::clone(&overlay_vfs) as Arc<dyn Vfs>,
209 overlay_vfs,
210 _upper_dir: Arc::new(upper_temp),
211 vfs_root_handle: root_handle,
212 workspace_root,
213 home_root,
214 cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
215 kv,
216 audit_log,
217 active_connections: AtomicUsize::new(0),
218 ephemeral: AtomicBool::new(false),
219 boot_time: std::time::Instant::now(),
220 shutdown_tx: tokio::sync::watch::channel(false).0,
221 session_token: Arc::new(session_token),
222 token_path,
223 allowance_store,
224 identity_store,
225 });
226
227 drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
228 drop(spawn_idle_monitor(Arc::clone(&kernel)));
229 drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
230 drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
231
232 let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
235 Arc::clone(&kernel.capsules),
236 Arc::clone(&kernel.event_bus),
237 )
238 .with_identity_store(Arc::clone(&kernel.identity_store));
239 tokio::spawn(dispatcher.run());
240
241 debug_assert_eq!(
242 kernel.event_bus.subscriber_count(),
243 INTERNAL_SUBSCRIBER_COUNT,
244 "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
245 );
246
247 Ok(kernel)
248 }
249
250 async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
256 let manifest_path = dir.join("Capsule.toml");
257 let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
258 .map_err(|e| anyhow::anyhow!(e))?;
259
260 {
263 let registry = self.capsules.read().await;
264 let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
265 if registry.get(&id).is_some() {
266 return Ok(());
267 }
268 }
269
270 let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
271 let mut capsule = loader.create_capsule(manifest, dir.clone())?;
272
273 let principal = astrid_core::PrincipalId::default();
276 let kv = astrid_storage::ScopedKvStore::new(
277 Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
278 format!("{principal}:capsule:{}", capsule.id()),
279 )?;
280
281 let capsule_name = capsule.id().to_string();
284 let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
285 let ph = home.principal_home(&principal);
286 let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
287 if principal_env.exists() {
288 principal_env
289 } else {
290 dir.join(".env.json")
291 }
292 } else {
293 dir.join(".env.json")
294 };
295 if env_path.exists()
296 && let Ok(contents) = std::fs::read_to_string(&env_path)
297 && let Ok(env_map) =
298 serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
299 {
300 for (k, v) in env_map {
301 let _ = kv.set(&k, v.into_bytes()).await;
302 }
303 }
304
305 let ctx = astrid_capsule::context::CapsuleContext::new(
306 principal.clone(),
307 self.workspace_root.clone(),
308 self.home_root.clone(),
309 kv,
310 Arc::clone(&self.event_bus),
311 self.cli_socket_listener.clone(),
312 )
313 .with_registry(Arc::clone(&self.capsules))
314 .with_session_token(Arc::clone(&self.session_token))
315 .with_allowance_store(Arc::clone(&self.allowance_store))
316 .with_identity_store(Arc::clone(&self.identity_store));
317
318 capsule.load(&ctx).await?;
319
320 let mut registry = self.capsules.write().await;
321 registry
322 .register(capsule)
323 .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
324
325 Ok(())
326 }
327
328 async fn restart_capsule(
335 &self,
336 id: &astrid_capsule::capsule::CapsuleId,
337 ) -> Result<(), anyhow::Error> {
338 let source_dir = {
340 let registry = self.capsules.read().await;
341 let capsule = registry
342 .get(id)
343 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
344 capsule
345 .source_dir()
346 .map(std::path::Path::to_path_buf)
347 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
348 };
349
350 let old_capsule = {
354 let mut registry = self.capsules.write().await;
355 registry
356 .unregister(id)
357 .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
358 };
359 {
364 let mut old = old_capsule;
365 if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
366 if let Err(e) = capsule.unload().await {
367 tracing::warn!(
368 capsule_id = %id,
369 error = %e,
370 "Capsule unload failed during restart"
371 );
372 }
373 } else {
374 tracing::warn!(
375 capsule_id = %id,
376 "Cannot call unload during restart - Arc still held by in-flight task"
377 );
378 }
379 }
380
381 self.load_capsule(source_dir).await?;
383
384 let capsule = {
393 let registry = self.capsules.read().await;
394 registry.get(id)
395 };
396 if let Some(capsule) = capsule
397 && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[], None)
398 {
399 tracing::debug!(
400 capsule_id = %id,
401 error = %e,
402 "Capsule does not handle lifecycle restart (optional)"
403 );
404 }
405
406 Ok(())
407 }
408
409 pub async fn load_all_capsules(&self) {
418 use astrid_capsule::toposort::toposort_manifests;
419 use astrid_core::dirs::AstridHome;
420
421 let mut paths = Vec::new();
423 if let Ok(home) = AstridHome::resolve() {
424 let principal = astrid_core::PrincipalId::default();
425 paths.push(home.principal_home(&principal).capsules_dir());
426 }
427
428 let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
429
430 let sorted = match toposort_manifests(discovered) {
434 Ok(sorted) => sorted,
435 Err((e, original)) => {
436 tracing::error!(
437 cycle = %e,
438 "Dependency cycle in capsules, falling back to discovery order"
439 );
440 original
441 },
442 };
443
444 for (manifest, _) in &sorted {
448 if manifest.capabilities.uplink && manifest.has_imports() {
449 tracing::warn!(
450 capsule = %manifest.package.name,
451 "Uplink capsule has [imports] - \
452 this should have been rejected at manifest load time"
453 );
454 }
455 }
456
457 validate_imports_exports(&sorted);
459
460 let (uplinks, others): (Vec<_>, Vec<_>) =
467 sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
468
469 let uplink_names: Vec<String> = uplinks
471 .iter()
472 .map(|(m, _)| m.package.name.clone())
473 .collect();
474 for (manifest, dir) in &uplinks {
475 if let Err(e) = self.load_capsule(dir.clone()).await {
476 tracing::warn!(
477 capsule = %manifest.package.name,
478 error = %e,
479 "Failed to load uplink capsule during discovery"
480 );
481 }
482 }
483
484 self.await_capsule_readiness(&uplink_names).await;
487
488 for (manifest, dir) in &others {
489 if let Err(e) = self.load_capsule(dir.clone()).await {
490 tracing::warn!(
491 capsule = %manifest.package.name,
492 error = %e,
493 "Failed to load capsule during discovery"
494 );
495 }
496 }
497
498 let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
501 self.await_capsule_readiness(&other_names).await;
502
503 let msg = astrid_events::ipc::IpcMessage::new(
507 "astrid.v1.capsules_loaded",
508 astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
509 self.session_id.0,
510 );
511 let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
512 metadata: astrid_events::EventMetadata::new("kernel"),
513 message: msg,
514 });
515 }
516
517 pub fn connection_opened(&self) {
519 self.active_connections.fetch_add(1, Ordering::Relaxed);
520 }
521
522 pub fn connection_closed(&self) {
530 let result =
531 self.active_connections
532 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
533 if n == 0 {
534 None
535 } else {
536 Some(n.saturating_sub(1))
537 }
538 });
539
540 if result == Ok(1) {
543 self.allowance_store.clear_session_allowances();
544 tracing::info!("last client disconnected, session allowances cleared");
545 }
546 }
547
548 pub fn set_ephemeral(&self, val: bool) {
550 self.ephemeral.store(val, Ordering::Relaxed);
551 }
552
553 pub fn connection_count(&self) -> usize {
555 self.active_connections.load(Ordering::Relaxed)
556 }
557
558 pub async fn shutdown(&self, reason: Option<String>) {
565 tracing::info!(reason = ?reason, "Kernel shutting down");
566
567 let _ = self
569 .event_bus
570 .publish(astrid_events::AstridEvent::KernelShutdown {
571 metadata: astrid_events::EventMetadata::new("kernel"),
572 reason: reason.clone(),
573 });
574
575 let capsules = {
585 let mut reg = self.capsules.write().await;
586 reg.drain()
587 };
588 for mut arc in capsules {
589 let id = arc.id().clone();
590 let mut unloaded = false;
591
592 for retry in 0..20_u32 {
593 if let Some(capsule) = Arc::get_mut(&mut arc) {
594 if let Err(e) = capsule.unload().await {
595 tracing::warn!(
596 capsule_id = %id,
597 error = %e,
598 "Failed to unload capsule during shutdown"
599 );
600 }
601 unloaded = true;
602 break;
603 }
604 if retry < 19 {
605 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
606 }
607 }
608
609 if !unloaded {
610 tracing::warn!(
611 capsule_id = %id,
612 strong_count = Arc::strong_count(&arc),
613 "Dropping capsule without explicit unload after retries exhausted; \
614 MCP child processes may be orphaned"
615 );
616 }
617 drop(arc);
618 }
619
620 if let Err(e) = self.kv.close().await {
622 tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
623 }
624
625 let socket_path = crate::socket::kernel_socket_path();
632 let _ = std::fs::remove_file(&socket_path);
633 let _ = std::fs::remove_file(&self.token_path);
634 crate::socket::remove_readiness_file();
635
636 tracing::info!("Kernel shutdown complete");
637 }
638
639 async fn await_capsule_readiness(&self, names: &[String]) {
645 use astrid_capsule::capsule::ReadyStatus;
646
647 if names.is_empty() {
648 return;
649 }
650
651 let timeout = std::time::Duration::from_millis(500);
652 let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
653 let registry = self.capsules.read().await;
654 names
655 .iter()
656 .filter_map(
657 |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
658 Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
659 Err(e) => {
660 tracing::warn!(
661 capsule = %name,
662 error = %e,
663 "Invalid capsule ID, skipping readiness wait"
664 );
665 None
666 },
667 },
668 )
669 .collect()
670 };
671
672 let mut set = tokio::task::JoinSet::new();
675 for (name, capsule) in capsules {
676 set.spawn(async move {
677 let status = capsule.wait_ready(timeout).await;
678 (name, status)
679 });
680 }
681 while let Some(result) = set.join_next().await {
682 if let Ok((name, status)) = result {
683 match status {
684 ReadyStatus::Ready => {},
685 ReadyStatus::Timeout => {
686 tracing::warn!(
687 capsule = %name,
688 timeout_ms = timeout.as_millis(),
689 "Capsule did not signal ready within timeout"
690 );
691 },
692 ReadyStatus::Crashed => {
693 tracing::error!(
694 capsule = %name,
695 "Capsule run loop exited before signaling ready"
696 );
697 },
698 }
699 }
700 }
701 }
702}
703
704async fn init_overlay_vfs(
712 root_handle: &DirHandle,
713 workspace_root: &Path,
714) -> Result<(Arc<OverlayVfs>, tempfile::TempDir), std::io::Error> {
715 let lower_vfs = HostVfs::new();
716 lower_vfs
717 .register_dir(root_handle.clone(), workspace_root.to_path_buf())
718 .await
719 .map_err(|_| std::io::Error::other("Failed to register lower vfs dir"))?;
720
721 let upper_temp = tempfile::TempDir::new()
722 .map_err(|e| std::io::Error::other(format!("Failed to create overlay temp dir: {e}")))?;
723 let upper_vfs = HostVfs::new();
724 upper_vfs
725 .register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
726 .await
727 .map_err(|_| std::io::Error::other("Failed to register upper vfs dir"))?;
728
729 let overlay = Arc::new(OverlayVfs::new(Box::new(lower_vfs), Box::new(upper_vfs)));
730 Ok((overlay, upper_temp))
731}
732
733fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
739 use astrid_core::dirs::AstridHome;
740
741 let home = AstridHome::resolve()
742 .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
743 home.ensure()
744 .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
745
746 let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
747 let default_principal = astrid_core::PrincipalId::default();
748 let principal_home = home.principal_home(&default_principal);
749 principal_home
750 .ensure()
751 .map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
752 let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
753 .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
754
755 match audit_log.verify_all() {
757 Ok(results) => {
758 let total_sessions = results.len();
759 let mut tampered_sessions: usize = 0;
760
761 for (session_id, result) in &results {
762 if !result.valid {
763 tampered_sessions = tampered_sessions.saturating_add(1);
764 for issue in &result.issues {
765 tracing::error!(
766 session_id = %session_id,
767 issue = %issue,
768 "Audit chain integrity violation detected"
769 );
770 }
771 }
772 }
773
774 if tampered_sessions > 0 {
775 tracing::error!(
776 total_sessions,
777 tampered_sessions,
778 "Audit chain verification found tampered sessions"
779 );
780 } else if total_sessions > 0 {
781 tracing::info!(
782 total_sessions,
783 "Audit chain verification passed for all sessions"
784 );
785 }
786 },
787 Err(e) => {
788 tracing::error!(error = %e, "Audit chain verification failed to run");
789 },
790 }
791
792 Ok(Arc::new(audit_log))
793}
794
795fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
799 let key_path = keys_dir.join("runtime.key");
800
801 if key_path.exists() {
802 let bytes = std::fs::read(&key_path)?;
803 KeyPair::from_secret_key(&bytes).map_err(|e| {
804 std::io::Error::other(format!(
805 "invalid runtime key at {}: {e}",
806 key_path.display()
807 ))
808 })
809 } else {
810 let keypair = KeyPair::generate();
811 std::fs::create_dir_all(keys_dir)?;
812 std::fs::write(&key_path, keypair.secret_key_bytes())?;
813
814 #[cfg(unix)]
816 {
817 use std::os::unix::fs::PermissionsExt;
818 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
819 }
820
821 tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
822 Ok(keypair)
823 }
824}
825
826const INTERNAL_SUBSCRIBER_COUNT: usize = 3;
842
843const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
845const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
847const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
849const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
851const IDLE_DEFAULT_TIMEOUT: std::time::Duration = std::time::Duration::from_mins(5);
853
854fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
855 tokio::spawn(async move {
856 tokio::time::sleep(IDLE_INITIAL_GRACE).await;
859
860 let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
862 let idle_timeout = if ephemeral {
863 std::time::Duration::from_secs(30)
867 } else {
868 std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
869 .ok()
870 .and_then(|v| v.parse().ok())
871 .map_or(IDLE_DEFAULT_TIMEOUT, std::time::Duration::from_secs)
872 };
873 let check_interval = if ephemeral {
874 IDLE_EPHEMERAL_CHECK_INTERVAL
875 } else {
876 IDLE_CHECK_INTERVAL
877 };
878
879 if !ephemeral {
881 tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
882 }
883 let mut idle_since: Option<tokio::time::Instant> = None;
884
885 loop {
886 tokio::time::sleep(check_interval).await;
887
888 let connections = kernel.connection_count();
889
890 let effective_connections = connections;
896
897 let has_daemons = {
898 let reg = kernel.capsules.read().await;
899 reg.values().any(|c| {
900 let m = c.manifest();
901 !m.uplinks.is_empty()
902 })
903 };
904
905 if effective_connections == 0 && !has_daemons {
906 let now = tokio::time::Instant::now();
907 let start = *idle_since.get_or_insert(now);
908 let elapsed = now.duration_since(start);
909
910 tracing::debug!(
911 idle_secs = elapsed.as_secs(),
912 timeout_secs = idle_timeout.as_secs(),
913 connections,
914 "Kernel idle, monitoring timeout"
915 );
916
917 if elapsed >= idle_timeout {
918 tracing::info!("Idle timeout reached, initiating shutdown");
919 kernel.shutdown(Some("idle_timeout".to_string())).await;
920 std::process::exit(0);
921 }
922 } else {
923 if idle_since.is_some() {
924 tracing::debug!(
925 effective_connections,
926 has_daemons,
927 "Activity detected, resetting idle timer"
928 );
929 }
930 idle_since = None;
931 }
932 }
933 })
934}
935
936struct RestartTracker {
938 attempts: u32,
939 last_attempt: std::time::Instant,
940 backoff: std::time::Duration,
941}
942
943impl RestartTracker {
944 const MAX_ATTEMPTS: u32 = 5;
945 const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
946 const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
947
948 fn new() -> Self {
949 Self {
950 attempts: 0,
951 last_attempt: std::time::Instant::now(),
952 backoff: Self::INITIAL_BACKOFF,
953 }
954 }
955
956 fn should_restart(&self) -> bool {
958 self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
959 }
960
961 fn record_attempt(&mut self) {
963 self.attempts = self.attempts.saturating_add(1);
964 self.last_attempt = std::time::Instant::now();
965 self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
966 }
967
968 fn exhausted(&self) -> bool {
970 self.attempts >= Self::MAX_ATTEMPTS
971 }
972}
973
974async fn attempt_capsule_restart(
978 kernel: &Kernel,
979 id_str: &str,
980 tracker: &mut RestartTracker,
981) -> bool {
982 if tracker.exhausted() {
983 return false;
984 }
985
986 if !tracker.should_restart() {
987 tracing::debug!(
988 capsule_id = %id_str,
989 next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
990 "Waiting for backoff before next restart attempt"
991 );
992 return false;
993 }
994
995 tracker.record_attempt();
996 let attempt = tracker.attempts;
997
998 tracing::warn!(
999 capsule_id = %id_str,
1000 attempt,
1001 max_attempts = RestartTracker::MAX_ATTEMPTS,
1002 "Attempting capsule restart"
1003 );
1004
1005 let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1006 match kernel.restart_capsule(&capsule_id).await {
1007 Ok(()) => {
1008 tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1009 true
1010 },
1011 Err(e) => {
1012 tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1013 if tracker.exhausted() {
1014 tracing::error!(
1015 capsule_id = %id_str,
1016 "All restart attempts exhausted - capsule will remain down"
1017 );
1018 }
1019 false
1020 },
1021 }
1022}
1023
1024fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1031 tokio::spawn(async move {
1032 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1033 interval.tick().await; let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1036 std::collections::HashMap::new();
1037
1038 loop {
1039 interval.tick().await;
1040
1041 let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1044 let registry = kernel.capsules.read().await;
1045 registry
1046 .list()
1047 .into_iter()
1048 .filter_map(|id| {
1049 let capsule = registry.get(id)?;
1050 if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1051 Some(capsule)
1052 } else {
1053 None
1054 }
1055 })
1056 .collect()
1057 };
1058
1059 let mut failures: Vec<(String, String)> = Vec::new();
1063 for capsule in &ready_capsules {
1064 let health = capsule.check_health();
1065 if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1066 let id_str = capsule.id().to_string();
1067 tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1068
1069 let msg = astrid_events::ipc::IpcMessage::new(
1070 "astrid.v1.health.failed",
1071 astrid_events::ipc::IpcPayload::Custom {
1072 data: serde_json::json!({
1073 "capsule_id": &id_str,
1074 "reason": &reason,
1075 }),
1076 },
1077 uuid::Uuid::new_v4(),
1078 );
1079 let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1080 metadata: astrid_events::EventMetadata::new("kernel"),
1081 message: msg,
1082 });
1083 failures.push((id_str, reason));
1084 }
1085 }
1086
1087 drop(ready_capsules);
1090
1091 let failed_this_tick: std::collections::HashSet<&str> =
1092 failures.iter().map(|(id, _)| id.as_str()).collect();
1093
1094 let mut restarted = Vec::new();
1095 for (id_str, _reason) in &failures {
1096 let tracker = restart_trackers
1097 .entry(id_str.clone())
1098 .or_insert_with(RestartTracker::new);
1099
1100 if attempt_capsule_restart(&kernel, id_str, tracker).await {
1101 restarted.push(id_str.clone());
1102 }
1103 }
1104
1105 for id in &restarted {
1107 restart_trackers.remove(id);
1108 }
1109
1110 restart_trackers.retain(|id, tracker| {
1115 if tracker.exhausted() {
1116 return true;
1117 }
1118 if tracker.last_attempt.elapsed() < tracker.backoff {
1121 return true;
1122 }
1123 failed_this_tick.contains(id.as_str())
1124 });
1125 }
1126 })
1127}
1128
1129fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1135 tokio::spawn(async move {
1136 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1137 interval.tick().await;
1139
1140 loop {
1141 interval.tick().await;
1142
1143 let msg = astrid_events::ipc::IpcMessage::new(
1144 "astrid.v1.watchdog.tick",
1145 astrid_events::ipc::IpcPayload::Custom {
1146 data: serde_json::json!({}),
1147 },
1148 uuid::Uuid::new_v4(),
1149 );
1150 let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1151 metadata: astrid_events::EventMetadata::new("kernel"),
1152 message: msg,
1153 });
1154 }
1155 })
1156}
1157
1158#[cfg(test)]
1159mod tests {
1160 use super::*;
1161
1162 #[test]
1163 fn test_load_or_generate_creates_new_key() {
1164 let dir = tempfile::tempdir().unwrap();
1165 let keys_dir = dir.path().join("keys");
1166
1167 let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1168 let key_path = keys_dir.join("runtime.key");
1169
1170 assert!(key_path.exists());
1172 let bytes = std::fs::read(&key_path).unwrap();
1173 assert_eq!(bytes.len(), 32);
1174
1175 let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1177 assert_eq!(
1178 keypair.public_key_bytes(),
1179 reloaded.public_key_bytes(),
1180 "reloaded key should match generated key"
1181 );
1182 }
1183
1184 #[test]
1185 fn test_load_or_generate_is_idempotent() {
1186 let dir = tempfile::tempdir().unwrap();
1187 let keys_dir = dir.path().join("keys");
1188
1189 let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1190 let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1191
1192 assert_eq!(
1193 first.public_key_bytes(),
1194 second.public_key_bytes(),
1195 "loading the same key file should produce the same keypair"
1196 );
1197 }
1198
1199 #[test]
1200 fn test_load_or_generate_rejects_bad_key_length() {
1201 let dir = tempfile::tempdir().unwrap();
1202 let keys_dir = dir.path().join("keys");
1203 std::fs::create_dir_all(&keys_dir).unwrap();
1204
1205 std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1207
1208 let result = load_or_generate_runtime_key(&keys_dir);
1209 assert!(result.is_err());
1210 let err = result.unwrap_err().to_string();
1211 assert!(
1212 err.contains("invalid runtime key"),
1213 "expected 'invalid runtime key' error, got: {err}"
1214 );
1215 }
1216
1217 #[test]
1218 fn test_connection_counter_increment_decrement() {
1219 let counter = AtomicUsize::new(0);
1220
1221 counter.fetch_add(1, Ordering::Relaxed);
1223 counter.fetch_add(1, Ordering::Relaxed);
1224 assert_eq!(counter.load(Ordering::Relaxed), 2);
1225
1226 for expected in [1, 0] {
1229 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1230 if n == 0 {
1231 None
1232 } else {
1233 Some(n.saturating_sub(1))
1234 }
1235 });
1236 assert_eq!(counter.load(Ordering::Relaxed), expected);
1237 }
1238 }
1239
1240 #[test]
1241 fn test_connection_counter_underflow_guard() {
1242 let counter = AtomicUsize::new(0);
1245
1246 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1247 if n == 0 { None } else { Some(n - 1) }
1248 });
1249 assert!(result.is_err());
1251 assert_eq!(counter.load(Ordering::Relaxed), 0);
1252 }
1253
1254 #[test]
1258 fn test_last_disconnect_clears_session_allowances() {
1259 use astrid_approval::AllowanceStore;
1260 use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1261 use astrid_core::types::Timestamp;
1262 use astrid_crypto::KeyPair;
1263
1264 let store = AllowanceStore::new();
1265 let keypair = KeyPair::generate();
1266
1267 store
1269 .add_allowance(Allowance {
1270 id: AllowanceId::new(),
1271 action_pattern: AllowancePattern::ServerTools {
1272 server: "session-server".to_string(),
1273 },
1274 created_at: Timestamp::now(),
1275 expires_at: None,
1276 max_uses: None,
1277 uses_remaining: None,
1278 session_only: true,
1279 workspace_root: None,
1280 signature: keypair.sign(b"test"),
1281 })
1282 .unwrap();
1283
1284 store
1286 .add_allowance(Allowance {
1287 id: AllowanceId::new(),
1288 action_pattern: AllowancePattern::ServerTools {
1289 server: "persistent-server".to_string(),
1290 },
1291 created_at: Timestamp::now(),
1292 expires_at: None,
1293 max_uses: None,
1294 uses_remaining: None,
1295 session_only: false,
1296 workspace_root: None,
1297 signature: keypair.sign(b"test"),
1298 })
1299 .unwrap();
1300
1301 assert_eq!(store.count(), 2);
1302
1303 let counter = AtomicUsize::new(2);
1304 let simulate_disconnect = || {
1305 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1306 if n == 0 {
1307 None
1308 } else {
1309 Some(n.saturating_sub(1))
1310 }
1311 });
1312 if result == Ok(1) {
1313 store.clear_session_allowances();
1314 }
1315 };
1316
1317 simulate_disconnect();
1319 assert_eq!(
1320 store.count(),
1321 2,
1322 "both allowances should survive non-final disconnect"
1323 );
1324
1325 simulate_disconnect();
1327 assert_eq!(
1328 store.count(),
1329 1,
1330 "session allowance should be cleared on last disconnect"
1331 );
1332 }
1333
1334 #[cfg(unix)]
1335 #[test]
1336 fn test_load_or_generate_sets_secure_permissions() {
1337 use std::os::unix::fs::PermissionsExt;
1338
1339 let dir = tempfile::tempdir().unwrap();
1340 let keys_dir = dir.path().join("keys");
1341
1342 let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1343
1344 let key_path = keys_dir.join("runtime.key");
1345 let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1346 assert_eq!(
1347 mode & 0o777,
1348 0o600,
1349 "key file should have 0o600 permissions, got {mode:#o}"
1350 );
1351 }
1352
1353 #[test]
1354 fn restart_tracker_initial_state() {
1355 let tracker = RestartTracker::new();
1356 assert!(!tracker.exhausted());
1357 assert!(!tracker.should_restart());
1359 }
1360
1361 #[test]
1362 fn restart_tracker_allows_restart_after_backoff() {
1363 let mut tracker = RestartTracker::new();
1364 tracker.last_attempt = std::time::Instant::now()
1366 - RestartTracker::INITIAL_BACKOFF
1367 - std::time::Duration::from_millis(1);
1368 assert!(tracker.should_restart());
1369 }
1370
1371 #[test]
1372 fn restart_tracker_doubles_backoff() {
1373 let mut tracker = RestartTracker::new();
1374 assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1375
1376 tracker.record_attempt();
1377 assert_eq!(
1378 tracker.backoff,
1379 RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1380 );
1381 assert_eq!(tracker.attempts, 1);
1382
1383 tracker.record_attempt();
1384 assert_eq!(
1385 tracker.backoff,
1386 RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1387 );
1388 assert_eq!(tracker.attempts, 2);
1389 }
1390
1391 #[test]
1392 fn restart_tracker_backoff_caps_at_max() {
1393 let mut tracker = RestartTracker::new();
1394 for _ in 0..20 {
1395 tracker.record_attempt();
1396 }
1397 assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1398 }
1399
1400 #[test]
1401 fn restart_tracker_exhausted_at_max_attempts() {
1402 let mut tracker = RestartTracker::new();
1403 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1404 assert!(!tracker.exhausted());
1405 tracker.record_attempt();
1406 }
1407 assert!(tracker.exhausted());
1408 }
1409
1410 #[test]
1411 fn restart_tracker_should_restart_false_when_exhausted() {
1412 let mut tracker = RestartTracker::new();
1413 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1414 tracker.record_attempt();
1415 }
1416 tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1418 assert!(!tracker.should_restart());
1419 }
1420}
1421
1422fn validate_imports_exports(
1431 manifests: &[(
1432 astrid_capsule::manifest::CapsuleManifest,
1433 std::path::PathBuf,
1434 )],
1435) {
1436 let mut exports_by_interface: std::collections::HashMap<
1438 (&str, &str),
1439 Vec<(&str, &semver::Version)>,
1440 > = std::collections::HashMap::new();
1441
1442 for (m, _) in manifests {
1443 for (ns, name, ver) in m.export_triples() {
1444 exports_by_interface
1445 .entry((ns, name))
1446 .or_default()
1447 .push((&m.package.name, ver));
1448 }
1449 }
1450
1451 for ((ns, name), providers) in &exports_by_interface {
1454 if providers.len() > 1 {
1455 let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
1456 tracing::warn!(
1457 interface = %format!("{ns}/{name}"),
1458 providers = ?names,
1459 "Multiple capsules export the same interface — events may be double-processed. \
1460 Consider removing one with `astrid capsule remove`."
1461 );
1462 }
1463 }
1464
1465 let mut satisfied_count: u32 = 0;
1466 let mut warning_count: u32 = 0;
1467
1468 for (manifest, _) in manifests {
1469 for (ns, name, req, optional) in manifest.import_tuples() {
1470 let has_provider = exports_by_interface
1471 .get(&(ns, name))
1472 .is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
1473
1474 if has_provider {
1475 satisfied_count = satisfied_count.saturating_add(1);
1476 } else if optional {
1477 tracing::info!(
1478 capsule = %manifest.package.name,
1479 import = %format!("{ns}/{name} {req}"),
1480 "Optional import not satisfied — capsule will boot with reduced functionality"
1481 );
1482 warning_count = warning_count.saturating_add(1);
1483 } else {
1484 tracing::error!(
1485 capsule = %manifest.package.name,
1486 import = %format!("{ns}/{name} {req}"),
1487 "Required import not satisfied — no loaded capsule exports this interface"
1488 );
1489 warning_count = warning_count.saturating_add(1);
1490 }
1491 }
1492 }
1493
1494 tracing::info!(
1495 capsules = manifests.len(),
1496 imports_satisfied = satisfied_count,
1497 warnings = warning_count,
1498 "Boot validation complete"
1499 );
1500}
1501
1502async fn bootstrap_cli_root_user(
1514 store: &Arc<dyn astrid_storage::IdentityStore>,
1515) -> Result<(), astrid_storage::IdentityError> {
1516 if let Some(_user) = store.resolve("cli", "local").await? {
1518 tracing::debug!("CLI root user already linked");
1519 return Ok(());
1520 }
1521
1522 let user = store.create_user(Some("root")).await?;
1524 tracing::info!(user_id = %user.id, "Created CLI root user");
1525
1526 store.link("cli", "local", user.id, "system").await?;
1528 tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1529
1530 Ok(())
1531}
1532
1533async fn apply_identity_config(
1539 store: &Arc<dyn astrid_storage::IdentityStore>,
1540 workspace_root: &std::path::Path,
1541) {
1542 let config = match astrid_config::Config::load(Some(workspace_root)) {
1543 Ok(resolved) => resolved.config,
1544 Err(e) => {
1545 tracing::debug!(error = %e, "No config loaded for identity links");
1546 return;
1547 },
1548 };
1549
1550 for link_cfg in &config.identity.links {
1551 let result = apply_single_identity_link(store, link_cfg).await;
1552 if let Err(e) = result {
1553 tracing::warn!(
1554 platform = %link_cfg.platform,
1555 platform_user_id = %link_cfg.platform_user_id,
1556 astrid_user = %link_cfg.astrid_user,
1557 error = %e,
1558 "Failed to apply identity link from config"
1559 );
1560 }
1561 }
1562}
1563
1564async fn apply_single_identity_link(
1566 store: &Arc<dyn astrid_storage::IdentityStore>,
1567 link_cfg: &astrid_config::types::IdentityLinkConfig,
1568) -> Result<(), astrid_storage::IdentityError> {
1569 let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
1571 if store.get_user(uuid).await?.is_none() {
1575 return Err(astrid_storage::IdentityError::UserNotFound(uuid));
1576 }
1577 uuid
1578 } else {
1579 if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
1581 user.id
1582 } else {
1583 let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
1584 tracing::info!(
1585 user_id = %user.id,
1586 name = %link_cfg.astrid_user,
1587 "Created user from config identity link"
1588 );
1589 user.id
1590 }
1591 };
1592
1593 let method = if link_cfg.method.is_empty() {
1594 "admin"
1595 } else {
1596 &link_cfg.method
1597 };
1598
1599 if let Some(existing) = store
1601 .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
1602 .await?
1603 && existing.id == user_id
1604 {
1605 tracing::debug!(
1606 platform = %link_cfg.platform,
1607 platform_user_id = %link_cfg.platform_user_id,
1608 user_id = %user_id,
1609 "Identity link from config already exists"
1610 );
1611 return Ok(());
1612 }
1613
1614 store
1615 .link(
1616 &link_cfg.platform,
1617 &link_cfg.platform_user_id,
1618 user_id,
1619 method,
1620 )
1621 .await?;
1622
1623 tracing::info!(
1624 platform = %link_cfg.platform,
1625 platform_user_id = %link_cfg.platform_user_id,
1626 user_id = %user_id,
1627 "Applied identity link from config"
1628 );
1629
1630 Ok(())
1631}