use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio_util::sync::CancellationToken;
use koi_runtime::heuristics;
use koi_runtime::instance::{Instance, PortProtocol};
use koi_runtime::{RuntimeCore, RuntimeEvent};
#[derive(Debug, Default)]
struct OrchestratedResources {
mdns_ids: Vec<String>,
dns_name: Option<String>,
health_name: Option<String>,
proxy_name: Option<String>,
}
pub(crate) struct OrchestrationTargets {
pub mdns: Option<Arc<koi_mdns::MdnsCore>>,
pub dns: Option<Arc<koi_dns::DnsRuntime>>,
pub health: Option<Arc<koi_health::HealthRuntime>>,
pub proxy: Option<Arc<koi_proxy::ProxyRuntime>>,
}
pub(crate) fn spawn_orchestrator(
runtime: &Arc<RuntimeCore>,
targets: OrchestrationTargets,
cancel: CancellationToken,
) -> tokio::task::JoinHandle<()> {
let mut rx = runtime.subscribe();
let resources: Arc<Mutex<HashMap<String, OrchestratedResources>>> =
Arc::new(Mutex::new(HashMap::new()));
let resources_clone = Arc::clone(&resources);
let targets = Arc::new(targets);
tokio::spawn(async move {
loop {
tokio::select! {
_ = cancel.cancelled() => {
tracing::info!("Orchestrator shutting down, cleaning up resources");
cleanup_all(&resources_clone, &targets).await;
break;
}
event = rx.recv() => {
match event {
Ok(RuntimeEvent::Started(instance)) => {
handle_start(&instance, &resources_clone, &targets).await;
}
Ok(RuntimeEvent::Stopped { id, name }) => {
handle_stop(&id, &name, &resources_clone, &targets).await;
}
Ok(RuntimeEvent::Updated(instance)) => {
handle_stop(&instance.id, &instance.name, &resources_clone, &targets).await;
handle_start(&instance, &resources_clone, &targets).await;
}
Ok(RuntimeEvent::BackendDisconnected { backend, reason }) => {
tracing::warn!(
backend,
reason,
"Runtime backend disconnected — keeping registrations alive"
);
}
Ok(RuntimeEvent::BackendReconnected { backend }) => {
tracing::info!(backend, "Runtime backend reconnected");
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
tracing::warn!(missed = n, "Orchestrator lagged behind runtime events");
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
tracing::info!("Runtime event channel closed");
break;
}
}
}
}
}
})
}
async fn handle_start(
instance: &Instance,
resources: &Arc<Mutex<HashMap<String, OrchestratedResources>>>,
targets: &Arc<OrchestrationTargets>,
) {
if !should_orchestrate(instance) {
return;
}
{
let map = resources.lock().await;
if map.contains_key(&instance.id) {
drop(map);
handle_stop(&instance.id, &instance.name, resources, targets).await;
}
}
let mut res = OrchestratedResources::default();
let service_name = effective_name(instance);
if let Some(ref mdns) = targets.mdns {
for port in &instance.ports {
if port.protocol != PortProtocol::Tcp {
continue;
}
let service_type = heuristics::resolve_service_type(
port.container_port,
false,
instance.metadata.service_type.as_deref(),
);
let payload = koi_mdns::protocol::RegisterPayload {
name: service_name.clone(),
service_type: service_type.to_string(),
port: port.host_port,
ip: non_loopback_ip(instance, port),
lease_secs: None,
txt: instance.metadata.txt.clone(),
};
match mdns.register(payload) {
Ok(result) => {
tracing::info!(
name = %service_name,
service_type,
port = port.host_port,
id = %result.id,
"Orchestrator: mDNS announced"
);
res.mdns_ids.push(result.id);
}
Err(e) => {
tracing::warn!(
name = %service_name,
port = port.host_port,
error = %e,
"Orchestrator: mDNS announce failed"
);
}
}
}
}
if let Some(ref dns) = targets.dns {
let dns_name = instance
.metadata
.dns_name
.as_deref()
.unwrap_or(&service_name);
let ip = resolve_host_ip(instance);
let entry = koi_config::state::DnsEntry {
name: dns_name.to_string(),
ip,
ttl: None,
};
match dns.core().add_entry(entry) {
Ok(_) => {
tracing::info!(name = dns_name, "Orchestrator: DNS entry added");
res.dns_name = Some(dns_name.to_string());
}
Err(e) => {
tracing::warn!(name = dns_name, error = %e, "Orchestrator: DNS add failed");
}
}
}
if let Some(ref health) = targets.health {
if let Some(check) = build_health_check(instance, &service_name) {
let check_name = check.name.clone();
let _ = health.core().remove_check(&check_name).await;
match health.core().add_check(check).await {
Ok(()) => {
tracing::info!(name = %check_name, "Orchestrator: health check added");
res.health_name = Some(check_name);
}
Err(e) => {
tracing::warn!(
name = %check_name,
error = %e,
"Orchestrator: health check add failed"
);
}
}
}
}
if let Some(ref proxy) = targets.proxy {
if let Some(entry) = build_proxy_entry(instance, &service_name) {
let proxy_name = entry.name.clone();
match proxy.core().upsert(entry).await {
Ok(_) => {
tracing::info!(name = %proxy_name, "Orchestrator: proxy entry added");
res.proxy_name = Some(proxy_name);
}
Err(e) => {
tracing::warn!(
name = %proxy_name,
error = %e,
"Orchestrator: proxy upsert failed"
);
}
}
}
}
resources.lock().await.insert(instance.id.clone(), res);
}
async fn handle_stop(
id: &str,
name: &str,
resources: &Arc<Mutex<HashMap<String, OrchestratedResources>>>,
targets: &Arc<OrchestrationTargets>,
) {
let res = resources.lock().await.remove(id);
let Some(res) = res else { return };
if let Some(ref mdns) = targets.mdns {
for mdns_id in &res.mdns_ids {
if let Err(e) = mdns.unregister(mdns_id) {
tracing::warn!(id = mdns_id, error = %e, "Orchestrator: mDNS unregister failed");
} else {
tracing::info!(id = mdns_id, name, "Orchestrator: mDNS unregistered");
}
}
}
if let Some(ref dns) = targets.dns {
if let Some(ref dns_name) = res.dns_name {
if let Err(e) = dns.core().remove_entry(dns_name) {
tracing::warn!(name = dns_name, error = %e, "Orchestrator: DNS remove failed");
} else {
tracing::info!(name = dns_name, "Orchestrator: DNS entry removed");
}
}
}
if let Some(ref health) = targets.health {
if let Some(ref check_name) = res.health_name {
if let Err(e) = health.core().remove_check(check_name).await {
tracing::warn!(name = check_name, error = %e, "Orchestrator: health remove failed");
} else {
tracing::info!(name = check_name, "Orchestrator: health check removed");
}
}
}
if let Some(ref proxy) = targets.proxy {
if let Some(ref proxy_name) = res.proxy_name {
if let Err(e) = proxy.core().remove(proxy_name).await {
tracing::warn!(name = proxy_name, error = %e, "Orchestrator: proxy remove failed");
} else {
tracing::info!(name = proxy_name, "Orchestrator: proxy entry removed");
}
}
}
}
async fn cleanup_all(
resources: &Arc<Mutex<HashMap<String, OrchestratedResources>>>,
targets: &Arc<OrchestrationTargets>,
) {
let entries: Vec<(String, OrchestratedResources)> = resources.lock().await.drain().collect();
for (id, res) in entries {
tracing::debug!(id, "Cleaning up orchestrated resources");
if let Some(ref mdns) = targets.mdns {
for mdns_id in &res.mdns_ids {
let _ = mdns.unregister(mdns_id);
}
}
if let Some(ref dns) = targets.dns {
if let Some(ref dns_name) = res.dns_name {
let _ = dns.core().remove_entry(dns_name);
}
}
if let Some(ref health) = targets.health {
if let Some(ref check_name) = res.health_name {
let _ = health.core().remove_check(check_name).await;
}
}
if let Some(ref proxy) = targets.proxy {
if let Some(ref proxy_name) = res.proxy_name {
let _ = proxy.core().remove(proxy_name).await;
}
}
}
}
fn should_orchestrate(instance: &Instance) -> bool {
if instance.metadata.is_disabled() {
return false;
}
instance.metadata.enable == Some(true)
}
fn effective_name(instance: &Instance) -> String {
instance
.metadata
.name
.clone()
.unwrap_or_else(|| instance.name.clone())
}
fn resolve_host_ip(instance: &Instance) -> String {
for port in &instance.ports {
if port.host_ip != "127.0.0.1" && port.host_ip != "::1" && !port.host_ip.is_empty() {
if port.host_ip == "0.0.0.0" || port.host_ip == "::" {
for ip in &instance.ips {
if ip != "127.0.0.1" && ip != "::1" {
return ip.clone();
}
}
}
return port.host_ip.clone();
}
}
for ip in &instance.ips {
if ip != "127.0.0.1" && ip != "::1" {
return ip.clone();
}
}
"127.0.0.1".to_string()
}
fn non_loopback_ip(_instance: &Instance, port: &koi_runtime::PortMapping) -> Option<String> {
let ip = &port.host_ip;
if ip.is_empty() || ip == "0.0.0.0" || ip == "::" || ip == "127.0.0.1" || ip == "::1" {
None } else {
Some(ip.clone())
}
}
fn build_health_check(instance: &Instance, service_name: &str) -> Option<koi_health::HealthCheck> {
let first_tcp_port = instance
.ports
.iter()
.find(|p| p.protocol == PortProtocol::Tcp)?;
let check_name = format!("runtime:{}", service_name);
let host_ip = resolve_host_ip(instance);
let health_path = instance.metadata.health_path.as_deref();
let (check_kind, target) = if let Some(path) = health_path {
(
koi_health::ServiceCheckKind::Http,
format!("http://{}:{}{}", host_ip, first_tcp_port.host_port, path),
)
} else {
(
koi_health::ServiceCheckKind::Tcp,
format!("{}:{}", host_ip, first_tcp_port.host_port),
)
};
Some(koi_health::HealthCheck {
name: check_name,
kind: check_kind,
target,
interval_secs: instance.metadata.health_interval.unwrap_or(30),
timeout_secs: instance.metadata.health_timeout.unwrap_or(5),
})
}
fn build_proxy_entry(instance: &Instance, service_name: &str) -> Option<koi_proxy::ProxyEntry> {
let listen_port = instance.metadata.proxy_port?;
let first_tcp_port = instance
.ports
.iter()
.find(|p| p.protocol == PortProtocol::Tcp)?;
let host_ip = resolve_host_ip(instance);
Some(koi_proxy::ProxyEntry {
name: service_name.to_string(),
listen_port,
backend: format!("http://{}:{}", host_ip, first_tcp_port.host_port),
allow_remote: instance.metadata.proxy_remote.unwrap_or(false),
})
}