mod adapters;
mod admin;
pub(crate) mod cli;
mod client;
mod commands;
mod format;
mod integrations;
mod orchestrator;
mod platform;
mod surface;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use clap::CommandFactory;
use clap::Parser;
use tokio_util::sync::CancellationToken;
use cli::{
CertmeshSubcommand, Cli, Command, Config, DnsSubcommand, HealthSubcommand, MdnsSubcommand,
ProxySubcommand, UdpSubcommand,
};
use commands::status::try_daemon_status;
use koi_common::types::ServiceRecord;
pub(crate) const SHUTDOWN_TIMEOUT: Duration = Duration::from_secs(20);
pub(crate) const SHUTDOWN_DRAIN: Duration = Duration::from_millis(500);
fn main() -> anyhow::Result<()> {
#[cfg(windows)]
{
if platform::windows::try_run_as_service() {
return Ok(());
}
}
{
let raw_args: Vec<String> = std::env::args().skip(1).collect();
if let Some(cmd_name) = extract_help_query(&raw_args) {
if let Some(def) = surface::MANIFEST.get(&cmd_name) {
if let Err(e) = surface::print_command_detail(def) {
eprintln!("Error: {e}");
}
} else {
eprintln!("Unknown command: {cmd_name}");
eprintln!("Run koi to see available commands.");
std::process::exit(1);
}
return Ok(());
}
}
let cli = Cli::parse();
let config = Config::from_cli(&cli);
let level = match cli.verbose {
0 => cli.log_level.as_str(),
1 => "debug",
_ => "trace",
};
let env_filter = tracing_subscriber::EnvFilter::try_new(level)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let _log_guards = init_logging(env_filter, cli.log_file.as_deref())?;
if let Some(command) = &cli.command {
match command {
Command::Install => {
return {
#[cfg(windows)]
{
platform::windows::install()
}
#[cfg(target_os = "linux")]
{
platform::unix::install()
}
#[cfg(target_os = "macos")]
{
platform::macos::install()
}
#[cfg(not(any(windows, target_os = "linux", target_os = "macos")))]
{
anyhow::bail!("Service install is not supported on this platform.")
}
};
}
Command::Uninstall => {
return {
#[cfg(windows)]
{
platform::windows::uninstall()
}
#[cfg(target_os = "linux")]
{
platform::unix::uninstall()
}
#[cfg(target_os = "macos")]
{
platform::macos::uninstall()
}
#[cfg(not(any(windows, target_os = "linux", target_os = "macos")))]
{
anyhow::bail!("Service uninstall is not supported on this platform.")
}
};
}
Command::Version => {
if cli.json {
println!(
"{}",
serde_json::json!({
"version": env!("CARGO_PKG_VERSION"),
"platform": std::env::consts::OS,
})
);
} else {
println!("koi {}", env!("CARGO_PKG_VERSION"));
}
return Ok(());
}
Command::Launch => {
let port = cli.port;
let url = format!("http://localhost:{port}");
println!("Opening dashboard at {url}");
if let Err(e) = open::that(&url) {
eprintln!("Failed to open browser: {e}");
eprintln!("Open manually: {url}");
}
return Ok(());
}
Command::FactoryReset => {
return commands::factory_reset::run(cli.json);
}
_ => {} }
}
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(run(cli, config))
}
async fn run(cli: Cli, config: Config) -> anyhow::Result<()> {
if let Some(command) = &cli.command {
return match command {
Command::Status => commands::status::status(&cli, &config),
Command::Mdns(mdns_cmd) => {
config.require_capability("mdns")?;
match &mdns_cmd.command {
None => {
surface::print_category_catalog(surface::KoiCategory::Discovery, None)?;
Ok(())
}
Some(MdnsSubcommand::Admin(admin_cmd)) => match &admin_cmd.command {
Some(admin) => commands::mdns::admin(admin, &cli),
None => {
surface::print_category_catalog(
surface::KoiCategory::Discovery,
Some(surface::KoiScope::Admin),
)?;
Ok(())
}
},
Some(MdnsSubcommand::Discover { service_type }) => {
let mode = commands::detect_mode(&cli);
commands::mdns::discover(
service_type.as_deref(),
cli.json,
cli.timeout,
mode,
)
.await
}
Some(MdnsSubcommand::Announce {
name,
service_type,
port,
ip,
txt,
}) => {
let mode = commands::detect_mode(&cli);
commands::mdns::announce(
name,
service_type,
*port,
ip.as_deref(),
txt,
cli.json,
cli.timeout,
mode,
)
.await
}
Some(MdnsSubcommand::Unregister { id }) => {
let mode = commands::detect_mode(&cli);
commands::mdns::unregister(id, cli.json, mode).await
}
Some(MdnsSubcommand::Resolve { instance }) => {
let mode = commands::detect_mode(&cli);
commands::mdns::resolve(instance, cli.json, mode).await
}
Some(MdnsSubcommand::Subscribe { service_type }) => {
let mode = commands::detect_mode(&cli);
commands::mdns::subscribe(service_type, cli.json, cli.timeout, mode).await
}
}
}
Command::Certmesh(cm_cmd) => {
config.require_capability("certmesh")?;
let ep = cli.endpoint.as_deref();
match &cm_cmd.command {
None => {
surface::print_category_catalog(surface::KoiCategory::Trust, None)?;
Ok(())
}
Some(CertmeshSubcommand::Create {
profile,
operator,
enrollment,
require_approval,
passphrase,
}) => commands::certmesh::create(
profile.as_deref(),
operator.as_deref(),
enrollment.as_deref(),
*require_approval,
passphrase.as_deref(),
cli.json,
ep,
),
Some(CertmeshSubcommand::Status) => commands::certmesh::status(cli.json, ep),
Some(CertmeshSubcommand::Log) => commands::certmesh::log(ep),
Some(CertmeshSubcommand::Compliance) => {
commands::certmesh::compliance(cli.json, ep)
}
Some(CertmeshSubcommand::Unlock) => commands::certmesh::unlock(ep),
Some(CertmeshSubcommand::SetHook { reload }) => {
commands::certmesh::set_hook(reload, cli.json, ep)
}
Some(CertmeshSubcommand::Join { endpoint }) => {
commands::certmesh::join(endpoint.as_deref(), cli.json, ep).await
}
Some(CertmeshSubcommand::Promote { endpoint }) => {
commands::certmesh::promote(endpoint.as_deref(), cli.json, ep).await
}
Some(CertmeshSubcommand::OpenEnrollment { until }) => {
commands::certmesh::open_enrollment(until.as_deref(), cli.json, ep)
}
Some(CertmeshSubcommand::CloseEnrollment) => {
commands::certmesh::close_enrollment(cli.json, ep)
}
Some(CertmeshSubcommand::SetPolicy {
domain,
subnet,
clear,
}) => commands::certmesh::set_policy(
domain.as_deref(),
subnet.as_deref(),
*clear,
cli.json,
ep,
),
Some(CertmeshSubcommand::RotateAuth) => {
commands::certmesh::rotate_auth(cli.json, ep)
}
Some(CertmeshSubcommand::Backup { path }) => {
commands::certmesh::backup(path, cli.json, ep)
}
Some(CertmeshSubcommand::Restore { path }) => {
commands::certmesh::restore(path, cli.json, ep)
}
Some(CertmeshSubcommand::Revoke { hostname, reason }) => {
commands::certmesh::revoke(hostname, reason.as_deref(), cli.json, ep)
}
Some(CertmeshSubcommand::Destroy) => commands::certmesh::destroy(cli.json, ep),
}
}
Command::Dns(dns_cmd) => {
config.require_capability("dns")?;
let mode = commands::detect_mode(&cli);
match &dns_cmd.command {
None => {
surface::print_category_catalog(surface::KoiCategory::Dns, None)?;
Ok(())
}
Some(DnsSubcommand::Serve) => commands::dns::serve(&config, mode).await,
Some(DnsSubcommand::Stop) => commands::dns::stop(mode).await,
Some(DnsSubcommand::Status) => {
commands::dns::status(&config, mode, cli.json).await
}
Some(DnsSubcommand::Lookup { name, record_type }) => {
commands::dns::lookup(name, record_type, mode, cli.json, &config).await
}
Some(DnsSubcommand::Add { name, ip, ttl }) => {
commands::dns::add(name, ip, *ttl, mode, cli.json, &config.dns_zone)
}
Some(DnsSubcommand::Remove { name }) => {
commands::dns::remove(name, mode, cli.json, &config.dns_zone)
}
Some(DnsSubcommand::List) => commands::dns::list(mode, cli.json, &config).await,
}
}
Command::Health(health_cmd) => {
config.require_capability("health")?;
let mode = commands::detect_mode(&cli);
match &health_cmd.command {
None => {
surface::print_category_catalog(surface::KoiCategory::Health, None)?;
Ok(())
}
Some(HealthSubcommand::Status) => {
commands::health::status(&config, mode, cli.json).await
}
Some(HealthSubcommand::Watch { interval }) => {
commands::health::watch(&config, mode, *interval).await
}
Some(HealthSubcommand::Add {
name,
http,
tcp,
interval,
timeout,
}) => {
commands::health::add(
name,
http.as_deref(),
tcp.as_deref(),
*interval,
*timeout,
mode,
cli.json,
&config,
)
.await
}
Some(HealthSubcommand::Remove { name }) => {
commands::health::remove(name, mode, cli.json, &config).await
}
Some(HealthSubcommand::Log) => commands::health::log(),
}
}
Command::Proxy(proxy_cmd) => {
config.require_capability("proxy")?;
let mode = commands::detect_mode(&cli);
match &proxy_cmd.command {
None => {
surface::print_category_catalog(surface::KoiCategory::Proxy, None)?;
Ok(())
}
Some(ProxySubcommand::Add {
name,
listen,
backend,
backend_remote,
}) => {
commands::proxy::add(
name,
*listen,
backend,
*backend_remote,
mode,
cli.json,
)
.await
}
Some(ProxySubcommand::Remove { name }) => {
commands::proxy::remove(name, mode, cli.json).await
}
Some(ProxySubcommand::Status) => commands::proxy::status(mode, cli.json).await,
Some(ProxySubcommand::List) => commands::proxy::list(mode, cli.json).await,
}
}
Command::Udp(udp_cmd) => {
config.require_capability("udp")?;
let mode = commands::detect_mode(&cli);
match &udp_cmd.command {
None => {
surface::print_category_catalog(surface::KoiCategory::Udp, None)?;
Ok(())
}
Some(UdpSubcommand::Bind { port, addr, lease }) => {
commands::udp::bind(*port, addr, *lease, mode, cli.json).await
}
Some(UdpSubcommand::Unbind { id }) => {
commands::udp::unbind(id, mode, cli.json).await
}
Some(UdpSubcommand::Send { id, dest, payload }) => {
commands::udp::send(id, dest, payload, mode, cli.json).await
}
Some(UdpSubcommand::Status) => commands::udp::status(mode, cli.json).await,
Some(UdpSubcommand::Heartbeat { id }) => {
commands::udp::heartbeat(id, mode, cli.json).await
}
}
}
Command::Token(token_cmd) => commands::token::run(token_cmd, cli.json),
Command::Install
| Command::Uninstall
| Command::Version
| Command::Launch
| Command::FactoryReset => Ok(()),
};
}
if cli.daemon {
return daemon_mode(config).await;
}
if is_piped_stdin() {
if config.no_mdns {
anyhow::bail!(
"Piped mode requires the mDNS capability. \
Remove --no-mdns or unset KOI_NO_MDNS to enable it."
);
}
let core = Arc::new(koi_mdns::MdnsCore::new()?);
adapters::cli::start(core.clone()).await?;
let _ = core.shutdown().await;
return Ok(());
}
if let Some(status_json) = try_daemon_status(&cli) {
if cli.json {
if let Ok(body) = serde_json::to_string_pretty(&status_json) {
println!("{body}");
}
} else {
print!("{}", format::unified_status(&status_json));
}
}
let api_endpoint = cli
.endpoint
.clone()
.or_else(koi_config::breadcrumb::read_breadcrumb_endpoint)
.unwrap_or_else(|| "http://localhost:5641".to_string());
print_top_level_help(&api_endpoint);
Ok(())
}
async fn daemon_mode(config: Config) -> anyhow::Result<()> {
koi_config::dirs::ensure_data_dir();
let http_bind_ip = if config.no_http {
None
} else {
Some(resolve_http_bind_ip(&config.http_bind)?)
};
startup_diagnostics(&config, http_bind_ip);
let dat_token = {
use base64::Engine;
use rand::RngCore;
let mut token_bytes = [0u8; 32];
rand::rng().fill_bytes(&mut token_bytes);
base64::engine::general_purpose::URL_SAFE_NO_PAD.encode(token_bytes)
};
if !config.no_http {
let endpoint = breadcrumb_endpoint(http_bind_ip, config.http_port);
koi_config::breadcrumb::write_breadcrumb(&endpoint, &dat_token);
}
let cancel = CancellationToken::new();
let mut tasks = Vec::new();
let started_at = std::time::Instant::now();
let mdns_core = if !config.no_mdns {
match koi_mdns::MdnsCore::with_cancel(cancel.clone()) {
Ok(core) => Some(Arc::new(core)),
Err(e) => {
tracing::error!(error = %e, "Failed to initialize mDNS core");
None
}
}
} else {
tracing::info!("mDNS capability: disabled");
None
};
let certmesh_core = if !config.no_certmesh {
init_certmesh_core()
} else {
tracing::info!("Certmesh capability: disabled");
None
};
let mdns_bridge: Option<Arc<dyn koi_common::integration::MdnsSnapshot>> =
if let Some(ref core) = mdns_core {
Some(integrations::MdnsBridge::spawn(core.clone()).await)
} else {
None
};
let certmesh_bridge: Option<Arc<dyn koi_common::integration::CertmeshSnapshot>> =
certmesh_core.as_ref().map(|core| {
integrations::CertmeshBridge::new(core.clone())
as Arc<dyn koi_common::integration::CertmeshSnapshot>
});
let alias_feedback: Option<Arc<dyn koi_common::integration::AliasFeedback>> =
certmesh_core.as_ref().map(|core| {
integrations::AliasFeedbackBridge::new(core.clone())
as Arc<dyn koi_common::integration::AliasFeedback>
});
let dns_runtime = if !config.no_dns {
let core = koi_dns::DnsCore::new(
config.dns_config(),
mdns_bridge.clone(),
certmesh_bridge.clone(),
alias_feedback,
)
.await;
match core {
Ok(core) => {
let runtime = Arc::new(koi_dns::DnsRuntime::new(core));
if let Err(e) = runtime.start().await {
tracing::error!(error = %e, "Failed to start DNS server");
}
Some(runtime)
}
Err(e) => {
tracing::error!(error = %e, "Failed to initialize DNS core");
None
}
}
} else {
tracing::info!("DNS capability: disabled");
None
};
let proxy_runtime = if !config.no_proxy {
match koi_proxy::ProxyCore::new() {
Ok(core) => {
let runtime = Arc::new(koi_proxy::ProxyRuntime::new(Arc::new(core)));
if let Err(e) = runtime.start_all().await {
tracing::error!(error = %e, "Failed to start proxy listeners");
}
Some(runtime)
}
Err(e) => {
tracing::error!(error = %e, "Failed to initialize proxy core");
None
}
}
} else {
tracing::info!("Proxy capability: disabled");
None
};
let dns_bridge: Option<Arc<dyn koi_common::integration::DnsProbe>> =
dns_runtime.as_ref().map(|rt| {
integrations::DnsBridge::new(rt.clone()) as Arc<dyn koi_common::integration::DnsProbe>
});
let proxy_bridge: Option<Arc<dyn koi_common::integration::ProxySnapshot>> =
proxy_runtime.as_ref().map(|rt| {
integrations::ProxyBridge::new(rt.core())
as Arc<dyn koi_common::integration::ProxySnapshot>
});
let health_runtime = if !config.no_health {
let core = Arc::new(
koi_health::HealthCore::new(
mdns_bridge.clone(),
dns_bridge,
certmesh_bridge,
proxy_bridge,
)
.await,
);
let runtime = Arc::new(koi_health::HealthRuntime::new(core));
if let Err(e) = runtime.start().await {
tracing::error!(error = %e, "Failed to start health checks");
}
Some(runtime)
} else {
tracing::info!("Health capability: disabled");
None
};
let udp_runtime = if !config.no_udp {
Some(Arc::new(koi_udp::UdpRuntime::new(cancel.clone())))
} else {
tracing::info!("UDP capability: disabled");
None
};
let runtime_core = if !config.no_runtime {
let backend_kind = koi_runtime::RuntimeBackendKind::from_str_loose(&config.runtime)
.unwrap_or_else(|| {
tracing::warn!(
value = %config.runtime,
"Unknown runtime backend, falling back to auto"
);
koi_runtime::RuntimeBackendKind::Auto
});
let rt_config = koi_runtime::RuntimeConfig {
backend_kind,
socket_path: None,
};
let core = Arc::new(koi_runtime::RuntimeCore::new(rt_config));
match core.start_watching(cancel.clone()).await {
Ok(()) => Some(core),
Err(e) => {
tracing::warn!(error = %e, "Runtime adapter unavailable, continuing without it");
None
}
}
} else {
tracing::info!("Runtime capability: disabled");
None
};
if let Some(ref rt) = runtime_core {
tasks.push(orchestrator::spawn_orchestrator(
rt,
orchestrator::OrchestrationTargets {
mdns: mdns_core.clone(),
dns: dns_runtime.clone(),
health: health_runtime.clone(),
proxy: proxy_runtime.clone(),
},
cancel.clone(),
));
}
let cores = DaemonCores {
mdns: mdns_core.clone(),
certmesh: certmesh_core,
dns: dns_runtime.clone(),
health: health_runtime.clone(),
proxy: proxy_runtime.clone(),
udp: udp_runtime.clone(),
runtime: runtime_core.clone(),
};
let dashboard_state = adapters::dashboard::build_dashboard_state(&cores, started_at, "daemon");
tasks.push(adapters::dashboard::spawn_event_forwarder(
cores.mdns.clone(),
cores.certmesh.clone(),
cores.dns.clone(),
cores.health.clone(),
cores.proxy.clone(),
dashboard_state.event_tx.clone(),
cancel.clone(),
));
let browser_state = if let Some(ref mdns) = mdns_core {
let adapter = adapters::mdns_browser::MdnsBrowseAdapter::new(mdns.clone(), cancel.clone());
let cache = koi_common::browser::BrowserCache::new();
let source = adapter.clone() as std::sync::Arc<dyn koi_common::browser::BrowseSource>;
let bc = cache.clone();
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
koi_common::browser::worker(source, bc, token).await;
}));
Some(koi_common::browser::BrowserState {
source: adapter,
cache,
})
} else {
None
};
if !config.no_http {
let c = cores.clone();
let port = config.http_port;
let bind_ip = http_bind_ip.unwrap_or(std::net::IpAddr::V4(std::net::Ipv4Addr::LOCALHOST));
let cancel_token = cancel.clone();
let ds = dashboard_state.clone();
let bs = browser_state.clone();
let dat = dat_token.clone();
tasks.push(tokio::spawn(async move {
if let Err(e) =
adapters::http::start(c, bind_ip, port, cancel_token, started_at, ds, bs, dat).await
{
tracing::error!(error = %e, "HTTP adapter failed");
}
}));
}
if let Some(ref certmesh) = cores.certmesh {
match certmesh.self_enroll().await {
Ok(enrollment) => {
let cm = certmesh.clone();
let port = config.mtls_port;
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
if let Err(e) = adapters::mtls::start(
port,
cm,
&enrollment.cert_pem,
&enrollment.key_pem,
&enrollment.ca_cert_pem,
token,
)
.await
{
tracing::error!(error = %e, "mTLS adapter failed");
}
}));
}
Err(e) => {
tracing::info!(
reason = %e,
"mTLS adapter: skipped (CA not available for self-enrollment)"
);
}
}
}
if !config.no_ipc {
if let Some(ref mdns) = mdns_core {
let c = mdns.clone();
let path = config.pipe_path.clone();
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
if let Err(e) = adapters::pipe::start(c, path, token).await {
tracing::error!(error = %e, "IPC adapter failed");
}
}));
} else {
tracing::info!("IPC adapter: skipped (mDNS disabled)");
}
}
let mut http_announce_id: Option<String> = None;
if config.announce_http && !config.no_http {
if let Some(ref mdns) = mdns_core {
let hostname = hostname::get()
.ok()
.and_then(|os| os.into_string().ok())
.unwrap_or_else(|| "unknown".to_string());
let mut txt = std::collections::HashMap::new();
txt.insert("path".to_string(), "/".to_string());
txt.insert("version".to_string(), env!("CARGO_PKG_VERSION").to_string());
txt.insert("api".to_string(), "v1".to_string());
txt.insert("dashboard".to_string(), "true".to_string());
let payload = koi_mdns::protocol::RegisterPayload {
name: format!("Koi ({hostname})"),
service_type: "_http._tcp".to_string(),
port: config.http_port,
ip: None,
lease_secs: None,
txt,
};
match mdns.register(payload) {
Ok(result) => {
tracing::info!(
id = %result.id,
port = config.http_port,
"HTTP server announced via mDNS"
);
http_announce_id = Some(result.id);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to announce HTTP server via mDNS");
}
}
} else {
tracing::debug!("--announce-http set but mDNS is disabled — skipping");
}
}
if let Some(ref certmesh) = cores.certmesh {
spawn_enrollment_approval_prompt(certmesh, &cancel, &mut tasks).await;
spawn_certmesh_background_tasks(
certmesh,
mdns_core.clone(),
config.http_port,
&cancel,
&mut tasks,
);
}
if let Err(e) = platform::register_service() {
tracing::warn!(error = %e, "Platform service registration failed");
}
tracing::info!("Ready.");
shutdown_signal(cancel.clone()).await;
tracing::info!("Shutting down...");
let shutdown = async {
cancel.cancel();
tokio::time::sleep(SHUTDOWN_DRAIN).await;
for task in tasks {
let _ = task.await;
}
if let Some(ref id) = http_announce_id {
if let Some(ref core) = mdns_core {
if let Err(e) = core.unregister(id) {
tracing::warn!(error = %e, "Failed to withdraw HTTP mDNS announcement");
}
}
}
if let Some(ref core) = mdns_core {
if let Err(e) = core.shutdown().await {
tracing::warn!(error = %e, "Error during mDNS shutdown");
}
}
if let Some(dns) = dns_runtime {
dns.stop().await;
}
if let Some(health) = health_runtime {
let _ = health.stop().await;
}
if let Some(proxy) = proxy_runtime {
proxy.stop_all().await;
}
if let Some(ref udp) = udp_runtime {
udp.shutdown().await;
}
};
if tokio::time::timeout(SHUTDOWN_TIMEOUT, shutdown)
.await
.is_err()
{
tracing::warn!(
"Shutdown timed out after {:?} - forcing exit",
SHUTDOWN_TIMEOUT
);
}
koi_config::breadcrumb::delete_breadcrumb();
Ok(())
}
#[derive(Clone)]
pub(crate) struct DaemonCores {
pub(crate) mdns: Option<Arc<koi_mdns::MdnsCore>>,
pub(crate) certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
pub(crate) dns: Option<Arc<koi_dns::DnsRuntime>>,
pub(crate) health: Option<Arc<koi_health::HealthRuntime>>,
pub(crate) proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
pub(crate) udp: Option<Arc<koi_udp::UdpRuntime>>,
pub(crate) runtime: Option<Arc<koi_runtime::RuntimeCore>>,
}
pub(crate) fn init_certmesh_core() -> Option<Arc<koi_certmesh::CertmeshCore>> {
let paths = koi_certmesh::CertmeshPaths::default();
if !paths.is_ca_initialized() {
tracing::info!("Certmesh: CA not initialized - routes mounted for /create");
return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
}
let roster_path = paths.roster_path();
let roster = match koi_certmesh::roster::load_roster(&roster_path) {
Ok(r) => r,
Err(e) => {
tracing::warn!(error = %e, "Failed to load certmesh roster - using uninitialized state");
return Some(Arc::new(koi_certmesh::CertmeshCore::uninitialized()));
}
};
let profile = roster.metadata.trust_profile;
let core = koi_certmesh::CertmeshCore::locked(roster, profile);
tracing::info!("Certmesh: CA initialized (locked, use `koi certmesh unlock` to decrypt)");
Some(Arc::new(core))
}
fn spawn_certmesh_background_tasks(
certmesh: &Arc<koi_certmesh::CertmeshCore>,
mdns: Option<Arc<koi_mdns::MdnsCore>>,
http_port: u16,
cancel: &CancellationToken,
tasks: &mut Vec<tokio::task::JoinHandle<()>>,
) {
let cm = Arc::clone(certmesh);
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
let interval = Duration::from_secs(koi_certmesh::lifecycle::RENEWAL_CHECK_INTERVAL_SECS);
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(interval) => {
let results = cm.renew_all_due().await;
for (hostname, result) in &results {
match result {
Ok(hook) => {
let hook_ok = hook.as_ref().map(|h| h.success).unwrap_or(true);
if hook_ok {
tracing::info!(hostname, "Certificate renewed");
} else {
tracing::warn!(hostname, "Certificate renewed but hook failed");
}
}
Err(e) => {
tracing::error!(hostname, error = %e, "Certificate renewal failed");
}
}
}
if !results.is_empty() {
tracing::info!(count = results.len(), "Renewal check complete");
}
}
}
}
}));
let cm = Arc::clone(certmesh);
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
let interval = Duration::from_secs(
koi_certmesh::failover::ROSTER_SYNC_INTERVAL_SECS,
);
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(interval) => {
if cm.node_role().await != Some(koi_certmesh::roster::MemberRole::Standby) {
continue;
}
let bc = match koi_config::breadcrumb::read_breadcrumb() {
Some(bc) => bc,
None => {
tracing::debug!("Roster sync: no primary endpoint found");
continue;
}
};
let manifest_json = tokio::task::spawn_blocking(move || {
let client = client::KoiClient::with_token(&bc.endpoint, &bc.token);
client.get_roster_manifest()
})
.await;
let manifest_json = match manifest_json {
Ok(Ok(json)) => json,
Ok(Err(e)) => {
tracing::warn!(error = %e, "Roster sync: failed to fetch manifest");
continue;
}
Err(e) => {
tracing::warn!(error = %e, "Roster sync: blocking task panicked");
continue;
}
};
match serde_json::from_value::<koi_certmesh::protocol::RosterManifest>(manifest_json) {
Ok(manifest) => {
if let Err(e) = cm.accept_roster_sync(&manifest).await {
tracing::warn!(error = %e, "Roster sync: verification failed");
} else {
tracing::debug!("Roster synced from primary");
}
}
Err(e) => {
tracing::warn!(error = %e, "Roster sync: invalid manifest format");
}
}
}
}
}
}));
let cm = Arc::clone(certmesh);
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
let interval = Duration::from_secs(
koi_certmesh::health::HEARTBEAT_INTERVAL_SECS,
);
loop {
tokio::select! {
_ = token.cancelled() => break,
_ = tokio::time::sleep(interval) => {
if cm.node_role().await != Some(koi_certmesh::roster::MemberRole::Member) {
continue;
}
let hostname = match koi_certmesh::CertmeshCore::local_hostname() {
Some(h) => h,
None => continue,
};
let pinned_fp = match cm.pinned_ca_fingerprint().await {
Some(fp) => fp,
None => {
tracing::debug!("Health heartbeat: no pinned CA fingerprint");
continue;
}
};
let bc = match koi_config::breadcrumb::read_breadcrumb() {
Some(bc) => bc,
None => {
tracing::debug!("Health heartbeat: no CA endpoint found");
continue;
}
};
let endpoint = bc.endpoint;
let token = bc.token;
let request = serde_json::json!({
"hostname": hostname,
"pinned_ca_fingerprint": pinned_fp,
});
let result = tokio::task::spawn_blocking(move || {
let c = client::KoiClient::with_token(&endpoint, &token);
c.health_heartbeat(&request)
})
.await;
match result {
Ok(Ok(resp)) => {
let valid = resp.get("valid").and_then(|v| v.as_bool()).unwrap_or(false);
if valid {
tracing::debug!("Health heartbeat: valid");
} else {
tracing::warn!("Health heartbeat: CA fingerprint mismatch");
}
}
Ok(Err(e)) => {
tracing::warn!(error = %e, "Health heartbeat: request failed");
}
Err(e) => {
tracing::warn!(error = %e, "Health heartbeat: blocking task panicked");
}
}
}
}
}
}));
let cm = Arc::clone(certmesh);
let mdns = mdns.clone();
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
let mdns = match mdns {
Some(core) => core,
None => {
tracing::debug!("Failover monitor: mDNS disabled");
return;
}
};
let browse = match mdns.browse(koi_certmesh::CERTMESH_SERVICE_TYPE).await {
Ok(handle) => handle,
Err(e) => {
tracing::warn!(error = %e, "Failover monitor: browse failed");
return;
}
};
let mut services: HashMap<String, ServiceRecord> = HashMap::new();
let mut primary_absent_since: Option<Instant> = None;
let mut announce_id: Option<String> = None;
let mut interval = tokio::time::interval(Duration::from_secs(5));
loop {
tokio::select! {
_ = token.cancelled() => break,
event = browse.recv() => {
let Some(event) = event else {
break;
};
match event {
koi_mdns::MdnsEvent::Resolved(record) => {
services.insert(record.name.clone(), record);
}
koi_mdns::MdnsEvent::Removed { name, .. } => {
services.remove(&name);
}
koi_mdns::MdnsEvent::Found(_) => {}
}
}
_ = interval.tick() => {
let pinned_fp = cm
.pinned_ca_fingerprint()
.await
.or_else(|| koi_certmesh::ca::ca_fingerprint_from_disk(&koi_certmesh::CertmeshPaths::default()).ok());
let Some(pinned_fp) = pinned_fp else {
continue;
};
let hostname = match koi_certmesh::CertmeshCore::local_hostname() {
Some(h) => h,
None => continue,
};
let expected_instance = format!("koi-ca-{hostname}");
let mut active_primary: Option<ServiceRecord> = None;
for record in services.values() {
let is_primary = record
.txt
.get("role")
.map(|r| r == "primary")
.unwrap_or(false);
let fp_matches = record
.txt
.get("fingerprint")
.map(|fp| koi_crypto::pinning::fingerprints_match(fp, &pinned_fp))
.unwrap_or(false);
if is_primary && fp_matches {
active_primary = Some(record.clone());
break;
}
}
let active_primary_is_self = active_primary
.as_ref()
.map(|record| record.name == expected_instance)
.unwrap_or(false);
let role = cm.node_role().await;
match (role, active_primary.is_some()) {
(Some(koi_certmesh::roster::MemberRole::Standby), true) => {
primary_absent_since = None;
}
(Some(koi_certmesh::roster::MemberRole::Standby), false) => {
if primary_absent_since.is_none() {
primary_absent_since = Some(Instant::now());
}
let grace = Duration::from_secs(
koi_certmesh::failover::FAILOVER_GRACE_SECS,
);
if koi_certmesh::failover::should_promote(primary_absent_since, grace) {
let wins = cm
.standby_hostnames()
.await
.into_iter()
.filter(|h| h != &hostname)
.all(|other| {
koi_certmesh::failover::tiebreaker_wins(
&hostname,
&other,
)
});
if wins {
match cm.promote_self_to_primary().await {
Ok(true) => {
primary_absent_since = None;
let _ = koi_certmesh::audit::append_entry(
"failover_promoted",
&[("hostname", &hostname)],
);
tracing::warn!(hostname, "Failover: promoted to primary");
}
Ok(false) => {}
Err(e) => {
tracing::warn!(error = %e, "Failover: promotion failed");
}
}
}
}
}
(Some(koi_certmesh::roster::MemberRole::Primary), true) => {
if !active_primary_is_self {
match cm.demote_self_to_standby().await {
Ok(true) => {
primary_absent_since = None;
let _ = koi_certmesh::audit::append_entry(
"failover_demoted",
&[("hostname", &hostname)],
);
tracing::warn!(
hostname,
"Failover: detected another primary, demoting to standby"
);
}
Ok(false) => {}
Err(e) => {
tracing::warn!(error = %e, "Failover: demotion failed");
}
}
}
}
_ => {
primary_absent_since = None;
}
}
if let Some(ann) = cm.ca_announcement(http_port).await {
if announce_id.is_none() {
let payload = koi_mdns::protocol::RegisterPayload {
name: ann.name.clone(),
service_type: koi_certmesh::CERTMESH_SERVICE_TYPE.to_string(),
port: ann.port,
ip: None,
lease_secs: None,
txt: ann.txt,
};
match mdns.register(payload) {
Ok(result) => {
tracing::info!(
name = %ann.name,
id = %result.id,
"CA announced via mDNS",
);
announce_id = Some(result.id);
}
Err(e) => {
tracing::warn!(error = %e, "Failed to announce CA via mDNS");
}
}
}
} else if let Some(id) = announce_id.take() {
if let Err(e) = mdns.unregister(&id) {
tracing::warn!(error = %e, "Failed to withdraw CA mDNS announcement");
}
}
}
}
}
if let Some(id) = announce_id {
let _ = mdns.unregister(&id);
}
}));
tracing::debug!("Certmesh background tasks spawned");
}
async fn spawn_enrollment_approval_prompt(
certmesh: &Arc<koi_certmesh::CertmeshCore>,
cancel: &CancellationToken,
tasks: &mut Vec<tokio::task::JoinHandle<()>>,
) {
let (tx, mut rx) = tokio::sync::mpsc::channel(8);
certmesh.set_approval_channel(tx).await;
let token = cancel.clone();
tasks.push(tokio::spawn(async move {
loop {
tokio::select! {
_ = token.cancelled() => break,
request = rx.recv() => {
let Some(request) = request else {
break;
};
let koi_certmesh::ApprovalRequest { hostname, profile, respond_to } = request;
let decision = tokio::task::spawn_blocking(move || {
prompt_enrollment_approval(&hostname, profile)
})
.await
.unwrap_or(koi_certmesh::ApprovalDecision::Denied);
let _ = respond_to.send(decision);
}
}
}
}));
}
fn prompt_enrollment_approval(
hostname: &str,
profile: koi_certmesh::profiles::TrustProfile,
) -> koi_certmesh::ApprovalDecision {
eprintln!("Enrollment approval requested for '{hostname}' (profile: {profile})");
let approve = read_yes_no("Approve enrollment? [y/N]: ");
if !approve {
return koi_certmesh::ApprovalDecision::Denied;
}
let operator = if profile.requires_operator() {
let operator = read_line("Operator name: ");
if operator.is_empty() {
return koi_certmesh::ApprovalDecision::Denied;
}
Some(operator)
} else {
None
};
koi_certmesh::ApprovalDecision::Approved { operator }
}
fn read_yes_no(prompt: &str) -> bool {
let line = read_line(prompt);
matches!(line.as_str(), "y" | "yes")
}
fn read_line(prompt: &str) -> String {
eprintln!("{prompt}");
let mut line = String::new();
if std::io::stdin().read_line(&mut line).is_ok() {
line.trim().to_string()
} else {
String::new()
}
}
fn is_piped_stdin() -> bool {
use std::io::IsTerminal;
!std::io::stdin().is_terminal()
}
fn print_top_level_help(api_endpoint: &str) {
if let Err(err) = surface::print_catalog(api_endpoint) {
tracing::debug!(error = %err, "Failed to render catalog, falling back to clap help");
let mut cmd = Cli::command();
let _ = cmd.print_help();
println!();
}
}
fn extract_help_query(raw_args: &[String]) -> Option<String> {
if raw_args.is_empty() {
return None;
}
if let Some(last) = raw_args.last() {
if last.ends_with('?') && last.len() > 1 {
let mut parts: Vec<&str> = raw_args[..raw_args.len() - 1]
.iter()
.map(|s| s.as_str())
.collect();
let trimmed = last.trim_end_matches('?');
if !trimmed.is_empty() {
parts.push(trimmed);
}
let parts: Vec<&str> = parts.into_iter().filter(|p| !p.starts_with('-')).collect();
if !parts.is_empty() {
return Some(parts.join(" "));
}
}
}
if let Some(first) = raw_args.first() {
if first.starts_with('?') && first.len() > 1 {
let cmd_name = first.trim_start_matches('?');
let mut parts = vec![cmd_name];
for arg in &raw_args[1..] {
if !arg.starts_with('-') {
parts.push(arg);
}
}
return Some(parts.join(" "));
}
}
None
}
async fn shutdown_signal(cancel: CancellationToken) {
tokio::select! {
result = tokio::signal::ctrl_c() => {
if let Err(e) = result {
tracing::error!(error = %e, "Failed to listen for Ctrl+C");
}
}
_ = cancel.cancelled() => {
}
}
}
pub(crate) fn startup_diagnostics(config: &Config, http_bind_ip: Option<std::net::IpAddr>) {
tracing::info!("Koi v{} starting", env!("CARGO_PKG_VERSION"));
tracing::info!("Platform: {}", std::env::consts::OS);
match hostname::get() {
Ok(h) => tracing::info!("Hostname: {}", h.to_string_lossy()),
Err(e) => tracing::warn!(error = %e, "Could not determine hostname"),
}
if config.no_mdns {
tracing::info!("mDNS capability: disabled");
} else {
tracing::info!("mDNS engine: mdns-sd");
}
if config.no_certmesh {
tracing::info!("Certmesh capability: disabled");
}
if config.no_dns {
tracing::info!("DNS capability: disabled");
} else {
tracing::info!(
"DNS: {}:{} (zone {})",
"0.0.0.0",
config.dns_port,
config.dns_zone
);
}
if config.no_health {
tracing::info!("Health capability: disabled");
} else {
tracing::info!("Health: service checks enabled");
}
if config.no_proxy {
tracing::info!("Proxy capability: disabled");
}
if let Some(bind_ip) = http_bind_ip {
log_http_bind(config, bind_ip);
} else {
tracing::info!("HTTP adapter: disabled");
}
if !config.no_ipc {
tracing::info!("IPC: {}", config.pipe_path.display());
} else {
tracing::info!("IPC adapter: disabled");
}
#[cfg(windows)]
platform::windows::check_firewall(config);
}
fn log_http_bind(config: &Config, bind_ip: std::net::IpAddr) {
let port = config.http_port;
if bind_ip.is_loopback() {
tracing::info!("HTTP: {bind_ip}:{port} (loopback only — use --http-bind to expose)");
return;
}
if bind_ip.is_unspecified() {
tracing::warn!(
"WARNING: Koi is reachable from your entire LAN. Mutations still require the \
daemon token; GET endpoints are readable by any device. (--http-bind 0.0.0.0)"
);
tracing::info!("HTTP: {bind_ip}:{port} (exposed) — mutations require x-koi-token");
} else if config.http_bind == "bridge" {
tracing::info!("HTTP: {bind_ip}:{port} (docker bridge) — mutations require x-koi-token");
} else {
tracing::warn!(
"WARNING: Koi is reachable on interface {bind_ip}. Mutations still require the \
daemon token; GET endpoints are readable by any device. (--http-bind {})",
config.http_bind
);
tracing::info!("HTTP: {bind_ip}:{port} (exposed) — mutations require x-koi-token");
}
tracing::info!("hint: containers read the token from a mounted secret; see `koi token --help`");
}
pub(crate) fn breadcrumb_endpoint(http_bind_ip: Option<std::net::IpAddr>, port: u16) -> String {
match http_bind_ip {
Some(ip) if !ip.is_unspecified() => format!("http://{ip}:{port}"),
_ => format!("http://127.0.0.1:{port}"),
}
}
pub(crate) fn resolve_http_bind_ip(mode: &str) -> anyhow::Result<std::net::IpAddr> {
use std::net::{IpAddr, Ipv4Addr};
match mode {
"loopback" => Ok(IpAddr::V4(Ipv4Addr::LOCALHOST)),
"0.0.0.0" => Ok(IpAddr::V4(Ipv4Addr::UNSPECIFIED)),
"bridge" => resolve_bridge_ip(),
other => other.parse::<IpAddr>().map_err(|_| {
anyhow::anyhow!(
"invalid --http-bind value '{other}': expected loopback, bridge, \
an IP address, or 0.0.0.0"
)
}),
}
}
fn resolve_bridge_ip() -> anyhow::Result<std::net::IpAddr> {
use std::net::IpAddr;
let ifaces = if_addrs::get_if_addrs()
.map_err(|e| anyhow::anyhow!("could not enumerate network interfaces: {e}"))?;
let is_v4 = |iface: &if_addrs::Interface| matches!(iface.addr.ip(), IpAddr::V4(_));
for name in ["docker0", "podman0", "cni-podman0"] {
if let Some(iface) = ifaces.iter().find(|i| i.name == name && is_v4(i)) {
return Ok(iface.addr.ip());
}
}
for iface in &ifaces {
if iface.is_loopback() || !is_v4(iface) {
continue;
}
let n = &iface.name;
if n.starts_with("docker")
|| n.starts_with("podman")
|| n.starts_with("br-")
|| n.starts_with("cni-")
{
return Ok(iface.addr.ip());
}
}
anyhow::bail!(
"no docker/podman bridge interface found (looked for docker0, podman0, br-*, …). \
Use --http-bind <ip> with the host IP that containers should reach."
)
}
#[cfg(test)]
mod http_bind_tests {
use super::{breadcrumb_endpoint, resolve_http_bind_ip};
use std::net::{IpAddr, Ipv4Addr};
#[test]
fn loopback_mode_resolves_to_localhost() {
assert_eq!(
resolve_http_bind_ip("loopback").unwrap(),
IpAddr::V4(Ipv4Addr::LOCALHOST)
);
}
#[test]
fn unspecified_mode_resolves_to_all_interfaces() {
assert_eq!(
resolve_http_bind_ip("0.0.0.0").unwrap(),
IpAddr::V4(Ipv4Addr::UNSPECIFIED)
);
}
#[test]
fn explicit_ipv4_is_parsed() {
assert_eq!(
resolve_http_bind_ip("192.168.1.42").unwrap(),
"192.168.1.42".parse::<IpAddr>().unwrap()
);
}
#[test]
fn explicit_ipv6_is_parsed() {
assert_eq!(
resolve_http_bind_ip("::1").unwrap(),
"::1".parse::<IpAddr>().unwrap()
);
}
#[test]
fn garbage_is_rejected() {
assert!(resolve_http_bind_ip("not-an-ip").is_err());
assert!(resolve_http_bind_ip("999.999.999.999").is_err());
}
#[test]
fn breadcrumb_advertises_loopback_for_unspecified() {
assert_eq!(
breadcrumb_endpoint(Some(IpAddr::V4(Ipv4Addr::UNSPECIFIED)), 5641),
"http://127.0.0.1:5641"
);
}
#[test]
fn breadcrumb_uses_specific_bind_ip() {
let ip: IpAddr = "172.17.0.1".parse().unwrap();
assert_eq!(
breadcrumb_endpoint(Some(ip), 5641),
"http://172.17.0.1:5641"
);
}
}
pub(crate) fn init_logging(
env_filter: tracing_subscriber::EnvFilter,
log_file: Option<&std::path::Path>,
) -> anyhow::Result<Vec<tracing_appender::non_blocking::WorkerGuard>> {
use tracing_subscriber::prelude::*;
let (nb_stderr, stderr_guard) = tracing_appender::non_blocking(std::io::stderr());
let stderr_layer = tracing_subscriber::fmt::layer().with_writer(nb_stderr);
if let Some(path) = log_file {
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent)?;
}
let file = std::fs::OpenOptions::new()
.create(true)
.append(true)
.open(path)?;
let (nb_file, file_guard) = tracing_appender::non_blocking(file);
let file_layer = tracing_subscriber::fmt::layer().with_writer(nb_file);
tracing_subscriber::registry()
.with(env_filter)
.with(stderr_layer)
.with(file_layer)
.init();
Ok(vec![stderr_guard, file_guard])
} else {
tracing_subscriber::registry()
.with(env_filter)
.with(stderr_layer)
.init();
Ok(vec![stderr_guard])
}
}