use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinHandle;
use tokio_util::sync::CancellationToken;
use koi_common::integration::{
AliasFeedback, CertmeshSnapshot, DnsProbe, MdnsSnapshot, ProxySnapshot,
};
#[derive(Clone, Default)]
pub struct Cores {
pub mdns: Option<Arc<koi_mdns::MdnsCore>>,
pub certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
pub dns: Option<Arc<koi_dns::DnsRuntime>>,
pub health: Option<Arc<koi_health::HealthRuntime>>,
pub proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
pub udp: Option<Arc<koi_udp::UdpRuntime>>,
pub runtime: Option<Arc<koi_runtime::RuntimeCore>>,
}
pub struct CoreSpec {
pub no_mdns: bool,
pub no_certmesh: bool,
pub no_dns: bool,
pub no_health: bool,
pub no_proxy: bool,
pub no_udp: bool,
pub no_runtime: bool,
pub data_dir: std::path::PathBuf,
pub dns_config: koi_dns::DnsConfig,
pub runtime: String,
pub http_port: u16,
}
pub fn init_certmesh_core(data_dir: Option<&Path>) -> Option<Arc<koi_certmesh::CertmeshCore>> {
let paths = koi_certmesh::CertmeshPaths::with_data_dir(
koi_common::paths::koi_data_dir_with_override(data_dir),
);
if !paths.is_ca_initialized() {
tracing::info!("Certmesh: CA not initialized - routes mounted for /create");
return Some(Arc::new(
koi_certmesh::CertmeshCore::uninitialized_with_paths(paths),
));
}
let roster_path = paths.roster_path();
let roster = match koi_certmesh::roster::load_roster(&roster_path) {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "Failed to load certmesh roster - using uninitialized state");
return Some(Arc::new(
koi_certmesh::CertmeshCore::uninitialized_with_paths(paths),
));
}
};
let profile = roster.metadata.trust_profile;
if let Ok(Some(pp)) = koi_certmesh::CertmeshCore::read_auto_unlock_key(&paths) {
match koi_certmesh::ca::load_ca(&pp, &paths) {
Ok(ca_state) => {
if let Ok(fresh_roster) = koi_certmesh::roster::load_roster(&roster_path) {
let auth_path = paths.auth_path();
let auth = if auth_path.exists() {
std::fs::read_to_string(&auth_path)
.ok()
.and_then(|json| {
serde_json::from_str::<koi_crypto::auth::StoredAuth>(&json).ok()
})
.and_then(|stored| stored.unlock(&pp).ok())
} else {
None
};
tracing::info!("Certmesh CA auto-unlocked at init from vault");
return Some(Arc::new(koi_certmesh::CertmeshCore::new_with_paths(
ca_state,
fresh_roster,
auth,
profile,
paths,
)));
}
}
Err(e) => {
tracing::warn!(
error = %e,
"Auto-unlock key exists in vault but CA decryption failed"
);
}
}
}
tracing::info!("Certmesh: CA initialized (locked, use `koi certmesh unlock` to decrypt)");
let core = koi_certmesh::CertmeshCore::locked_with_paths(roster, profile, paths);
Some(Arc::new(core))
}
pub async fn build_cores(
spec: &CoreSpec,
cancel: &CancellationToken,
tasks: &mut Vec<JoinHandle<()>>,
) -> Cores {
let mdns_core = if !spec.no_mdns {
match koi_mdns::MdnsCore::with_cancel(cancel.clone()) {
Ok(core) => Some(Arc::new(core)),
Err(e) => {
tracing::error!(error = %e, "Failed to initialize mDNS core");
None
}
}
} else {
tracing::info!("mDNS capability: disabled");
None
};
let certmesh_core = if !spec.no_certmesh {
init_certmesh_core(Some(&spec.data_dir))
} else {
tracing::info!("Certmesh capability: disabled");
None
};
let mdns_bridge: Option<Arc<dyn MdnsSnapshot>> = if let Some(ref core) = mdns_core {
Some(crate::bridges::MdnsBridge::spawn(core.clone()).await)
} else {
None
};
let certmesh_bridge: Option<Arc<dyn CertmeshSnapshot>> = certmesh_core
.as_ref()
.map(|core| crate::bridges::CertmeshBridge::new(core.clone()) as Arc<dyn CertmeshSnapshot>);
let alias_feedback: Option<Arc<dyn AliasFeedback>> = certmesh_core.as_ref().map(|core| {
crate::bridges::AliasFeedbackBridge::new(core.clone()) as Arc<dyn AliasFeedback>
});
let dns_runtime = if !spec.no_dns {
let core = koi_dns::DnsCore::new(
spec.dns_config.clone(),
mdns_bridge.clone(),
certmesh_bridge.clone(),
alias_feedback,
)
.await;
match core {
Ok(core) => {
let runtime = Arc::new(koi_dns::DnsRuntime::new(core));
if let Err(e) = runtime.start().await {
tracing::error!(error = %e, "Failed to start DNS server");
}
Some(runtime)
}
Err(e) => {
tracing::error!(error = %e, "Failed to initialize DNS core");
None
}
}
} else {
tracing::info!("DNS capability: disabled");
None
};
let proxy_runtime = if !spec.no_proxy {
match koi_proxy::ProxyCore::new() {
Ok(core) => {
let runtime = Arc::new(koi_proxy::ProxyRuntime::new(Arc::new(core)));
if let Err(e) = runtime.start_all().await {
tracing::error!(error = %e, "Failed to start proxy listeners");
}
Some(runtime)
}
Err(e) => {
tracing::error!(error = %e, "Failed to initialize proxy core");
None
}
}
} else {
tracing::info!("Proxy capability: disabled");
None
};
let dns_bridge: Option<Arc<dyn DnsProbe>> = dns_runtime
.as_ref()
.map(|rt| crate::bridges::DnsBridge::new(rt.clone()) as Arc<dyn DnsProbe>);
let proxy_bridge: Option<Arc<dyn ProxySnapshot>> = proxy_runtime
.as_ref()
.map(|rt| crate::bridges::ProxyBridge::new(rt.core()) as Arc<dyn ProxySnapshot>);
let health_runtime = if !spec.no_health {
let core = Arc::new(
koi_health::HealthCore::new(
mdns_bridge.clone(),
dns_bridge,
certmesh_bridge,
proxy_bridge,
)
.await,
);
let runtime = Arc::new(koi_health::HealthRuntime::new(core));
if let Err(e) = runtime.start().await {
tracing::error!(error = %e, "Failed to start health checks");
}
Some(runtime)
} else {
tracing::info!("Health capability: disabled");
None
};
let udp_runtime = if !spec.no_udp {
Some(Arc::new(koi_udp::UdpRuntime::new(cancel.clone())))
} else {
tracing::info!("UDP capability: disabled");
None
};
let runtime_core = if !spec.no_runtime {
let backend_kind = koi_runtime::RuntimeBackendKind::from_str_loose(&spec.runtime)
.unwrap_or_else(|| {
tracing::warn!(
value = %spec.runtime,
"Unknown runtime backend, falling back to auto"
);
koi_runtime::RuntimeBackendKind::Auto
});
let rt_config = koi_runtime::RuntimeConfig {
backend_kind,
socket_path: None,
};
let core = Arc::new(koi_runtime::RuntimeCore::new(rt_config));
match core.start_watching(cancel.clone()).await {
Ok(()) => Some(core),
Err(e) => {
tracing::warn!(error = %e, "Runtime adapter unavailable, continuing without it");
None
}
}
} else {
tracing::info!("Runtime capability: disabled");
None
};
if let Some(ref rt) = runtime_core {
tasks.push(crate::orchestrator::spawn_orchestrator(
rt,
crate::orchestrator::OrchestrationTargets {
mdns: mdns_core.clone(),
dns: dns_runtime.clone(),
health: health_runtime.clone(),
proxy: proxy_runtime.clone(),
},
cancel.clone(),
));
}
let cores = Cores {
mdns: mdns_core,
certmesh: certmesh_core,
dns: dns_runtime,
health: health_runtime,
proxy: proxy_runtime,
udp: udp_runtime,
runtime: runtime_core,
};
if let Some(ref certmesh) = cores.certmesh {
crate::certmesh::spawn_certmesh_background_tasks(
certmesh,
cores.mdns.clone(),
spec.http_port,
cancel,
tasks,
);
}
tracing::debug!("Domain cores built");
cores
}
pub async fn ordered_shutdown(
cancel: &CancellationToken,
tasks: Vec<JoinHandle<()>>,
cores: &Cores,
http_announce_id: Option<String>,
timeout: Duration,
drain: Duration,
) {
let shutdown = async {
cancel.cancel();
tokio::time::sleep(drain).await;
for task in tasks {
let _ = task.await;
}
if let Some(ref id) = http_announce_id {
if let Some(ref core) = cores.mdns {
if let Err(e) = core.unregister(id) {
tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
}
}
}
if let Some(ref core) = cores.mdns {
if let Err(e) = core.shutdown().await {
tracing::warn!(error = %e, "Error during mDNS shutdown");
}
}
if let Some(ref dns) = cores.dns {
dns.stop().await;
}
if let Some(ref health) = cores.health {
let _ = health.stop().await;
}
if let Some(ref proxy) = cores.proxy {
let _ = proxy.stop_all().await;
}
if let Some(ref udp) = cores.udp {
udp.shutdown().await;
}
};
if tokio::time::timeout(timeout, shutdown).await.is_err() {
tracing::warn!("Shutdown timed out after {:?} - forcing exit", timeout);
}
}