use std::sync::Arc;
use axum::body::Body;
use axum::extract::Extension;
use axum::http::{header, HeaderName, Request, StatusCode};
use axum::middleware::{self, Next};
use axum::response::{IntoResponse, Json, Response};
use axum::routing::{get, post};
use axum::Router;
use serde::Serialize;
use subtle::ConstantTimeEq;
use tokio_util::sync::CancellationToken;
use tower_http::cors::CorsLayer;
use utoipa::OpenApi;
use utoipa::ToSchema;
use utoipa_scalar::{Scalar, Servable};
use crate::DaemonCores;
use koi_dashboard::browser::BrowserState;
use koi_dashboard::dashboard::DashboardState;
use koi_dashboard::meta_browse::LazyMetaBrowse;
const DAT_HEADER: &str = "x-koi-token";
pub mod paths {
pub const HEALTHZ: &str = "/healthz";
pub const UNIFIED_STATUS: &str = "/v1/status";
pub const SHUTDOWN: &str = "/v1/admin/shutdown";
pub const HOST: &str = "/v1/host";
}
#[derive(Clone)]
struct AppState {
mdns: Option<Arc<koi_mdns::MdnsCore>>,
certmesh: Option<Arc<koi_certmesh::CertmeshCore>>,
dns: Option<Arc<koi_dns::DnsRuntime>>,
health: Option<Arc<koi_health::HealthRuntime>>,
proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
udp: Option<Arc<koi_udp::UdpRuntime>>,
runtime: Option<Arc<koi_runtime::RuntimeCore>>,
started_at: std::time::Instant,
cancel: CancellationToken,
http_bind: String,
mdns_browse: Option<Arc<LazyMetaBrowse>>,
}
#[allow(clippy::too_many_arguments)]
pub async fn start(
cores: DaemonCores,
bind_ip: std::net::IpAddr,
port: u16,
cancel: CancellationToken,
started_at: std::time::Instant,
dashboard_state: DashboardState,
browser_state: Option<BrowserState>,
dat_token: String,
) -> anyhow::Result<()> {
let app_state = AppState {
mdns: cores.mdns.clone(),
certmesh: cores.certmesh.clone(),
dns: cores.dns.clone(),
health: cores.health.clone(),
proxy: cores.proxy.clone(),
udp: cores.udp.clone(),
runtime: cores.runtime.clone(),
started_at,
cancel: cancel.clone(),
http_bind: bind_ip.to_string(),
mdns_browse: browser_state.as_ref().map(|b| b.meta.clone()),
};
let mut app = Router::new()
.route(paths::HEALTHZ, get(health))
.route(paths::UNIFIED_STATUS, get(unified_status_handler))
.route(paths::SHUTDOWN, post(shutdown_handler))
.route(paths::HOST, get(host_handler))
.route("/", get(koi_dashboard::dashboard::get_dashboard))
.route(
"/v1/dashboard/snapshot",
get(koi_dashboard::dashboard::get_snapshot),
)
.route(
"/v1/dashboard/events",
get(koi_dashboard::dashboard::get_events),
);
if let Some(bs) = browser_state {
app = app
.route("/mdns-browser", get(koi_dashboard::browser::get_page))
.nest("/v1/mdns/browser", koi_dashboard::browser::routes(bs));
} else {
app = app.nest("/v1/mdns/browser", disabled_fallback_router("mdns-browser"));
}
if let Some(ref mdns_core) = cores.mdns {
app = app.nest(
koi_mdns::http::paths::PREFIX,
koi_mdns::http::routes(mdns_core.clone()),
);
} else {
app = app.nest(
koi_mdns::http::paths::PREFIX,
disabled_fallback_router("mdns"),
);
}
if let Some(ref certmesh_core) = cores.certmesh {
app = app.nest(koi_certmesh::http::paths::PREFIX, certmesh_core.routes());
} else {
app = app.nest(
koi_certmesh::http::paths::PREFIX,
disabled_fallback_router("certmesh"),
);
}
if let Some(ref dns_runtime) = cores.dns {
app = app.nest(
koi_dns::http::paths::PREFIX,
koi_dns::http::routes(dns_runtime.clone()),
);
} else {
app = app.nest(
koi_dns::http::paths::PREFIX,
disabled_fallback_router("dns"),
);
}
if let Some(ref health_runtime) = cores.health {
app = app.nest(
koi_health::http::paths::PREFIX,
koi_health::http::routes(health_runtime.core()),
);
} else {
app = app.nest(
koi_health::http::paths::PREFIX,
disabled_fallback_router("health"),
);
}
if let Some(ref proxy_runtime) = cores.proxy {
app = app.nest(
koi_proxy::http::paths::PREFIX,
koi_proxy::http::routes(proxy_runtime.clone()),
);
} else {
app = app.nest(
koi_proxy::http::paths::PREFIX,
disabled_fallback_router("proxy"),
);
}
if let Some(ref udp_runtime) = cores.udp {
app = app.nest(
koi_udp::http::paths::PREFIX,
koi_udp::http::routes(udp_runtime.clone()),
);
} else {
app = app.nest(
koi_udp::http::paths::PREFIX,
disabled_fallback_router("udp"),
);
}
if let Some(ref runtime_core) = cores.runtime {
app = app.nest(koi_runtime::http::paths::PREFIX, runtime_core.routes());
} else {
app = app.nest(
koi_runtime::http::paths::PREFIX,
disabled_fallback_router("runtime"),
);
}
let openapi = build_openapi();
app = app.merge(Scalar::with_url("/docs", openapi.clone()));
let spec_json = match openapi.to_pretty_json() {
Ok(json) => json,
Err(e) => {
tracing::error!(error = %e, "OpenAPI JSON serialization failed");
String::from(r#"{"error":"OpenAPI serialization failed"}"#)
}
};
app = app.route(
"/openapi.json",
get(move || {
let json = spec_json.clone();
async move {
(
[(axum::http::header::CONTENT_TYPE, "application/json")],
json,
)
}
}),
);
app = app.layer(Extension(app_state));
app = app.layer(Extension(dashboard_state));
let shared_token = Arc::new(dat_token);
app = app.layer(middleware::from_fn(move |req, next| {
let token = Arc::clone(&shared_token);
dat_auth_middleware(req, next, token)
}));
let cors = CorsLayer::new()
.allow_methods([
axum::http::Method::GET,
axum::http::Method::POST,
axum::http::Method::PUT,
axum::http::Method::DELETE,
axum::http::Method::OPTIONS,
])
.allow_headers([header::CONTENT_TYPE, HeaderName::from_static("x-koi-token")])
.allow_origin(tower_http::cors::AllowOrigin::predicate(|origin, _| {
let s = origin.to_str().unwrap_or("");
s.starts_with("http://localhost") || s.starts_with("http://127.0.0.1")
}));
app = app.layer(cors);
let listener = tokio::net::TcpListener::bind((bind_ip, port)).await?;
tracing::info!("HTTP adapter listening on {}:{}", bind_ip, port);
axum::serve(listener, app)
.with_graceful_shutdown(async move {
cancel.cancelled().await;
})
.await?;
tracing::debug!("HTTP adapter stopped");
Ok(())
}
#[derive(Debug, Serialize, ToSchema)]
struct UnifiedStatusResponse {
version: String,
platform: String,
uptime_secs: u64,
daemon: bool,
http_bind: String,
capabilities: Vec<koi_common::capability::CapabilityStatus>,
}
#[derive(Debug, Serialize, ToSchema)]
struct ShutdownResponse {
status: String,
}
#[derive(Debug, Serialize, ToSchema)]
struct HostInfoResponse {
hostname: String,
hostname_fqdn: String,
os: String,
arch: String,
interfaces: HostInterfaces,
}
#[derive(Debug, Serialize, ToSchema)]
struct HostInterfaces {
lan: Vec<NetworkInterface>,
}
#[derive(Debug, Serialize, ToSchema)]
struct NetworkInterface {
name: String,
ip: String,
}
#[derive(OpenApi)]
#[openapi(
paths(health, unified_status_handler, shutdown_handler, host_handler),
components(schemas(
UnifiedStatusResponse,
ShutdownResponse,
HostInfoResponse,
HostInterfaces,
NetworkInterface,
koi_common::capability::CapabilityStatus,
koi_common::error::ErrorCode,
koi_common::api::ErrorBody,
))
)]
struct KoiSchemas;
pub fn build_openapi() -> utoipa::openapi::OpenApi {
use utoipa::openapi::external_docs::ExternalDocs;
use utoipa::openapi::tag::TagBuilder;
use utoipa::openapi::{InfoBuilder, LicenseBuilder};
let openapi = KoiSchemas::openapi()
.nest(
koi_mdns::http::paths::PREFIX,
koi_mdns::http::MdnsApiDoc::openapi(),
)
.nest(
koi_certmesh::http::paths::PREFIX,
koi_certmesh::http::CertmeshApiDoc::openapi(),
)
.nest(
koi_dns::http::paths::PREFIX,
koi_dns::http::DnsApiDoc::openapi(),
)
.nest(
koi_health::http::paths::PREFIX,
koi_health::http::HealthApiDoc::openapi(),
)
.nest(
koi_proxy::http::paths::PREFIX,
koi_proxy::http::ProxyApiDoc::openapi(),
)
.nest(
koi_udp::http::paths::PREFIX,
koi_udp::http::UdpApiDoc::openapi(),
)
.nest(
koi_runtime::http::paths::PREFIX,
koi_runtime::http::RuntimeApiDoc::openapi(),
);
let info = InfoBuilder::new()
.title("Koi Network Toolkit API")
.version(env!("CARGO_PKG_VERSION"))
.description(Some(
"Local network toolkit: service discovery, DNS, health monitoring, \
TLS proxy, and certificate mesh.",
))
.license(Some(
LicenseBuilder::new().name("Apache-2.0 OR MIT").build(),
))
.build();
let base = "https://github.com/sylin-org/koi/blob/main/docs";
let tags = vec![
TagBuilder::new()
.name("system")
.description(Some(
"Core daemon lifecycle - status, version, health probes, \
and graceful shutdown.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-system.md"))))
.build(),
TagBuilder::new()
.name("mdns")
.description(Some(
"Multicast DNS service discovery - announce, discover, \
and manage services on the local network. Includes \
admin operations for inspecting and controlling \
individual registrations.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-mdns.md"))))
.build(),
TagBuilder::new()
.name("certmesh")
.description(Some(
"Zero-config TLS certificate mesh - automatic CA \
bootstrapping, certificate enrollment, renewal, \
revocation, and cluster-wide trust distribution.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-certmesh.md"))))
.build(),
TagBuilder::new()
.name("dns")
.description(Some(
"Local DNS server - custom record management, \
upstream forwarding, and split-horizon resolution \
for development environments.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-dns.md"))))
.build(),
TagBuilder::new()
.name("health")
.description(Some(
"Endpoint health monitoring - configure checks, \
view live status, and receive real-time health \
change events via SSE.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-health.md"))))
.build(),
TagBuilder::new()
.name("proxy")
.description(Some(
"TLS-terminating reverse proxy - route traffic \
to local services with automatic certificate \
provisioning from the certmesh CA.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-proxy.md"))))
.build(),
TagBuilder::new()
.name("udp")
.description(Some(
"UDP datagram bridging - bind host sockets, send \
and receive datagrams over HTTP/SSE.",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-udp.md"))))
.build(),
TagBuilder::new()
.name("runtime")
.description(Some(
"Runtime adapter - container and service lifecycle \
integration (Docker, Podman, systemd, Incus, Kubernetes).",
))
.external_docs(Some(ExternalDocs::new(format!("{base}/guide-runtime.md"))))
.build(),
];
let mut openapi = openapi;
openapi.info = info;
openapi.tags = Some(tags);
openapi
}
async fn dat_auth_middleware(
req: Request<Body>,
next: Next,
expected_token: Arc<String>,
) -> Response {
let method = req.method().clone();
if method == axum::http::Method::GET
|| method == axum::http::Method::HEAD
|| method == axum::http::Method::OPTIONS
{
return next.run(req).await;
}
let authenticated = req
.headers()
.get(DAT_HEADER)
.and_then(|val| val.to_str().ok())
.map(|val| bool::from(val.as_bytes().ct_eq(expected_token.as_bytes())))
.unwrap_or(false);
if !authenticated {
return (
StatusCode::UNAUTHORIZED,
axum::Json(serde_json::json!({
"error": "unauthorized",
"message": "Missing or invalid x-koi-token header"
})),
)
.into_response();
}
next.run(req).await
}
#[utoipa::path(get, path = "/healthz", tag = "system",
summary = "Basic liveness probe",
responses((status = 200, description = "Daemon is alive")))]
async fn health() -> &'static str {
"OK"
}
#[utoipa::path(get, path = "/v1/status", tag = "system",
summary = "Unified capability status",
responses((status = 200, body = UnifiedStatusResponse)))]
async fn unified_status_handler(Extension(state): Extension<AppState>) -> Json<serde_json::Value> {
let cores = crate::DaemonCores {
mdns: state.mdns.clone(),
certmesh: state.certmesh.clone(),
dns: state.dns.clone(),
health: state.health.clone(),
proxy: state.proxy.clone(),
udp: state.udp.clone(),
runtime: state.runtime.clone(),
};
let capabilities: Vec<koi_common::capability::CapabilityStatus> =
koi_compose::status::assemble_capabilities(&cores)
.await
.into_iter()
.map(|c| c.status)
.collect();
let uptime_secs = state.started_at.elapsed().as_secs();
Json(serde_json::json!({
"version": env!("CARGO_PKG_VERSION"),
"platform": std::env::consts::OS,
"uptime_secs": uptime_secs,
"daemon": true,
"http_bind": state.http_bind,
"mdns_browse_active": state.mdns_browse.as_ref().map(|m| m.is_active()),
"capabilities": capabilities,
}))
}
fn default_lan_interfaces() -> Vec<NetworkInterface> {
let all = if_addrs::get_if_addrs().unwrap_or_default();
if let Some(ip) = default_route_ipv4() {
if let Some(iface) = all.iter().find(|i| i.addr.ip() == std::net::IpAddr::V4(ip)) {
return vec![NetworkInterface {
name: iface.name.clone(),
ip: ip.to_string(),
}];
}
}
all.into_iter()
.filter(|iface| !iface.is_loopback())
.filter_map(|iface| match iface.addr.ip() {
std::net::IpAddr::V4(v4) if !v4.is_link_local() => Some(NetworkInterface {
name: iface.name,
ip: v4.to_string(),
}),
_ => None,
})
.collect()
}
fn default_route_ipv4() -> Option<std::net::Ipv4Addr> {
let sock = std::net::UdpSocket::bind(("0.0.0.0", 0)).ok()?;
sock.connect(("8.8.8.8", 80)).ok()?;
match sock.local_addr().ok()?.ip() {
std::net::IpAddr::V4(v4) if !v4.is_unspecified() => Some(v4),
_ => None,
}
}
#[utoipa::path(get, path = "/v1/host", tag = "system",
summary = "Host identity and network interfaces",
responses((status = 200, body = HostInfoResponse)))]
async fn host_handler() -> Json<HostInfoResponse> {
let raw = hostname::get()
.ok()
.and_then(|os| os.into_string().ok())
.unwrap_or_else(|| "unknown".to_string());
let fqdn = format!("{}.local", raw);
let lan = default_lan_interfaces();
Json(HostInfoResponse {
hostname: raw,
hostname_fqdn: fqdn,
os: std::env::consts::OS.to_string(),
arch: std::env::consts::ARCH.to_string(),
interfaces: HostInterfaces { lan },
})
}
#[utoipa::path(post, path = "/v1/admin/shutdown", tag = "system",
summary = "Request graceful daemon shutdown",
responses((status = 200, body = ShutdownResponse)))]
async fn shutdown_handler(Extension(state): Extension<AppState>) -> Json<serde_json::Value> {
tracing::info!("Shutdown requested via admin endpoint");
state.cancel.cancel();
Json(serde_json::json!({ "status": "shutting_down" }))
}
fn disabled_fallback_router(capability_name: &'static str) -> Router {
Router::new().fallback(move || async move {
let body = serde_json::json!({
"error": "capability_disabled",
"message": format!(
"The '{}' capability is disabled on this daemon.",
capability_name
),
});
(
axum::http::StatusCode::SERVICE_UNAVAILABLE,
axum::Json(body),
)
})
}
#[cfg(test)]
mod tests {
use super::*;
use axum::body::Body;
use axum::http::Request;
use tower::ServiceExt;
#[tokio::test]
async fn disabled_fallback_returns_503() {
let app = disabled_fallback_router("mdns");
let req = Request::get("/browse").body(Body::empty()).unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE);
}
#[tokio::test]
async fn disabled_fallback_body_has_error_field() {
let app = disabled_fallback_router("certmesh");
let req = Request::get("/status").body(Body::empty()).unwrap();
let resp = app.oneshot(req).await.unwrap();
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json.get("error").unwrap(), "capability_disabled");
}
#[tokio::test]
async fn disabled_fallback_message_includes_capability_name() {
let app = disabled_fallback_router("mdns");
let req = Request::get("/any").body(Body::empty()).unwrap();
let resp = app.oneshot(req).await.unwrap();
let body = axum::body::to_bytes(resp.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
let msg = json.get("message").unwrap().as_str().unwrap();
assert!(
msg.contains("mdns"),
"message should contain capability name: {msg}"
);
}
#[tokio::test]
async fn disabled_fallback_works_for_post() {
let app = disabled_fallback_router("certmesh");
let req = Request::post("/join").body(Body::empty()).unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), axum::http::StatusCode::SERVICE_UNAVAILABLE);
}
fn dat_test_router(token: &str) -> Router {
let expected = Arc::new(token.to_string());
Router::new()
.route("/probe", get(|| async { "ok" }).post(|| async { "ok" }))
.layer(middleware::from_fn(move |req, next| {
let expected = expected.clone();
dat_auth_middleware(req, next, expected)
}))
}
#[tokio::test]
async fn get_is_exempt_from_dat_auth() {
let app = dat_test_router("secret-token");
let req = Request::get("/probe").body(Body::empty()).unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), axum::http::StatusCode::OK);
}
#[tokio::test]
async fn post_without_token_is_rejected() {
let app = dat_test_router("secret-token");
let req = Request::post("/probe").body(Body::empty()).unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), axum::http::StatusCode::UNAUTHORIZED);
}
#[tokio::test]
async fn post_with_valid_token_is_accepted() {
let app = dat_test_router("secret-token");
let req = Request::post("/probe")
.header(DAT_HEADER, "secret-token")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), axum::http::StatusCode::OK);
}
#[tokio::test]
async fn post_with_wrong_token_is_rejected() {
let app = dat_test_router("secret-token");
let req = Request::post("/probe")
.header(DAT_HEADER, "wrong-token")
.body(Body::empty())
.unwrap();
let resp = app.oneshot(req).await.unwrap();
assert_eq!(resp.status(), axum::http::StatusCode::UNAUTHORIZED);
}
#[test]
fn openapi_spec_contains_system_paths() {
let spec = build_openapi();
let paths: Vec<&str> = spec.paths.paths.keys().map(|k| k.as_str()).collect();
assert!(paths.contains(&"/healthz"), "missing /healthz: {paths:?}");
assert!(
paths.contains(&"/v1/status"),
"missing /v1/status: {paths:?}"
);
assert!(paths.contains(&"/v1/host"), "missing /v1/host: {paths:?}");
assert!(
paths.contains(&"/v1/admin/shutdown"),
"missing /v1/admin/shutdown: {paths:?}"
);
}
#[test]
fn openapi_spec_contains_domain_paths() {
let spec = build_openapi();
let paths: Vec<&str> = spec.paths.paths.keys().map(|k| k.as_str()).collect();
assert!(
paths.contains(&"/v1/mdns/discover"),
"missing /v1/mdns/discover: {paths:?}"
);
assert!(
paths.contains(&"/v1/dns/status"),
"missing /v1/dns/status: {paths:?}"
);
assert!(
paths.contains(&"/v1/health/status"),
"missing /v1/health/status: {paths:?}"
);
assert!(
paths.contains(&"/v1/proxy/status"),
"missing /v1/proxy/status: {paths:?}"
);
assert!(
paths.contains(&"/v1/certmesh/status"),
"missing /v1/certmesh/status: {paths:?}"
);
assert!(
paths.contains(&"/v1/udp/status"),
"missing /v1/udp/status: {paths:?}"
);
}
#[tokio::test]
async fn host_handler_returns_default_interface_only() {
let Json(resp) = host_handler().await;
assert!(!resp.hostname.is_empty(), "hostname should not be empty");
assert!(
resp.hostname_fqdn.ends_with(".local"),
"FQDN should end with .local: {}",
resp.hostname_fqdn
);
assert!(
!resp.interfaces.lan.is_empty(),
"lan interfaces should not be empty on a machine with network"
);
for iface in &resp.interfaces.lan {
let ip: std::net::IpAddr = iface.ip.parse().expect("should be a valid IP");
assert!(!ip.is_loopback(), "LAN should not contain loopback");
}
println!(
"host_handler returned {} LAN interface(s):",
resp.interfaces.lan.len()
);
for iface in &resp.interfaces.lan {
println!(" {} -> {}", iface.name, iface.ip);
}
}
}