#![deny(unsafe_code)]
#![deny(missing_docs)]
#![deny(clippy::all)]
#![deny(unreachable_pub)]
#![allow(clippy::module_name_repetitions)]
pub mod kernel_router;
pub mod socket;
use arc_swap::ArcSwap;
use astrid_audit::AuditLog;
use astrid_capabilities::{CapabilityStore, DirHandle};
use astrid_capsule::profile_cache::PrincipalProfileCache;
use astrid_capsule::registry::CapsuleRegistry;
use astrid_core::SessionId;
use astrid_core::groups::GroupConfig;
use astrid_core::principal::PrincipalId;
use astrid_crypto::KeyPair;
use astrid_events::EventBus;
use astrid_mcp::{McpClient, SecureMcpClient, ServerManager, ServersConfig};
use astrid_vfs::{HostVfs, OverlayVfsRegistry, Vfs};
use dashmap::DashMap;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use tokio::sync::{Mutex, RwLock};
pub struct Kernel {
pub session_id: SessionId,
pub event_bus: Arc<EventBus>,
pub capsules: Arc<RwLock<CapsuleRegistry>>,
pub mcp: SecureMcpClient,
pub capabilities: Arc<CapabilityStore>,
pub vfs: Arc<dyn Vfs>,
pub overlay_registry: Arc<OverlayVfsRegistry>,
pub vfs_root_handle: DirHandle,
pub workspace_root: PathBuf,
pub home_root: Option<PathBuf>,
pub cli_socket_listener: Option<Arc<tokio::sync::Mutex<tokio::net::UnixListener>>>,
pub kv: Arc<astrid_storage::SurrealKvStore>,
pub audit_log: Arc<AuditLog>,
active_connections: DashMap<PrincipalId, AtomicUsize>,
pub ephemeral: AtomicBool,
pub boot_time: std::time::Instant,
pub shutdown_tx: tokio::sync::watch::Sender<bool>,
pub session_token: Arc<astrid_core::session_token::SessionToken>,
token_path: PathBuf,
pub allowance_store: Arc<astrid_approval::AllowanceStore>,
identity_store: Arc<dyn astrid_storage::IdentityStore>,
pub(crate) profile_cache: Arc<PrincipalProfileCache>,
pub(crate) groups: Arc<ArcSwap<GroupConfig>>,
pub(crate) astrid_home: astrid_core::dirs::AstridHome,
pub(crate) admin_write_lock: Mutex<()>,
}
impl Kernel {
#[expect(
clippy::too_many_lines,
reason = "boot sequence: sequential setup that does not benefit from splitting"
)]
pub async fn new(
session_id: SessionId,
workspace_root: PathBuf,
) -> Result<Arc<Self>, std::io::Error> {
use astrid_core::dirs::AstridHome;
assert!(
tokio::runtime::Handle::current().runtime_flavor()
== tokio::runtime::RuntimeFlavor::MultiThread,
"Kernel requires a multi-threaded tokio runtime (block_in_place panics on \
single-threaded). Use #[tokio::main] or Runtime::new() instead of current_thread."
);
let event_bus = Arc::new(EventBus::new());
let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
let home = AstridHome::resolve().map_err(|e| {
std::io::Error::other(format!(
"Failed to resolve Astrid home (set $ASTRID_HOME or $HOME): {e}"
))
})?;
let default_principal = astrid_core::PrincipalId::default();
let principal_home = home.principal_home(&default_principal);
let home_root = Some(principal_home.root().to_path_buf());
let kv_path = home.state_db_path();
let kv = Arc::new(
astrid_storage::SurrealKvStore::open(&kv_path)
.map_err(|e| std::io::Error::other(format!("Failed to open KV store: {e}")))?,
);
let mcp_config = ServersConfig::load_default().unwrap_or_default();
let mcp_manager = ServerManager::new(mcp_config)
.with_workspace_root(workspace_root.clone())
.with_capsule_log_dir(principal_home.log_dir());
let mcp_client = McpClient::new(mcp_manager);
let capabilities = Arc::new(
CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
.map_err(|e| {
std::io::Error::other(format!("Failed to init capability store: {e}"))
})?,
);
let audit_log = open_audit_log()?;
let mcp = SecureMcpClient::new(
mcp_client,
Arc::clone(&capabilities),
Arc::clone(&audit_log),
session_id.clone(),
);
let root_handle = DirHandle::new();
let kernel_host_vfs = HostVfs::new();
kernel_host_vfs
.register_dir(root_handle.clone(), workspace_root.clone())
.await
.map_err(|_| std::io::Error::other("Failed to register kernel workspace vfs"))?;
let overlay_registry = Arc::new(OverlayVfsRegistry::new(
workspace_root.clone(),
root_handle.clone(),
));
let listener = socket::bind_session_socket()?;
let (session_token, token_path) = socket::generate_session_token()?;
let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
let identity_kv = astrid_storage::ScopedKvStore::new(
Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
"system:identity",
)
.map_err(|e| std::io::Error::other(format!("Failed to create identity KV: {e}")))?;
let identity_store: Arc<dyn astrid_storage::IdentityStore> =
Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
let groups_loaded = GroupConfig::load(&home)
.map_err(|e| std::io::Error::other(format!("Failed to load groups config: {e}")))?;
let groups = Arc::new(ArcSwap::from_pointee(groups_loaded));
bootstrap_cli_root_user(&identity_store, &home)
.await
.map_err(|e| {
std::io::Error::other(format!("Failed to bootstrap CLI root user: {e}"))
})?;
apply_identity_config(&identity_store, &workspace_root).await;
let kernel = Arc::new(Self {
session_id,
event_bus,
capsules,
mcp,
capabilities,
vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
overlay_registry,
vfs_root_handle: root_handle,
workspace_root,
home_root,
cli_socket_listener: Some(Arc::new(tokio::sync::Mutex::new(listener))),
kv,
audit_log,
active_connections: DashMap::new(),
ephemeral: AtomicBool::new(false),
boot_time: std::time::Instant::now(),
shutdown_tx: tokio::sync::watch::channel(false).0,
session_token: Arc::new(session_token),
token_path,
allowance_store,
identity_store,
profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
groups,
astrid_home: home,
admin_write_lock: Mutex::new(()),
});
drop(kernel_router::spawn_kernel_router(Arc::clone(&kernel)));
drop(spawn_idle_monitor(Arc::clone(&kernel)));
drop(spawn_react_watchdog(Arc::clone(&kernel.event_bus)));
drop(spawn_capsule_health_monitor(Arc::clone(&kernel)));
let dispatcher = astrid_capsule::dispatcher::EventDispatcher::new(
Arc::clone(&kernel.capsules),
Arc::clone(&kernel.event_bus),
)
.with_identity_store(Arc::clone(&kernel.identity_store));
tokio::spawn(dispatcher.run());
debug_assert_eq!(
kernel.event_bus.subscriber_count(),
INTERNAL_SUBSCRIBER_COUNT,
"INTERNAL_SUBSCRIBER_COUNT is stale; update it when adding permanent subscribers"
);
Ok(kernel)
}
async fn load_capsule(&self, dir: PathBuf) -> Result<(), anyhow::Error> {
let manifest_path = dir.join("Capsule.toml");
let manifest = astrid_capsule::discovery::load_manifest(&manifest_path)
.map_err(|e| anyhow::anyhow!(e))?;
{
let registry = self.capsules.read().await;
let id = astrid_capsule::capsule::CapsuleId::from_static(&manifest.package.name);
if registry.get(&id).is_some() {
return Ok(());
}
}
let loader = astrid_capsule::loader::CapsuleLoader::new(self.mcp.clone());
let mut capsule = loader.create_capsule(manifest, dir.clone())?;
let principal = astrid_core::PrincipalId::default();
let kv = astrid_storage::ScopedKvStore::new(
Arc::clone(&self.kv) as Arc<dyn astrid_storage::KvStore>,
format!("{principal}:capsule:{}", capsule.id()),
)?;
let capsule_name = capsule.id().to_string();
let env_path = if let Ok(home) = astrid_core::dirs::AstridHome::resolve() {
let ph = home.principal_home(&principal);
let principal_env = ph.env_dir().join(format!("{capsule_name}.env.json"));
if principal_env.exists() {
principal_env
} else {
dir.join(".env.json")
}
} else {
dir.join(".env.json")
};
if env_path.exists()
&& let Ok(contents) = std::fs::read_to_string(&env_path)
&& let Ok(env_map) =
serde_json::from_str::<std::collections::HashMap<String, String>>(&contents)
{
for (k, v) in env_map {
let _ = kv.set(&k, v.into_bytes()).await;
}
}
let ctx = astrid_capsule::context::CapsuleContext::new(
principal.clone(),
self.workspace_root.clone(),
self.home_root.clone(),
kv,
Arc::clone(&self.event_bus),
self.cli_socket_listener.clone(),
)
.with_registry(Arc::clone(&self.capsules))
.with_session_token(Arc::clone(&self.session_token))
.with_allowance_store(Arc::clone(&self.allowance_store))
.with_identity_store(Arc::clone(&self.identity_store))
.with_profile_cache(Arc::clone(&self.profile_cache))
.with_overlay_registry(Arc::clone(&self.overlay_registry));
capsule.load(&ctx).await?;
let mut registry = self.capsules.write().await;
registry
.register(capsule)
.map_err(|e| anyhow::anyhow!("Failed to register capsule: {e}"))?;
Ok(())
}
async fn restart_capsule(
&self,
id: &astrid_capsule::capsule::CapsuleId,
) -> Result<(), anyhow::Error> {
let source_dir = {
let registry = self.capsules.read().await;
let capsule = registry
.get(id)
.ok_or_else(|| anyhow::anyhow!("capsule '{id}' not found in registry"))?;
capsule
.source_dir()
.map(std::path::Path::to_path_buf)
.ok_or_else(|| anyhow::anyhow!("capsule '{id}' has no source directory"))?
};
let old_capsule = {
let mut registry = self.capsules.write().await;
registry
.unregister(id)
.map_err(|e| anyhow::anyhow!("failed to unregister capsule '{id}': {e}"))?
};
{
let mut old = old_capsule;
if let Some(capsule) = std::sync::Arc::get_mut(&mut old) {
if let Err(e) = capsule.unload().await {
tracing::warn!(
capsule_id = %id,
error = %e,
"Capsule unload failed during restart"
);
}
} else {
tracing::warn!(
capsule_id = %id,
"Cannot call unload during restart - Arc still held by in-flight task"
);
}
}
self.load_capsule(source_dir).await?;
let capsule = {
let registry = self.capsules.read().await;
registry.get(id)
};
if let Some(capsule) = capsule
&& let Err(e) = capsule.invoke_interceptor("handle_lifecycle_restart", &[], None)
{
tracing::debug!(
capsule_id = %id,
error = %e,
"Capsule does not handle lifecycle restart (optional)"
);
}
Ok(())
}
pub async fn load_all_capsules(&self) {
use astrid_capsule::toposort::toposort_manifests;
use astrid_core::dirs::AstridHome;
let mut paths = Vec::new();
if let Ok(home) = AstridHome::resolve() {
let principal = astrid_core::PrincipalId::default();
paths.push(home.principal_home(&principal).capsules_dir());
}
let discovered = astrid_capsule::discovery::discover_manifests(Some(&paths));
let sorted = match toposort_manifests(discovered) {
Ok(sorted) => sorted,
Err((e, original)) => {
tracing::error!(
cycle = %e,
"Dependency cycle in capsules, falling back to discovery order"
);
original
},
};
for (manifest, _) in &sorted {
if manifest.capabilities.uplink && manifest.has_imports() {
tracing::warn!(
capsule = %manifest.package.name,
"Uplink capsule has [imports] - \
this should have been rejected at manifest load time"
);
}
}
validate_imports_exports(&sorted);
let (uplinks, others): (Vec<_>, Vec<_>) =
sorted.into_iter().partition(|(m, _)| m.capabilities.uplink);
let uplink_names: Vec<String> = uplinks
.iter()
.map(|(m, _)| m.package.name.clone())
.collect();
for (manifest, dir) in &uplinks {
if let Err(e) = self.load_capsule(dir.clone()).await {
tracing::warn!(
capsule = %manifest.package.name,
error = %e,
"Failed to load uplink capsule during discovery"
);
}
}
self.await_capsule_readiness(&uplink_names).await;
for (manifest, dir) in &others {
if let Err(e) = self.load_capsule(dir.clone()).await {
tracing::warn!(
capsule = %manifest.package.name,
error = %e,
"Failed to load capsule during discovery"
);
}
}
let other_names: Vec<String> = others.iter().map(|(m, _)| m.package.name.clone()).collect();
self.await_capsule_readiness(&other_names).await;
let msg = astrid_events::ipc::IpcMessage::new(
"astrid.v1.capsules_loaded",
astrid_events::ipc::IpcPayload::RawJson(serde_json::json!({"status": "ready"})),
self.session_id.0,
);
let _ = self.event_bus.publish(astrid_events::AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("kernel"),
message: msg,
});
}
pub fn connection_opened(&self, principal: &PrincipalId) {
self.active_connections
.entry(principal.clone())
.or_insert_with(|| AtomicUsize::new(0))
.fetch_add(1, Ordering::Relaxed);
}
pub fn connection_closed(&self, principal: &PrincipalId) {
let entry = self
.active_connections
.entry(principal.clone())
.or_insert_with(|| AtomicUsize::new(0));
let result = entry.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
if n == 0 {
None
} else {
Some(n.saturating_sub(1))
}
});
if result == Ok(1) {
self.allowance_store.clear_session_allowances(principal);
if let Err(e) = self.capabilities.clear_session_for(principal) {
tracing::warn!(%principal, error = %e, "failed to clear capability session");
}
tracing::info!(
%principal,
"last connection for principal disconnected, session state cleared"
);
}
drop(entry);
if result == Ok(1) {
self.active_connections
.remove_if(principal, |_, count| count.load(Ordering::Relaxed) == 0);
}
}
pub fn set_ephemeral(&self, val: bool) {
self.ephemeral.store(val, Ordering::Relaxed);
}
pub fn total_connection_count(&self) -> usize {
self.active_connections
.iter()
.map(|e| e.value().load(Ordering::Relaxed))
.sum()
}
pub fn connections_by_principal(&self) -> Vec<(PrincipalId, usize)> {
self.active_connections
.iter()
.filter_map(|e| {
let count = e.value().load(Ordering::Relaxed);
if count == 0 {
None
} else {
Some((e.key().clone(), count))
}
})
.collect()
}
pub async fn shutdown(&self, reason: Option<String>) {
tracing::info!(reason = ?reason, "Kernel shutting down");
let _ = self
.event_bus
.publish(astrid_events::AstridEvent::KernelShutdown {
metadata: astrid_events::EventMetadata::new("kernel"),
reason: reason.clone(),
});
self.allowance_store.clear_all_session_allowances();
if let Err(e) = self.capabilities.clear_session() {
tracing::warn!(error = %e, "failed to clear capability session on shutdown");
}
let capsules = {
let mut reg = self.capsules.write().await;
reg.drain()
};
for mut arc in capsules {
let id = arc.id().clone();
let mut unloaded = false;
for retry in 0..20_u32 {
if let Some(capsule) = Arc::get_mut(&mut arc) {
if let Err(e) = capsule.unload().await {
tracing::warn!(
capsule_id = %id,
error = %e,
"Failed to unload capsule during shutdown"
);
}
unloaded = true;
break;
}
if retry < 19 {
tokio::time::sleep(std::time::Duration::from_millis(50)).await;
}
}
if !unloaded {
tracing::warn!(
capsule_id = %id,
strong_count = Arc::strong_count(&arc),
"Dropping capsule without explicit unload after retries exhausted; \
MCP child processes may be orphaned"
);
}
drop(arc);
}
if let Err(e) = self.kv.close().await {
tracing::warn!(error = %e, "Failed to flush KV store during shutdown");
}
let socket_path = crate::socket::kernel_socket_path();
let _ = std::fs::remove_file(&socket_path);
let _ = std::fs::remove_file(&self.token_path);
crate::socket::remove_readiness_file();
tracing::info!("Kernel shutdown complete");
}
async fn await_capsule_readiness(&self, names: &[String]) {
use astrid_capsule::capsule::ReadyStatus;
if names.is_empty() {
return;
}
let timeout = std::time::Duration::from_millis(500);
let capsules: Vec<(String, std::sync::Arc<dyn astrid_capsule::capsule::Capsule>)> = {
let registry = self.capsules.read().await;
names
.iter()
.filter_map(
|name| match astrid_capsule::capsule::CapsuleId::new(name.clone()) {
Ok(capsule_id) => registry.get(&capsule_id).map(|c| (name.clone(), c)),
Err(e) => {
tracing::warn!(
capsule = %name,
error = %e,
"Invalid capsule ID, skipping readiness wait"
);
None
},
},
)
.collect()
};
let mut set = tokio::task::JoinSet::new();
for (name, capsule) in capsules {
set.spawn(async move {
let status = capsule.wait_ready(timeout).await;
(name, status)
});
}
while let Some(result) = set.join_next().await {
if let Ok((name, status)) = result {
match status {
ReadyStatus::Ready => {},
ReadyStatus::Timeout => {
tracing::warn!(
capsule = %name,
timeout_ms = timeout.as_millis(),
"Capsule did not signal ready within timeout"
);
},
ReadyStatus::Crashed => {
tracing::error!(
capsule = %name,
"Capsule run loop exited before signaling ready"
);
},
}
}
}
}
}
#[cfg(test)]
pub(crate) async fn test_kernel_with_home(home: astrid_core::dirs::AstridHome) -> Arc<Kernel> {
use astrid_capsule::profile_cache::PrincipalProfileCache;
home.ensure()
.expect("test kernel: ensure astrid home dir tree");
let session_id = SessionId::SYSTEM;
let event_bus = Arc::new(EventBus::new());
let capsules = Arc::new(RwLock::new(CapsuleRegistry::new()));
let kv = Arc::new(
astrid_storage::SurrealKvStore::open(&home.state_db_path()).expect("test kernel: open kv"),
);
let capabilities = Arc::new(
CapabilityStore::with_kv_store(Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>)
.expect("test kernel: capability store"),
);
let runtime_key =
load_or_generate_runtime_key(&home.keys_dir()).expect("test kernel: runtime key");
let default_principal = astrid_core::PrincipalId::default();
let principal_home = home.principal_home(&default_principal);
principal_home
.ensure()
.expect("test kernel: ensure principal home");
let audit_log = Arc::new(
AuditLog::open(principal_home.audit_dir(), runtime_key)
.expect("test kernel: open audit log"),
);
let mcp_manager = ServerManager::new(ServersConfig::default());
let mcp_client = McpClient::new(mcp_manager);
let mcp = SecureMcpClient::new(
mcp_client,
Arc::clone(&capabilities),
Arc::clone(&audit_log),
session_id.clone(),
);
let root_handle = DirHandle::new();
let kernel_host_vfs = HostVfs::new();
kernel_host_vfs
.register_dir(root_handle.clone(), home.root().to_path_buf())
.await
.expect("test kernel: register workspace vfs");
let overlay_registry = Arc::new(OverlayVfsRegistry::new(
home.root().to_path_buf(),
root_handle.clone(),
));
let allowance_store = Arc::new(astrid_approval::AllowanceStore::new());
let identity_kv = astrid_storage::ScopedKvStore::new(
Arc::clone(&kv) as Arc<dyn astrid_storage::KvStore>,
"system:identity",
)
.expect("test kernel: identity kv scope");
let identity_store: Arc<dyn astrid_storage::IdentityStore> =
Arc::new(astrid_storage::KvIdentityStore::new(identity_kv));
let groups = Arc::new(ArcSwap::from_pointee(
GroupConfig::load(&home).expect("test kernel: load groups"),
));
let kernel = Arc::new(Kernel {
session_id,
event_bus,
capsules,
mcp,
capabilities,
vfs: Arc::new(kernel_host_vfs) as Arc<dyn Vfs>,
overlay_registry,
vfs_root_handle: root_handle,
workspace_root: home.root().to_path_buf(),
home_root: Some(principal_home.root().to_path_buf()),
cli_socket_listener: None,
kv,
audit_log,
active_connections: DashMap::new(),
ephemeral: AtomicBool::new(false),
boot_time: std::time::Instant::now(),
shutdown_tx: tokio::sync::watch::channel(false).0,
session_token: Arc::new(astrid_core::session_token::SessionToken::generate()),
token_path: home.token_path(),
allowance_store,
identity_store,
profile_cache: Arc::new(PrincipalProfileCache::with_home(home.clone())),
groups,
astrid_home: home,
admin_write_lock: Mutex::new(()),
});
drop(kernel_router::admin::spawn_admin_router(Arc::clone(
&kernel,
)));
kernel
}
fn open_audit_log() -> std::io::Result<Arc<AuditLog>> {
use astrid_core::dirs::AstridHome;
let home = AstridHome::resolve()
.map_err(|e| std::io::Error::other(format!("cannot resolve Astrid home: {e}")))?;
home.ensure()
.map_err(|e| std::io::Error::other(format!("cannot create Astrid home dirs: {e}")))?;
let runtime_key = load_or_generate_runtime_key(&home.keys_dir())?;
let default_principal = astrid_core::PrincipalId::default();
let principal_home = home.principal_home(&default_principal);
principal_home
.ensure()
.map_err(|e| std::io::Error::other(format!("cannot create principal home dirs: {e}")))?;
let audit_log = AuditLog::open(principal_home.audit_dir(), runtime_key)
.map_err(|e| std::io::Error::other(format!("cannot open audit log: {e}")))?;
match audit_log.verify_all() {
Ok(results) => {
let total_sessions = results.len();
let mut tampered_sessions: usize = 0;
for (session_id, result) in &results {
if !result.valid {
tampered_sessions = tampered_sessions.saturating_add(1);
for issue in &result.issues {
tracing::error!(
session_id = %session_id,
issue = %issue,
"Audit chain integrity violation detected"
);
}
}
}
if tampered_sessions > 0 {
tracing::error!(
total_sessions,
tampered_sessions,
"Audit chain verification found tampered sessions"
);
} else if total_sessions > 0 {
tracing::info!(
total_sessions,
"Audit chain verification passed for all sessions"
);
}
},
Err(e) => {
tracing::error!(error = %e, "Audit chain verification failed to run");
},
}
Ok(Arc::new(audit_log))
}
fn load_or_generate_runtime_key(keys_dir: &Path) -> std::io::Result<KeyPair> {
let key_path = keys_dir.join("runtime.key");
if key_path.exists() {
let bytes = std::fs::read(&key_path)?;
KeyPair::from_secret_key(&bytes).map_err(|e| {
std::io::Error::other(format!(
"invalid runtime key at {}: {e}",
key_path.display()
))
})
} else {
let keypair = KeyPair::generate();
std::fs::create_dir_all(keys_dir)?;
std::fs::write(&key_path, keypair.secret_key_bytes())?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
std::fs::set_permissions(&key_path, std::fs::Permissions::from_mode(0o600))?;
}
tracing::info!(key_id = %keypair.key_id_hex(), "Generated new runtime signing key");
Ok(keypair)
}
}
const INTERNAL_SUBSCRIBER_COUNT: usize = 4;
const IDLE_INITIAL_GRACE: std::time::Duration = std::time::Duration::from_secs(5);
const IDLE_NON_EPHEMERAL_GRACE: std::time::Duration = std::time::Duration::from_secs(25);
const IDLE_EPHEMERAL_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(1);
const IDLE_CHECK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(15);
fn spawn_idle_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
tokio::time::sleep(IDLE_INITIAL_GRACE).await;
let ephemeral = kernel.ephemeral.load(Ordering::Relaxed);
let idle_timeout = if ephemeral {
std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
.map_or(
std::time::Duration::from_secs(30),
std::time::Duration::from_secs,
)
} else {
let Some(secs) = std::env::var("ASTRID_IDLE_TIMEOUT_SECS")
.ok()
.and_then(|v| v.parse().ok())
else {
tracing::debug!(
"Non-ephemeral daemon: idle shutdown disabled \
(set ASTRID_IDLE_TIMEOUT_SECS to enable)."
);
return;
};
std::time::Duration::from_secs(secs)
};
let check_interval = if ephemeral {
IDLE_EPHEMERAL_CHECK_INTERVAL
} else {
IDLE_CHECK_INTERVAL
};
if !ephemeral {
tokio::time::sleep(IDLE_NON_EPHEMERAL_GRACE).await;
}
let mut idle_since: Option<tokio::time::Instant> = None;
loop {
tokio::time::sleep(check_interval).await;
let connections = kernel.total_connection_count();
let effective_connections = connections;
let has_daemons = {
let reg = kernel.capsules.read().await;
reg.values().any(|c| {
let m = c.manifest();
!m.uplinks.is_empty()
})
};
if effective_connections == 0 && !has_daemons {
let now = tokio::time::Instant::now();
let start = *idle_since.get_or_insert(now);
let elapsed = now.duration_since(start);
tracing::debug!(
idle_secs = elapsed.as_secs(),
timeout_secs = idle_timeout.as_secs(),
connections,
"Kernel idle, monitoring timeout"
);
if elapsed >= idle_timeout {
tracing::info!("Idle timeout reached, initiating shutdown");
kernel.shutdown(Some("idle_timeout".to_string())).await;
std::process::exit(0);
}
} else {
if idle_since.is_some() {
tracing::debug!(
effective_connections,
has_daemons,
"Activity detected, resetting idle timer"
);
}
idle_since = None;
}
}
})
}
struct RestartTracker {
attempts: u32,
last_attempt: std::time::Instant,
backoff: std::time::Duration,
}
impl RestartTracker {
const MAX_ATTEMPTS: u32 = 5;
const INITIAL_BACKOFF: std::time::Duration = std::time::Duration::from_secs(2);
const MAX_BACKOFF: std::time::Duration = std::time::Duration::from_secs(120);
fn new() -> Self {
Self {
attempts: 0,
last_attempt: std::time::Instant::now(),
backoff: Self::INITIAL_BACKOFF,
}
}
fn should_restart(&self) -> bool {
self.attempts < Self::MAX_ATTEMPTS && self.last_attempt.elapsed() >= self.backoff
}
fn record_attempt(&mut self) {
self.attempts = self.attempts.saturating_add(1);
self.last_attempt = std::time::Instant::now();
self.backoff = self.backoff.saturating_mul(2).min(Self::MAX_BACKOFF);
}
fn exhausted(&self) -> bool {
self.attempts >= Self::MAX_ATTEMPTS
}
}
async fn attempt_capsule_restart(
kernel: &Kernel,
id_str: &str,
tracker: &mut RestartTracker,
) -> bool {
if tracker.exhausted() {
return false;
}
if !tracker.should_restart() {
tracing::debug!(
capsule_id = %id_str,
next_attempt_in = ?tracker.backoff.saturating_sub(tracker.last_attempt.elapsed()),
"Waiting for backoff before next restart attempt"
);
return false;
}
tracker.record_attempt();
let attempt = tracker.attempts;
tracing::warn!(
capsule_id = %id_str,
attempt,
max_attempts = RestartTracker::MAX_ATTEMPTS,
"Attempting capsule restart"
);
let capsule_id = astrid_capsule::capsule::CapsuleId::from_static(id_str);
match kernel.restart_capsule(&capsule_id).await {
Ok(()) => {
tracing::info!(capsule_id = %id_str, attempt, "Capsule restarted successfully");
true
},
Err(e) => {
tracing::error!(capsule_id = %id_str, attempt, error = %e, "Capsule restart failed");
if tracker.exhausted() {
tracing::error!(
capsule_id = %id_str,
"All restart attempts exhausted - capsule will remain down"
);
}
false
},
}
}
fn spawn_capsule_health_monitor(kernel: Arc<Kernel>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(10));
interval.tick().await;
let mut restart_trackers: std::collections::HashMap<String, RestartTracker> =
std::collections::HashMap::new();
loop {
interval.tick().await;
let ready_capsules: Vec<std::sync::Arc<dyn astrid_capsule::capsule::Capsule>> = {
let registry = kernel.capsules.read().await;
registry
.list()
.into_iter()
.filter_map(|id| {
let capsule = registry.get(id)?;
if capsule.state() == astrid_capsule::capsule::CapsuleState::Ready {
Some(capsule)
} else {
None
}
})
.collect()
};
let mut failures: Vec<(String, String)> = Vec::new();
for capsule in &ready_capsules {
let health = capsule.check_health();
if let astrid_capsule::capsule::CapsuleState::Failed(reason) = health {
let id_str = capsule.id().to_string();
tracing::error!(capsule_id = %id_str, reason = %reason, "Capsule health check failed");
let msg = astrid_events::ipc::IpcMessage::new(
"astrid.v1.health.failed",
astrid_events::ipc::IpcPayload::Custom {
data: serde_json::json!({
"capsule_id": &id_str,
"reason": &reason,
}),
},
uuid::Uuid::new_v4(),
);
let _ = kernel.event_bus.publish(astrid_events::AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("kernel"),
message: msg,
});
failures.push((id_str, reason));
}
}
drop(ready_capsules);
let failed_this_tick: std::collections::HashSet<&str> =
failures.iter().map(|(id, _)| id.as_str()).collect();
let mut restarted = Vec::new();
for (id_str, _reason) in &failures {
let tracker = restart_trackers
.entry(id_str.clone())
.or_insert_with(RestartTracker::new);
if attempt_capsule_restart(&kernel, id_str, tracker).await {
restarted.push(id_str.clone());
}
}
for id in &restarted {
restart_trackers.remove(id);
}
restart_trackers.retain(|id, tracker| {
if tracker.exhausted() {
return true;
}
if tracker.last_attempt.elapsed() < tracker.backoff {
return true;
}
failed_this_tick.contains(id.as_str())
});
}
})
}
fn spawn_react_watchdog(event_bus: Arc<EventBus>) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(5));
interval.tick().await;
loop {
interval.tick().await;
let msg = astrid_events::ipc::IpcMessage::new(
"astrid.v1.watchdog.tick",
astrid_events::ipc::IpcPayload::Custom {
data: serde_json::json!({}),
},
uuid::Uuid::new_v4(),
);
let _ = event_bus.publish(astrid_events::AstridEvent::Ipc {
metadata: astrid_events::EventMetadata::new("kernel"),
message: msg,
});
}
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_or_generate_creates_new_key() {
let dir = tempfile::tempdir().unwrap();
let keys_dir = dir.path().join("keys");
let keypair = load_or_generate_runtime_key(&keys_dir).unwrap();
let key_path = keys_dir.join("runtime.key");
assert!(key_path.exists());
let bytes = std::fs::read(&key_path).unwrap();
assert_eq!(bytes.len(), 32);
let reloaded = KeyPair::from_secret_key(&bytes).unwrap();
assert_eq!(
keypair.public_key_bytes(),
reloaded.public_key_bytes(),
"reloaded key should match generated key"
);
}
#[test]
fn test_load_or_generate_is_idempotent() {
let dir = tempfile::tempdir().unwrap();
let keys_dir = dir.path().join("keys");
let first = load_or_generate_runtime_key(&keys_dir).unwrap();
let second = load_or_generate_runtime_key(&keys_dir).unwrap();
assert_eq!(
first.public_key_bytes(),
second.public_key_bytes(),
"loading the same key file should produce the same keypair"
);
}
#[test]
fn test_load_or_generate_rejects_bad_key_length() {
let dir = tempfile::tempdir().unwrap();
let keys_dir = dir.path().join("keys");
std::fs::create_dir_all(&keys_dir).unwrap();
std::fs::write(keys_dir.join("runtime.key"), [0u8; 16]).unwrap();
let result = load_or_generate_runtime_key(&keys_dir);
assert!(result.is_err());
let err = result.unwrap_err().to_string();
assert!(
err.contains("invalid runtime key"),
"expected 'invalid runtime key' error, got: {err}"
);
}
#[test]
fn test_connection_counter_increment_decrement() {
let counter = AtomicUsize::new(0);
counter.fetch_add(1, Ordering::Relaxed);
counter.fetch_add(1, Ordering::Relaxed);
assert_eq!(counter.load(Ordering::Relaxed), 2);
for expected in [1, 0] {
let _ = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
if n == 0 {
None
} else {
Some(n.saturating_sub(1))
}
});
assert_eq!(counter.load(Ordering::Relaxed), expected);
}
}
#[test]
fn test_connection_counter_underflow_guard() {
let counter = AtomicUsize::new(0);
let result = counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
if n == 0 { None } else { Some(n - 1) }
});
assert!(result.is_err());
assert_eq!(counter.load(Ordering::Relaxed), 0);
}
#[test]
fn test_last_disconnect_clears_session_allowances_scoped() {
use astrid_approval::AllowanceStore;
use astrid_approval::allowance::{Allowance, AllowanceId, AllowancePattern};
use astrid_core::principal::PrincipalId;
use astrid_core::types::Timestamp;
use astrid_crypto::KeyPair;
let store = AllowanceStore::new();
let keypair = KeyPair::generate();
let alice = PrincipalId::new("alice").unwrap();
let bob = PrincipalId::new("bob").unwrap();
store
.add_allowance(Allowance {
id: AllowanceId::new(),
principal: alice.clone(),
action_pattern: AllowancePattern::ServerTools {
server: "alice-session".to_string(),
},
created_at: Timestamp::now(),
expires_at: None,
max_uses: None,
uses_remaining: None,
session_only: true,
workspace_root: None,
signature: keypair.sign(b"test"),
})
.unwrap();
store
.add_allowance(Allowance {
id: AllowanceId::new(),
principal: alice.clone(),
action_pattern: AllowancePattern::ServerTools {
server: "alice-persistent".to_string(),
},
created_at: Timestamp::now(),
expires_at: None,
max_uses: None,
uses_remaining: None,
session_only: false,
workspace_root: None,
signature: keypair.sign(b"test"),
})
.unwrap();
store
.add_allowance(Allowance {
id: AllowanceId::new(),
principal: bob.clone(),
action_pattern: AllowancePattern::ServerTools {
server: "bob-session".to_string(),
},
created_at: Timestamp::now(),
expires_at: None,
max_uses: None,
uses_remaining: None,
session_only: true,
workspace_root: None,
signature: keypair.sign(b"test"),
})
.unwrap();
assert_eq!(store.count(), 3);
let alice_counter = AtomicUsize::new(1);
let simulate_alice_disconnect = || {
let result = alice_counter.fetch_update(Ordering::Relaxed, Ordering::Relaxed, |n| {
if n == 0 {
None
} else {
Some(n.saturating_sub(1))
}
});
if result == Ok(1) {
store.clear_session_allowances(&alice);
}
};
simulate_alice_disconnect();
assert_eq!(store.count(), 2);
assert_eq!(store.count_for(&alice), 1);
assert_eq!(store.count_for(&bob), 1);
}
#[cfg(unix)]
#[test]
fn test_load_or_generate_sets_secure_permissions() {
use std::os::unix::fs::PermissionsExt;
let dir = tempfile::tempdir().unwrap();
let keys_dir = dir.path().join("keys");
let _ = load_or_generate_runtime_key(&keys_dir).unwrap();
let key_path = keys_dir.join("runtime.key");
let mode = std::fs::metadata(&key_path).unwrap().permissions().mode();
assert_eq!(
mode & 0o777,
0o600,
"key file should have 0o600 permissions, got {mode:#o}"
);
}
#[test]
fn restart_tracker_initial_state() {
let tracker = RestartTracker::new();
assert!(!tracker.exhausted());
assert!(!tracker.should_restart());
}
#[test]
fn restart_tracker_allows_restart_after_backoff() {
let mut tracker = RestartTracker::new();
tracker.last_attempt = std::time::Instant::now()
- RestartTracker::INITIAL_BACKOFF
- std::time::Duration::from_millis(1);
assert!(tracker.should_restart());
}
#[test]
fn restart_tracker_doubles_backoff() {
let mut tracker = RestartTracker::new();
assert_eq!(tracker.backoff, RestartTracker::INITIAL_BACKOFF);
tracker.record_attempt();
assert_eq!(
tracker.backoff,
RestartTracker::INITIAL_BACKOFF.saturating_mul(2)
);
assert_eq!(tracker.attempts, 1);
tracker.record_attempt();
assert_eq!(
tracker.backoff,
RestartTracker::INITIAL_BACKOFF.saturating_mul(4)
);
assert_eq!(tracker.attempts, 2);
}
#[test]
fn restart_tracker_backoff_caps_at_max() {
let mut tracker = RestartTracker::new();
for _ in 0..20 {
tracker.record_attempt();
}
assert_eq!(tracker.backoff, RestartTracker::MAX_BACKOFF);
}
#[test]
fn restart_tracker_exhausted_at_max_attempts() {
let mut tracker = RestartTracker::new();
for _ in 0..RestartTracker::MAX_ATTEMPTS {
assert!(!tracker.exhausted());
tracker.record_attempt();
}
assert!(tracker.exhausted());
}
#[test]
fn restart_tracker_should_restart_false_when_exhausted() {
let mut tracker = RestartTracker::new();
for _ in 0..RestartTracker::MAX_ATTEMPTS {
tracker.record_attempt();
}
tracker.last_attempt = std::time::Instant::now() - RestartTracker::MAX_BACKOFF;
assert!(!tracker.should_restart());
}
fn scratch_home() -> (tempfile::TempDir, astrid_core::dirs::AstridHome) {
let dir = tempfile::tempdir().unwrap();
let home = astrid_core::dirs::AstridHome::from_path(dir.path());
(dir, home)
}
#[test]
fn seed_admin_writes_fresh_profile_when_missing() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
let path = astrid_core::PrincipalProfile::path_for(&home, &default);
assert!(!path.exists());
seed_default_principal_admin_profile(&home).unwrap();
let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
assert_eq!(profile.groups, vec!["admin".to_string()]);
assert!(profile.grants.is_empty());
assert!(profile.revokes.is_empty());
}
#[test]
fn seed_admin_is_idempotent_across_reboots() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
seed_default_principal_admin_profile(&home).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
let path = astrid_core::PrincipalProfile::path_for(&home, &default);
let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
assert_eq!(profile.groups, vec!["admin".to_string()]);
}
#[test]
fn seed_admin_leaves_operator_configured_groups_intact() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
let mut existing = astrid_core::PrincipalProfile::default();
existing.groups = vec!["agent".to_string()];
let path = astrid_core::PrincipalProfile::path_for(&home, &default);
std::fs::create_dir_all(home.profiles_dir()).unwrap();
existing.save_to_path(&path).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
assert_eq!(profile.groups, vec!["agent".to_string()]);
}
#[test]
fn seed_admin_leaves_operator_configured_grants_intact() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
let mut existing = astrid_core::PrincipalProfile::default();
existing.grants = vec!["system:status".to_string()];
let path = astrid_core::PrincipalProfile::path_for(&home, &default);
std::fs::create_dir_all(home.profiles_dir()).unwrap();
existing.save_to_path(&path).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
assert!(profile.groups.is_empty());
assert_eq!(profile.grants, vec!["system:status".to_string()]);
}
#[test]
fn seed_admin_leaves_operator_configured_revokes_intact() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
let mut existing = astrid_core::PrincipalProfile::default();
existing.revokes = vec!["system:shutdown".to_string()];
let path = astrid_core::PrincipalProfile::path_for(&home, &default);
std::fs::create_dir_all(home.profiles_dir()).unwrap();
existing.save_to_path(&path).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
let profile = astrid_core::PrincipalProfile::load_from_path(&path).unwrap();
assert!(profile.groups.is_empty());
assert_eq!(profile.revokes, vec!["system:shutdown".to_string()]);
}
#[test]
fn migrate_legacy_profile_relocates_to_etc() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
let legacy_path = home
.principal_home(&default)
.config_dir()
.join("profile.toml");
std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
let mut existing = astrid_core::PrincipalProfile::default();
existing.groups = vec!["operator-configured".to_string()];
existing.save_to_path(&legacy_path).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
assert!(!legacy_path.exists());
let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
let migrated = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
assert_eq!(migrated.groups, vec!["operator-configured".to_string()]);
}
#[test]
fn migrate_legacy_profile_drops_stale_legacy_when_new_already_exists() {
let (_d, home) = scratch_home();
let default = astrid_core::PrincipalId::default();
let legacy_path = home
.principal_home(&default)
.config_dir()
.join("profile.toml");
std::fs::create_dir_all(legacy_path.parent().unwrap()).unwrap();
let mut stale = astrid_core::PrincipalProfile::default();
stale.groups = vec!["stale".to_string()];
stale.save_to_path(&legacy_path).unwrap();
let new_path = astrid_core::PrincipalProfile::path_for(&home, &default);
std::fs::create_dir_all(new_path.parent().unwrap()).unwrap();
let mut canonical = astrid_core::PrincipalProfile::default();
canonical.groups = vec!["canonical".to_string()];
canonical.save_to_path(&new_path).unwrap();
seed_default_principal_admin_profile(&home).unwrap();
assert!(!legacy_path.exists());
let result = astrid_core::PrincipalProfile::load_from_path(&new_path).unwrap();
assert_eq!(result.groups, vec!["canonical".to_string()]);
}
}
fn validate_imports_exports(
manifests: &[(
astrid_capsule::manifest::CapsuleManifest,
std::path::PathBuf,
)],
) {
let mut exports_by_interface: std::collections::HashMap<
(&str, &str),
Vec<(&str, &semver::Version)>,
> = std::collections::HashMap::new();
for (m, _) in manifests {
for (ns, name, ver) in m.export_triples() {
exports_by_interface
.entry((ns, name))
.or_default()
.push((&m.package.name, ver));
}
}
for ((ns, name), providers) in &exports_by_interface {
if providers.len() > 1 {
let names: Vec<&str> = providers.iter().map(|(n, _)| *n).collect();
tracing::warn!(
interface = %format!("{ns}/{name}"),
providers = ?names,
"Multiple capsules export the same interface — events may be double-processed. \
Consider removing one with `astrid capsule remove`."
);
}
}
let mut satisfied_count: u32 = 0;
let mut warning_count: u32 = 0;
for (manifest, _) in manifests {
for (ns, name, req, optional) in manifest.import_tuples() {
let has_provider = exports_by_interface
.get(&(ns, name))
.is_some_and(|providers| providers.iter().any(|(_, v)| req.matches(v)));
if has_provider {
satisfied_count = satisfied_count.saturating_add(1);
} else if optional {
tracing::info!(
capsule = %manifest.package.name,
import = %format!("{ns}/{name} {req}"),
"Optional import not satisfied — capsule will boot with reduced functionality"
);
warning_count = warning_count.saturating_add(1);
} else {
tracing::error!(
capsule = %manifest.package.name,
import = %format!("{ns}/{name} {req}"),
"Required import not satisfied — no loaded capsule exports this interface"
);
warning_count = warning_count.saturating_add(1);
}
}
}
tracing::info!(
capsules = manifests.len(),
imports_satisfied = satisfied_count,
warnings = warning_count,
"Boot validation complete"
);
}
async fn bootstrap_cli_root_user(
store: &Arc<dyn astrid_storage::IdentityStore>,
home: &astrid_core::dirs::AstridHome,
) -> Result<(), astrid_storage::IdentityError> {
if let Err(e) = seed_default_principal_admin_profile(home) {
tracing::warn!(error = %e, "Failed to seed default admin profile — continuing boot");
}
if let Some(_user) = store.resolve("cli", "local").await? {
tracing::debug!("CLI root user already linked");
return Ok(());
}
let user = store.create_user(Some("root")).await?;
tracing::info!(user_id = %user.id, "Created CLI root user");
store.link("cli", "local", user.id, "system").await?;
tracing::info!(user_id = %user.id, "Linked CLI root user (cli/local)");
Ok(())
}
fn migrate_legacy_profile_path(
home: &astrid_core::dirs::AstridHome,
principal: &astrid_core::PrincipalId,
) -> Result<(), std::io::Error> {
let legacy_path = home
.principal_home(principal)
.config_dir()
.join("profile.toml");
let new_path = home.profile_path(principal);
if !legacy_path.exists() {
return Ok(());
}
if new_path.exists() {
let _ = std::fs::remove_file(&legacy_path);
return Ok(());
}
if let Some(parent) = new_path.parent() {
std::fs::create_dir_all(parent)?;
}
std::fs::rename(&legacy_path, &new_path)?;
tracing::warn!(
%principal,
legacy = %legacy_path.display(),
new = %new_path.display(),
"Migrated profile.toml out of principal home directory \
(security: capsules with home:// fs_read could read the legacy file)"
);
Ok(())
}
fn seed_default_principal_admin_profile(
home: &astrid_core::dirs::AstridHome,
) -> Result<(), astrid_core::ProfileError> {
use astrid_core::PrincipalProfile;
let default_principal = astrid_core::PrincipalId::default();
if let Err(e) = migrate_legacy_profile_path(home, &default_principal) {
tracing::warn!(error = %e, "Failed to migrate legacy profile path — continuing");
}
let path = PrincipalProfile::path_for(home, &default_principal);
let profile = PrincipalProfile::load_from_path(&path)?;
if !profile.groups.is_empty() || !profile.grants.is_empty() || !profile.revokes.is_empty() {
tracing::debug!(
principal = %default_principal,
"Default principal profile already has group/grant/revoke entries — leaving intact"
);
return Ok(());
}
let mut updated = profile;
updated
.groups
.push(astrid_core::groups::BUILTIN_ADMIN.to_string());
updated.save_to_path(&path)?;
tracing::info!(
principal = %default_principal,
"Seeded default principal with built-in `admin` group"
);
Ok(())
}
async fn apply_identity_config(
store: &Arc<dyn astrid_storage::IdentityStore>,
workspace_root: &std::path::Path,
) {
let config = match astrid_config::Config::load(Some(workspace_root)) {
Ok(resolved) => resolved.config,
Err(e) => {
tracing::debug!(error = %e, "No config loaded for identity links");
return;
},
};
for link_cfg in &config.identity.links {
let result = apply_single_identity_link(store, link_cfg).await;
if let Err(e) = result {
tracing::warn!(
platform = %link_cfg.platform,
platform_user_id = %link_cfg.platform_user_id,
astrid_user = %link_cfg.astrid_user,
error = %e,
"Failed to apply identity link from config"
);
}
}
}
async fn apply_single_identity_link(
store: &Arc<dyn astrid_storage::IdentityStore>,
link_cfg: &astrid_config::types::IdentityLinkConfig,
) -> Result<(), astrid_storage::IdentityError> {
let user_id = if let Ok(uuid) = uuid::Uuid::parse_str(&link_cfg.astrid_user) {
if store.get_user(uuid).await?.is_none() {
return Err(astrid_storage::IdentityError::UserNotFound(uuid));
}
uuid
} else {
if let Some(user) = store.get_user_by_name(&link_cfg.astrid_user).await? {
user.id
} else {
let user = store.create_user(Some(&link_cfg.astrid_user)).await?;
tracing::info!(
user_id = %user.id,
name = %link_cfg.astrid_user,
"Created user from config identity link"
);
user.id
}
};
let method = if link_cfg.method.is_empty() {
"admin"
} else {
&link_cfg.method
};
if let Some(existing) = store
.resolve(&link_cfg.platform, &link_cfg.platform_user_id)
.await?
&& existing.id == user_id
{
tracing::debug!(
platform = %link_cfg.platform,
platform_user_id = %link_cfg.platform_user_id,
user_id = %user_id,
"Identity link from config already exists"
);
return Ok(());
}
store
.link(
&link_cfg.platform,
&link_cfg.platform_user_id,
user_id,
method,
)
.await?;
tracing::info!(
platform = %link_cfg.platform,
platform_user_id = %link_cfg.platform_user_id,
user_id = %user_id,
"Applied identity link from config"
);
Ok(())
}