1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7pub mod kernel_router;
16pub mod socket;
18
19use arc_swap::ArcSwap;
20use astrid_audit::AuditLog;
21use astrid_capabilities::{CapabilityStore, DirHandle};
22use astrid_capsule::profile_cache::PrincipalProfileCache;
23use astrid_capsule::registry::CapsuleRegistry;
24use astrid_core::SessionId;
25use astrid_core::groups::GroupConfig;
26use astrid_core::principal::PrincipalId;
27use astrid_crypto::KeyPair;
28use astrid_events::EventBus;
29use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
30use astrid_vfs::{HostVfs, OverlayVfsRegistry, Vfs};
31use dashmap::DashMap;
32use std::path::{Path, PathBuf};
33use std::sync::Arc;
34use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
35use tokio::sync::{Mutex, RwLock};
36
37pub struct Kernel {
39 pub session_id: SessionId,
41 pub event_bus: Arc<EventBus>,
43 pub capsules: Arc<RwLock<CapsuleRegistry>>,
45 pub mcp: SecureMcpClient,
47 pub capabilities: Arc<CapabilityStore>,
49 pub vfs: Arc<dyn Vfs>,
56 pub overlay_registry: Arc<OverlayVfsRegistry>,
64 pub vfs_root_handle: DirHandle,
66 pub workspace_root: PathBuf,
68 pub home_root: Option<PathBuf>,
76 pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
78 pub kv: Arc<astrid_storage::SurrealKvStore>,
80 pub audit_log: Arc<AuditLog>,
82 active_connections: DashMap<PrincipalId, AtomicUsize>,
89 pub ephemeral: AtomicBool,
91 pub boot_time: std::time::Instant,
93 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
96 pub session_token: Arc<astrid_core::session_token::SessionToken>,
99 token_path: PathBuf,
102 pub allowance_store: Arc<astrid_approval::AllowanceStore>,
107 identity_store: Arc<dyn astrid_storage::IdentityStore>,
109 pub(crate) profile_cache: Arc<PrincipalProfileCache>,
119 pub(crate) groups: Arc<ArcSwap<GroupConfig>>,
131 pub(crate) astrid_home: astrid_core::dirs::AstridHome,
135 pub(crate) admin_write_lock: Mutex<()>,
144}
145
146impl Kernel {
147 #[expect(
158 clippy::too_many_lines,
159 reason = "boot sequence: sequential setup that does not benefit from splitting"
160 )]
161 pub async fn new(
162 session_id: SessionId,
163 workspace_root: PathBuf,
164 ) -> Result<Arc<Self>, std::io::Error> {
165 use astrid_core::dirs::AstridHome;
166
167 assert!(
168 tokio::runtime::Handle::current().runtime_flavor()
169 == tokio::runtime::RuntimeFlavor::MultiThread,
170 "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
171 single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
172 );
173
174 let event_bus = Arc::new(EventBus::new());
175 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
176
177 let home = AstridHome::resolve().map_err(|e| {
180 std::io::Error::other(format!(
181 "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
182 ))
183 })?;
184
185 let default_principal = astrid_core::PrincipalId::default();
189 let principal_home = home.principal_home(&default_principal);
190 let home_root = Some(principal_home.root().to_path_buf());
191
192 let kv_path = home.state_db_path();
194 let kv = Arc::new(
195 astrid_storage::SurrealKvStore::open(&kv_path)
196 .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
197 );
198 let mcp_config = ServersConfig::load_default().unwrap_or_default();
204 let mcp_manager = ServerManager::new(mcp_config)
205 .with_workspace_root(workspace_root.clone())
206 .with_capsule_log_dir(principal_home.log_dir());
207 let mcp_client = McpClient::new(mcp_manager);
208
209 let capabilities = Arc::new(
212 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
213 .map_err(|e| {
214 std::io::Error::other(format!("Failed to init capability store: {e}"))
215 })?,
216 );
217 let audit_log = open_audit_log()?;
218 let mcp = SecureMcpClient::new(
219 mcp_client,
220 Arc::clone(&capabilities),
221 Arc::clone(&audit_log),
222 session_id.clone(),
223 );
224
225 let root_handle = DirHandle::new();
227
228 let kernel_host_vfs = HostVfs::new();
234 kernel_host_vfs
235 .register_dir(root_handle.clone(), workspace_root.clone())
236 .await
237 .map_err(|_| std::io::Error::other("Failed to register kernel workspace vfs"))?;
238 let overlay_registry = Arc::new(OverlayVfsRegistry::new(
239 workspace_root.clone(),
240 root_handle.clone(),
241 ));
242
243 let listener = socket::bind_session_socket()?;
248 let (session_token, token_path) = socket::generate_session_token()?;
249
250 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
251 let identity_kv = astrid_storage::ScopedKvStore::new(
253 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
254 "system:identity",
255 )
256 .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
257 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
258 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
259
260 let groups_loaded = GroupConfig::load(&home)
265 .map_err(|e| std::io::Error::other(format!("Failed to load groups config: {e}")))?;
266 let groups = Arc::new(ArcSwap::from_pointee(groups_loaded));
267
268 bootstrap_cli_root_user(&identity_store, &home)
272 .await
273 .map_err(|e| {
274 std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
275 })?;
276
277 apply_identity_config(&identity_store, &workspace_root).await;
279
280 let kernel = Arc::new(Self {
281 session_id,
282 event_bus,
283 capsules,
284 mcp,
285 capabilities,
286 vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
287 overlay_registry,
288 vfs_root_handle: root_handle,
289 workspace_root,
290 home_root,
291 cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
292 kv,
293 audit_log,
294 active_connections: DashMap::new(),
295 ephemeral: AtomicBool::new(false),
296 boot_time: std::time::Instant::now(),
297 shutdown_tx: tokio::sync::watch::channel(false).0,
298 session_token: Arc::new(session_token),
299 token_path,
300 allowance_store,
301 identity_store,
302 profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
303 groups,
304 astrid_home: home,
305 admin_write_lock: Mutex::new(()),
306 });
307
308 drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
309 drop(spawn_idle_monitor(Arc::clone(&kernel)));
310 drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
311 drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
312
313 let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
316 Arc::clone(&kernel.capsules),
317 Arc::clone(&kernel.event_bus),
318 )
319 .with_identity_store(Arc::clone(&kernel.identity_store));
320 tokio::spawn(dispatcher.run());
321
322 debug_assert_eq!(
323 kernel.event_bus.subscriber_count(),
324 INTERNAL_SUBSCRIBER_COUNT,
325 "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
326 );
327
328 Ok(kernel)
329 }
330
331 async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
337 let manifest_path = dir.join("Capsule.toml");
338 let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
339 .map_err(|e| anyhow::anyhow!(e))?;
340
341 {
344 let registry = self.capsules.read().await;
345 let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
346 if registry.get(&id).is_some() {
347 return Ok(());
348 }
349 }
350
351 let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
352 let mut capsule = loader.create_capsule(manifest, dir.clone())?;
353
354 let principal = astrid_core::PrincipalId::default();
357 let kv = astrid_storage::ScopedKvStore::new(
358 Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
359 format!("{principal}:capsule:{}", capsule.id()),
360 )?;
361
362 let capsule_name = capsule.id().to_string();
365 let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
366 let ph = home.principal_home(&principal);
367 let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
368 if principal_env.exists() {
369 principal_env
370 } else {
371 dir.join(".env.json")
372 }
373 } else {
374 dir.join(".env.json")
375 };
376 if env_path.exists()
377 && let Ok(contents) = std::fs::read_to_string(&env_path)
378 && let Ok(env_map) =
379 serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
380 {
381 for (k, v) in env_map {
382 let _ = kv.set(&k, v.into_bytes()).await;
383 }
384 }
385
386 let ctx = astrid_capsule::context::CapsuleContext::new(
387 principal.clone(),
388 self.workspace_root.clone(),
389 self.home_root.clone(),
390 kv,
391 Arc::clone(&self.event_bus),
392 self.cli_socket_listener.clone(),
393 )
394 .with_registry(Arc::clone(&self.capsules))
395 .with_session_token(Arc::clone(&self.session_token))
396 .with_allowance_store(Arc::clone(&self.allowance_store))
397 .with_identity_store(Arc::clone(&self.identity_store))
398 .with_profile_cache(Arc::clone(&self.profile_cache))
399 .with_overlay_registry(Arc::clone(&self.overlay_registry));
400
401 capsule.load(&ctx).await?;
402
403 let mut registry = self.capsules.write().await;
404 registry
405 .register(capsule)
406 .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
407
408 Ok(())
409 }
410
411 async fn restart_capsule(
418 &self,
419 id: &astrid_capsule::capsule::CapsuleId,
420 ) -> Result<(), anyhow::Error> {
421 let source_dir = {
423 let registry = self.capsules.read().await;
424 let capsule = registry
425 .get(id)
426 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
427 capsule
428 .source_dir()
429 .map(std::path::Path::to_path_buf)
430 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
431 };
432
433 let old_capsule = {
437 let mut registry = self.capsules.write().await;
438 registry
439 .unregister(id)
440 .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
441 };
442 {
447 let mut old = old_capsule;
448 if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
449 if let Err(e) = capsule.unload().await {
450 tracing::warn!(
451 capsule_id = %id,
452 error = %e,
453 "Capsule unload failed during restart"
454 );
455 }
456 } else {
457 tracing::warn!(
458 capsule_id = %id,
459 "Cannot call unload during restart - Arc still held by in-flight task"
460 );
461 }
462 }
463
464 self.load_capsule(source_dir).await?;
466
467 let capsule = {
476 let registry = self.capsules.read().await;
477 registry.get(id)
478 };
479 if let Some(capsule) = capsule
480 && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[], None)
481 {
482 tracing::debug!(
483 capsule_id = %id,
484 error = %e,
485 "Capsule does not handle lifecycle restart (optional)"
486 );
487 }
488
489 Ok(())
490 }
491
492 pub async fn load_all_capsules(&self) {
501 use astrid_capsule::toposort::toposort_manifests;
502 use astrid_core::dirs::AstridHome;
503
504 let mut paths = Vec::new();
506 if let Ok(home) = AstridHome::resolve() {
507 let principal = astrid_core::PrincipalId::default();
508 paths.push(home.principal_home(&principal).capsules_dir());
509 }
510
511 let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
512
513 let sorted = match toposort_manifests(discovered) {
517 Ok(sorted) => sorted,
518 Err((e, original)) => {
519 tracing::error!(
520 cycle = %e,
521 "Dependency cycle in capsules, falling back to discovery order"
522 );
523 original
524 },
525 };
526
527 for (manifest, _) in &sorted {
531 if manifest.capabilities.uplink && manifest.has_imports() {
532 tracing::warn!(
533 capsule = %manifest.package.name,
534 "Uplink capsule has [imports] - \
535 this should have been rejected at manifest load time"
536 );
537 }
538 }
539
540 validate_imports_exports(&sorted);
542
543 let (uplinks, others): (Vec<_>, Vec<_>) =
550 sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
551
552 let uplink_names: Vec<String> = uplinks
554 .iter()
555 .map(|(m, _)| m.package.name.clone())
556 .collect();
557 for (manifest, dir) in &uplinks {
558 if let Err(e) = self.load_capsule(dir.clone()).await {
559 tracing::warn!(
560 capsule = %manifest.package.name,
561 error = %e,
562 "Failed to load uplink capsule during discovery"
563 );
564 }
565 }
566
567 self.await_capsule_readiness(&uplink_names).await;
570
571 for (manifest, dir) in &others {
572 if let Err(e) = self.load_capsule(dir.clone()).await {
573 tracing::warn!(
574 capsule = %manifest.package.name,
575 error = %e,
576 "Failed to load capsule during discovery"
577 );
578 }
579 }
580
581 let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
584 self.await_capsule_readiness(&other_names).await;
585
586 let msg = astrid_events::ipc::IpcMessage::new(
590 "astrid.v1.capsules_loaded",
591 astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
592 self.session_id.0,
593 );
594 let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
595 metadata: astrid_events::EventMetadata::new("kernel"),
596 message: msg,
597 });
598 }
599
600 pub fn connection_opened(&self, principal: &PrincipalId) {
602 self.active_connections
603 .entry(principal.clone())
604 .or_insert_with(|| AtomicUsize::new(0))
605 .fetch_add(1, Ordering::Relaxed);
606 }
607
608 pub fn connection_closed(&self, principal: &PrincipalId) {
620 let entry = self
630 .active_connections
631 .entry(principal.clone())
632 .or_insert_with(|| AtomicUsize::new(0));
633 let result = entry.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
634 if n == 0 {
635 None
636 } else {
637 Some(n.saturating_sub(1))
638 }
639 });
640
641 if result == Ok(1) {
642 self.allowance_store.clear_session_allowances(principal);
643 if let Err(e) = self.capabilities.clear_session_for(principal) {
644 tracing::warn!(%principal, error = %e, "failed to clear capability session");
645 }
646 tracing::info!(
647 %principal,
648 "last connection for principal disconnected, session state cleared"
649 );
650 }
651 drop(entry);
654
655 if result == Ok(1) {
656 self.active_connections
657 .remove_if(principal, |_, count| count.load(Ordering::Relaxed) == 0);
658 }
659 }
660
661 pub fn set_ephemeral(&self, val: bool) {
663 self.ephemeral.store(val, Ordering::Relaxed);
664 }
665
666 pub fn total_connection_count(&self) -> usize {
671 self.active_connections
672 .iter()
673 .map(|e| e.value().load(Ordering::Relaxed))
674 .sum()
675 }
676
677 pub fn connections_by_principal(&self) -> Vec<(PrincipalId, usize)> {
690 self.active_connections
691 .iter()
692 .filter_map(|e| {
693 let count = e.value().load(Ordering::Relaxed);
694 if count == 0 {
695 None
696 } else {
697 Some((e.key().clone(), count))
698 }
699 })
700 .collect()
701 }
702
703 pub async fn shutdown(&self, reason: Option<String>) {
710 tracing::info!(reason = ?reason, "Kernel shutting down");
711
712 let _ = self
714 .event_bus
715 .publish(astrid_events::AstridEvent::KernelShutdown {
716 metadata: astrid_events::EventMetadata::new("kernel"),
717 reason: reason.clone(),
718 });
719
720 self.allowance_store.clear_all_session_allowances();
726 if let Err(e) = self.capabilities.clear_session() {
727 tracing::warn!(error = %e, "failed to clear capability session on shutdown");
728 }
729
730 let capsules = {
740 let mut reg = self.capsules.write().await;
741 reg.drain()
742 };
743 for mut arc in capsules {
744 let id = arc.id().clone();
745 let mut unloaded = false;
746
747 for retry in 0..20_u32 {
748 if let Some(capsule) = Arc::get_mut(&mut arc) {
749 if let Err(e) = capsule.unload().await {
750 tracing::warn!(
751 capsule_id = %id,
752 error = %e,
753 "Failed to unload capsule during shutdown"
754 );
755 }
756 unloaded = true;
757 break;
758 }
759 if retry < 19 {
760 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
761 }
762 }
763
764 if !unloaded {
765 tracing::warn!(
766 capsule_id = %id,
767 strong_count = Arc::strong_count(&arc),
768 "Dropping capsule without explicit unload after retries exhausted; \
769 MCP child processes may be orphaned"
770 );
771 }
772 drop(arc);
773 }
774
775 if let Err(e) = self.kv.close().await {
777 tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
778 }
779
780 let socket_path = crate::socket::kernel_socket_path();
787 let _ = std::fs::remove_file(&socket_path);
788 let _ = std::fs::remove_file(&self.token_path);
789 crate::socket::remove_readiness_file();
790
791 tracing::info!("Kernel shutdown complete");
792 }
793
794 async fn await_capsule_readiness(&self, names: &[String]) {
800 use astrid_capsule::capsule::ReadyStatus;
801
802 if names.is_empty() {
803 return;
804 }
805
806 let timeout = std::time::Duration::from_millis(500);
807 let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
808 let registry = self.capsules.read().await;
809 names
810 .iter()
811 .filter_map(
812 |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
813 Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
814 Err(e) => {
815 tracing::warn!(
816 capsule = %name,
817 error = %e,
818 "Invalid capsule ID, skipping readiness wait"
819 );
820 None
821 },
822 },
823 )
824 .collect()
825 };
826
827 let mut set = tokio::task::JoinSet::new();
830 for (name, capsule) in capsules {
831 set.spawn(async move {
832 let status = capsule.wait_ready(timeout).await;
833 (name, status)
834 });
835 }
836 while let Some(result) = set.join_next().await {
837 if let Ok((name, status)) = result {
838 match status {
839 ReadyStatus::Ready => {},
840 ReadyStatus::Timeout => {
841 tracing::warn!(
842 capsule = %name,
843 timeout_ms = timeout.as_millis(),
844 "Capsule did not signal ready within timeout"
845 );
846 },
847 ReadyStatus::Crashed => {
848 tracing::error!(
849 capsule = %name,
850 "Capsule run loop exited before signaling ready"
851 );
852 },
853 }
854 }
855 }
856 }
857}
858
859#[cfg(test)]
871pub(crate) async fn test_kernel_with_home(home: astrid_core::dirs::AstridHome) -> Arc<Kernel> {
872 use astrid_capsule::profile_cache::PrincipalProfileCache;
873
874 home.ensure()
875 .expect("test kernel: ensure astrid home dir tree");
876
877 let session_id = SessionId::SYSTEM;
878 let event_bus = Arc::new(EventBus::new());
879 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
880
881 let kv = Arc::new(
883 astrid_storage::SurrealKvStore::open(&home.state_db_path()).expect("test kernel: open kv"),
884 );
885 let capabilities = Arc::new(
886 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
887 .expect("test kernel: capability store"),
888 );
889
890 let runtime_key =
893 load_or_generate_runtime_key(&home.keys_dir()).expect("test kernel: runtime key");
894 let default_principal = astrid_core::PrincipalId::default();
895 let principal_home = home.principal_home(&default_principal);
896 principal_home
897 .ensure()
898 .expect("test kernel: ensure principal home");
899 let audit_log = Arc::new(
900 AuditLog::open(principal_home.audit_dir(), runtime_key)
901 .expect("test kernel: open audit log"),
902 );
903
904 let mcp_manager = ServerManager::new(ServersConfig::default());
907 let mcp_client = McpClient::new(mcp_manager);
908 let mcp = SecureMcpClient::new(
909 mcp_client,
910 Arc::clone(&capabilities),
911 Arc::clone(&audit_log),
912 session_id.clone(),
913 );
914
915 let root_handle = DirHandle::new();
916 let kernel_host_vfs = HostVfs::new();
917 kernel_host_vfs
918 .register_dir(root_handle.clone(), home.root().to_path_buf())
919 .await
920 .expect("test kernel: register workspace vfs");
921 let overlay_registry = Arc::new(OverlayVfsRegistry::new(
922 home.root().to_path_buf(),
923 root_handle.clone(),
924 ));
925
926 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
927 let identity_kv = astrid_storage::ScopedKvStore::new(
928 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
929 "system:identity",
930 )
931 .expect("test kernel: identity kv scope");
932 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
933 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
934
935 let groups = Arc::new(ArcSwap::from_pointee(
936 GroupConfig::load(&home).expect("test kernel: load groups"),
937 ));
938
939 let kernel = Arc::new(Kernel {
940 session_id,
941 event_bus,
942 capsules,
943 mcp,
944 capabilities,
945 vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
946 overlay_registry,
947 vfs_root_handle: root_handle,
948 workspace_root: home.root().to_path_buf(),
949 home_root: Some(principal_home.root().to_path_buf()),
950 cli_socket_listener: None,
951 kv,
952 audit_log,
953 active_connections: DashMap::new(),
954 ephemeral: AtomicBool::new(false),
955 boot_time: std::time::Instant::now(),
956 shutdown_tx: tokio::sync::watch::channel(false).0,
957 session_token: Arc::new(astrid_core::session_token::SessionToken::generate()),
958 token_path: home.token_path(),
959 allowance_store,
960 identity_store,
961 profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
962 groups,
963 astrid_home: home,
964 admin_write_lock: Mutex::new(()),
965 });
966 drop(kernel_router::admin::spawn_admin_router(Arc::clone(
971 &kernel,
972 )));
973 kernel
974}
975
976fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
982 use astrid_core::dirs::AstridHome;
983
984 let home = AstridHome::resolve()
985 .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
986 home.ensure()
987 .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
988
989 let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
990 let default_principal = astrid_core::PrincipalId::default();
991 let principal_home = home.principal_home(&default_principal);
992 principal_home
993 .ensure()
994 .map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
995 let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
996 .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
997
998 match audit_log.verify_all() {
1000 Ok(results) => {
1001 let total_sessions = results.len();
1002 let mut tampered_sessions: usize = 0;
1003
1004 for (session_id, result) in &results {
1005 if !result.valid {
1006 tampered_sessions = tampered_sessions.saturating_add(1);
1007 for issue in &result.issues {
1008 tracing::error!(
1009 session_id = %session_id,
1010 issue = %issue,
1011 "Audit chain integrity violation detected"
1012 );
1013 }
1014 }
1015 }
1016
1017 if tampered_sessions > 0 {
1018 tracing::error!(
1019 total_sessions,
1020 tampered_sessions,
1021 "Audit chain verification found tampered sessions"
1022 );
1023 } else if total_sessions > 0 {
1024 tracing::info!(
1025 total_sessions,
1026 "Audit chain verification passed for all sessions"
1027 );
1028 }
1029 },
1030 Err(e) => {
1031 tracing::error!(error = %e, "Audit chain verification failed to run");
1032 },
1033 }
1034
1035 Ok(Arc::new(audit_log))
1036}
1037
1038fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
1042 let key_path = keys_dir.join("runtime.key");
1043
1044 if key_path.exists() {
1045 let bytes = std::fs::read(&key_path)?;
1046 KeyPair::from_secret_key(&bytes).map_err(|e| {
1047 std::io::Error::other(format!(
1048 "invalid runtime key at {}: {e}",
1049 key_path.display()
1050 ))
1051 })
1052 } else {
1053 let keypair = KeyPair::generate();
1054 std::fs::create_dir_all(keys_dir)?;
1055 std::fs::write(&key_path, keypair.secret_key_bytes())?;
1056
1057 #[cfg(unix)]
1059 {
1060 use std::os::unix::fs::PermissionsExt;
1061 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
1062 }
1063
1064 tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
1065 Ok(keypair)
1066 }
1067}
1068
1069const INTERNAL_SUBSCRIBER_COUNT: usize = 4;
1090
1091const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
1093const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
1095const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
1097const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
1099fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1100 tokio::spawn(async move {
1101 tokio::time::sleep(IDLE_INITIAL_GRACE).await;
1104
1105 let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
1107 let idle_timeout = if ephemeral {
1108 std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1116 .ok()
1117 .and_then(|v| v.parse().ok())
1118 .map_or(
1119 std::time::Duration::from_secs(30),
1120 std::time::Duration::from_secs,
1121 )
1122 } else {
1123 let Some(secs) = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
1130 .ok()
1131 .and_then(|v| v.parse().ok())
1132 else {
1133 tracing::debug!(
1134 "Non-ephemeral daemon: idle shutdown disabled \
1135 (set ASTRID_IDLE_TIMEOUT_SECS to enable)."
1136 );
1137 return;
1138 };
1139 std::time::Duration::from_secs(secs)
1140 };
1141 let check_interval = if ephemeral {
1142 IDLE_EPHEMERAL_CHECK_INTERVAL
1143 } else {
1144 IDLE_CHECK_INTERVAL
1145 };
1146
1147 if !ephemeral {
1149 tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
1150 }
1151 let mut idle_since: Option<tokio::time::Instant> = None;
1152
1153 loop {
1154 tokio::time::sleep(check_interval).await;
1155
1156 let connections = kernel.total_connection_count();
1157
1158 let effective_connections = connections;
1164
1165 let has_daemons = {
1166 let reg = kernel.capsules.read().await;
1167 reg.values().any(|c| {
1168 let m = c.manifest();
1169 !m.uplinks.is_empty()
1170 })
1171 };
1172
1173 if effective_connections == 0 && !has_daemons {
1174 let now = tokio::time::Instant::now();
1175 let start = *idle_since.get_or_insert(now);
1176 let elapsed = now.duration_since(start);
1177
1178 tracing::debug!(
1179 idle_secs = elapsed.as_secs(),
1180 timeout_secs = idle_timeout.as_secs(),
1181 connections,
1182 "Kernel idle, monitoring timeout"
1183 );
1184
1185 if elapsed >= idle_timeout {
1186 tracing::info!("Idle timeout reached, initiating shutdown");
1187 kernel.shutdown(Some("idle_timeout".to_string())).await;
1188 std::process::exit(0);
1189 }
1190 } else {
1191 if idle_since.is_some() {
1192 tracing::debug!(
1193 effective_connections,
1194 has_daemons,
1195 "Activity detected, resetting idle timer"
1196 );
1197 }
1198 idle_since = None;
1199 }
1200 }
1201 })
1202}
1203
1204struct RestartTracker {
1206 attempts: u32,
1207 last_attempt: std::time::Instant,
1208 backoff: std::time::Duration,
1209}
1210
1211impl RestartTracker {
1212 const MAX_ATTEMPTS: u32 = 5;
1213 const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
1214 const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
1215
1216 fn new() -> Self {
1217 Self {
1218 attempts: 0,
1219 last_attempt: std::time::Instant::now(),
1220 backoff: Self::INITIAL_BACKOFF,
1221 }
1222 }
1223
1224 fn should_restart(&self) -> bool {
1226 self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
1227 }
1228
1229 fn record_attempt(&mut self) {
1231 self.attempts = self.attempts.saturating_add(1);
1232 self.last_attempt = std::time::Instant::now();
1233 self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
1234 }
1235
1236 fn exhausted(&self) -> bool {
1238 self.attempts >= Self::MAX_ATTEMPTS
1239 }
1240}
1241
1242async fn attempt_capsule_restart(
1246 kernel: &Kernel,
1247 id_str: &str,
1248 tracker: &mut RestartTracker,
1249) -> bool {
1250 if tracker.exhausted() {
1251 return false;
1252 }
1253
1254 if !tracker.should_restart() {
1255 tracing::debug!(
1256 capsule_id = %id_str,
1257 next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
1258 "Waiting for backoff before next restart attempt"
1259 );
1260 return false;
1261 }
1262
1263 tracker.record_attempt();
1264 let attempt = tracker.attempts;
1265
1266 tracing::warn!(
1267 capsule_id = %id_str,
1268 attempt,
1269 max_attempts = RestartTracker::MAX_ATTEMPTS,
1270 "Attempting capsule restart"
1271 );
1272
1273 let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1274 match kernel.restart_capsule(&capsule_id).await {
1275 Ok(()) => {
1276 tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1277 true
1278 },
1279 Err(e) => {
1280 tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1281 if tracker.exhausted() {
1282 tracing::error!(
1283 capsule_id = %id_str,
1284 "All restart attempts exhausted - capsule will remain down"
1285 );
1286 }
1287 false
1288 },
1289 }
1290}
1291
1292fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1299 tokio::spawn(async move {
1300 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1301 interval.tick().await; let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1304 std::collections::HashMap::new();
1305
1306 loop {
1307 interval.tick().await;
1308
1309 let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1312 let registry = kernel.capsules.read().await;
1313 registry
1314 .list()
1315 .into_iter()
1316 .filter_map(|id| {
1317 let capsule = registry.get(id)?;
1318 if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1319 Some(capsule)
1320 } else {
1321 None
1322 }
1323 })
1324 .collect()
1325 };
1326
1327 let mut failures: Vec<(String, String)> = Vec::new();
1331 for capsule in &ready_capsules {
1332 let health = capsule.check_health();
1333 if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1334 let id_str = capsule.id().to_string();
1335 tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1336
1337 let msg = astrid_events::ipc::IpcMessage::new(
1338 "astrid.v1.health.failed",
1339 astrid_events::ipc::IpcPayload::Custom {
1340 data: serde_json::json!({
1341 "capsule_id": &id_str,
1342 "reason": &reason,
1343 }),
1344 },
1345 uuid::Uuid::new_v4(),
1346 );
1347 let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1348 metadata: astrid_events::EventMetadata::new("kernel"),
1349 message: msg,
1350 });
1351 failures.push((id_str, reason));
1352 }
1353 }
1354
1355 drop(ready_capsules);
1358
1359 let failed_this_tick: std::collections::HashSet<&str> =
1360 failures.iter().map(|(id, _)| id.as_str()).collect();
1361
1362 let mut restarted = Vec::new();
1363 for (id_str, _reason) in &failures {
1364 let tracker = restart_trackers
1365 .entry(id_str.clone())
1366 .or_insert_with(RestartTracker::new);
1367
1368 if attempt_capsule_restart(&kernel, id_str, tracker).await {
1369 restarted.push(id_str.clone());
1370 }
1371 }
1372
1373 for id in &restarted {
1375 restart_trackers.remove(id);
1376 }
1377
1378 restart_trackers.retain(|id, tracker| {
1383 if tracker.exhausted() {
1384 return true;
1385 }
1386 if tracker.last_attempt.elapsed() < tracker.backoff {
1389 return true;
1390 }
1391 failed_this_tick.contains(id.as_str())
1392 });
1393 }
1394 })
1395}
1396
1397fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1403 tokio::spawn(async move {
1404 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1405 interval.tick().await;
1407
1408 loop {
1409 interval.tick().await;
1410
1411 let msg = astrid_events::ipc::IpcMessage::new(
1412 "astrid.v1.watchdog.tick",
1413 astrid_events::ipc::IpcPayload::Custom {
1414 data: serde_json::json!({}),
1415 },
1416 uuid::Uuid::new_v4(),
1417 );
1418 let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1419 metadata: astrid_events::EventMetadata::new("kernel"),
1420 message: msg,
1421 });
1422 }
1423 })
1424}
1425
1426#[cfg(test)]
1427mod tests {
1428 use super::*;
1429
1430 #[test]
1431 fn test_load_or_generate_creates_new_key() {
1432 let dir = tempfile::tempdir().unwrap();
1433 let keys_dir = dir.path().join("keys");
1434
1435 let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1436 let key_path = keys_dir.join("runtime.key");
1437
1438 assert!(key_path.exists());
1440 let bytes = std::fs::read(&key_path).unwrap();
1441 assert_eq!(bytes.len(), 32);
1442
1443 let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1445 assert_eq!(
1446 keypair.public_key_bytes(),
1447 reloaded.public_key_bytes(),
1448 "reloaded key should match generated key"
1449 );
1450 }
1451
1452 #[test]
1453 fn test_load_or_generate_is_idempotent() {
1454 let dir = tempfile::tempdir().unwrap();
1455 let keys_dir = dir.path().join("keys");
1456
1457 let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1458 let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1459
1460 assert_eq!(
1461 first.public_key_bytes(),
1462 second.public_key_bytes(),
1463 "loading the same key file should produce the same keypair"
1464 );
1465 }
1466
1467 #[test]
1468 fn test_load_or_generate_rejects_bad_key_length() {
1469 let dir = tempfile::tempdir().unwrap();
1470 let keys_dir = dir.path().join("keys");
1471 std::fs::create_dir_all(&keys_dir).unwrap();
1472
1473 std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1475
1476 let result = load_or_generate_runtime_key(&keys_dir);
1477 assert!(result.is_err());
1478 let err = result.unwrap_err().to_string();
1479 assert!(
1480 err.contains("invalid runtime key"),
1481 "expected 'invalid runtime key' error, got: {err}"
1482 );
1483 }
1484
1485 #[test]
1486 fn test_connection_counter_increment_decrement() {
1487 let counter = AtomicUsize::new(0);
1488
1489 counter.fetch_add(1, Ordering::Relaxed);
1491 counter.fetch_add(1, Ordering::Relaxed);
1492 assert_eq!(counter.load(Ordering::Relaxed), 2);
1493
1494 for expected in [1, 0] {
1497 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1498 if n == 0 {
1499 None
1500 } else {
1501 Some(n.saturating_sub(1))
1502 }
1503 });
1504 assert_eq!(counter.load(Ordering::Relaxed), expected);
1505 }
1506 }
1507
1508 #[test]
1509 fn test_connection_counter_underflow_guard() {
1510 let counter = AtomicUsize::new(0);
1513
1514 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1515 if n == 0 { None } else { Some(n - 1) }
1516 });
1517 assert!(result.is_err());
1519 assert_eq!(counter.load(Ordering::Relaxed), 0);
1520 }
1521
1522 #[test]
1527 fn test_last_disconnect_clears_session_allowances_scoped() {
1528 use astrid_approval::AllowanceStore;
1529 use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1530 use astrid_core::principal::PrincipalId;
1531 use astrid_core::types::Timestamp;
1532 use astrid_crypto::KeyPair;
1533
1534 let store = AllowanceStore::new();
1535 let keypair = KeyPair::generate();
1536 let alice = PrincipalId::new("alice").unwrap();
1537 let bob = PrincipalId::new("bob").unwrap();
1538
1539 store
1541 .add_allowance(Allowance {
1542 id: AllowanceId::new(),
1543 principal: alice.clone(),
1544 action_pattern: AllowancePattern::ServerTools {
1545 server: "alice-session".to_string(),
1546 },
1547 created_at: Timestamp::now(),
1548 expires_at: None,
1549 max_uses: None,
1550 uses_remaining: None,
1551 session_only: true,
1552 workspace_root: None,
1553 signature: keypair.sign(b"test"),
1554 })
1555 .unwrap();
1556 store
1557 .add_allowance(Allowance {
1558 id: AllowanceId::new(),
1559 principal: alice.clone(),
1560 action_pattern: AllowancePattern::ServerTools {
1561 server: "alice-persistent".to_string(),
1562 },
1563 created_at: Timestamp::now(),
1564 expires_at: None,
1565 max_uses: None,
1566 uses_remaining: None,
1567 session_only: false,
1568 workspace_root: None,
1569 signature: keypair.sign(b"test"),
1570 })
1571 .unwrap();
1572 store
1574 .add_allowance(Allowance {
1575 id: AllowanceId::new(),
1576 principal: bob.clone(),
1577 action_pattern: AllowancePattern::ServerTools {
1578 server: "bob-session".to_string(),
1579 },
1580 created_at: Timestamp::now(),
1581 expires_at: None,
1582 max_uses: None,
1583 uses_remaining: None,
1584 session_only: true,
1585 workspace_root: None,
1586 signature: keypair.sign(b"test"),
1587 })
1588 .unwrap();
1589 assert_eq!(store.count(), 3);
1590
1591 let alice_counter = AtomicUsize::new(1);
1592 let simulate_alice_disconnect = || {
1593 let result = alice_counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1594 if n == 0 {
1595 None
1596 } else {
1597 Some(n.saturating_sub(1))
1598 }
1599 });
1600 if result == Ok(1) {
1601 store.clear_session_allowances(&alice);
1602 }
1603 };
1604
1605 simulate_alice_disconnect();
1606 assert_eq!(store.count(), 2);
1608 assert_eq!(store.count_for(&alice), 1);
1609 assert_eq!(store.count_for(&bob), 1);
1610 }
1611
1612 #[cfg(unix)]
1613 #[test]
1614 fn test_load_or_generate_sets_secure_permissions() {
1615 use std::os::unix::fs::PermissionsExt;
1616
1617 let dir = tempfile::tempdir().unwrap();
1618 let keys_dir = dir.path().join("keys");
1619
1620 let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1621
1622 let key_path = keys_dir.join("runtime.key");
1623 let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1624 assert_eq!(
1625 mode & 0o777,
1626 0o600,
1627 "key file should have 0o600 permissions, got {mode:#o}"
1628 );
1629 }
1630
1631 #[test]
1632 fn restart_tracker_initial_state() {
1633 let tracker = RestartTracker::new();
1634 assert!(!tracker.exhausted());
1635 assert!(!tracker.should_restart());
1637 }
1638
1639 #[test]
1640 fn restart_tracker_allows_restart_after_backoff() {
1641 let mut tracker = RestartTracker::new();
1642 tracker.last_attempt = std::time::Instant::now()
1644 - RestartTracker::INITIAL_BACKOFF
1645 - std::time::Duration::from_millis(1);
1646 assert!(tracker.should_restart());
1647 }
1648
1649 #[test]
1650 fn restart_tracker_doubles_backoff() {
1651 let mut tracker = RestartTracker::new();
1652 assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1653
1654 tracker.record_attempt();
1655 assert_eq!(
1656 tracker.backoff,
1657 RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1658 );
1659 assert_eq!(tracker.attempts, 1);
1660
1661 tracker.record_attempt();
1662 assert_eq!(
1663 tracker.backoff,
1664 RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1665 );
1666 assert_eq!(tracker.attempts, 2);
1667 }
1668
1669 #[test]
1670 fn restart_tracker_backoff_caps_at_max() {
1671 let mut tracker = RestartTracker::new();
1672 for _ in 0..20 {
1673 tracker.record_attempt();
1674 }
1675 assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1676 }
1677
1678 #[test]
1679 fn restart_tracker_exhausted_at_max_attempts() {
1680 let mut tracker = RestartTracker::new();
1681 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1682 assert!(!tracker.exhausted());
1683 tracker.record_attempt();
1684 }
1685 assert!(tracker.exhausted());
1686 }
1687
1688 #[test]
1689 fn restart_tracker_should_restart_false_when_exhausted() {
1690 let mut tracker = RestartTracker::new();
1691 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1692 tracker.record_attempt();
1693 }
1694 tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1696 assert!(!tracker.should_restart());
1697 }
1698
1699 fn scratch_home() -> (tempfile::TempDir, astrid_core::dirs::AstridHome) {
1702 let dir = tempfile::tempdir().unwrap();
1703 let home = astrid_core::dirs::AstridHome::from_path(dir.path());
1704 (dir, home)
1705 }
1706
1707 #[test]
1708 fn seed_admin_writes_fresh_profile_when_missing() {
1709 let (_d, home) = scratch_home();
1710 let default = astrid_core::PrincipalId::default();
1711 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1712 assert!(!path.exists());
1713
1714 seed_default_principal_admin_profile(&home).unwrap();
1715
1716 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1717 assert_eq!(profile.groups, vec!["admin".to_string()]);
1718 assert!(profile.grants.is_empty());
1719 assert!(profile.revokes.is_empty());
1720 }
1721
1722 #[test]
1723 fn seed_admin_is_idempotent_across_reboots() {
1724 let (_d, home) = scratch_home();
1725 let default = astrid_core::PrincipalId::default();
1726
1727 seed_default_principal_admin_profile(&home).unwrap();
1728 seed_default_principal_admin_profile(&home).unwrap();
1729 seed_default_principal_admin_profile(&home).unwrap();
1730
1731 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1732 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1733 assert_eq!(profile.groups, vec!["admin".to_string()]);
1735 }
1736
1737 #[test]
1738 fn seed_admin_leaves_operator_configured_groups_intact() {
1739 let (_d, home) = scratch_home();
1740 let default = astrid_core::PrincipalId::default();
1741
1742 let mut existing = astrid_core::PrincipalProfile::default();
1744 existing.groups = vec!["agent".to_string()];
1745 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1746 std::fs::create_dir_all(home.profiles_dir()).unwrap();
1747 existing.save_to_path(&path).unwrap();
1748
1749 seed_default_principal_admin_profile(&home).unwrap();
1750
1751 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1752 assert_eq!(profile.groups, vec!["agent".to_string()]);
1753 }
1754
1755 #[test]
1756 fn seed_admin_leaves_operator_configured_grants_intact() {
1757 let (_d, home) = scratch_home();
1758 let default = astrid_core::PrincipalId::default();
1759
1760 let mut existing = astrid_core::PrincipalProfile::default();
1761 existing.grants = vec!["system:status".to_string()];
1762 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1763 std::fs::create_dir_all(home.profiles_dir()).unwrap();
1764 existing.save_to_path(&path).unwrap();
1765
1766 seed_default_principal_admin_profile(&home).unwrap();
1767
1768 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1769 assert!(profile.groups.is_empty());
1771 assert_eq!(profile.grants, vec!["system:status".to_string()]);
1772 }
1773
1774 #[test]
1775 fn seed_admin_leaves_operator_configured_revokes_intact() {
1776 let (_d, home) = scratch_home();
1777 let default = astrid_core::PrincipalId::default();
1778
1779 let mut existing = astrid_core::PrincipalProfile::default();
1780 existing.revokes = vec!["system:shutdown".to_string()];
1781 let path = astrid_core::PrincipalProfile::path_for(&home, &default);
1782 std::fs::create_dir_all(home.profiles_dir()).unwrap();
1783 existing.save_to_path(&path).unwrap();
1784
1785 seed_default_principal_admin_profile(&home).unwrap();
1786
1787 let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
1788 assert!(profile.groups.is_empty());
1789 assert_eq!(profile.revokes, vec!["system:shutdown".to_string()]);
1790 }
1791
1792 #[test]
1795 fn migrate_legacy_profile_relocates_to_etc() {
1796 let (_d, home) = scratch_home();
1800 let default = astrid_core::PrincipalId::default();
1801 let legacy_path = home
1802 .principal_home(&default)
1803 .config_dir()
1804 .join("profile.toml");
1805 std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1806 let mut existing = astrid_core::PrincipalProfile::default();
1807 existing.groups = vec!["operator-configured".to_string()];
1808 existing.save_to_path(&legacy_path).unwrap();
1809
1810 seed_default_principal_admin_profile(&home).unwrap();
1811
1812 assert!(!legacy_path.exists());
1814 let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1815 let migrated = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1816 assert_eq!(migrated.groups, vec!["operator-configured".to_string()]);
1817 }
1818
1819 #[test]
1820 fn migrate_legacy_profile_drops_stale_legacy_when_new_already_exists() {
1821 let (_d, home) = scratch_home();
1826 let default = astrid_core::PrincipalId::default();
1827
1828 let legacy_path = home
1830 .principal_home(&default)
1831 .config_dir()
1832 .join("profile.toml");
1833 std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
1834 let mut stale = astrid_core::PrincipalProfile::default();
1835 stale.groups = vec!["stale".to_string()];
1836 stale.save_to_path(&legacy_path).unwrap();
1837
1838 let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
1840 std::fs::create_dir_all(new_path.parent().unwrap()).unwrap();
1841 let mut canonical = astrid_core::PrincipalProfile::default();
1842 canonical.groups = vec!["canonical".to_string()];
1843 canonical.save_to_path(&new_path).unwrap();
1844
1845 seed_default_principal_admin_profile(&home).unwrap();
1846
1847 assert!(!legacy_path.exists());
1849 let result = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
1850 assert_eq!(result.groups, vec!["canonical".to_string()]);
1851 }
1852}
1853
1854fn validate_imports_exports(
1863 manifests: &[(
1864 astrid_capsule::manifest::CapsuleManifest,
1865 std::path::PathBuf,
1866 )],
1867) {
1868 let mut exports_by_interface: std::collections::HashMap<
1870 (&str, &str),
1871 Vec<(&str, &semver::Version)>,
1872 > = std::collections::HashMap::new();
1873
1874 for (m, _) in manifests {
1875 for (ns, name, ver) in m.export_triples() {
1876 exports_by_interface
1877 .entry((ns, name))
1878 .or_default()
1879 .push((&m.package.name, ver));
1880 }
1881 }
1882
1883 for ((ns, name), providers) in &exports_by_interface {
1886 if providers.len() > 1 {
1887 let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
1888 tracing::warn!(
1889 interface = %format!("{ns}/{name}"),
1890 providers = ?names,
1891 "Multiple capsules export the same interface — events may be double-processed. \
1892 Consider removing one with `astrid capsule remove`."
1893 );
1894 }
1895 }
1896
1897 let mut satisfied_count: u32 = 0;
1898 let mut warning_count: u32 = 0;
1899
1900 for (manifest, _) in manifests {
1901 for (ns, name, req, optional) in manifest.import_tuples() {
1902 let has_provider = exports_by_interface
1903 .get(&(ns, name))
1904 .is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
1905
1906 if has_provider {
1907 satisfied_count = satisfied_count.saturating_add(1);
1908 } else if optional {
1909 tracing::info!(
1910 capsule = %manifest.package.name,
1911 import = %format!("{ns}/{name} {req}"),
1912 "Optional import not satisfied — capsule will boot with reduced functionality"
1913 );
1914 warning_count = warning_count.saturating_add(1);
1915 } else {
1916 tracing::error!(
1917 capsule = %manifest.package.name,
1918 import = %format!("{ns}/{name} {req}"),
1919 "Required import not satisfied — no loaded capsule exports this interface"
1920 );
1921 warning_count = warning_count.saturating_add(1);
1922 }
1923 }
1924 }
1925
1926 tracing::info!(
1927 capsules = manifests.len(),
1928 imports_satisfied = satisfied_count,
1929 warnings = warning_count,
1930 "Boot validation complete"
1931 );
1932}
1933
1934async fn bootstrap_cli_root_user(
1953 store: &Arc<dyn astrid_storage::IdentityStore>,
1954 home: &astrid_core::dirs::AstridHome,
1955) -> Result<(), astrid_storage::IdentityError> {
1956 if let Err(e) = seed_default_principal_admin_profile(home) {
1960 tracing::warn!(error = %e, "Failed to seed default admin profile — continuing boot");
1961 }
1962
1963 if let Some(_user) = store.resolve("cli", "local").await? {
1965 tracing::debug!("CLI root user already linked");
1966 return Ok(());
1967 }
1968
1969 let user = store.create_user(Some("root")).await?;
1971 tracing::info!(user_id = %user.id, "Created CLI root user");
1972
1973 store.link("cli", "local", user.id, "system").await?;
1975 tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1976
1977 Ok(())
1978}
1979
1980fn migrate_legacy_profile_path(
1992 home: &astrid_core::dirs::AstridHome,
1993 principal: &astrid_core::PrincipalId,
1994) -> Result<(), std::io::Error> {
1995 let legacy_path = home
1996 .principal_home(principal)
1997 .config_dir()
1998 .join("profile.toml");
1999 let new_path = home.profile_path(principal);
2000 if !legacy_path.exists() {
2001 return Ok(());
2002 }
2003 if new_path.exists() {
2004 let _ = std::fs::remove_file(&legacy_path);
2008 return Ok(());
2009 }
2010 if let Some(parent) = new_path.parent() {
2011 std::fs::create_dir_all(parent)?;
2012 }
2013 std::fs::rename(&legacy_path, &new_path)?;
2014 tracing::warn!(
2015 %principal,
2016 legacy = %legacy_path.display(),
2017 new = %new_path.display(),
2018 "Migrated profile.toml out of principal home directory \
2019 (security: capsules with home:// fs_read could read the legacy file)"
2020 );
2021 Ok(())
2022}
2023
2024fn seed_default_principal_admin_profile(
2040 home: &astrid_core::dirs::AstridHome,
2041) -> Result<(), astrid_core::ProfileError> {
2042 use astrid_core::PrincipalProfile;
2043
2044 let default_principal = astrid_core::PrincipalId::default();
2045
2046 if let Err(e) = migrate_legacy_profile_path(home, &default_principal) {
2050 tracing::warn!(error = %e, "Failed to migrate legacy profile path — continuing");
2051 }
2052
2053 let path = PrincipalProfile::path_for(home, &default_principal);
2054 let profile = PrincipalProfile::load_from_path(&path)?;
2055
2056 if !profile.groups.is_empty() || !profile.grants.is_empty() || !profile.revokes.is_empty() {
2057 tracing::debug!(
2058 principal = %default_principal,
2059 "Default principal profile already has group/grant/revoke entries — leaving intact"
2060 );
2061 return Ok(());
2062 }
2063
2064 let mut updated = profile;
2065 updated
2066 .groups
2067 .push(astrid_core::groups::BUILTIN_ADMIN.to_string());
2068 updated.save_to_path(&path)?;
2069 tracing::info!(
2070 principal = %default_principal,
2071 "Seeded default principal with built-in `admin` group"
2072 );
2073 Ok(())
2074}
2075
2076async fn apply_identity_config(
2082 store: &Arc<dyn astrid_storage::IdentityStore>,
2083 workspace_root: &std::path::Path,
2084) {
2085 let config = match astrid_config::Config::load(Some(workspace_root)) {
2086 Ok(resolved) => resolved.config,
2087 Err(e) => {
2088 tracing::debug!(error = %e, "No config loaded for identity links");
2089 return;
2090 },
2091 };
2092
2093 for link_cfg in &config.identity.links {
2094 let result = apply_single_identity_link(store, link_cfg).await;
2095 if let Err(e) = result {
2096 tracing::warn!(
2097 platform = %link_cfg.platform,
2098 platform_user_id = %link_cfg.platform_user_id,
2099 astrid_user = %link_cfg.astrid_user,
2100 error = %e,
2101 "Failed to apply identity link from config"
2102 );
2103 }
2104 }
2105}
2106
2107async fn apply_single_identity_link(
2109 store: &Arc<dyn astrid_storage::IdentityStore>,
2110 link_cfg: &astrid_config::types::IdentityLinkConfig,
2111) -> Result<(), astrid_storage::IdentityError> {
2112 let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
2114 if store.get_user(uuid).await?.is_none() {
2118 return Err(astrid_storage::IdentityError::UserNotFound(uuid));
2119 }
2120 uuid
2121 } else {
2122 if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
2124 user.id
2125 } else {
2126 let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
2127 tracing::info!(
2128 user_id = %user.id,
2129 name = %link_cfg.astrid_user,
2130 "Created user from config identity link"
2131 );
2132 user.id
2133 }
2134 };
2135
2136 let method = if link_cfg.method.is_empty() {
2137 "admin"
2138 } else {
2139 &link_cfg.method
2140 };
2141
2142 if let Some(existing) = store
2144 .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
2145 .await?
2146 && existing.id == user_id
2147 {
2148 tracing::debug!(
2149 platform = %link_cfg.platform,
2150 platform_user_id = %link_cfg.platform_user_id,
2151 user_id = %user_id,
2152 "Identity link from config already exists"
2153 );
2154 return Ok(());
2155 }
2156
2157 store
2158 .link(
2159 &link_cfg.platform,
2160 &link_cfg.platform_user_id,
2161 user_id,
2162 method,
2163 )
2164 .await?;
2165
2166 tracing::info!(
2167 platform = %link_cfg.platform,
2168 platform_user_id = %link_cfg.platform_user_id,
2169 user_id = %user_id,
2170 "Applied identity link from config"
2171 );
2172
2173 Ok(())
2174}