1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7mod bus_monitor;
16pub mod invite;
18pub mod kernel_router;
20pub mod pair_token;
22pub mod socket;
24
25use arc_swap::ArcSwap;
26use astrid_audit::AuditLog;
27use astrid_capabilities::{CapabilityStore, DirHandle};
28use astrid_capsule::profile_cache::PrincipalProfileCache;
29use astrid_capsule::registry::CapsuleRegistry;
30use astrid_core::SessionId;
31use astrid_core::groups::GroupConfig;
32use astrid_core::principal::PrincipalId;
33use astrid_crypto::KeyPair;
34use astrid_events::EventBus;
35use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
36use astrid_vfs::{HostVfs, OverlayVfsRegistry, Vfs};
37use dashmap::DashMap;
38use std::path::{Path, PathBuf};
39use std::sync::Arc;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41use tokio::sync::{Mutex, RwLock};
42
43pub struct Kernel {
45 pub session_id: SessionId,
47 pub event_bus: Arc<EventBus>,
49 pub capsules: Arc<RwLock<CapsuleRegistry>>,
51 pub mcp: SecureMcpClient,
53 pub capabilities: Arc<CapabilityStore>,
55 pub vfs: Arc<dyn Vfs>,
62 pub overlay_registry: Arc<OverlayVfsRegistry>,
70 pub vfs_root_handle: DirHandle,
72 pub workspace_root: PathBuf,
74 pub home_root: Option<PathBuf>,
82 pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
84 #[expect(
90 dead_code,
91 reason = "held for the process lifetime; Drop releases the singleton flock"
92 )]
93 singleton_lock: Option<std::fs::File>,
94 pub kv: Arc<astrid_storage::SurrealKvStore>,
96 pub audit_log: Arc<AuditLog>,
98 active_connections: DashMap<PrincipalId, AtomicUsize>,
105 fuel_ledger: astrid_capsule::FuelLedger,
111 fuel_rate: astrid_capsule::FuelRateLimiter,
117 memory_ledger: astrid_capsule::MemoryLedger,
123 runtime_limits: astrid_capsule::CapsuleRuntimeLimits,
129 pub ephemeral: AtomicBool,
131 pub boot_time: std::time::Instant,
133 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
136 pub session_token: Arc<astrid_core::session_token::SessionToken>,
139 token_path: PathBuf,
142 pub allowance_store: Arc<astrid_approval::AllowanceStore>,
147 identity_store: Arc<dyn astrid_storage::IdentityStore>,
149 pub(crate) profile_cache: Arc<PrincipalProfileCache>,
159 pub(crate) groups: Arc<ArcSwap<GroupConfig>>,
171 pub(crate) astrid_home: astrid_core::dirs::AstridHome,
175 pub(crate) admin_write_lock: Mutex<()>,
184}
185
186impl Kernel {
187 #[expect(
204 clippy::too_many_lines,
205 reason = "boot sequence: sequential setup that does not benefit from splitting"
206 )]
207 pub async fn new(
208 session_id: SessionId,
209 workspace_root: PathBuf,
210 runtime_limits: astrid_capsule::CapsuleRuntimeLimits,
211 ) -> Result<Arc<Self>, std::io::Error> {
212 use astrid_core::dirs::AstridHome;
213
214 assert!(
215 tokio::runtime::Handle::current().runtime_flavor()
216 == tokio::runtime::RuntimeFlavor::MultiThread,
217 "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
218 single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
219 );
220
221 let event_bus = Arc::new(EventBus::new());
222 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
223
224 let home = AstridHome::resolve().map_err(|e| {
227 std::io::Error::other(format!(
228 "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
229 ))
230 })?;
231
232 let default_principal = astrid_core::PrincipalId::default();
236 let principal_home = home.principal_home(&default_principal);
237 let home_root = Some(principal_home.root().to_path_buf());
238
239 let kv_path = home.state_db_path();
241 let kv = Arc::new(
242 astrid_storage::SurrealKvStore::open(&kv_path)
243 .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
244 );
245 let mcp_config = ServersConfig::load_default().unwrap_or_default();
251 let mcp_manager = ServerManager::new(mcp_config)
252 .with_workspace_root(workspace_root.clone())
253 .with_capsule_log_dir(principal_home.log_dir());
254 let mcp_client = McpClient::new(mcp_manager);
255
256 let capabilities = Arc::new(
259 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
260 .map_err(|e| {
261 std::io::Error::other(format!("Failed to init capability store: {e}"))
262 })?,
263 );
264 let audit_log = open_audit_log()?;
265 let mcp = SecureMcpClient::new(
266 mcp_client,
267 Arc::clone(&capabilities),
268 Arc::clone(&audit_log),
269 session_id.clone(),
270 );
271
272 let root_handle = DirHandle::new();
274
275 let kernel_host_vfs = HostVfs::new();
281 kernel_host_vfs
282 .register_dir(root_handle.clone(), workspace_root.clone())
283 .await
284 .map_err(|_| std::io::Error::other("Failed to register kernel workspace vfs"))?;
285 let overlay_registry = Arc::new(OverlayVfsRegistry::new(
286 workspace_root.clone(),
287 root_handle.clone(),
288 ));
289
290 let (listener, singleton_lock) = socket::bind_session_socket(&home)?;
295 let (session_token, token_path) = socket::generate_session_token()?;
296
297 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
298 let identity_kv = astrid_storage::ScopedKvStore::new(
300 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
301 "system:identity",
302 )
303 .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
304 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
305 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
306
307 let groups_loaded = GroupConfig::load(&home)
312 .map_err(|e| std::io::Error::other(format!("Failed to load groups config: {e}")))?;
313 let groups = Arc::new(ArcSwap::from_pointee(groups_loaded));
314
315 bootstrap_cli_root_user(&identity_store, &home)
319 .await
320 .map_err(|e| {
321 std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
322 })?;
323
324 apply_identity_config(&identity_store, &workspace_root).await;
326
327 let kernel = Arc::new(Self {
328 session_id,
329 event_bus,
330 capsules,
331 mcp,
332 capabilities,
333 vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
334 overlay_registry,
335 vfs_root_handle: root_handle,
336 workspace_root,
337 home_root,
338 cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
339 singleton_lock: Some(singleton_lock),
340 kv,
341 audit_log,
342 active_connections: DashMap::new(),
343 fuel_ledger: astrid_capsule::FuelLedger::default(),
344 fuel_rate: astrid_capsule::FuelRateLimiter::default(),
345 memory_ledger: astrid_capsule::MemoryLedger::default(),
346 runtime_limits,
347 ephemeral: AtomicBool::new(false),
348 boot_time: std::time::Instant::now(),
349 shutdown_tx: tokio::sync::watch::channel(false).0,
350 session_token: Arc::new(session_token),
351 token_path,
352 allowance_store,
353 identity_store,
354 profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
355 groups,
356 astrid_home: home,
357 admin_write_lock: Mutex::new(()),
358 });
359
360 drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
361 drop(spawn_idle_monitor(Arc::clone(&kernel)));
362 drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
363 drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
364 drop(bus_monitor::spawn_bus_activity_monitor(&kernel.event_bus));
368
369 let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
372 Arc::clone(&kernel.capsules),
373 Arc::clone(&kernel.event_bus),
374 )
375 .with_identity_store(Arc::clone(&kernel.identity_store));
376 tokio::spawn(dispatcher.run());
377
378 debug_assert_eq!(
379 kernel.event_bus.subscriber_count(),
380 INTERNAL_SUBSCRIBER_COUNT,
381 "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
382 );
383
384 Ok(kernel)
385 }
386
387 async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
393 let manifest_path = dir.join("Capsule.toml");
394 let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
395 .map_err(|e| anyhow::anyhow!(e))?;
396
397 {
400 let registry = self.capsules.read().await;
401 let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
402 if registry.get(&id).is_some() {
403 return Ok(());
404 }
405 }
406
407 let loader = astrid_capsule::loader::CapsuleLoader::new(
408 self.mcp.clone(),
409 self.fuel_ledger.clone(),
410 self.fuel_rate.clone(),
411 self.memory_ledger.clone(),
412 self.runtime_limits,
413 );
414 let mut capsule = loader.create_capsule(manifest, dir.clone())?;
415
416 let principal = astrid_core::PrincipalId::default();
419 let kv = astrid_storage::ScopedKvStore::new(
420 Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
421 format!("{principal}:capsule:{}", capsule.id()),
422 )?;
423
424 let capsule_name = capsule.id().to_string();
427 let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
428 let ph = home.principal_home(&principal);
429 let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
430 if principal_env.exists() {
431 principal_env
432 } else {
433 dir.join(".env.json")
434 }
435 } else {
436 dir.join(".env.json")
437 };
438 if env_path.exists()
439 && let Ok(contents) = std::fs::read_to_string(&env_path)
440 && let Ok(env_map) =
441 serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
442 {
443 for (k, v) in env_map {
444 let _ = kv.set(&k, v.into_bytes()).await;
445 }
446 }
447
448 let ctx = astrid_capsule::context::CapsuleContext::new(
449 principal.clone(),
450 self.workspace_root.clone(),
451 self.home_root.clone(),
452 kv,
453 Arc::clone(&self.event_bus),
454 self.cli_socket_listener.clone(),
455 )
456 .with_registry(Arc::clone(&self.capsules))
457 .with_session_token(Arc::clone(&self.session_token))
458 .with_allowance_store(Arc::clone(&self.allowance_store))
459 .with_identity_store(Arc::clone(&self.identity_store))
460 .with_profile_cache(Arc::clone(&self.profile_cache))
461 .with_overlay_registry(Arc::clone(&self.overlay_registry))
462 .with_group_config(self.groups.load_full());
466
467 capsule.load(&ctx).await?;
468
469 let mut registry = self.capsules.write().await;
470 registry
471 .register(capsule)
472 .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
473
474 Ok(())
475 }
476
477 async fn restart_capsule(
484 &self,
485 id: &astrid_capsule::capsule::CapsuleId,
486 ) -> Result<(), anyhow::Error> {
487 let source_dir = {
489 let registry = self.capsules.read().await;
490 let capsule = registry
491 .get(id)
492 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
493 capsule
494 .source_dir()
495 .map(std::path::Path::to_path_buf)
496 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
497 };
498
499 let old_capsule = {
503 let mut registry = self.capsules.write().await;
504 registry
505 .unregister(id)
506 .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
507 };
508 {
513 let mut old = old_capsule;
514 if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
515 if let Err(e) = capsule.unload().await {
516 tracing::warn!(
517 capsule_id = %id,
518 error = %e,
519 "Capsule unload failed during restart"
520 );
521 }
522 } else {
523 tracing::warn!(
524 capsule_id = %id,
525 "Cannot call unload during restart - Arc still held by in-flight task"
526 );
527 }
528 }
529
530 self.load_capsule(source_dir).await?;
532
533 let capsule = {
542 let registry = self.capsules.read().await;
543 registry.get(id)
544 };
545 if let Some(capsule) = capsule
546 && let Err(e) = capsule
547 .invoke_interceptor("handle_lifecycle_restart", &[], None)
548 .await
549 {
550 tracing::debug!(
551 capsule_id = %id,
552 error = %e,
553 "Capsule does not handle lifecycle restart (optional)"
554 );
555 }
556
557 Ok(())
558 }
559
560 pub async fn load_all_capsules(&self) {
569 use astrid_capsule::toposort::toposort_manifests;
570 use astrid_core::dirs::AstridHome;
571
572 let mut paths = Vec::new();
574 if let Ok(home) = AstridHome::resolve() {
575 let principal = astrid_core::PrincipalId::default();
576 paths.push(home.principal_home(&principal).capsules_dir());
577 }
578
579 let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
580
581 let sorted = match toposort_manifests(discovered) {
585 Ok(sorted) => sorted,
586 Err((e, original)) => {
587 tracing::error!(
588 cycle = %e,
589 "Dependency cycle in capsules, falling back to discovery order"
590 );
591 original
592 },
593 };
594
595 for (manifest, _) in &sorted {
599 if manifest.capabilities.uplink && manifest.has_imports() {
600 tracing::warn!(
601 capsule = %manifest.package.name,
602 "Uplink capsule has [imports] - \
603 this should have been rejected at manifest load time"
604 );
605 }
606 }
607
608 validate_imports_exports(&sorted);
610
611 let (uplinks, others): (Vec<_>, Vec<_>) =
618 sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
619
620 let uplink_names: Vec<String> = uplinks
622 .iter()
623 .map(|(m, _)| m.package.name.clone())
624 .collect();
625 for (manifest, dir) in &uplinks {
626 if let Err(e) = self.load_capsule(dir.clone()).await {
627 tracing::warn!(
628 capsule = %manifest.package.name,
629 error = %e,
630 "Failed to load uplink capsule during discovery"
631 );
632 }
633 }
634
635 self.await_capsule_readiness(&uplink_names).await;
638
639 for (manifest, dir) in &others {
640 if let Err(e) = self.load_capsule(dir.clone()).await {
641 tracing::warn!(
642 capsule = %manifest.package.name,
643 error = %e,
644 "Failed to load capsule during discovery"
645 );
646 }
647 }
648
649 let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
652 self.await_capsule_readiness(&other_names).await;
653
654 let msg = astrid_events::ipc::IpcMessage::new(
658 "astrid.v1.capsules_loaded",
659 astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
660 self.session_id.0,
661 );
662 let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
663 metadata: astrid_events::EventMetadata::new("kernel"),
664 message: msg,
665 });
666 }
667
668 pub fn connection_opened(&self, principal: &PrincipalId) {
670 self.active_connections
671 .entry(principal.clone())
672 .or_insert_with(|| AtomicUsize::new(0))
673 .fetch_add(1, Ordering::Relaxed);
674 metrics::counter!(METRIC_CONNECTIONS_OPENED_TOTAL).increment(1);
675 metrics::gauge!(METRIC_ACTIVE_CONNECTIONS).increment(1.0);
676 }
677
678 pub fn connection_closed(&self, principal: &PrincipalId) {
690 let entry = self
700 .active_connections
701 .entry(principal.clone())
702 .or_insert_with(|| AtomicUsize::new(0));
703 let result = entry.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
704 if n == 0 {
705 None
706 } else {
707 Some(n.saturating_sub(1))
708 }
709 });
710
711 if result.is_ok() {
714 metrics::counter!(METRIC_CONNECTIONS_CLOSED_TOTAL).increment(1);
715 metrics::gauge!(METRIC_ACTIVE_CONNECTIONS).decrement(1.0);
716 }
717
718 if result == Ok(1) {
719 self.allowance_store.clear_session_allowances(principal);
720 if let Err(e) = self.capabilities.clear_session_for(principal) {
721 tracing::warn!(%principal, error = %e, "failed to clear capability session");
722 }
723 tracing::info!(
724 %principal,
725 "last connection for principal disconnected, session state cleared"
726 );
727 }
728 drop(entry);
731
732 if result == Ok(1) {
733 self.active_connections
734 .remove_if(principal, |_, count| count.load(Ordering::Relaxed) == 0);
735 }
736 }
737
738 pub fn set_ephemeral(&self, val: bool) {
740 self.ephemeral.store(val, Ordering::Relaxed);
741 }
742
743 pub fn total_connection_count(&self) -> usize {
748 self.active_connections
749 .iter()
750 .map(|e| e.value().load(Ordering::Relaxed))
751 .sum()
752 }
753
754 pub fn connections_by_principal(&self) -> Vec<(PrincipalId, usize)> {
767 self.active_connections
768 .iter()
769 .filter_map(|e| {
770 let count = e.value().load(Ordering::Relaxed);
771 if count == 0 {
772 None
773 } else {
774 Some((e.key().clone(), count))
775 }
776 })
777 .collect()
778 }
779
780 pub async fn shutdown(&self, reason: Option<String>) {
787 tracing::info!(reason = ?reason, "Kernel shutting down");
788
789 let _ = self
791 .event_bus
792 .publish(astrid_events::AstridEvent::KernelShutdown {
793 metadata: astrid_events::EventMetadata::new("kernel"),
794 reason: reason.clone(),
795 });
796
797 self.allowance_store.clear_all_session_allowances();
803 if let Err(e) = self.capabilities.clear_session() {
804 tracing::warn!(error = %e, "failed to clear capability session on shutdown");
805 }
806
807 let capsules = {
817 let mut reg = self.capsules.write().await;
818 reg.drain()
819 };
820 for mut arc in capsules {
821 let id = arc.id().clone();
822 let mut unloaded = false;
823
824 for retry in 0..20_u32 {
825 if let Some(capsule) = Arc::get_mut(&mut arc) {
826 if let Err(e) = capsule.unload().await {
827 tracing::warn!(
828 capsule_id = %id,
829 error = %e,
830 "Failed to unload capsule during shutdown"
831 );
832 }
833 unloaded = true;
834 break;
835 }
836 if retry < 19 {
837 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
838 }
839 }
840
841 if !unloaded {
842 tracing::warn!(
843 capsule_id = %id,
844 strong_count = Arc::strong_count(&arc),
845 "Dropping capsule without explicit unload after retries exhausted; \
846 MCP child processes may be orphaned"
847 );
848 }
849 drop(arc);
850 }
851
852 if let Err(e) = self.kv.close().await {
854 tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
855 }
856
857 let socket_path = crate::socket::kernel_socket_path();
864 let _ = std::fs::remove_file(&socket_path);
865 let _ = std::fs::remove_file(&self.token_path);
866 crate::socket::remove_readiness_file();
867
868 tracing::info!("Kernel shutdown complete");
869 }
870
871 async fn await_capsule_readiness(&self, names: &[String]) {
877 use astrid_capsule::capsule::ReadyStatus;
878
879 if names.is_empty() {
880 return;
881 }
882
883 let timeout = std::time::Duration::from_millis(500);
884 let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
885 let registry = self.capsules.read().await;
886 names
887 .iter()
888 .filter_map(
889 |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
890 Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
891 Err(e) => {
892 tracing::warn!(
893 capsule = %name,
894 error = %e,
895 "Invalid capsule ID, skipping readiness wait"
896 );
897 None
898 },
899 },
900 )
901 .collect()
902 };
903
904 let mut set = tokio::task::JoinSet::new();
907 for (name, capsule) in capsules {
908 set.spawn(async move {
909 let status = capsule.wait_ready(timeout).await;
910 (name, status)
911 });
912 }
913 while let Some(result) = set.join_next().await {
914 if let Ok((name, status)) = result {
915 match status {
916 ReadyStatus::Ready => {},
917 ReadyStatus::Timeout => {
918 tracing::warn!(
919 capsule = %name,
920 timeout_ms = timeout.as_millis(),
921 "Capsule did not signal ready within timeout"
922 );
923 },
924 ReadyStatus::Crashed => {
925 tracing::error!(
926 capsule = %name,
927 "Capsule run loop exited before signaling ready"
928 );
929 },
930 }
931 }
932 }
933 }
934}
935
936#[cfg(test)]
948pub(crate) async fn test_kernel_with_home(home: astrid_core::dirs::AstridHome) -> Arc<Kernel> {
949 use astrid_capsule::profile_cache::PrincipalProfileCache;
950
951 home.ensure()
952 .expect("test kernel: ensure astrid home dir tree");
953
954 let session_id = SessionId::SYSTEM;
955 let event_bus = Arc::new(EventBus::new());
956 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
957
958 let kv = Arc::new(
960 astrid_storage::SurrealKvStore::open(&home.state_db_path()).expect("test kernel: open kv"),
961 );
962 let capabilities = Arc::new(
963 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
964 .expect("test kernel: capability store"),
965 );
966
967 let runtime_key =
970 load_or_generate_runtime_key(&home.keys_dir()).expect("test kernel: runtime key");
971 let default_principal = astrid_core::PrincipalId::default();
972 let principal_home = home.principal_home(&default_principal);
973 principal_home
974 .ensure()
975 .expect("test kernel: ensure principal home");
976 let audit_log = Arc::new(
977 AuditLog::open(principal_home.audit_dir(), runtime_key)
978 .expect("test kernel: open audit log"),
979 );
980
981 let mcp_manager = ServerManager::new(ServersConfig::default());
984 let mcp_client = McpClient::new(mcp_manager);
985 let mcp = SecureMcpClient::new(
986 mcp_client,
987 Arc::clone(&capabilities),
988 Arc::clone(&audit_log),
989 session_id.clone(),
990 );
991
992 let root_handle = DirHandle::new();
993 let kernel_host_vfs = HostVfs::new();
994 kernel_host_vfs
995 .register_dir(root_handle.clone(), home.root().to_path_buf())
996 .await
997 .expect("test kernel: register workspace vfs");
998 let overlay_registry = Arc::new(OverlayVfsRegistry::new(
999 home.root().to_path_buf(),
1000 root_handle.clone(),
1001 ));
1002
1003 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
1004 let identity_kv = astrid_storage::ScopedKvStore::new(
1005 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
1006 "system:identity",
1007 )
1008 .expect("test kernel: identity kv scope");
1009 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
1010 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
1011
1012 let groups = Arc::new(ArcSwap::from_pointee(
1013 GroupConfig::load(&home).expect("test kernel: load groups"),
1014 ));
1015
1016 let kernel = Arc::new(Kernel {
1017 session_id,
1018 event_bus,
1019 capsules,
1020 mcp,
1021 capabilities,
1022 vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
1023 overlay_registry,
1024 vfs_root_handle: root_handle,
1025 workspace_root: home.root().to_path_buf(),
1026 home_root: Some(principal_home.root().to_path_buf()),
1027 cli_socket_listener: None,
1028 singleton_lock: None,
1029 kv,
1030 audit_log,
1031 active_connections: DashMap::new(),
1032 fuel_ledger: astrid_capsule::FuelLedger::default(),
1033 fuel_rate: astrid_capsule::FuelRateLimiter::default(),
1034 memory_ledger: astrid_capsule::MemoryLedger::default(),
1035 runtime_limits: astrid_capsule::CapsuleRuntimeLimits::default(),
1036 ephemeral: AtomicBool::new(false),
1037 boot_time: std::time::Instant::now(),
1038 shutdown_tx: tokio::sync::watch::channel(false).0,
1039 session_token: Arc::new(astrid_core::session_token::SessionToken::generate()),
1040 token_path: home.token_path(),
1041 allowance_store,
1042 identity_store,
1043 profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
1044 groups,
1045 astrid_home: home,
1046 admin_write_lock: Mutex::new(()),
1047 });
1048 drop(kernel_router::admin::spawn_admin_router(Arc::clone(
1053 &kernel,
1054 )));
1055 kernel
1056}
1057
1058fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
1064 use astrid_core::dirs::AstridHome;
1065
1066 let home = AstridHome::resolve()
1067 .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
1068 home.ensure()
1069 .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
1070
1071 let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
1072 let default_principal = astrid_core::PrincipalId::default();
1073 let principal_home = home.principal_home(&default_principal);
1074 principal_home
1075 .ensure()
1076 .map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
1077 let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
1078 .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
1079
1080 match audit_log.verify_all() {
1082 Ok(results) => {
1083 let total_sessions = results.len();
1084 let mut tampered_sessions: usize = 0;
1085
1086 for (session_id, result) in &results {
1087 if !result.valid {
1088 tampered_sessions = tampered_sessions.saturating_add(1);
1089 for issue in &result.issues {
1090 tracing::error!(
1091 session_id = %session_id,
1092 issue = %issue,
1093 "Audit chain integrity violation detected"
1094 );
1095 }
1096 }
1097 }
1098
1099 if tampered_sessions > 0 {
1100 tracing::error!(
1101 total_sessions,
1102 tampered_sessions,
1103 "Audit chain verification found tampered sessions"
1104 );
1105 } else if total_sessions > 0 {
1106 tracing::info!(
1107 total_sessions,
1108 "Audit chain verification passed for all sessions"
1109 );
1110 }
1111 },
1112 Err(e) => {
1113 tracing::error!(error = %e, "Audit chain verification failed to run");
1114 },
1115 }
1116
1117 Ok(Arc::new(audit_log))
1118}
1119
1120fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
1124 let key_path = keys_dir.join("runtime.key");
1125
1126 if key_path.exists() {
1127 let bytes = std::fs::read(&key_path)?;
1128 KeyPair::from_secret_key(&bytes).map_err(|e| {
1129 std::io::Error::other(format!(
1130 "invalid runtime key at {}: {e}",
1131 key_path.display()
1132 ))
1133 })
1134 } else {
1135 let keypair = KeyPair::generate();
1136 std::fs::create_dir_all(keys_dir)?;
1137 std::fs::write(&key_path, keypair.secret_key_bytes())?;
1138
1139 #[cfg(unix)]
1141 {
1142 use std::os::unix::fs::PermissionsExt;
1143 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
1144 }
1145
1146 tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
1147 Ok(keypair)
1148 }
1149}
1150
1151const INTERNAL_SUBSCRIBER_COUNT: usize = 5;
1173
1174const METRIC_ACTIVE_CONNECTIONS: &str = "astrid_daemon_active_connections";
1178const METRIC_CONNECTIONS_OPENED_TOTAL: &str = "astrid_daemon_connections_opened_total";
1180const METRIC_CONNECTIONS_CLOSED_TOTAL: &str = "astrid_daemon_connections_closed_total";
1183pub(crate) const METRIC_BACKGROUND_TICKS_TOTAL: &str = "astrid_daemon_background_ticks_total";
1188
1189const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
1191const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
1193const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
1195const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
1197fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1198 tokio::spawn(async move {
1199 tokio::time::sleep(IDLE_INITIAL_GRACE).await;
1202
1203 let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
1205 let idle_timeout = if ephemeral {
1206 std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1214 .ok()
1215 .and_then(|v| v.parse().ok())
1216 .map_or(
1217 std::time::Duration::from_secs(30),
1218 std::time::Duration::from_secs,
1219 )
1220 } else {
1221 let Some(secs) = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1228 .ok()
1229 .and_then(|v| v.parse().ok())
1230 else {
1231 tracing::debug!(
1232 "Non-ephemeral daemon: idle shutdown disabled \
1233 (set ASTRID_IDLE_TIMEOUT_SECS to enable)."
1234 );
1235 return;
1236 };
1237 std::time::Duration::from_secs(secs)
1238 };
1239 let check_interval = if ephemeral {
1240 IDLE_EPHEMERAL_CHECK_INTERVAL
1241 } else {
1242 IDLE_CHECK_INTERVAL
1243 };
1244
1245 if !ephemeral {
1247 tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
1248 }
1249 let mut idle_since: Option<tokio::time::Instant> = None;
1250
1251 loop {
1252 tokio::time::sleep(check_interval).await;
1253 metrics::counter!(METRIC_BACKGROUND_TICKS_TOTAL, "loop" => "idle").increment(1);
1254
1255 let connections = kernel.total_connection_count();
1256
1257 let effective_connections = connections;
1263
1264 let has_daemons = {
1265 let reg = kernel.capsules.read().await;
1266 reg.values().any(|c| {
1267 let m = c.manifest();
1268 !m.uplinks.is_empty()
1269 })
1270 };
1271
1272 if effective_connections == 0 && !has_daemons {
1273 let now = tokio::time::Instant::now();
1274 let start = *idle_since.get_or_insert(now);
1275 let elapsed = now.duration_since(start);
1276
1277 tracing::debug!(
1278 idle_secs = elapsed.as_secs(),
1279 timeout_secs = idle_timeout.as_secs(),
1280 connections,
1281 "Kernel idle, monitoring timeout"
1282 );
1283
1284 if elapsed >= idle_timeout {
1285 tracing::info!("Idle timeout reached, initiating shutdown");
1286 kernel.shutdown(Some("idle_timeout".to_string())).await;
1287 std::process::exit(0);
1288 }
1289 } else {
1290 if idle_since.is_some() {
1291 tracing::debug!(
1292 effective_connections,
1293 has_daemons,
1294 "Activity detected, resetting idle timer"
1295 );
1296 }
1297 idle_since = None;
1298 }
1299 }
1300 })
1301}
1302
1303struct RestartTracker {
1305 attempts: u32,
1306 last_attempt: std::time::Instant,
1307 backoff: std::time::Duration,
1308}
1309
1310impl RestartTracker {
1311 const MAX_ATTEMPTS: u32 = 5;
1312 const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
1313 const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_mins(2);
1314
1315 fn new() -> Self {
1316 Self {
1317 attempts: 0,
1318 last_attempt: std::time::Instant::now(),
1319 backoff: Self::INITIAL_BACKOFF,
1320 }
1321 }
1322
1323 fn should_restart(&self) -> bool {
1325 self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
1326 }
1327
1328 fn record_attempt(&mut self) {
1330 self.attempts = self.attempts.saturating_add(1);
1331 self.last_attempt = std::time::Instant::now();
1332 self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
1333 }
1334
1335 fn exhausted(&self) -> bool {
1337 self.attempts >= Self::MAX_ATTEMPTS
1338 }
1339}
1340
1341async fn attempt_capsule_restart(
1345 kernel: &Kernel,
1346 id_str: &str,
1347 tracker: &mut RestartTracker,
1348) -> bool {
1349 if tracker.exhausted() {
1350 return false;
1351 }
1352
1353 if !tracker.should_restart() {
1354 tracing::debug!(
1355 capsule_id = %id_str,
1356 next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
1357 "Waiting for backoff before next restart attempt"
1358 );
1359 return false;
1360 }
1361
1362 tracker.record_attempt();
1363 let attempt = tracker.attempts;
1364
1365 tracing::warn!(
1366 capsule_id = %id_str,
1367 attempt,
1368 max_attempts = RestartTracker::MAX_ATTEMPTS,
1369 "Attempting capsule restart"
1370 );
1371
1372 let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1373 match kernel.restart_capsule(&capsule_id).await {
1374 Ok(()) => {
1375 tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1376 true
1377 },
1378 Err(e) => {
1379 tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1380 if tracker.exhausted() {
1381 tracing::error!(
1382 capsule_id = %id_str,
1383 "All restart attempts exhausted - capsule will remain down"
1384 );
1385 }
1386 false
1387 },
1388 }
1389}
1390
1391fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1398 tokio::spawn(async move {
1399 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1400 interval.tick().await; let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1403 std::collections::HashMap::new();
1404
1405 loop {
1406 interval.tick().await;
1407 metrics::counter!(METRIC_BACKGROUND_TICKS_TOTAL, "loop" => "capsule_health")
1408 .increment(1);
1409
1410 let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1413 let registry = kernel.capsules.read().await;
1414 registry
1415 .list()
1416 .into_iter()
1417 .filter_map(|id| {
1418 let capsule = registry.get(id)?;
1419 if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1420 Some(capsule)
1421 } else {
1422 None
1423 }
1424 })
1425 .collect()
1426 };
1427
1428 let mut failures: Vec<(String, String)> = Vec::new();
1432 for capsule in &ready_capsules {
1433 let health = capsule.check_health();
1434 if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1435 let id_str = capsule.id().to_string();
1436 tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1437
1438 let msg = astrid_events::ipc::IpcMessage::new(
1439 "astrid.v1.health.failed",
1440 astrid_events::ipc::IpcPayload::Custom {
1441 data: serde_json::json!({
1442 "capsule_id": &id_str,
1443 "reason": &reason,
1444 }),
1445 },
1446 uuid::Uuid::new_v4(),
1447 );
1448 let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1449 metadata: astrid_events::EventMetadata::new("kernel"),
1450 message: msg,
1451 });
1452 failures.push((id_str, reason));
1453 }
1454 }
1455
1456 drop(ready_capsules);
1459
1460 let failed_this_tick: std::collections::HashSet<&str> =
1461 failures.iter().map(|(id, _)| id.as_str()).collect();
1462
1463 let mut restarted = Vec::new();
1464 for (id_str, _reason) in &failures {
1465 let tracker = restart_trackers
1466 .entry(id_str.clone())
1467 .or_insert_with(RestartTracker::new);
1468
1469 if attempt_capsule_restart(&kernel, id_str, tracker).await {
1470 restarted.push(id_str.clone());
1471 }
1472 }
1473
1474 for id in &restarted {
1476 restart_trackers.remove(id);
1477 }
1478
1479 restart_trackers.retain(|id, tracker| {
1484 if tracker.exhausted() {
1485 return true;
1486 }
1487 if tracker.last_attempt.elapsed() < tracker.backoff {
1490 return true;
1491 }
1492 failed_this_tick.contains(id.as_str())
1493 });
1494 }
1495 })
1496}
1497
1498fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1504 tokio::spawn(async move {
1505 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1506 interval.tick().await;
1508
1509 loop {
1510 interval.tick().await;
1511 metrics::counter!(METRIC_BACKGROUND_TICKS_TOTAL, "loop" => "react_watchdog")
1512 .increment(1);
1513
1514 let msg = astrid_events::ipc::IpcMessage::new(
1515 "astrid.v1.watchdog.tick",
1516 astrid_events::ipc::IpcPayload::Custom {
1517 data: serde_json::json!({}),
1518 },
1519 uuid::Uuid::new_v4(),
1520 );
1521 let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1522 metadata: astrid_events::EventMetadata::new("kernel"),
1523 message: msg,
1524 });
1525 }
1526 })
1527}
1528
1529#[cfg(test)]
1530mod tests {
1531 use super::*;
1532
1533 #[test]
1534 fn test_load_or_generate_creates_new_key() {
1535 let dir = tempfile::tempdir().unwrap();
1536 let keys_dir = dir.path().join("keys");
1537
1538 let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1539 let key_path = keys_dir.join("runtime.key");
1540
1541 assert!(key_path.exists());
1543 let bytes = std::fs::read(&key_path).unwrap();
1544 assert_eq!(bytes.len(), 32);
1545
1546 let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1548 assert_eq!(
1549 keypair.public_key_bytes(),
1550 reloaded.public_key_bytes(),
1551 "reloaded key should match generated key"
1552 );
1553 }
1554
1555 #[test]
1556 fn test_load_or_generate_is_idempotent() {
1557 let dir = tempfile::tempdir().unwrap();
1558 let keys_dir = dir.path().join("keys");
1559
1560 let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1561 let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1562
1563 assert_eq!(
1564 first.public_key_bytes(),
1565 second.public_key_bytes(),
1566 "loading the same key file should produce the same keypair"
1567 );
1568 }
1569
1570 #[test]
1571 fn test_load_or_generate_rejects_bad_key_length() {
1572 let dir = tempfile::tempdir().unwrap();
1573 let keys_dir = dir.path().join("keys");
1574 std::fs::create_dir_all(&keys_dir).unwrap();
1575
1576 std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1578
1579 let result = load_or_generate_runtime_key(&keys_dir);
1580 assert!(result.is_err());
1581 let err = result.unwrap_err().to_string();
1582 assert!(
1583 err.contains("invalid runtime key"),
1584 "expected 'invalid runtime key' error, got: {err}"
1585 );
1586 }
1587
1588 #[test]
1589 fn test_connection_counter_increment_decrement() {
1590 let counter = AtomicUsize::new(0);
1591
1592 counter.fetch_add(1, Ordering::Relaxed);
1594 counter.fetch_add(1, Ordering::Relaxed);
1595 assert_eq!(counter.load(Ordering::Relaxed), 2);
1596
1597 for expected in [1, 0] {
1600 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1601 if n == 0 {
1602 None
1603 } else {
1604 Some(n.saturating_sub(1))
1605 }
1606 });
1607 assert_eq!(counter.load(Ordering::Relaxed), expected);
1608 }
1609 }
1610
1611 #[test]
1612 fn test_connection_counter_underflow_guard() {
1613 let counter = AtomicUsize::new(0);
1616
1617 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1618 if n == 0 { None } else { Some(n - 1) }
1619 });
1620 assert!(result.is_err());
1622 assert_eq!(counter.load(Ordering::Relaxed), 0);
1623 }
1624
1625 #[test]
1630 fn test_last_disconnect_clears_session_allowances_scoped() {
1631 use astrid_approval::AllowanceStore;
1632 use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1633 use astrid_core::principal::PrincipalId;
1634 use astrid_core::types::Timestamp;
1635 use astrid_crypto::KeyPair;
1636
1637 let store = AllowanceStore::new();
1638 let keypair = KeyPair::generate();
1639 let alice = PrincipalId::new("alice").unwrap();
1640 let bob = PrincipalId::new("bob").unwrap();
1641
1642 store
1644 .add_allowance(Allowance {
1645 id: AllowanceId::new(),
1646 principal: alice.clone(),
1647 action_pattern: AllowancePattern::ServerTools {
1648 server: "alice-session".to_string(),
1649 },
1650 created_at: Timestamp::now(),
1651 expires_at: None,
1652 max_uses: None,
1653 uses_remaining: None,
1654 session_only: true,
1655 workspace_root: None,
1656 signature: keypair.sign(b"test"),
1657 })
1658 .unwrap();
1659 store
1660 .add_allowance(Allowance {
1661 id: AllowanceId::new(),
1662 principal: alice.clone(),
1663 action_pattern: AllowancePattern::ServerTools {
1664 server: "alice-persistent".to_string(),
1665 },
1666 created_at: Timestamp::now(),
1667 expires_at: None,
1668 max_uses: None,
1669 uses_remaining: None,
1670 session_only: false,
1671 workspace_root: None,
1672 signature: keypair.sign(b"test"),
1673 })
1674 .unwrap();
1675 store
1677 .add_allowance(Allowance {
1678 id: AllowanceId::new(),
1679 principal: bob.clone(),
1680 action_pattern: AllowancePattern::ServerTools {
1681 server: "bob-session".to_string(),
1682 },
1683 created_at: Timestamp::now(),
1684 expires_at: None,
1685 max_uses: None,
1686 uses_remaining: None,
1687 session_only: true,
1688 workspace_root: None,
1689 signature: keypair.sign(b"test"),
1690 })
1691 .unwrap();
1692 assert_eq!(store.count(), 3);
1693
1694 let alice_counter = AtomicUsize::new(1);
1695 let simulate_alice_disconnect = || {
1696 let result = alice_counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1697 if n == 0 {
1698 None
1699 } else {
1700 Some(n.saturating_sub(1))
1701 }
1702 });
1703 if result == Ok(1) {
1704 store.clear_session_allowances(&alice);
1705 }
1706 };
1707
1708 simulate_alice_disconnect();
1709 assert_eq!(store.count(), 2);
1711 assert_eq!(store.count_for(&alice), 1);
1712 assert_eq!(store.count_for(&bob), 1);
1713 }
1714
1715 #[cfg(unix)]
1716 #[test]
1717 fn test_load_or_generate_sets_secure_permissions() {
1718 use std::os::unix::fs::PermissionsExt;
1719
1720 let dir = tempfile::tempdir().unwrap();
1721 let keys_dir = dir.path().join("keys");
1722
1723 let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1724
1725 let key_path = keys_dir.join("runtime.key");
1726 let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1727 assert_eq!(
1728 mode & 0o777,
1729 0o600,
1730 "key file should have 0o600 permissions, got {mode:#o}"
1731 );
1732 }
1733
1734 #[test]
1735 fn restart_tracker_initial_state() {
1736 let tracker = RestartTracker::new();
1737 assert!(!tracker.exhausted());
1738 assert!(!tracker.should_restart());
1740 }
1741
1742 #[test]
1743 fn restart_tracker_allows_restart_after_backoff() {
1744 let mut tracker = RestartTracker::new();
1745 tracker.last_attempt = std::time::Instant::now()
1747 - RestartTracker::INITIAL_BACKOFF
1748 - std::time::Duration::from_millis(1);
1749 assert!(tracker.should_restart());
1750 }
1751
1752 #[test]
1753 fn restart_tracker_doubles_backoff() {
1754 let mut tracker = RestartTracker::new();
1755 assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1756
1757 tracker.record_attempt();
1758 assert_eq!(
1759 tracker.backoff,
1760 RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1761 );
1762 assert_eq!(tracker.attempts, 1);
1763
1764 tracker.record_attempt();
1765 assert_eq!(
1766 tracker.backoff,
1767 RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1768 );
1769 assert_eq!(tracker.attempts, 2);
1770 }
1771
1772 #[test]
1773 fn restart_tracker_backoff_caps_at_max() {
1774 let mut tracker = RestartTracker::new();
1775 for _ in 0..20 {
1776 tracker.record_attempt();
1777 }
1778 assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1779 }
1780
1781 #[test]
1782 fn restart_tracker_exhausted_at_max_attempts() {
1783 let mut tracker = RestartTracker::new();
1784 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1785 assert!(!tracker.exhausted());
1786 tracker.record_attempt();
1787 }
1788 assert!(tracker.exhausted());
1789 }
1790
1791 #[test]
1792 fn restart_tracker_should_restart_false_when_exhausted() {
1793 let mut tracker = RestartTracker::new();
1794 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1795 tracker.record_attempt();
1796 }
1797 tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1799 assert!(!tracker.should_restart());
1800 }
1801
1802 fn scratch_home() -> (tempfile::TempDir, astrid_core::dirs::AstridHome) {
1805 let dir = tempfile::tempdir().unwrap();
1806 let home = astrid_core::dirs::AstridHome::from_path(dir.path());
1807 (dir, home)
1808 }
1809
1810 #[test]
1811 fn seed_admin_writes_fresh_profile_when_missing() {
1812 let (_d, home) = scratch_home();
1813 let default = astrid_core::PrincipalId::default();
1814 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1815 assert!(!path.exists());
1816
1817 seed_default_principal_admin_profile(&home).unwrap();
1818
1819 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1820 assert_eq!(profile.groups, vec!["admin".to_string()]);
1821 assert!(profile.grants.is_empty());
1822 assert!(profile.revokes.is_empty());
1823 }
1824
1825 #[test]
1826 fn seed_admin_is_idempotent_across_reboots() {
1827 let (_d, home) = scratch_home();
1828 let default = astrid_core::PrincipalId::default();
1829
1830 seed_default_principal_admin_profile(&home).unwrap();
1831 seed_default_principal_admin_profile(&home).unwrap();
1832 seed_default_principal_admin_profile(&home).unwrap();
1833
1834 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1835 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1836 assert_eq!(profile.groups, vec!["admin".to_string()]);
1838 }
1839
1840 #[test]
1841 fn seed_admin_leaves_operator_configured_groups_intact() {
1842 let (_d, home) = scratch_home();
1843 let default = astrid_core::PrincipalId::default();
1844
1845 let mut existing = astrid_core::PrincipalProfile::default();
1847 existing.groups = vec!["agent".to_string()];
1848 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1849 std::fs::create_dir_all(home.profiles_dir()).unwrap();
1850 existing.save_to_path(&path).unwrap();
1851
1852 seed_default_principal_admin_profile(&home).unwrap();
1853
1854 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1855 assert_eq!(profile.groups, vec!["agent".to_string()]);
1856 }
1857
1858 #[test]
1859 fn seed_admin_leaves_operator_configured_grants_intact() {
1860 let (_d, home) = scratch_home();
1861 let default = astrid_core::PrincipalId::default();
1862
1863 let mut existing = astrid_core::PrincipalProfile::default();
1864 existing.grants = vec!["system:status".to_string()];
1865 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1866 std::fs::create_dir_all(home.profiles_dir()).unwrap();
1867 existing.save_to_path(&path).unwrap();
1868
1869 seed_default_principal_admin_profile(&home).unwrap();
1870
1871 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1872 assert!(profile.groups.is_empty());
1874 assert_eq!(profile.grants, vec!["system:status".to_string()]);
1875 }
1876
1877 #[test]
1878 fn seed_admin_leaves_operator_configured_revokes_intact() {
1879 let (_d, home) = scratch_home();
1880 let default = astrid_core::PrincipalId::default();
1881
1882 let mut existing = astrid_core::PrincipalProfile::default();
1883 existing.revokes = vec!["system:shutdown".to_string()];
1884 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1885 std::fs::create_dir_all(home.profiles_dir()).unwrap();
1886 existing.save_to_path(&path).unwrap();
1887
1888 seed_default_principal_admin_profile(&home).unwrap();
1889
1890 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1891 assert!(profile.groups.is_empty());
1892 assert_eq!(profile.revokes, vec!["system:shutdown".to_string()]);
1893 }
1894
1895 #[test]
1898 fn migrate_legacy_profile_relocates_to_etc() {
1899 let (_d, home) = scratch_home();
1903 let default = astrid_core::PrincipalId::default();
1904 let legacy_path = home
1905 .principal_home(&default)
1906 .config_dir()
1907 .join("profile.toml");
1908 std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1909 let mut existing = astrid_core::PrincipalProfile::default();
1910 existing.groups = vec!["operator-configured".to_string()];
1911 existing.save_to_path(&legacy_path).unwrap();
1912
1913 seed_default_principal_admin_profile(&home).unwrap();
1914
1915 assert!(!legacy_path.exists());
1917 let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1918 let migrated = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1919 assert_eq!(migrated.groups, vec!["operator-configured".to_string()]);
1920 }
1921
1922 #[test]
1923 fn migrate_legacy_profile_drops_stale_legacy_when_new_already_exists() {
1924 let (_d, home) = scratch_home();
1929 let default = astrid_core::PrincipalId::default();
1930
1931 let legacy_path = home
1933 .principal_home(&default)
1934 .config_dir()
1935 .join("profile.toml");
1936 std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1937 let mut stale = astrid_core::PrincipalProfile::default();
1938 stale.groups = vec!["stale".to_string()];
1939 stale.save_to_path(&legacy_path).unwrap();
1940
1941 let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1943 std::fs::create_dir_all(new_path.parent().unwrap()).unwrap();
1944 let mut canonical = astrid_core::PrincipalProfile::default();
1945 canonical.groups = vec!["canonical".to_string()];
1946 canonical.save_to_path(&new_path).unwrap();
1947
1948 seed_default_principal_admin_profile(&home).unwrap();
1949
1950 assert!(!legacy_path.exists());
1952 let result = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1953 assert_eq!(result.groups, vec!["canonical".to_string()]);
1954 }
1955}
1956
1957fn validate_imports_exports(
1966 manifests: &[(
1967 astrid_capsule::manifest::CapsuleManifest,
1968 std::path::PathBuf,
1969 )],
1970) {
1971 let mut exports_by_interface: std::collections::HashMap<
1973 (&str, &str),
1974 Vec<(&str, &semver::Version)>,
1975 > = std::collections::HashMap::new();
1976
1977 for (m, _) in manifests {
1978 for (ns, name, ver) in m.export_triples() {
1979 exports_by_interface
1980 .entry((ns, name))
1981 .or_default()
1982 .push((&m.package.name, ver));
1983 }
1984 }
1985
1986 for ((ns, name), providers) in &exports_by_interface {
1989 if providers.len() > 1 {
1990 let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
1991 tracing::warn!(
1992 interface = %format!("{ns}/{name}"),
1993 providers = ?names,
1994 "Multiple capsules export the same interface — events may be double-processed. \
1995 Consider removing one with `astrid capsule remove`."
1996 );
1997 }
1998 }
1999
2000 let mut satisfied_count: u32 = 0;
2001 let mut warning_count: u32 = 0;
2002
2003 for (manifest, _) in manifests {
2004 for (ns, name, req, optional) in manifest.import_tuples() {
2005 let has_provider = exports_by_interface
2006 .get(&(ns, name))
2007 .is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
2008
2009 if has_provider {
2010 satisfied_count = satisfied_count.saturating_add(1);
2011 } else if optional {
2012 tracing::info!(
2013 capsule = %manifest.package.name,
2014 import = %format!("{ns}/{name} {req}"),
2015 "Optional import not satisfied — capsule will boot with reduced functionality"
2016 );
2017 warning_count = warning_count.saturating_add(1);
2018 } else {
2019 tracing::error!(
2020 capsule = %manifest.package.name,
2021 import = %format!("{ns}/{name} {req}"),
2022 "Required import not satisfied — no loaded capsule exports this interface"
2023 );
2024 warning_count = warning_count.saturating_add(1);
2025 }
2026 }
2027 }
2028
2029 tracing::info!(
2030 capsules = manifests.len(),
2031 imports_satisfied = satisfied_count,
2032 warnings = warning_count,
2033 "Boot validation complete"
2034 );
2035}
2036
2037async fn bootstrap_cli_root_user(
2056 store: &Arc<dyn astrid_storage::IdentityStore>,
2057 home: &astrid_core::dirs::AstridHome,
2058) -> Result<(), astrid_storage::IdentityError> {
2059 if let Err(e) = seed_default_principal_admin_profile(home) {
2063 tracing::warn!(error = %e, "Failed to seed default admin profile — continuing boot");
2064 }
2065
2066 if let Some(_user) = store.resolve("cli", "local").await? {
2068 tracing::debug!("CLI root user already linked");
2069 return Ok(());
2070 }
2071
2072 let user = store.create_user(Some("root")).await?;
2074 tracing::info!(user_id = %user.id, "Created CLI root user");
2075
2076 store.link("cli", "local", user.id, "system").await?;
2078 tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
2079
2080 Ok(())
2081}
2082
2083fn migrate_legacy_profile_path(
2095 home: &astrid_core::dirs::AstridHome,
2096 principal: &astrid_core::PrincipalId,
2097) -> Result<(), std::io::Error> {
2098 let legacy_path = home
2099 .principal_home(principal)
2100 .config_dir()
2101 .join("profile.toml");
2102 let new_path = home.profile_path(principal);
2103 if !legacy_path.exists() {
2104 return Ok(());
2105 }
2106 if new_path.exists() {
2107 let _ = std::fs::remove_file(&legacy_path);
2111 return Ok(());
2112 }
2113 if let Some(parent) = new_path.parent() {
2114 std::fs::create_dir_all(parent)?;
2115 }
2116 std::fs::rename(&legacy_path, &new_path)?;
2117 tracing::warn!(
2118 %principal,
2119 legacy = %legacy_path.display(),
2120 new = %new_path.display(),
2121 "Migrated profile.toml out of principal home directory \
2122 (security: capsules with home:// fs_read could read the legacy file)"
2123 );
2124 Ok(())
2125}
2126
2127fn seed_default_principal_admin_profile(
2143 home: &astrid_core::dirs::AstridHome,
2144) -> Result<(), astrid_core::ProfileError> {
2145 use astrid_core::PrincipalProfile;
2146
2147 let default_principal = astrid_core::PrincipalId::default();
2148
2149 if let Err(e) = migrate_legacy_profile_path(home, &default_principal) {
2153 tracing::warn!(error = %e, "Failed to migrate legacy profile path — continuing");
2154 }
2155
2156 let path = PrincipalProfile::path_for(home, &default_principal);
2157 let profile = PrincipalProfile::load_from_path(&path)?;
2158
2159 if !profile.groups.is_empty() || !profile.grants.is_empty() || !profile.revokes.is_empty() {
2160 tracing::debug!(
2161 principal = %default_principal,
2162 "Default principal profile already has group/grant/revoke entries — leaving intact"
2163 );
2164 return Ok(());
2165 }
2166
2167 let mut updated = profile;
2168 updated
2169 .groups
2170 .push(astrid_core::groups::BUILTIN_ADMIN.to_string());
2171 updated.save_to_path(&path)?;
2172 tracing::info!(
2173 principal = %default_principal,
2174 "Seeded default principal with built-in `admin` group"
2175 );
2176 Ok(())
2177}
2178
2179async fn apply_identity_config(
2185 store: &Arc<dyn astrid_storage::IdentityStore>,
2186 workspace_root: &std::path::Path,
2187) {
2188 let config = match astrid_config::Config::load(Some(workspace_root)) {
2189 Ok(resolved) => resolved.config,
2190 Err(e) => {
2191 tracing::debug!(error = %e, "No config loaded for identity links");
2192 return;
2193 },
2194 };
2195
2196 for link_cfg in &config.identity.links {
2197 let result = apply_single_identity_link(store, link_cfg).await;
2198 if let Err(e) = result {
2199 tracing::warn!(
2200 platform = %link_cfg.platform,
2201 platform_user_id = %link_cfg.platform_user_id,
2202 astrid_user = %link_cfg.astrid_user,
2203 error = %e,
2204 "Failed to apply identity link from config"
2205 );
2206 }
2207 }
2208}
2209
2210async fn apply_single_identity_link(
2212 store: &Arc<dyn astrid_storage::IdentityStore>,
2213 link_cfg: &astrid_config::types::IdentityLinkConfig,
2214) -> Result<(), astrid_storage::IdentityError> {
2215 let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
2217 if store.get_user(uuid).await?.is_none() {
2221 return Err(astrid_storage::IdentityError::UserNotFound(uuid));
2222 }
2223 uuid
2224 } else {
2225 if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
2227 user.id
2228 } else {
2229 let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
2230 tracing::info!(
2231 user_id = %user.id,
2232 name = %link_cfg.astrid_user,
2233 "Created user from config identity link"
2234 );
2235 user.id
2236 }
2237 };
2238
2239 let method = if link_cfg.method.is_empty() {
2240 "admin"
2241 } else {
2242 &link_cfg.method
2243 };
2244
2245 if let Some(existing) = store
2247 .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
2248 .await?
2249 && existing.id == user_id
2250 {
2251 tracing::debug!(
2252 platform = %link_cfg.platform,
2253 platform_user_id = %link_cfg.platform_user_id,
2254 user_id = %user_id,
2255 "Identity link from config already exists"
2256 );
2257 return Ok(());
2258 }
2259
2260 store
2261 .link(
2262 &link_cfg.platform,
2263 &link_cfg.platform_user_id,
2264 user_id,
2265 method,
2266 )
2267 .await?;
2268
2269 tracing::info!(
2270 platform = %link_cfg.platform,
2271 platform_user_id = %link_cfg.platform_user_id,
2272 user_id = %user_id,
2273 "Applied identity link from config"
2274 );
2275
2276 Ok(())
2277}