1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7pub mod kernel_router;
16pub mod socket;
18
19use astrid_audit::AuditLog;
20use astrid_capabilities::{CapabilityStore, DirHandle};
21use astrid_capsule::registry::CapsuleRegistry;
22use astrid_core::SessionId;
23use astrid_crypto::KeyPair;
24use astrid_events::EventBus;
25use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
26use astrid_vfs::{HostVfs, OverlayVfs, Vfs};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use tokio::sync::RwLock;
31
32pub struct Kernel {
34 pub session_id: SessionId,
36 pub event_bus: Arc<EventBus>,
38 pub capsules: Arc<RwLock<CapsuleRegistry>>,
40 pub mcp: SecureMcpClient,
42 pub capabilities: Arc<CapabilityStore>,
44 pub vfs: Arc<dyn Vfs>,
46 pub overlay_vfs: Arc<OverlayVfs>,
48 _upper_dir: Arc<tempfile::TempDir>,
51 pub vfs_root_handle: DirHandle,
53 pub workspace_root: PathBuf,
55 pub global_root: Option<PathBuf>,
64 pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
66 pub kv: Arc<astrid_storage::SurrealKvStore>,
68 pub audit_log: Arc<AuditLog>,
70 pub active_connections: AtomicUsize,
72 pub session_token: Arc<astrid_core::session_token::SessionToken>,
75 token_path: PathBuf,
78 pub allowance_store: Arc<astrid_approval::AllowanceStore>,
83 identity_store: Arc<dyn astrid_storage::IdentityStore>,
85}
86
87impl Kernel {
88 pub async fn new(
99 session_id: SessionId,
100 workspace_root: PathBuf,
101 ) -> Result<Arc<Self>, std::io::Error> {
102 use astrid_core::dirs::AstridHome;
103
104 assert!(
105 tokio::runtime::Handle::current().runtime_flavor()
106 == tokio::runtime::RuntimeFlavor::MultiThread,
107 "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
108 single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
109 );
110
111 let event_bus = Arc::new(EventBus::new());
112 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
113
114 let home = AstridHome::resolve().map_err(|e| {
117 std::io::Error::other(format!(
118 "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
119 ))
120 })?;
121
122 let global_root = Some(home.shared_dir());
126
127 let kv_path = home.state_db_path();
129 let kv = Arc::new(
130 astrid_storage::SurrealKvStore::open(&kv_path)
131 .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
132 );
133 let mcp_config = ServersConfig::load_default().unwrap_or_default();
139 let mcp_manager =
140 ServerManager::new(mcp_config).with_workspace_root(workspace_root.clone());
141 let mcp_client = McpClient::new(mcp_manager);
142
143 let capabilities = Arc::new(
146 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
147 .map_err(|e| {
148 std::io::Error::other(format!("Failed to init capability store: {e}"))
149 })?,
150 );
151 let audit_log = open_audit_log()?;
152 let mcp = SecureMcpClient::new(
153 mcp_client,
154 Arc::clone(&capabilities),
155 Arc::clone(&audit_log),
156 session_id.clone(),
157 );
158
159 let root_handle = DirHandle::new();
161
162 let (overlay_vfs, upper_temp) = init_overlay_vfs(&root_handle, &workspace_root).await?;
164
165 let listener = socket::bind_session_socket()?;
170 let (session_token, token_path) = socket::generate_session_token()?;
171
172 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
173
174 let identity_kv = astrid_storage::ScopedKvStore::new(
176 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
177 "system:identity",
178 )
179 .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
180 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
181 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
182
183 bootstrap_cli_root_user(&identity_store)
185 .await
186 .map_err(|e| {
187 std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
188 })?;
189
190 apply_identity_config(&identity_store, &workspace_root).await;
192
193 let kernel = Arc::new(Self {
194 session_id,
195 event_bus,
196 capsules,
197 mcp,
198 capabilities,
199 vfs: Arc::clone(&overlay_vfs) as Arc<dyn Vfs>,
200 overlay_vfs,
201 _upper_dir: Arc::new(upper_temp),
202 vfs_root_handle: root_handle,
203 workspace_root,
204 global_root,
205 cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
206 kv,
207 audit_log,
208 active_connections: AtomicUsize::new(0),
209 session_token: Arc::new(session_token),
210 token_path,
211 allowance_store,
212 identity_store,
213 });
214
215 drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
216 drop(spawn_idle_monitor(Arc::clone(&kernel)));
217 drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
218 drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
219
220 let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
222 Arc::clone(&kernel.capsules),
223 Arc::clone(&kernel.event_bus),
224 );
225 tokio::spawn(dispatcher.run());
226
227 debug_assert_eq!(
228 kernel.event_bus.subscriber_count(),
229 INTERNAL_SUBSCRIBER_COUNT,
230 "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
231 );
232
233 Ok(kernel)
234 }
235
236 async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
242 let manifest_path = dir.join("Capsule.toml");
243 let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
244 .map_err(|e| anyhow::anyhow!(e))?;
245
246 let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
247 let mut capsule = loader.create_capsule(manifest, dir.clone())?;
248
249 let kv = astrid_storage::ScopedKvStore::new(
252 Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
253 format!("capsule:{}", capsule.id()),
254 )?;
255
256 let env_path = dir.join(".env.json");
258 if env_path.exists()
259 && let Ok(contents) = std::fs::read_to_string(&env_path)
260 && let Ok(env_map) =
261 serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
262 {
263 for (k, v) in env_map {
264 let _ = kv.set(&k, v.into_bytes()).await;
265 }
266 }
267
268 let ctx = astrid_capsule::context::CapsuleContext::new(
269 self.workspace_root.clone(),
270 self.global_root.clone(),
271 kv,
272 Arc::clone(&self.event_bus),
273 self.cli_socket_listener.clone(),
274 )
275 .with_registry(Arc::clone(&self.capsules))
276 .with_session_token(Arc::clone(&self.session_token))
277 .with_allowance_store(Arc::clone(&self.allowance_store))
278 .with_identity_store(Arc::clone(&self.identity_store));
279
280 capsule.load(&ctx).await?;
281
282 let mut registry = self.capsules.write().await;
283 registry
284 .register(capsule)
285 .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
286
287 Ok(())
288 }
289
290 async fn restart_capsule(
297 &self,
298 id: &astrid_capsule::capsule::CapsuleId,
299 ) -> Result<(), anyhow::Error> {
300 let source_dir = {
302 let registry = self.capsules.read().await;
303 let capsule = registry
304 .get(id)
305 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
306 capsule
307 .source_dir()
308 .map(std::path::Path::to_path_buf)
309 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
310 };
311
312 let old_capsule = {
316 let mut registry = self.capsules.write().await;
317 registry
318 .unregister(id)
319 .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
320 };
321 {
326 let mut old = old_capsule;
327 if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
328 if let Err(e) = capsule.unload().await {
329 tracing::warn!(
330 capsule_id = %id,
331 error = %e,
332 "Capsule unload failed during restart"
333 );
334 }
335 } else {
336 tracing::warn!(
337 capsule_id = %id,
338 "Cannot call unload during restart - Arc still held by in-flight task"
339 );
340 }
341 }
342
343 self.load_capsule(source_dir).await?;
345
346 let capsule = {
355 let registry = self.capsules.read().await;
356 registry.get(id)
357 };
358 if let Some(capsule) = capsule
359 && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[])
360 {
361 tracing::debug!(
362 capsule_id = %id,
363 error = %e,
364 "Capsule does not handle lifecycle restart (optional)"
365 );
366 }
367
368 Ok(())
369 }
370
371 pub async fn load_all_capsules(&self) {
380 use astrid_capsule::toposort::toposort_manifests;
381 use astrid_core::dirs::AstridHome;
382
383 let mut paths = Vec::new();
384 if let Ok(home) = AstridHome::resolve() {
385 paths.push(home.capsules_dir());
386 }
387
388 let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
389
390 let sorted = match toposort_manifests(discovered) {
394 Ok(sorted) => sorted,
395 Err((e, original)) => {
396 tracing::error!(
397 cycle = %e,
398 "Dependency cycle in capsules, falling back to discovery order"
399 );
400 original
401 },
402 };
403
404 for (manifest, _) in &sorted {
408 if manifest.capabilities.uplink && !manifest.dependencies.requires.is_empty() {
409 tracing::warn!(
410 capsule = %manifest.package.name,
411 requires = ?manifest.dependencies.requires,
412 "Uplink capsule has [dependencies].requires - \
413 this should have been rejected at manifest load time"
414 );
415 }
416 }
417
418 let (uplinks, others): (Vec<_>, Vec<_>) =
425 sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
426
427 let uplink_names: Vec<String> = uplinks
429 .iter()
430 .map(|(m, _)| m.package.name.clone())
431 .collect();
432 for (manifest, dir) in &uplinks {
433 if let Err(e) = self.load_capsule(dir.clone()).await {
434 tracing::warn!(
435 capsule = %manifest.package.name,
436 error = %e,
437 "Failed to load uplink capsule during discovery"
438 );
439 }
440 }
441
442 self.await_capsule_readiness(&uplink_names).await;
445
446 for (manifest, dir) in &others {
447 if let Err(e) = self.load_capsule(dir.clone()).await {
448 tracing::warn!(
449 capsule = %manifest.package.name,
450 error = %e,
451 "Failed to load capsule during discovery"
452 );
453 }
454 }
455
456 let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
459 self.await_capsule_readiness(&other_names).await;
460
461 self.inject_tool_schemas().await;
464
465 let msg = astrid_events::ipc::IpcMessage::new(
469 "astrid.v1.capsules_loaded",
470 astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
471 self.session_id.0,
472 );
473 let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
474 metadata: astrid_events::EventMetadata::new("kernel"),
475 message: msg,
476 });
477 }
478
479 pub fn connection_opened(&self) {
481 self.active_connections.fetch_add(1, Ordering::Relaxed);
482 }
483
484 pub fn connection_closed(&self) {
492 let result =
493 self.active_connections
494 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
495 if n == 0 {
496 None
497 } else {
498 Some(n.saturating_sub(1))
499 }
500 });
501
502 if result == Ok(1) {
505 self.allowance_store.clear_session_allowances();
506 tracing::info!("last client disconnected, session allowances cleared");
507 }
508 }
509
510 pub fn connection_count(&self) -> usize {
512 self.active_connections.load(Ordering::Relaxed)
513 }
514
515 pub async fn shutdown(&self, reason: Option<String>) {
522 tracing::info!(reason = ?reason, "Kernel shutting down");
523
524 let _ = self
526 .event_bus
527 .publish(astrid_events::AstridEvent::KernelShutdown {
528 metadata: astrid_events::EventMetadata::new("kernel"),
529 reason: reason.clone(),
530 });
531
532 let capsules = {
542 let mut reg = self.capsules.write().await;
543 reg.drain()
544 };
545 for mut arc in capsules {
546 let id = arc.id().clone();
547 let mut unloaded = false;
548
549 for retry in 0..20_u32 {
550 if let Some(capsule) = Arc::get_mut(&mut arc) {
551 if let Err(e) = capsule.unload().await {
552 tracing::warn!(
553 capsule_id = %id,
554 error = %e,
555 "Failed to unload capsule during shutdown"
556 );
557 }
558 unloaded = true;
559 break;
560 }
561 if retry < 19 {
562 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
563 }
564 }
565
566 if !unloaded {
567 tracing::warn!(
568 capsule_id = %id,
569 strong_count = Arc::strong_count(&arc),
570 "Dropping capsule without explicit unload after retries exhausted; \
571 MCP child processes may be orphaned"
572 );
573 }
574 drop(arc);
575 }
576
577 if let Err(e) = self.kv.close().await {
579 tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
580 }
581
582 let socket_path = crate::socket::kernel_socket_path();
589 let _ = std::fs::remove_file(&socket_path);
590 let _ = std::fs::remove_file(&self.token_path);
591 crate::socket::remove_readiness_file();
592
593 tracing::info!("Kernel shutdown complete");
594 }
595
596 async fn await_capsule_readiness(&self, names: &[String]) {
602 use astrid_capsule::capsule::ReadyStatus;
603
604 if names.is_empty() {
605 return;
606 }
607
608 let timeout = std::time::Duration::from_millis(500);
609 let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
610 let registry = self.capsules.read().await;
611 names
612 .iter()
613 .filter_map(
614 |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
615 Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
616 Err(e) => {
617 tracing::warn!(
618 capsule = %name,
619 error = %e,
620 "Invalid capsule ID, skipping readiness wait"
621 );
622 None
623 },
624 },
625 )
626 .collect()
627 };
628
629 let mut set = tokio::task::JoinSet::new();
632 for (name, capsule) in capsules {
633 set.spawn(async move {
634 let status = capsule.wait_ready(timeout).await;
635 (name, status)
636 });
637 }
638 while let Some(result) = set.join_next().await {
639 if let Ok((name, status)) = result {
640 match status {
641 ReadyStatus::Ready => {},
642 ReadyStatus::Timeout => {
643 tracing::warn!(
644 capsule = %name,
645 timeout_ms = timeout.as_millis(),
646 "Capsule did not signal ready within timeout"
647 );
648 },
649 ReadyStatus::Crashed => {
650 tracing::error!(
651 capsule = %name,
652 "Capsule run loop exited before signaling ready"
653 );
654 },
655 }
656 }
657 }
658 }
659
660 async fn inject_tool_schemas(&self) {
663 use astrid_events::llm::LlmToolDefinition;
664 use astrid_storage::KvStore;
665
666 let (all_tools, capsule_ids) = {
670 let registry = self.capsules.read().await;
671 let tools: Vec<LlmToolDefinition> = registry
672 .values()
673 .flat_map(|capsule| {
674 capsule.manifest().tools.iter().map(|t| LlmToolDefinition {
675 name: t.name.clone(),
676 description: Some(t.description.clone()),
677 input_schema: t.input_schema.clone(),
678 })
679 })
680 .collect();
681 let ids: Vec<String> = registry.list().iter().map(ToString::to_string).collect();
682 (tools, ids)
683 };
684
685 if all_tools.is_empty() {
686 return;
687 }
688
689 let tool_bytes = match serde_json::to_vec(&all_tools) {
690 Ok(b) => b,
691 Err(e) => {
692 tracing::error!(error = %e, "Failed to serialize tool schemas");
693 return;
694 },
695 };
696
697 tracing::info!(
698 tool_count = all_tools.len(),
699 "Injecting tool schemas into capsule KV stores"
700 );
701
702 for capsule_id in &capsule_ids {
703 let namespace = format!("capsule:{capsule_id}");
704 if let Err(e) = self
705 .kv
706 .set(&namespace, "tool_schemas", tool_bytes.clone())
707 .await
708 {
709 tracing::warn!(
710 capsule = %capsule_id,
711 error = %e,
712 "Failed to inject tool schemas"
713 );
714 }
715 }
716 }
717}
718
719async fn init_overlay_vfs(
727 root_handle: &DirHandle,
728 workspace_root: &Path,
729) -> Result<(Arc<OverlayVfs>, tempfile::TempDir), std::io::Error> {
730 let lower_vfs = HostVfs::new();
731 lower_vfs
732 .register_dir(root_handle.clone(), workspace_root.to_path_buf())
733 .await
734 .map_err(|_| std::io::Error::other("Failed to register lower vfs dir"))?;
735
736 let upper_temp = tempfile::TempDir::new()
737 .map_err(|e| std::io::Error::other(format!("Failed to create overlay temp dir: {e}")))?;
738 let upper_vfs = HostVfs::new();
739 upper_vfs
740 .register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
741 .await
742 .map_err(|_| std::io::Error::other("Failed to register upper vfs dir"))?;
743
744 let overlay = Arc::new(OverlayVfs::new(Box::new(lower_vfs), Box::new(upper_vfs)));
745 Ok((overlay, upper_temp))
746}
747
748fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
754 use astrid_core::dirs::AstridHome;
755
756 let home = AstridHome::resolve()
757 .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
758 home.ensure()
759 .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
760
761 let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
762 let audit_log = AuditLog::open(home.audit_db_path(), runtime_key)
763 .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
764
765 match audit_log.verify_all() {
767 Ok(results) => {
768 let total_sessions = results.len();
769 let mut tampered_sessions: usize = 0;
770
771 for (session_id, result) in &results {
772 if !result.valid {
773 tampered_sessions = tampered_sessions.saturating_add(1);
774 for issue in &result.issues {
775 tracing::error!(
776 session_id = %session_id,
777 issue = %issue,
778 "Audit chain integrity violation detected"
779 );
780 }
781 }
782 }
783
784 if tampered_sessions > 0 {
785 tracing::error!(
786 total_sessions,
787 tampered_sessions,
788 "Audit chain verification found tampered sessions"
789 );
790 } else if total_sessions > 0 {
791 tracing::info!(
792 total_sessions,
793 "Audit chain verification passed for all sessions"
794 );
795 }
796 },
797 Err(e) => {
798 tracing::error!(error = %e, "Audit chain verification failed to run");
799 },
800 }
801
802 Ok(Arc::new(audit_log))
803}
804
805fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
809 let key_path = keys_dir.join("runtime.key");
810
811 if key_path.exists() {
812 let bytes = std::fs::read(&key_path)?;
813 KeyPair::from_secret_key(&bytes).map_err(|e| {
814 std::io::Error::other(format!(
815 "invalid runtime key at {}: {e}",
816 key_path.display()
817 ))
818 })
819 } else {
820 let keypair = KeyPair::generate();
821 std::fs::create_dir_all(keys_dir)?;
822 std::fs::write(&key_path, keypair.secret_key_bytes())?;
823
824 #[cfg(unix)]
826 {
827 use std::os::unix::fs::PermissionsExt;
828 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
829 }
830
831 tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
832 Ok(keypair)
833 }
834}
835
836const INTERNAL_SUBSCRIBER_COUNT: usize = 3;
852
853fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
854 tokio::spawn(async move {
855 let grace = std::time::Duration::from_secs(30);
856 let timeout_secs: u64 = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
857 .ok()
858 .and_then(|v| v.parse().ok())
859 .unwrap_or(300);
860 let idle_timeout = std::time::Duration::from_secs(timeout_secs);
861 let check_interval = std::time::Duration::from_secs(15);
862
863 tokio::time::sleep(grace).await;
864 let mut idle_since: Option<tokio::time::Instant> = None;
865
866 loop {
867 tokio::time::sleep(check_interval).await;
868
869 let connections = kernel.connection_count();
870
871 let bus_subscribers = kernel
875 .event_bus
876 .subscriber_count()
877 .saturating_sub(INTERNAL_SUBSCRIBER_COUNT);
878
879 let effective_connections = connections.min(bus_subscribers);
882
883 let has_daemons = {
884 let reg = kernel.capsules.read().await;
885 reg.values().any(|c| {
886 let m = c.manifest();
887 !m.uplinks.is_empty() || !m.cron_jobs.is_empty()
888 })
889 };
890
891 if effective_connections == 0 && !has_daemons {
892 let now = tokio::time::Instant::now();
893 let start = *idle_since.get_or_insert(now);
894 let elapsed = now.duration_since(start);
895
896 tracing::debug!(
897 idle_secs = elapsed.as_secs(),
898 timeout_secs,
899 connections,
900 bus_subscribers,
901 "Kernel idle, monitoring timeout"
902 );
903
904 if elapsed >= idle_timeout {
905 tracing::info!("Idle timeout reached, initiating shutdown");
906 kernel.shutdown(Some("idle_timeout".to_string())).await;
907 std::process::exit(0);
908 }
909 } else {
910 if idle_since.is_some() {
911 tracing::debug!(
912 effective_connections,
913 has_daemons,
914 "Activity detected, resetting idle timer"
915 );
916 }
917 idle_since = None;
918 }
919 }
920 })
921}
922
923struct RestartTracker {
925 attempts: u32,
926 last_attempt: std::time::Instant,
927 backoff: std::time::Duration,
928}
929
930impl RestartTracker {
931 const MAX_ATTEMPTS: u32 = 5;
932 const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
933 const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
934
935 fn new() -> Self {
936 Self {
937 attempts: 0,
938 last_attempt: std::time::Instant::now(),
939 backoff: Self::INITIAL_BACKOFF,
940 }
941 }
942
943 fn should_restart(&self) -> bool {
945 self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
946 }
947
948 fn record_attempt(&mut self) {
950 self.attempts = self.attempts.saturating_add(1);
951 self.last_attempt = std::time::Instant::now();
952 self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
953 }
954
955 fn exhausted(&self) -> bool {
957 self.attempts >= Self::MAX_ATTEMPTS
958 }
959}
960
961async fn attempt_capsule_restart(
965 kernel: &Kernel,
966 id_str: &str,
967 tracker: &mut RestartTracker,
968) -> bool {
969 if tracker.exhausted() {
970 return false;
971 }
972
973 if !tracker.should_restart() {
974 tracing::debug!(
975 capsule_id = %id_str,
976 next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
977 "Waiting for backoff before next restart attempt"
978 );
979 return false;
980 }
981
982 tracker.record_attempt();
983 let attempt = tracker.attempts;
984
985 tracing::warn!(
986 capsule_id = %id_str,
987 attempt,
988 max_attempts = RestartTracker::MAX_ATTEMPTS,
989 "Attempting capsule restart"
990 );
991
992 let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
993 match kernel.restart_capsule(&capsule_id).await {
994 Ok(()) => {
995 tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
996 true
997 },
998 Err(e) => {
999 tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1000 if tracker.exhausted() {
1001 tracing::error!(
1002 capsule_id = %id_str,
1003 "All restart attempts exhausted - capsule will remain down"
1004 );
1005 }
1006 false
1007 },
1008 }
1009}
1010
1011fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1018 tokio::spawn(async move {
1019 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1020 interval.tick().await; let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1023 std::collections::HashMap::new();
1024
1025 loop {
1026 interval.tick().await;
1027
1028 let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1031 let registry = kernel.capsules.read().await;
1032 registry
1033 .list()
1034 .into_iter()
1035 .filter_map(|id| {
1036 let capsule = registry.get(id)?;
1037 if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1038 Some(capsule)
1039 } else {
1040 None
1041 }
1042 })
1043 .collect()
1044 };
1045
1046 let mut failures: Vec<(String, String)> = Vec::new();
1050 for capsule in &ready_capsules {
1051 let health = capsule.check_health();
1052 if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1053 let id_str = capsule.id().to_string();
1054 tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1055
1056 let msg = astrid_events::ipc::IpcMessage::new(
1057 "astrid.v1.health.failed",
1058 astrid_events::ipc::IpcPayload::Custom {
1059 data: serde_json::json!({
1060 "capsule_id": &id_str,
1061 "reason": &reason,
1062 }),
1063 },
1064 uuid::Uuid::new_v4(),
1065 );
1066 let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1067 metadata: astrid_events::EventMetadata::new("kernel"),
1068 message: msg,
1069 });
1070 failures.push((id_str, reason));
1071 }
1072 }
1073
1074 drop(ready_capsules);
1077
1078 let failed_this_tick: std::collections::HashSet<&str> =
1079 failures.iter().map(|(id, _)| id.as_str()).collect();
1080
1081 let mut restarted = Vec::new();
1082 for (id_str, _reason) in &failures {
1083 let tracker = restart_trackers
1084 .entry(id_str.clone())
1085 .or_insert_with(RestartTracker::new);
1086
1087 if attempt_capsule_restart(&kernel, id_str, tracker).await {
1088 restarted.push(id_str.clone());
1089 }
1090 }
1091
1092 for id in &restarted {
1094 restart_trackers.remove(id);
1095 }
1096
1097 restart_trackers.retain(|id, tracker| {
1102 if tracker.exhausted() {
1103 return true;
1104 }
1105 if tracker.last_attempt.elapsed() < tracker.backoff {
1108 return true;
1109 }
1110 failed_this_tick.contains(id.as_str())
1111 });
1112 }
1113 })
1114}
1115
1116fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1122 tokio::spawn(async move {
1123 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1124 interval.tick().await;
1126
1127 loop {
1128 interval.tick().await;
1129
1130 let msg = astrid_events::ipc::IpcMessage::new(
1131 "astrid.v1.watchdog.tick",
1132 astrid_events::ipc::IpcPayload::Custom {
1133 data: serde_json::json!({}),
1134 },
1135 uuid::Uuid::new_v4(),
1136 );
1137 let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1138 metadata: astrid_events::EventMetadata::new("kernel"),
1139 message: msg,
1140 });
1141 }
1142 })
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148
1149 #[test]
1150 fn test_load_or_generate_creates_new_key() {
1151 let dir = tempfile::tempdir().unwrap();
1152 let keys_dir = dir.path().join("keys");
1153
1154 let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1155 let key_path = keys_dir.join("runtime.key");
1156
1157 assert!(key_path.exists());
1159 let bytes = std::fs::read(&key_path).unwrap();
1160 assert_eq!(bytes.len(), 32);
1161
1162 let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1164 assert_eq!(
1165 keypair.public_key_bytes(),
1166 reloaded.public_key_bytes(),
1167 "reloaded key should match generated key"
1168 );
1169 }
1170
1171 #[test]
1172 fn test_load_or_generate_is_idempotent() {
1173 let dir = tempfile::tempdir().unwrap();
1174 let keys_dir = dir.path().join("keys");
1175
1176 let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1177 let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1178
1179 assert_eq!(
1180 first.public_key_bytes(),
1181 second.public_key_bytes(),
1182 "loading the same key file should produce the same keypair"
1183 );
1184 }
1185
1186 #[test]
1187 fn test_load_or_generate_rejects_bad_key_length() {
1188 let dir = tempfile::tempdir().unwrap();
1189 let keys_dir = dir.path().join("keys");
1190 std::fs::create_dir_all(&keys_dir).unwrap();
1191
1192 std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1194
1195 let result = load_or_generate_runtime_key(&keys_dir);
1196 assert!(result.is_err());
1197 let err = result.unwrap_err().to_string();
1198 assert!(
1199 err.contains("invalid runtime key"),
1200 "expected 'invalid runtime key' error, got: {err}"
1201 );
1202 }
1203
1204 #[test]
1205 fn test_connection_counter_increment_decrement() {
1206 let counter = AtomicUsize::new(0);
1207
1208 counter.fetch_add(1, Ordering::Relaxed);
1210 counter.fetch_add(1, Ordering::Relaxed);
1211 assert_eq!(counter.load(Ordering::Relaxed), 2);
1212
1213 for expected in [1, 0] {
1216 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1217 if n == 0 {
1218 None
1219 } else {
1220 Some(n.saturating_sub(1))
1221 }
1222 });
1223 assert_eq!(counter.load(Ordering::Relaxed), expected);
1224 }
1225 }
1226
1227 #[test]
1228 fn test_connection_counter_underflow_guard() {
1229 let counter = AtomicUsize::new(0);
1232
1233 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1234 if n == 0 { None } else { Some(n - 1) }
1235 });
1236 assert!(result.is_err());
1238 assert_eq!(counter.load(Ordering::Relaxed), 0);
1239 }
1240
1241 #[test]
1245 fn test_last_disconnect_clears_session_allowances() {
1246 use astrid_approval::AllowanceStore;
1247 use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1248 use astrid_core::types::Timestamp;
1249 use astrid_crypto::KeyPair;
1250
1251 let store = AllowanceStore::new();
1252 let keypair = KeyPair::generate();
1253
1254 store
1256 .add_allowance(Allowance {
1257 id: AllowanceId::new(),
1258 action_pattern: AllowancePattern::ServerTools {
1259 server: "session-server".to_string(),
1260 },
1261 created_at: Timestamp::now(),
1262 expires_at: None,
1263 max_uses: None,
1264 uses_remaining: None,
1265 session_only: true,
1266 workspace_root: None,
1267 signature: keypair.sign(b"test"),
1268 })
1269 .unwrap();
1270
1271 store
1273 .add_allowance(Allowance {
1274 id: AllowanceId::new(),
1275 action_pattern: AllowancePattern::ServerTools {
1276 server: "persistent-server".to_string(),
1277 },
1278 created_at: Timestamp::now(),
1279 expires_at: None,
1280 max_uses: None,
1281 uses_remaining: None,
1282 session_only: false,
1283 workspace_root: None,
1284 signature: keypair.sign(b"test"),
1285 })
1286 .unwrap();
1287
1288 assert_eq!(store.count(), 2);
1289
1290 let counter = AtomicUsize::new(2);
1291 let simulate_disconnect = || {
1292 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1293 if n == 0 {
1294 None
1295 } else {
1296 Some(n.saturating_sub(1))
1297 }
1298 });
1299 if result == Ok(1) {
1300 store.clear_session_allowances();
1301 }
1302 };
1303
1304 simulate_disconnect();
1306 assert_eq!(
1307 store.count(),
1308 2,
1309 "both allowances should survive non-final disconnect"
1310 );
1311
1312 simulate_disconnect();
1314 assert_eq!(
1315 store.count(),
1316 1,
1317 "session allowance should be cleared on last disconnect"
1318 );
1319 }
1320
1321 #[cfg(unix)]
1322 #[test]
1323 fn test_load_or_generate_sets_secure_permissions() {
1324 use std::os::unix::fs::PermissionsExt;
1325
1326 let dir = tempfile::tempdir().unwrap();
1327 let keys_dir = dir.path().join("keys");
1328
1329 let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1330
1331 let key_path = keys_dir.join("runtime.key");
1332 let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1333 assert_eq!(
1334 mode & 0o777,
1335 0o600,
1336 "key file should have 0o600 permissions, got {mode:#o}"
1337 );
1338 }
1339
1340 #[test]
1341 fn restart_tracker_initial_state() {
1342 let tracker = RestartTracker::new();
1343 assert!(!tracker.exhausted());
1344 assert!(!tracker.should_restart());
1346 }
1347
1348 #[test]
1349 fn restart_tracker_allows_restart_after_backoff() {
1350 let mut tracker = RestartTracker::new();
1351 tracker.last_attempt = std::time::Instant::now()
1353 - RestartTracker::INITIAL_BACKOFF
1354 - std::time::Duration::from_millis(1);
1355 assert!(tracker.should_restart());
1356 }
1357
1358 #[test]
1359 fn restart_tracker_doubles_backoff() {
1360 let mut tracker = RestartTracker::new();
1361 assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1362
1363 tracker.record_attempt();
1364 assert_eq!(
1365 tracker.backoff,
1366 RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1367 );
1368 assert_eq!(tracker.attempts, 1);
1369
1370 tracker.record_attempt();
1371 assert_eq!(
1372 tracker.backoff,
1373 RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1374 );
1375 assert_eq!(tracker.attempts, 2);
1376 }
1377
1378 #[test]
1379 fn restart_tracker_backoff_caps_at_max() {
1380 let mut tracker = RestartTracker::new();
1381 for _ in 0..20 {
1382 tracker.record_attempt();
1383 }
1384 assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1385 }
1386
1387 #[test]
1388 fn restart_tracker_exhausted_at_max_attempts() {
1389 let mut tracker = RestartTracker::new();
1390 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1391 assert!(!tracker.exhausted());
1392 tracker.record_attempt();
1393 }
1394 assert!(tracker.exhausted());
1395 }
1396
1397 #[test]
1398 fn restart_tracker_should_restart_false_when_exhausted() {
1399 let mut tracker = RestartTracker::new();
1400 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1401 tracker.record_attempt();
1402 }
1403 tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1405 assert!(!tracker.should_restart());
1406 }
1407}
1408
1409async fn bootstrap_cli_root_user(
1421 store: &Arc<dyn astrid_storage::IdentityStore>,
1422) -> Result<(), astrid_storage::IdentityError> {
1423 if let Some(_user) = store.resolve("cli", "local").await? {
1425 tracing::debug!("CLI root user already linked");
1426 return Ok(());
1427 }
1428
1429 let user = store.create_user(Some("root")).await?;
1431 tracing::info!(user_id = %user.id, "Created CLI root user");
1432
1433 store.link("cli", "local", user.id, "system").await?;
1435 tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1436
1437 Ok(())
1438}
1439
1440async fn apply_identity_config(
1446 store: &Arc<dyn astrid_storage::IdentityStore>,
1447 workspace_root: &std::path::Path,
1448) {
1449 let config = match astrid_config::Config::load(Some(workspace_root)) {
1450 Ok(resolved) => resolved.config,
1451 Err(e) => {
1452 tracing::debug!(error = %e, "No config loaded for identity links");
1453 return;
1454 },
1455 };
1456
1457 for link_cfg in &config.identity.links {
1458 let result = apply_single_identity_link(store, link_cfg).await;
1459 if let Err(e) = result {
1460 tracing::warn!(
1461 platform = %link_cfg.platform,
1462 platform_user_id = %link_cfg.platform_user_id,
1463 astrid_user = %link_cfg.astrid_user,
1464 error = %e,
1465 "Failed to apply identity link from config"
1466 );
1467 }
1468 }
1469}
1470
1471async fn apply_single_identity_link(
1473 store: &Arc<dyn astrid_storage::IdentityStore>,
1474 link_cfg: &astrid_config::types::IdentityLinkConfig,
1475) -> Result<(), astrid_storage::IdentityError> {
1476 let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
1478 if store.get_user(uuid).await?.is_none() {
1482 return Err(astrid_storage::IdentityError::UserNotFound(uuid));
1483 }
1484 uuid
1485 } else {
1486 if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
1488 user.id
1489 } else {
1490 let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
1491 tracing::info!(
1492 user_id = %user.id,
1493 name = %link_cfg.astrid_user,
1494 "Created user from config identity link"
1495 );
1496 user.id
1497 }
1498 };
1499
1500 let method = if link_cfg.method.is_empty() {
1501 "admin"
1502 } else {
1503 &link_cfg.method
1504 };
1505
1506 if let Some(existing) = store
1508 .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
1509 .await?
1510 && existing.id == user_id
1511 {
1512 tracing::debug!(
1513 platform = %link_cfg.platform,
1514 platform_user_id = %link_cfg.platform_user_id,
1515 user_id = %user_id,
1516 "Identity link from config already exists"
1517 );
1518 return Ok(());
1519 }
1520
1521 store
1522 .link(
1523 &link_cfg.platform,
1524 &link_cfg.platform_user_id,
1525 user_id,
1526 method,
1527 )
1528 .await?;
1529
1530 tracing::info!(
1531 platform = %link_cfg.platform,
1532 platform_user_id = %link_cfg.platform_user_id,
1533 user_id = %user_id,
1534 "Applied identity link from config"
1535 );
1536
1537 Ok(())
1538}