use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc;
use async_trait::async_trait;
use engenho_store::StoreMesh;
use serde::{Deserialize, Serialize};
use thiserror::Error;
use tokio::sync::Mutex;
use crate::controller::{Controller, ReconcileReport};
use crate::error::ControllerError;
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct ServiceRoute {
pub service_id: String,
pub cluster_ip: String,
pub ports: Vec<PortMap>,
pub endpoints: BTreeSet<String>,
}
#[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct PortMap {
pub name: String,
pub service_port: u16,
pub target_port: u16,
pub protocol: String,
}
#[derive(Debug, Clone, Error)]
pub enum RouterError {
#[error("backend: {0}")]
Backend(String),
#[error("invalid route: {0}")]
InvalidRoute(String),
}
impl RouterError {
#[must_use]
pub fn kind(&self) -> &'static str {
match self {
Self::Backend(_) => "backend",
Self::InvalidRoute(_) => "invalid_route",
}
}
}
#[async_trait]
pub trait ServiceRouter: Send + Sync {
fn name(&self) -> &'static str;
async fn upsert(&self, route: &ServiceRoute) -> Result<(), RouterError>;
async fn remove(&self, service_id: &str) -> Result<(), RouterError>;
async fn list(&self) -> Result<BTreeMap<String, ServiceRoute>, RouterError>;
}
#[derive(Default, Clone)]
pub struct FakeRouter {
inner: Arc<Mutex<FakeRouterState>>,
}
#[derive(Default)]
struct FakeRouterState {
routes: BTreeMap<String, ServiceRoute>,
events: Vec<FakeRouterEvent>,
}
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum FakeRouterEvent {
Upsert(String),
Remove(String),
}
impl FakeRouter {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub async fn events(&self) -> Vec<FakeRouterEvent> {
self.inner.lock().await.events.clone()
}
pub async fn route_count(&self) -> usize {
self.inner.lock().await.routes.len()
}
}
#[async_trait]
impl ServiceRouter for FakeRouter {
fn name(&self) -> &'static str {
"fake"
}
async fn upsert(&self, route: &ServiceRoute) -> Result<(), RouterError> {
if route.service_id.is_empty() {
return Err(RouterError::InvalidRoute("empty service_id".into()));
}
let mut state = self.inner.lock().await;
state.routes.insert(route.service_id.clone(), route.clone());
state.events.push(FakeRouterEvent::Upsert(route.service_id.clone()));
Ok(())
}
async fn remove(&self, service_id: &str) -> Result<(), RouterError> {
let mut state = self.inner.lock().await;
state.routes.remove(service_id);
state.events.push(FakeRouterEvent::Remove(service_id.to_string()));
Ok(())
}
async fn list(&self) -> Result<BTreeMap<String, ServiceRoute>, RouterError> {
Ok(self.inner.lock().await.routes.clone())
}
}
pub struct ServiceRoutingController {
store: Arc<StoreMesh>,
backend: Arc<dyn ServiceRouter>,
namespace: Option<String>,
}
impl ServiceRoutingController {
#[must_use]
pub fn new(
store: Arc<StoreMesh>,
backend: Arc<dyn ServiceRouter>,
namespace: Option<String>,
) -> Self {
Self {
store,
backend,
namespace,
}
}
fn build_route(
service: &serde_json::Value,
endpoints: Option<&serde_json::Value>,
service_id: &str,
) -> Option<ServiceRoute> {
let cluster_ip = service
.get("spec")
.and_then(|s| s.get("clusterIP"))
.and_then(|c| c.as_str())
.unwrap_or("")
.to_string();
let ports = service
.get("spec")
.and_then(|s| s.get("ports"))
.and_then(|p| p.as_array())
.map(|arr| {
arr.iter()
.filter_map(|p| {
let name = p
.get("name")
.and_then(|n| n.as_str())
.unwrap_or("default")
.to_string();
let service_port = p.get("port").and_then(|n| n.as_u64())? as u16;
let target_port = p
.get("targetPort")
.and_then(|n| n.as_u64())
.map(|n| n as u16)
.unwrap_or(service_port);
let protocol = p
.get("protocol")
.and_then(|n| n.as_str())
.unwrap_or("TCP")
.to_string();
Some(PortMap {
name,
service_port,
target_port,
protocol,
})
})
.collect::<Vec<_>>()
})
.unwrap_or_default();
let endpoints_set: BTreeSet<String> = endpoints
.and_then(|e| e.get("subsets"))
.and_then(|s| s.as_array())
.map(|arr| {
arr.iter()
.flat_map(|subset| {
subset
.get("addresses")
.and_then(|a| a.as_array())
.into_iter()
.flatten()
})
.filter_map(|addr| {
addr.get("ip").and_then(|i| i.as_str()).map(String::from)
})
.collect()
})
.unwrap_or_default();
Some(ServiceRoute {
service_id: service_id.to_string(),
cluster_ip,
ports,
endpoints: endpoints_set,
})
}
fn service_id(namespace: &str, name: &str) -> String {
format!("{namespace}/{name}")
}
}
#[async_trait]
impl Controller for ServiceRoutingController {
fn name(&self) -> &'static str {
"service_router"
}
async fn tick(&self) -> Result<ReconcileReport, ControllerError> {
let services = self
.store
.list("", "v1", "Service", self.namespace.as_deref())
.await;
let endpoints_list = self
.store
.list("", "v1", "Endpoints", self.namespace.as_deref())
.await;
let endpoints_by_id: BTreeMap<String, serde_json::Value> = endpoints_list
.into_iter()
.map(|(k, v)| (Self::service_id(k.namespace.as_deref().unwrap_or("default"), &k.name), v))
.collect();
let mut report = ReconcileReport::default();
report.objects_examined = services.len();
let desired: BTreeMap<String, ServiceRoute> = services
.into_iter()
.filter_map(|(k, svc)| {
let id = Self::service_id(k.namespace.as_deref().unwrap_or("default"), &k.name);
let eps = endpoints_by_id.get(&id);
Self::build_route(&svc, eps, &id).map(|r| (id, r))
})
.collect();
let installed = self
.backend
.list()
.await
.map_err(|e| ControllerError::Internal(e.to_string()))?;
for (id, route) in &desired {
if installed.get(id) != Some(route) {
self.backend
.upsert(route)
.await
.map_err(|e| ControllerError::Internal(e.to_string()))?;
report.objects_changed += 1;
}
}
for id in installed.keys() {
if !desired.contains_key(id) {
self.backend
.remove(id)
.await
.map_err(|e| ControllerError::Internal(e.to_string()))?;
report.objects_changed += 1;
}
}
Ok(report)
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
fn make_service(name: &str, cluster_ip: &str, port: u16) -> serde_json::Value {
json!({
"kind": "Service",
"metadata": {"name": name, "namespace": "default"},
"spec": {
"clusterIP": cluster_ip,
"ports": [{"name": "http", "port": port, "targetPort": port}]
}
})
}
fn make_endpoints(ips: &[&str]) -> serde_json::Value {
let addresses: Vec<serde_json::Value> = ips
.iter()
.map(|ip| json!({"ip": ip}))
.collect();
json!({"subsets": [{"addresses": addresses}]})
}
#[test]
fn build_route_extracts_cluster_ip_and_ports() {
let svc = make_service("podinfo", "10.96.5.1", 80);
let eps = make_endpoints(&["10.42.0.1", "10.42.0.2"]);
let r = ServiceRoutingController::build_route(&svc, Some(&eps), "default/podinfo")
.unwrap();
assert_eq!(r.cluster_ip, "10.96.5.1");
assert_eq!(r.ports[0].service_port, 80);
assert_eq!(r.endpoints.len(), 2);
assert!(r.endpoints.contains("10.42.0.1"));
assert!(r.endpoints.contains("10.42.0.2"));
}
#[test]
fn build_route_handles_missing_endpoints() {
let svc = make_service("x", "10.96.0.1", 8080);
let r = ServiceRoutingController::build_route(&svc, None, "default/x").unwrap();
assert!(r.endpoints.is_empty());
}
#[test]
fn build_route_default_target_port_equals_service_port() {
let svc = json!({
"spec": {
"clusterIP": "10.0.0.1",
"ports": [{"name": "metrics", "port": 9090}] }
});
let r = ServiceRoutingController::build_route(&svc, None, "x/y").unwrap();
assert_eq!(r.ports[0].service_port, 9090);
assert_eq!(r.ports[0].target_port, 9090);
}
#[test]
fn build_route_default_protocol_is_tcp() {
let svc = make_service("x", "10.0.0.1", 80);
let r = ServiceRoutingController::build_route(&svc, None, "x/y").unwrap();
assert_eq!(r.ports[0].protocol, "TCP");
}
#[tokio::test]
async fn fake_router_upsert_inserts_route() {
let router = FakeRouter::new();
let route = ServiceRoute {
service_id: "default/x".into(),
cluster_ip: "10.0.0.1".into(),
ports: vec![],
endpoints: BTreeSet::new(),
};
router.upsert(&route).await.unwrap();
assert_eq!(router.route_count().await, 1);
let evs = router.events().await;
assert_eq!(evs, vec![FakeRouterEvent::Upsert("default/x".into())]);
}
#[tokio::test]
async fn fake_router_rejects_empty_service_id() {
let router = FakeRouter::new();
let route = ServiceRoute {
service_id: String::new(),
cluster_ip: "x".into(),
ports: vec![],
endpoints: BTreeSet::new(),
};
let err = router.upsert(&route).await.unwrap_err();
assert_eq!(err.kind(), "invalid_route");
}
#[tokio::test]
async fn fake_router_remove_clears_route() {
let router = FakeRouter::new();
let route = ServiceRoute {
service_id: "x".into(),
cluster_ip: "10.0.0.1".into(),
ports: vec![],
endpoints: BTreeSet::new(),
};
router.upsert(&route).await.unwrap();
router.remove("x").await.unwrap();
assert_eq!(router.route_count().await, 0);
}
#[tokio::test]
async fn fake_router_list_returns_installed() {
let router = FakeRouter::new();
let route = ServiceRoute {
service_id: "a".into(),
cluster_ip: "10.0.0.1".into(),
ports: vec![],
endpoints: BTreeSet::new(),
};
router.upsert(&route).await.unwrap();
let list = router.list().await.unwrap();
assert_eq!(list.len(), 1);
assert!(list.contains_key("a"));
}
#[test]
fn error_kinds_are_stable() {
assert_eq!(RouterError::Backend("x".into()).kind(), "backend");
assert_eq!(RouterError::InvalidRoute("x".into()).kind(), "invalid_route");
}
#[test]
fn controller_name_is_stable() {
struct Fake;
#[async_trait]
impl Controller for Fake {
fn name(&self) -> &'static str { "service_router" }
async fn tick(&self) -> Result<ReconcileReport, ControllerError> {
Ok(ReconcileReport::default())
}
}
assert_eq!(Fake.name(), "service_router");
}
}