use std::collections::BTreeMap;
use std::str::FromStr;
use crate::backend::{Backend, LbPolicy};
use crate::error::{Error, ErrorContext};
use crate::{Name, Service};
use k8s_openapi::api::core::v1 as core_v1;
use kube::api::ObjectMeta;
use kube::{Resource, ResourceExt};
const LB_ANNOTATION: &str = "junctionlabs.io/backend.lb";
fn lb_policy_annotation(port: u16) -> String {
format!("{LB_ANNOTATION}.{port}")
}
impl Backend {
pub fn to_service_patch(&self) -> core_v1::Service {
let mut svc = core_v1::Service {
metadata: ObjectMeta {
annotations: Some(BTreeMap::new()),
..Default::default()
},
..Default::default()
};
let lb_annotation = lb_policy_annotation(self.id.port);
let lb_json = serde_json::to_string(&self.lb)
.expect("Failed to serialize Backend. this is a bug in Junction, not your code");
svc.annotations_mut()
.insert(lb_annotation.to_string(), lb_json);
match &self.id.service {
Service::Dns(dns) => {
svc.spec = Some(core_v1::ServiceSpec {
type_: Some("ExternalName".to_string()),
external_name: Some(dns.hostname.to_string()),
..Default::default()
})
}
Service::Kube(service) => {
let meta = svc.meta_mut();
meta.name = Some(service.name.to_string());
meta.namespace = Some(service.namespace.to_string());
svc.spec = Some(core_v1::ServiceSpec {
type_: Some("ClusterIP".to_string()),
ports: Some(vec![core_v1::ServicePort {
port: self.id.port as i32,
protocol: Some("TCP".to_string()),
..Default::default()
}]),
..Default::default()
})
}
};
svc
}
pub fn from_service(svc: &core_v1::Service) -> Result<Vec<Self>, Error> {
let (namespace, name) = (
as_ref_or_else(&svc.meta().namespace, "missing namespace")
.with_fields("meta", "name")?,
as_ref_or_else(&svc.meta().name, "missing name").with_fields("meta", "name")?,
);
let spec = as_ref_or_else(&svc.spec, "missing spec").with_field("spec")?;
let svc_type = spec
.type_
.as_deref()
.ok_or_else(|| Error::new_static("missing type"))
.with_fields("spec", "type")?;
let mut backends = vec![];
let (service, svc_ports) = match svc_type {
"ClusterIP" => {
let name = Name::from_str(name).with_fields("meta", "name")?;
let namespace = Name::from_str(namespace).with_fields("meta", "namespace")?;
let service = Service::kube(&namespace, &name)?;
let svc_ports =
as_ref_or_else(&spec.ports, "missing ports").with_fields("spec", "ports")?;
let mut ports = Vec::with_capacity(svc_ports.len());
for (i, svc_port) in svc_ports.iter().enumerate() {
let port: u16 = convert_port(svc_port.port)
.with_field("port")
.with_field_index("ports", i)?;
ports.push(port);
}
(service, ports)
}
"ExternalName" => {
let external_name = as_ref_or_else(&spec.external_name, "missing externalName")
.with_fields("spec", "externalName")?;
let service = Service::dns(external_name).with_fields("spec", "externalName")?;
let svc_ports = spec.ports.as_deref().unwrap_or_default();
let mut ports = Vec::with_capacity(svc_ports.len());
for (i, svc_port) in svc_ports.iter().enumerate() {
let port: u16 = convert_port(svc_port.port)
.with_field("port")
.with_field_index("ports", i)?;
ports.push(port);
}
if ports.is_empty() {
ports.extend([80, 443]);
}
(service, ports)
}
svc_type => return Err(Error::new(format!("{svc_type} Services are unsupported"))),
};
for port in svc_ports {
let lb =
get_lb_policy(svc.annotations(), &lb_policy_annotation(port))?.unwrap_or_default();
backends.push(Backend {
id: crate::BackendId {
service: service.clone(),
port,
},
lb,
})
}
Ok(backends)
}
}
fn get_lb_policy(
annotations: &BTreeMap<String, String>,
key: &str,
) -> Result<Option<LbPolicy>, Error> {
match annotations.get(key) {
Some(s) => {
let lb_policy = serde_json::from_str(s)
.map_err(|e| Error::new(format!("failed to deserialize {key}: {e}")))?;
Ok(Some(lb_policy))
}
None => Ok(None),
}
}
#[inline]
fn convert_port(port: i32) -> Result<u16, Error> {
port.try_into()
.map_err(|_| Error::new(format!("port value '{port}' is out of range")))
}
#[inline]
fn as_ref_or_else<'a, T>(f: &'a Option<T>, message: &'static str) -> Result<&'a T, Error> {
f.as_ref().ok_or_else(|| Error::new_static(message))
}
#[cfg(test)]
mod test {
use k8s_openapi::api::core::v1 as core_v1;
use kube::api::ObjectMeta;
use crate::backend::{RequestHashPolicy, RequestHasher, RingHashParams};
use super::*;
macro_rules! annotations {
($($k:expr => $v:expr),* $(,)*) => {{
let mut annotations = BTreeMap::new();
$(
annotations.insert($k.to_string(), $v.to_string());
)*
annotations
}}
}
const CLUSTER_IP: Option<&str> = Some("ClusterIP");
const EXTERNAL_NAME: Option<&str> = Some("ExternalName");
#[test]
fn test_to_service_patch() {
let backend = Backend {
id: Service::kube("bar", "foo").unwrap().as_backend_id(1212),
lb: LbPolicy::RoundRobin,
};
assert_eq!(
backend.to_service_patch(),
core_v1::Service {
metadata: ObjectMeta {
namespace: Some("bar".to_string()),
name: Some("foo".to_string()),
annotations: Some(
annotations! { "junctionlabs.io/backend.lb.1212" => r#"{"type":"RoundRobin"}"# }
),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: CLUSTER_IP.map(str::to_string),
ports: Some(vec![core_v1::ServicePort {
port: 1212,
protocol: Some("TCP".to_string()),
..Default::default()
}]),
..Default::default()
}),
status: None,
}
);
let backend = Backend {
id: Service::dns("example.com").unwrap().as_backend_id(4430),
lb: LbPolicy::RoundRobin,
};
assert_eq!(
backend.to_service_patch(),
core_v1::Service {
metadata: ObjectMeta {
annotations: Some(
annotations! { "junctionlabs.io/backend.lb.4430" => r#"{"type":"RoundRobin"}"# }
),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: Some("ExternalName".to_string()),
external_name: Some("example.com".to_string()),
..Default::default()
}),
status: None,
}
);
}
#[test]
fn test_from_clusterip() {
let svc = core_v1::Service {
metadata: ObjectMeta {
namespace: Some("bar".to_string()),
name: Some("foo".to_string()),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: CLUSTER_IP.map(str::to_string),
ports: Some(vec![core_v1::ServicePort {
port: 8910,
protocol: Some("TCP".to_string()),
..Default::default()
}]),
..Default::default()
}),
status: None,
};
assert_eq!(
Backend::from_service(&svc).unwrap(),
vec![Backend {
id: Service::kube("bar", "foo").unwrap().as_backend_id(8910),
lb: LbPolicy::Unspecified,
},]
);
let no_ports = core_v1::Service {
metadata: ObjectMeta {
namespace: Some("bar".to_string()),
name: Some("foo".to_string()),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: CLUSTER_IP.map(str::to_string),
..Default::default()
}),
status: None,
};
assert!(Backend::from_service(&no_ports).is_err());
let svc = core_v1::Service {
metadata: ObjectMeta {
namespace: Some("bar".to_string()),
name: Some("foo".to_string()),
annotations: Some(annotations! {
"junctionlabs.io/backend.lb.443" => r#"{"type":"RingHash", "min_ring_size": 1024, "hash_params": [{"type": "Header", "name": "x-user"}]}"#,
"junctionlabs.io/backend.lb.4430" => r#"{"type":"RoundRobin"}"#,
}),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: CLUSTER_IP.map(str::to_string),
ports: Some(vec![
core_v1::ServicePort {
name: Some("http".to_string()),
port: 80,
protocol: Some("TCP".to_string()),
..Default::default()
},
core_v1::ServicePort {
name: Some("https".to_string()),
port: 443,
protocol: Some("TCP".to_string()),
..Default::default()
},
core_v1::ServicePort {
name: Some("health".to_string()),
port: 4430,
protocol: Some("TCP".to_string()),
..Default::default()
},
]),
..Default::default()
}),
status: None,
};
assert_eq!(
Backend::from_service(&svc).unwrap(),
vec![
Backend {
id: Service::kube("bar", "foo").unwrap().as_backend_id(80),
lb: LbPolicy::Unspecified,
},
Backend {
id: Service::kube("bar", "foo").unwrap().as_backend_id(443),
lb: LbPolicy::RingHash(RingHashParams {
min_ring_size: 1024,
hash_params: vec![RequestHashPolicy {
terminal: false,
hasher: RequestHasher::Header {
name: "x-user".to_string()
}
}]
}),
},
Backend {
id: Service::kube("bar", "foo").unwrap().as_backend_id(4430),
lb: LbPolicy::RoundRobin,
},
]
)
}
#[test]
fn test_from_external_name() {
let svc = core_v1::Service {
metadata: ObjectMeta {
namespace: Some("bar".to_string()),
name: Some("foo".to_string()),
annotations: Some(annotations! {
"junctionlabs.io/backend.lb.443" => r#"{"type":"RoundRobin"}"#,
}),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: EXTERNAL_NAME.map(str::to_string),
external_name: Some("www.junctionlabs.io".to_string()),
..Default::default()
}),
status: None,
};
assert_eq!(
Backend::from_service(&svc).unwrap(),
vec![
Backend {
id: Service::dns("www.junctionlabs.io")
.unwrap()
.as_backend_id(80),
lb: LbPolicy::Unspecified,
},
Backend {
id: Service::dns("www.junctionlabs.io")
.unwrap()
.as_backend_id(443),
lb: LbPolicy::RoundRobin,
},
]
);
let svc = core_v1::Service {
metadata: ObjectMeta {
namespace: Some("bar".to_string()),
name: Some("foo".to_string()),
annotations: Some(annotations! {
"junctionlabs.io/backend.lb.7777" => r#"{"type":"RoundRobin"}"#,
}),
..Default::default()
},
spec: Some(core_v1::ServiceSpec {
type_: EXTERNAL_NAME.map(str::to_string),
external_name: Some("www.junctionlabs.io".to_string()),
ports: Some(vec![core_v1::ServicePort {
port: 7777,
protocol: Some("TCP".to_string()),
..Default::default()
}]),
..Default::default()
}),
status: None,
};
assert_eq!(
Backend::from_service(&svc).unwrap(),
vec![Backend {
id: Service::dns("www.junctionlabs.io")
.unwrap()
.as_backend_id(7777),
lb: LbPolicy::RoundRobin,
},]
)
}
#[test]
fn test_svc_patch_roundtrip() {
let backend = Backend {
id: Service::kube("bar", "foo").unwrap().as_backend_id(8888),
lb: LbPolicy::RoundRobin,
};
assert_eq!(
Backend::from_service(&backend.to_service_patch()).unwrap(),
vec![backend.clone()]
)
}
}