1#![deny(unsafe_code)]
2#![deny(missing_docs)]
3#![deny(clippy::all)]
4#![deny(unreachable_pub)]
5#![allow(clippy::module_name_repetitions)]
6
7pub mod kernel_router;
16pub mod socket;
18
19use astrid_audit::AuditLog;
20use astrid_capabilities::{CapabilityStore, DirHandle};
21use astrid_capsule::registry::CapsuleRegistry;
22use astrid_core::SessionId;
23use astrid_crypto::KeyPair;
24use astrid_events::EventBus;
25use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
26use astrid_vfs::{HostVfs, OverlayVfs, Vfs};
27use std::path::{Path, PathBuf};
28use std::sync::Arc;
29use std::sync::atomic::{AtomicUsize, Ordering};
30use tokio::sync::RwLock;
31
32pub struct Kernel {
34 pub session_id: SessionId,
36 pub event_bus: Arc<EventBus>,
38 pub capsules: Arc<RwLock<CapsuleRegistry>>,
40 pub mcp: SecureMcpClient,
42 pub capabilities: Arc<CapabilityStore>,
44 pub vfs: Arc<dyn Vfs>,
46 pub overlay_vfs: Arc<OverlayVfs>,
48 _upper_dir: Arc<tempfile::TempDir>,
51 pub vfs_root_handle: DirHandle,
53 pub workspace_root: PathBuf,
55 pub global_root: Option<PathBuf>,
64 pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
66 pub kv: Arc<astrid_storage::SurrealKvStore>,
68 pub audit_log: Arc<AuditLog>,
70 pub active_connections: AtomicUsize,
72 pub boot_time: std::time::Instant,
74 pub shutdown_tx: tokio::sync::watch::Sender<bool>,
77 pub session_token: Arc<astrid_core::session_token::SessionToken>,
80 token_path: PathBuf,
83 pub allowance_store: Arc<astrid_approval::AllowanceStore>,
88 identity_store: Arc<dyn astrid_storage::IdentityStore>,
90}
91
92impl Kernel {
93 pub async fn new(
104 session_id: SessionId,
105 workspace_root: PathBuf,
106 ) -> Result<Arc<Self>, std::io::Error> {
107 use astrid_core::dirs::AstridHome;
108
109 assert!(
110 tokio::runtime::Handle::current().runtime_flavor()
111 == tokio::runtime::RuntimeFlavor::MultiThread,
112 "Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
113 single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
114 );
115
116 let event_bus = Arc::new(EventBus::new());
117 let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
118
119 let home = AstridHome::resolve().map_err(|e| {
122 std::io::Error::other(format!(
123 "Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
124 ))
125 })?;
126
127 let global_root = Some(home.shared_dir());
131
132 let kv_path = home.state_db_path();
134 let kv = Arc::new(
135 astrid_storage::SurrealKvStore::open(&kv_path)
136 .map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
137 );
138 let mcp_config = ServersConfig::load_default().unwrap_or_default();
144 let mcp_manager =
145 ServerManager::new(mcp_config).with_workspace_root(workspace_root.clone());
146 let mcp_client = McpClient::new(mcp_manager);
147
148 let capabilities = Arc::new(
151 CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
152 .map_err(|e| {
153 std::io::Error::other(format!("Failed to init capability store: {e}"))
154 })?,
155 );
156 let audit_log = open_audit_log()?;
157 let mcp = SecureMcpClient::new(
158 mcp_client,
159 Arc::clone(&capabilities),
160 Arc::clone(&audit_log),
161 session_id.clone(),
162 );
163
164 let root_handle = DirHandle::new();
166
167 let (overlay_vfs, upper_temp) = init_overlay_vfs(&root_handle, &workspace_root).await?;
169
170 let listener = socket::bind_session_socket()?;
175 let (session_token, token_path) = socket::generate_session_token()?;
176
177 let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
178
179 let identity_kv = astrid_storage::ScopedKvStore::new(
181 Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
182 "system:identity",
183 )
184 .map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
185 let identity_store: Arc<dyn astrid_storage::IdentityStore> =
186 Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
187
188 bootstrap_cli_root_user(&identity_store)
190 .await
191 .map_err(|e| {
192 std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
193 })?;
194
195 apply_identity_config(&identity_store, &workspace_root).await;
197
198 let kernel = Arc::new(Self {
199 session_id,
200 event_bus,
201 capsules,
202 mcp,
203 capabilities,
204 vfs: Arc::clone(&overlay_vfs) as Arc<dyn Vfs>,
205 overlay_vfs,
206 _upper_dir: Arc::new(upper_temp),
207 vfs_root_handle: root_handle,
208 workspace_root,
209 global_root,
210 cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
211 kv,
212 audit_log,
213 active_connections: AtomicUsize::new(0),
214 boot_time: std::time::Instant::now(),
215 shutdown_tx: tokio::sync::watch::channel(false).0,
216 session_token: Arc::new(session_token),
217 token_path,
218 allowance_store,
219 identity_store,
220 });
221
222 drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
223 drop(spawn_idle_monitor(Arc::clone(&kernel)));
224 drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
225 drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
226
227 let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
229 Arc::clone(&kernel.capsules),
230 Arc::clone(&kernel.event_bus),
231 );
232 tokio::spawn(dispatcher.run());
233
234 debug_assert_eq!(
235 kernel.event_bus.subscriber_count(),
236 INTERNAL_SUBSCRIBER_COUNT,
237 "INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
238 );
239
240 Ok(kernel)
241 }
242
243 async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
249 let manifest_path = dir.join("Capsule.toml");
250 let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
251 .map_err(|e| anyhow::anyhow!(e))?;
252
253 let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
254 let mut capsule = loader.create_capsule(manifest, dir.clone())?;
255
256 let kv = astrid_storage::ScopedKvStore::new(
259 Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
260 format!("capsule:{}", capsule.id()),
261 )?;
262
263 let env_path = dir.join(".env.json");
265 if env_path.exists()
266 && let Ok(contents) = std::fs::read_to_string(&env_path)
267 && let Ok(env_map) =
268 serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
269 {
270 for (k, v) in env_map {
271 let _ = kv.set(&k, v.into_bytes()).await;
272 }
273 }
274
275 let ctx = astrid_capsule::context::CapsuleContext::new(
276 self.workspace_root.clone(),
277 self.global_root.clone(),
278 kv,
279 Arc::clone(&self.event_bus),
280 self.cli_socket_listener.clone(),
281 )
282 .with_registry(Arc::clone(&self.capsules))
283 .with_session_token(Arc::clone(&self.session_token))
284 .with_allowance_store(Arc::clone(&self.allowance_store))
285 .with_identity_store(Arc::clone(&self.identity_store));
286
287 capsule.load(&ctx).await?;
288
289 let mut registry = self.capsules.write().await;
290 registry
291 .register(capsule)
292 .map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
293
294 Ok(())
295 }
296
297 async fn restart_capsule(
304 &self,
305 id: &astrid_capsule::capsule::CapsuleId,
306 ) -> Result<(), anyhow::Error> {
307 let source_dir = {
309 let registry = self.capsules.read().await;
310 let capsule = registry
311 .get(id)
312 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
313 capsule
314 .source_dir()
315 .map(std::path::Path::to_path_buf)
316 .ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
317 };
318
319 let old_capsule = {
323 let mut registry = self.capsules.write().await;
324 registry
325 .unregister(id)
326 .map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
327 };
328 {
333 let mut old = old_capsule;
334 if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
335 if let Err(e) = capsule.unload().await {
336 tracing::warn!(
337 capsule_id = %id,
338 error = %e,
339 "Capsule unload failed during restart"
340 );
341 }
342 } else {
343 tracing::warn!(
344 capsule_id = %id,
345 "Cannot call unload during restart - Arc still held by in-flight task"
346 );
347 }
348 }
349
350 self.load_capsule(source_dir).await?;
352
353 let capsule = {
362 let registry = self.capsules.read().await;
363 registry.get(id)
364 };
365 if let Some(capsule) = capsule
366 && let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[])
367 {
368 tracing::debug!(
369 capsule_id = %id,
370 error = %e,
371 "Capsule does not handle lifecycle restart (optional)"
372 );
373 }
374
375 Ok(())
376 }
377
378 pub async fn load_all_capsules(&self) {
387 use astrid_capsule::toposort::toposort_manifests;
388 use astrid_core::dirs::AstridHome;
389
390 let mut paths = Vec::new();
391 if let Ok(home) = AstridHome::resolve() {
392 paths.push(home.capsules_dir());
393 }
394
395 let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
396
397 let sorted = match toposort_manifests(discovered) {
401 Ok(sorted) => sorted,
402 Err((e, original)) => {
403 tracing::error!(
404 cycle = %e,
405 "Dependency cycle in capsules, falling back to discovery order"
406 );
407 original
408 },
409 };
410
411 for (manifest, _) in &sorted {
415 if manifest.capabilities.uplink && !manifest.dependencies.requires.is_empty() {
416 tracing::warn!(
417 capsule = %manifest.package.name,
418 requires = ?manifest.dependencies.requires,
419 "Uplink capsule has [dependencies].requires - \
420 this should have been rejected at manifest load time"
421 );
422 }
423 }
424
425 let (uplinks, others): (Vec<_>, Vec<_>) =
432 sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
433
434 let uplink_names: Vec<String> = uplinks
436 .iter()
437 .map(|(m, _)| m.package.name.clone())
438 .collect();
439 for (manifest, dir) in &uplinks {
440 if let Err(e) = self.load_capsule(dir.clone()).await {
441 tracing::warn!(
442 capsule = %manifest.package.name,
443 error = %e,
444 "Failed to load uplink capsule during discovery"
445 );
446 }
447 }
448
449 self.await_capsule_readiness(&uplink_names).await;
452
453 for (manifest, dir) in &others {
454 if let Err(e) = self.load_capsule(dir.clone()).await {
455 tracing::warn!(
456 capsule = %manifest.package.name,
457 error = %e,
458 "Failed to load capsule during discovery"
459 );
460 }
461 }
462
463 let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
466 self.await_capsule_readiness(&other_names).await;
467
468 self.inject_tool_schemas().await;
471
472 let msg = astrid_events::ipc::IpcMessage::new(
476 "astrid.v1.capsules_loaded",
477 astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
478 self.session_id.0,
479 );
480 let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
481 metadata: astrid_events::EventMetadata::new("kernel"),
482 message: msg,
483 });
484 }
485
486 pub fn connection_opened(&self) {
488 self.active_connections.fetch_add(1, Ordering::Relaxed);
489 }
490
491 pub fn connection_closed(&self) {
499 let result =
500 self.active_connections
501 .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
502 if n == 0 {
503 None
504 } else {
505 Some(n.saturating_sub(1))
506 }
507 });
508
509 if result == Ok(1) {
512 self.allowance_store.clear_session_allowances();
513 tracing::info!("last client disconnected, session allowances cleared");
514 }
515 }
516
517 pub fn connection_count(&self) -> usize {
519 self.active_connections.load(Ordering::Relaxed)
520 }
521
522 pub async fn shutdown(&self, reason: Option<String>) {
529 tracing::info!(reason = ?reason, "Kernel shutting down");
530
531 let _ = self
533 .event_bus
534 .publish(astrid_events::AstridEvent::KernelShutdown {
535 metadata: astrid_events::EventMetadata::new("kernel"),
536 reason: reason.clone(),
537 });
538
539 let capsules = {
549 let mut reg = self.capsules.write().await;
550 reg.drain()
551 };
552 for mut arc in capsules {
553 let id = arc.id().clone();
554 let mut unloaded = false;
555
556 for retry in 0..20_u32 {
557 if let Some(capsule) = Arc::get_mut(&mut arc) {
558 if let Err(e) = capsule.unload().await {
559 tracing::warn!(
560 capsule_id = %id,
561 error = %e,
562 "Failed to unload capsule during shutdown"
563 );
564 }
565 unloaded = true;
566 break;
567 }
568 if retry < 19 {
569 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
570 }
571 }
572
573 if !unloaded {
574 tracing::warn!(
575 capsule_id = %id,
576 strong_count = Arc::strong_count(&arc),
577 "Dropping capsule without explicit unload after retries exhausted; \
578 MCP child processes may be orphaned"
579 );
580 }
581 drop(arc);
582 }
583
584 if let Err(e) = self.kv.close().await {
586 tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
587 }
588
589 let socket_path = crate::socket::kernel_socket_path();
596 let _ = std::fs::remove_file(&socket_path);
597 let _ = std::fs::remove_file(&self.token_path);
598 crate::socket::remove_readiness_file();
599
600 tracing::info!("Kernel shutdown complete");
601 }
602
603 async fn await_capsule_readiness(&self, names: &[String]) {
609 use astrid_capsule::capsule::ReadyStatus;
610
611 if names.is_empty() {
612 return;
613 }
614
615 let timeout = std::time::Duration::from_millis(500);
616 let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
617 let registry = self.capsules.read().await;
618 names
619 .iter()
620 .filter_map(
621 |name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
622 Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
623 Err(e) => {
624 tracing::warn!(
625 capsule = %name,
626 error = %e,
627 "Invalid capsule ID, skipping readiness wait"
628 );
629 None
630 },
631 },
632 )
633 .collect()
634 };
635
636 let mut set = tokio::task::JoinSet::new();
639 for (name, capsule) in capsules {
640 set.spawn(async move {
641 let status = capsule.wait_ready(timeout).await;
642 (name, status)
643 });
644 }
645 while let Some(result) = set.join_next().await {
646 if let Ok((name, status)) = result {
647 match status {
648 ReadyStatus::Ready => {},
649 ReadyStatus::Timeout => {
650 tracing::warn!(
651 capsule = %name,
652 timeout_ms = timeout.as_millis(),
653 "Capsule did not signal ready within timeout"
654 );
655 },
656 ReadyStatus::Crashed => {
657 tracing::error!(
658 capsule = %name,
659 "Capsule run loop exited before signaling ready"
660 );
661 },
662 }
663 }
664 }
665 }
666
667 async fn inject_tool_schemas(&self) {
670 use astrid_events::llm::LlmToolDefinition;
671 use astrid_storage::KvStore;
672
673 let (all_tools, capsule_ids) = {
677 let registry = self.capsules.read().await;
678 let tools: Vec<LlmToolDefinition> = registry
679 .values()
680 .flat_map(|capsule| {
681 capsule.manifest().tools.iter().map(|t| LlmToolDefinition {
682 name: t.name.clone(),
683 description: Some(t.description.clone()),
684 input_schema: t.input_schema.clone(),
685 })
686 })
687 .collect();
688 let ids: Vec<String> = registry.list().iter().map(ToString::to_string).collect();
689 (tools, ids)
690 };
691
692 if all_tools.is_empty() {
693 return;
694 }
695
696 let tool_bytes = match serde_json::to_vec(&all_tools) {
697 Ok(b) => b,
698 Err(e) => {
699 tracing::error!(error = %e, "Failed to serialize tool schemas");
700 return;
701 },
702 };
703
704 tracing::info!(
705 tool_count = all_tools.len(),
706 "Injecting tool schemas into capsule KV stores"
707 );
708
709 for capsule_id in &capsule_ids {
710 let namespace = format!("capsule:{capsule_id}");
711 if let Err(e) = self
712 .kv
713 .set(&namespace, "tool_schemas", tool_bytes.clone())
714 .await
715 {
716 tracing::warn!(
717 capsule = %capsule_id,
718 error = %e,
719 "Failed to inject tool schemas"
720 );
721 }
722 }
723 }
724}
725
726async fn init_overlay_vfs(
734 root_handle: &DirHandle,
735 workspace_root: &Path,
736) -> Result<(Arc<OverlayVfs>, tempfile::TempDir), std::io::Error> {
737 let lower_vfs = HostVfs::new();
738 lower_vfs
739 .register_dir(root_handle.clone(), workspace_root.to_path_buf())
740 .await
741 .map_err(|_| std::io::Error::other("Failed to register lower vfs dir"))?;
742
743 let upper_temp = tempfile::TempDir::new()
744 .map_err(|e| std::io::Error::other(format!("Failed to create overlay temp dir: {e}")))?;
745 let upper_vfs = HostVfs::new();
746 upper_vfs
747 .register_dir(root_handle.clone(), upper_temp.path().to_path_buf())
748 .await
749 .map_err(|_| std::io::Error::other("Failed to register upper vfs dir"))?;
750
751 let overlay = Arc::new(OverlayVfs::new(Box::new(lower_vfs), Box::new(upper_vfs)));
752 Ok((overlay, upper_temp))
753}
754
755fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
761 use astrid_core::dirs::AstridHome;
762
763 let home = AstridHome::resolve()
764 .map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
765 home.ensure()
766 .map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
767
768 let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
769 let audit_log = AuditLog::open(home.audit_db_path(), runtime_key)
770 .map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
771
772 match audit_log.verify_all() {
774 Ok(results) => {
775 let total_sessions = results.len();
776 let mut tampered_sessions: usize = 0;
777
778 for (session_id, result) in &results {
779 if !result.valid {
780 tampered_sessions = tampered_sessions.saturating_add(1);
781 for issue in &result.issues {
782 tracing::error!(
783 session_id = %session_id,
784 issue = %issue,
785 "Audit chain integrity violation detected"
786 );
787 }
788 }
789 }
790
791 if tampered_sessions > 0 {
792 tracing::error!(
793 total_sessions,
794 tampered_sessions,
795 "Audit chain verification found tampered sessions"
796 );
797 } else if total_sessions > 0 {
798 tracing::info!(
799 total_sessions,
800 "Audit chain verification passed for all sessions"
801 );
802 }
803 },
804 Err(e) => {
805 tracing::error!(error = %e, "Audit chain verification failed to run");
806 },
807 }
808
809 Ok(Arc::new(audit_log))
810}
811
812fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
816 let key_path = keys_dir.join("runtime.key");
817
818 if key_path.exists() {
819 let bytes = std::fs::read(&key_path)?;
820 KeyPair::from_secret_key(&bytes).map_err(|e| {
821 std::io::Error::other(format!(
822 "invalid runtime key at {}: {e}",
823 key_path.display()
824 ))
825 })
826 } else {
827 let keypair = KeyPair::generate();
828 std::fs::create_dir_all(keys_dir)?;
829 std::fs::write(&key_path, keypair.secret_key_bytes())?;
830
831 #[cfg(unix)]
833 {
834 use std::os::unix::fs::PermissionsExt;
835 std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
836 }
837
838 tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
839 Ok(keypair)
840 }
841}
842
843const INTERNAL_SUBSCRIBER_COUNT: usize = 3;
859
860fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
861 tokio::spawn(async move {
862 let grace = std::time::Duration::from_secs(30);
863 let timeout_secs: u64 = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
864 .ok()
865 .and_then(|v| v.parse().ok())
866 .unwrap_or(300);
867 let idle_timeout = std::time::Duration::from_secs(timeout_secs);
868 let check_interval = std::time::Duration::from_secs(15);
869
870 tokio::time::sleep(grace).await;
871 let mut idle_since: Option<tokio::time::Instant> = None;
872
873 loop {
874 tokio::time::sleep(check_interval).await;
875
876 let connections = kernel.connection_count();
877
878 let bus_subscribers = kernel
882 .event_bus
883 .subscriber_count()
884 .saturating_sub(INTERNAL_SUBSCRIBER_COUNT);
885
886 let effective_connections = connections.min(bus_subscribers);
889
890 let has_daemons = {
891 let reg = kernel.capsules.read().await;
892 reg.values().any(|c| {
893 let m = c.manifest();
894 !m.uplinks.is_empty() || !m.cron_jobs.is_empty()
895 })
896 };
897
898 if effective_connections == 0 && !has_daemons {
899 let now = tokio::time::Instant::now();
900 let start = *idle_since.get_or_insert(now);
901 let elapsed = now.duration_since(start);
902
903 tracing::debug!(
904 idle_secs = elapsed.as_secs(),
905 timeout_secs,
906 connections,
907 bus_subscribers,
908 "Kernel idle, monitoring timeout"
909 );
910
911 if elapsed >= idle_timeout {
912 tracing::info!("Idle timeout reached, initiating shutdown");
913 kernel.shutdown(Some("idle_timeout".to_string())).await;
914 std::process::exit(0);
915 }
916 } else {
917 if idle_since.is_some() {
918 tracing::debug!(
919 effective_connections,
920 has_daemons,
921 "Activity detected, resetting idle timer"
922 );
923 }
924 idle_since = None;
925 }
926 }
927 })
928}
929
930struct RestartTracker {
932 attempts: u32,
933 last_attempt: std::time::Instant,
934 backoff: std::time::Duration,
935}
936
937impl RestartTracker {
938 const MAX_ATTEMPTS: u32 = 5;
939 const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
940 const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
941
942 fn new() -> Self {
943 Self {
944 attempts: 0,
945 last_attempt: std::time::Instant::now(),
946 backoff: Self::INITIAL_BACKOFF,
947 }
948 }
949
950 fn should_restart(&self) -> bool {
952 self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
953 }
954
955 fn record_attempt(&mut self) {
957 self.attempts = self.attempts.saturating_add(1);
958 self.last_attempt = std::time::Instant::now();
959 self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
960 }
961
962 fn exhausted(&self) -> bool {
964 self.attempts >= Self::MAX_ATTEMPTS
965 }
966}
967
968async fn attempt_capsule_restart(
972 kernel: &Kernel,
973 id_str: &str,
974 tracker: &mut RestartTracker,
975) -> bool {
976 if tracker.exhausted() {
977 return false;
978 }
979
980 if !tracker.should_restart() {
981 tracing::debug!(
982 capsule_id = %id_str,
983 next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
984 "Waiting for backoff before next restart attempt"
985 );
986 return false;
987 }
988
989 tracker.record_attempt();
990 let attempt = tracker.attempts;
991
992 tracing::warn!(
993 capsule_id = %id_str,
994 attempt,
995 max_attempts = RestartTracker::MAX_ATTEMPTS,
996 "Attempting capsule restart"
997 );
998
999 let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
1000 match kernel.restart_capsule(&capsule_id).await {
1001 Ok(()) => {
1002 tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
1003 true
1004 },
1005 Err(e) => {
1006 tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
1007 if tracker.exhausted() {
1008 tracing::error!(
1009 capsule_id = %id_str,
1010 "All restart attempts exhausted - capsule will remain down"
1011 );
1012 }
1013 false
1014 },
1015 }
1016}
1017
1018fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
1025 tokio::spawn(async move {
1026 let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
1027 interval.tick().await; let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
1030 std::collections::HashMap::new();
1031
1032 loop {
1033 interval.tick().await;
1034
1035 let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
1038 let registry = kernel.capsules.read().await;
1039 registry
1040 .list()
1041 .into_iter()
1042 .filter_map(|id| {
1043 let capsule = registry.get(id)?;
1044 if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
1045 Some(capsule)
1046 } else {
1047 None
1048 }
1049 })
1050 .collect()
1051 };
1052
1053 let mut failures: Vec<(String, String)> = Vec::new();
1057 for capsule in &ready_capsules {
1058 let health = capsule.check_health();
1059 if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
1060 let id_str = capsule.id().to_string();
1061 tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
1062
1063 let msg = astrid_events::ipc::IpcMessage::new(
1064 "astrid.v1.health.failed",
1065 astrid_events::ipc::IpcPayload::Custom {
1066 data: serde_json::json!({
1067 "capsule_id": &id_str,
1068 "reason": &reason,
1069 }),
1070 },
1071 uuid::Uuid::new_v4(),
1072 );
1073 let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
1074 metadata: astrid_events::EventMetadata::new("kernel"),
1075 message: msg,
1076 });
1077 failures.push((id_str, reason));
1078 }
1079 }
1080
1081 drop(ready_capsules);
1084
1085 let failed_this_tick: std::collections::HashSet<&str> =
1086 failures.iter().map(|(id, _)| id.as_str()).collect();
1087
1088 let mut restarted = Vec::new();
1089 for (id_str, _reason) in &failures {
1090 let tracker = restart_trackers
1091 .entry(id_str.clone())
1092 .or_insert_with(RestartTracker::new);
1093
1094 if attempt_capsule_restart(&kernel, id_str, tracker).await {
1095 restarted.push(id_str.clone());
1096 }
1097 }
1098
1099 for id in &restarted {
1101 restart_trackers.remove(id);
1102 }
1103
1104 restart_trackers.retain(|id, tracker| {
1109 if tracker.exhausted() {
1110 return true;
1111 }
1112 if tracker.last_attempt.elapsed() < tracker.backoff {
1115 return true;
1116 }
1117 failed_this_tick.contains(id.as_str())
1118 });
1119 }
1120 })
1121}
1122
1123fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
1129 tokio::spawn(async move {
1130 let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
1131 interval.tick().await;
1133
1134 loop {
1135 interval.tick().await;
1136
1137 let msg = astrid_events::ipc::IpcMessage::new(
1138 "astrid.v1.watchdog.tick",
1139 astrid_events::ipc::IpcPayload::Custom {
1140 data: serde_json::json!({}),
1141 },
1142 uuid::Uuid::new_v4(),
1143 );
1144 let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
1145 metadata: astrid_events::EventMetadata::new("kernel"),
1146 message: msg,
1147 });
1148 }
1149 })
1150}
1151
1152#[cfg(test)]
1153mod tests {
1154 use super::*;
1155
1156 #[test]
1157 fn test_load_or_generate_creates_new_key() {
1158 let dir = tempfile::tempdir().unwrap();
1159 let keys_dir = dir.path().join("keys");
1160
1161 let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
1162 let key_path = keys_dir.join("runtime.key");
1163
1164 assert!(key_path.exists());
1166 let bytes = std::fs::read(&key_path).unwrap();
1167 assert_eq!(bytes.len(), 32);
1168
1169 let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
1171 assert_eq!(
1172 keypair.public_key_bytes(),
1173 reloaded.public_key_bytes(),
1174 "reloaded key should match generated key"
1175 );
1176 }
1177
1178 #[test]
1179 fn test_load_or_generate_is_idempotent() {
1180 let dir = tempfile::tempdir().unwrap();
1181 let keys_dir = dir.path().join("keys");
1182
1183 let first = load_or_generate_runtime_key(&keys_dir).unwrap();
1184 let second = load_or_generate_runtime_key(&keys_dir).unwrap();
1185
1186 assert_eq!(
1187 first.public_key_bytes(),
1188 second.public_key_bytes(),
1189 "loading the same key file should produce the same keypair"
1190 );
1191 }
1192
1193 #[test]
1194 fn test_load_or_generate_rejects_bad_key_length() {
1195 let dir = tempfile::tempdir().unwrap();
1196 let keys_dir = dir.path().join("keys");
1197 std::fs::create_dir_all(&keys_dir).unwrap();
1198
1199 std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
1201
1202 let result = load_or_generate_runtime_key(&keys_dir);
1203 assert!(result.is_err());
1204 let err = result.unwrap_err().to_string();
1205 assert!(
1206 err.contains("invalid runtime key"),
1207 "expected 'invalid runtime key' error, got: {err}"
1208 );
1209 }
1210
1211 #[test]
1212 fn test_connection_counter_increment_decrement() {
1213 let counter = AtomicUsize::new(0);
1214
1215 counter.fetch_add(1, Ordering::Relaxed);
1217 counter.fetch_add(1, Ordering::Relaxed);
1218 assert_eq!(counter.load(Ordering::Relaxed), 2);
1219
1220 for expected in [1, 0] {
1223 let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1224 if n == 0 {
1225 None
1226 } else {
1227 Some(n.saturating_sub(1))
1228 }
1229 });
1230 assert_eq!(counter.load(Ordering::Relaxed), expected);
1231 }
1232 }
1233
1234 #[test]
1235 fn test_connection_counter_underflow_guard() {
1236 let counter = AtomicUsize::new(0);
1239
1240 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1241 if n == 0 { None } else { Some(n - 1) }
1242 });
1243 assert!(result.is_err());
1245 assert_eq!(counter.load(Ordering::Relaxed), 0);
1246 }
1247
1248 #[test]
1252 fn test_last_disconnect_clears_session_allowances() {
1253 use astrid_approval::AllowanceStore;
1254 use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
1255 use astrid_core::types::Timestamp;
1256 use astrid_crypto::KeyPair;
1257
1258 let store = AllowanceStore::new();
1259 let keypair = KeyPair::generate();
1260
1261 store
1263 .add_allowance(Allowance {
1264 id: AllowanceId::new(),
1265 action_pattern: AllowancePattern::ServerTools {
1266 server: "session-server".to_string(),
1267 },
1268 created_at: Timestamp::now(),
1269 expires_at: None,
1270 max_uses: None,
1271 uses_remaining: None,
1272 session_only: true,
1273 workspace_root: None,
1274 signature: keypair.sign(b"test"),
1275 })
1276 .unwrap();
1277
1278 store
1280 .add_allowance(Allowance {
1281 id: AllowanceId::new(),
1282 action_pattern: AllowancePattern::ServerTools {
1283 server: "persistent-server".to_string(),
1284 },
1285 created_at: Timestamp::now(),
1286 expires_at: None,
1287 max_uses: None,
1288 uses_remaining: None,
1289 session_only: false,
1290 workspace_root: None,
1291 signature: keypair.sign(b"test"),
1292 })
1293 .unwrap();
1294
1295 assert_eq!(store.count(), 2);
1296
1297 let counter = AtomicUsize::new(2);
1298 let simulate_disconnect = || {
1299 let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
1300 if n == 0 {
1301 None
1302 } else {
1303 Some(n.saturating_sub(1))
1304 }
1305 });
1306 if result == Ok(1) {
1307 store.clear_session_allowances();
1308 }
1309 };
1310
1311 simulate_disconnect();
1313 assert_eq!(
1314 store.count(),
1315 2,
1316 "both allowances should survive non-final disconnect"
1317 );
1318
1319 simulate_disconnect();
1321 assert_eq!(
1322 store.count(),
1323 1,
1324 "session allowance should be cleared on last disconnect"
1325 );
1326 }
1327
1328 #[cfg(unix)]
1329 #[test]
1330 fn test_load_or_generate_sets_secure_permissions() {
1331 use std::os::unix::fs::PermissionsExt;
1332
1333 let dir = tempfile::tempdir().unwrap();
1334 let keys_dir = dir.path().join("keys");
1335
1336 let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
1337
1338 let key_path = keys_dir.join("runtime.key");
1339 let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
1340 assert_eq!(
1341 mode & 0o777,
1342 0o600,
1343 "key file should have 0o600 permissions, got {mode:#o}"
1344 );
1345 }
1346
1347 #[test]
1348 fn restart_tracker_initial_state() {
1349 let tracker = RestartTracker::new();
1350 assert!(!tracker.exhausted());
1351 assert!(!tracker.should_restart());
1353 }
1354
1355 #[test]
1356 fn restart_tracker_allows_restart_after_backoff() {
1357 let mut tracker = RestartTracker::new();
1358 tracker.last_attempt = std::time::Instant::now()
1360 - RestartTracker::INITIAL_BACKOFF
1361 - std::time::Duration::from_millis(1);
1362 assert!(tracker.should_restart());
1363 }
1364
1365 #[test]
1366 fn restart_tracker_doubles_backoff() {
1367 let mut tracker = RestartTracker::new();
1368 assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
1369
1370 tracker.record_attempt();
1371 assert_eq!(
1372 tracker.backoff,
1373 RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
1374 );
1375 assert_eq!(tracker.attempts, 1);
1376
1377 tracker.record_attempt();
1378 assert_eq!(
1379 tracker.backoff,
1380 RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
1381 );
1382 assert_eq!(tracker.attempts, 2);
1383 }
1384
1385 #[test]
1386 fn restart_tracker_backoff_caps_at_max() {
1387 let mut tracker = RestartTracker::new();
1388 for _ in 0..20 {
1389 tracker.record_attempt();
1390 }
1391 assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
1392 }
1393
1394 #[test]
1395 fn restart_tracker_exhausted_at_max_attempts() {
1396 let mut tracker = RestartTracker::new();
1397 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1398 assert!(!tracker.exhausted());
1399 tracker.record_attempt();
1400 }
1401 assert!(tracker.exhausted());
1402 }
1403
1404 #[test]
1405 fn restart_tracker_should_restart_false_when_exhausted() {
1406 let mut tracker = RestartTracker::new();
1407 for _ in 0..RestartTracker::MAX_ATTEMPTS {
1408 tracker.record_attempt();
1409 }
1410 tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
1412 assert!(!tracker.should_restart());
1413 }
1414}
1415
1416async fn bootstrap_cli_root_user(
1428 store: &Arc<dyn astrid_storage::IdentityStore>,
1429) -> Result<(), astrid_storage::IdentityError> {
1430 if let Some(_user) = store.resolve("cli", "local").await? {
1432 tracing::debug!("CLI root user already linked");
1433 return Ok(());
1434 }
1435
1436 let user = store.create_user(Some("root")).await?;
1438 tracing::info!(user_id = %user.id, "Created CLI root user");
1439
1440 store.link("cli", "local", user.id, "system").await?;
1442 tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
1443
1444 Ok(())
1445}
1446
1447async fn apply_identity_config(
1453 store: &Arc<dyn astrid_storage::IdentityStore>,
1454 workspace_root: &std::path::Path,
1455) {
1456 let config = match astrid_config::Config::load(Some(workspace_root)) {
1457 Ok(resolved) => resolved.config,
1458 Err(e) => {
1459 tracing::debug!(error = %e, "No config loaded for identity links");
1460 return;
1461 },
1462 };
1463
1464 for link_cfg in &config.identity.links {
1465 let result = apply_single_identity_link(store, link_cfg).await;
1466 if let Err(e) = result {
1467 tracing::warn!(
1468 platform = %link_cfg.platform,
1469 platform_user_id = %link_cfg.platform_user_id,
1470 astrid_user = %link_cfg.astrid_user,
1471 error = %e,
1472 "Failed to apply identity link from config"
1473 );
1474 }
1475 }
1476}
1477
1478async fn apply_single_identity_link(
1480 store: &Arc<dyn astrid_storage::IdentityStore>,
1481 link_cfg: &astrid_config::types::IdentityLinkConfig,
1482) -> Result<(), astrid_storage::IdentityError> {
1483 let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
1485 if store.get_user(uuid).await?.is_none() {
1489 return Err(astrid_storage::IdentityError::UserNotFound(uuid));
1490 }
1491 uuid
1492 } else {
1493 if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
1495 user.id
1496 } else {
1497 let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
1498 tracing::info!(
1499 user_id = %user.id,
1500 name = %link_cfg.astrid_user,
1501 "Created user from config identity link"
1502 );
1503 user.id
1504 }
1505 };
1506
1507 let method = if link_cfg.method.is_empty() {
1508 "admin"
1509 } else {
1510 &link_cfg.method
1511 };
1512
1513 if let Some(existing) = store
1515 .resolve(&link_cfg.platform, &link_cfg.platform_user_id)
1516 .await?
1517 && existing.id == user_id
1518 {
1519 tracing::debug!(
1520 platform = %link_cfg.platform,
1521 platform_user_id = %link_cfg.platform_user_id,
1522 user_id = %user_id,
1523 "Identity link from config already exists"
1524 );
1525 return Ok(());
1526 }
1527
1528 store
1529 .link(
1530 &link_cfg.platform,
1531 &link_cfg.platform_user_id,
1532 user_id,
1533 method,
1534 )
1535 .await?;
1536
1537 tracing::info!(
1538 platform = %link_cfg.platform,
1539 platform_user_id = %link_cfg.platform_user_id,
1540 user_id = %user_id,
1541 "Applied identity link from config"
1542 );
1543
1544 Ok(())
1545}