use crate::{apis::coredb_types::CoreDB, ingress_route_crd::IngressRouteRoutes, Context, Error, Result};
use k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec},
core::v1::{
Capabilities, Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, PodSpec,
PodTemplateSpec, Probe, SecretKeySelector, SecurityContext, Service, ServicePort, ServiceSpec,
},
},
apimachinery::pkg::{
apis::meta::v1::{LabelSelector, OwnerReference},
util::intstr::IntOrString,
},
};
use kube::{
api::{Api, ListParams, ObjectMeta, Patch, PatchParams, ResourceExt},
runtime::controller::Action,
Client, Resource,
};
use std::{collections::BTreeMap, sync::Arc, time::Duration};
use tracing::{debug, error, warn};
use super::{
ingress::{generate_ingress_routes, reconcile_ingress},
types::{AppService, EnvVarRef, Middleware, COMPONENT_NAME},
};
#[derive(Clone, Debug)]
struct AppServiceResources {
deployment: Deployment,
name: String,
service: Option<Service>,
ingress_routes: Option<Vec<IngressRouteRoutes>>,
}
fn generate_resource(
appsvc: &AppService,
coredb_name: &str,
namespace: &str,
oref: OwnerReference,
domain: String,
) -> AppServiceResources {
let resource_name = format!("{}-{}", coredb_name, appsvc.name.clone());
let service = appsvc
.routing
.as_ref()
.map(|_| generate_service(appsvc, coredb_name, &resource_name, namespace, oref.clone()));
let deployment = generate_deployment(appsvc, coredb_name, &resource_name, namespace, oref);
let host_matcher = format!(
"Host(`{subdomain}.{domain}`)",
subdomain = coredb_name,
domain = domain
);
let ingress_routes =
generate_ingress_routes(appsvc, &resource_name, namespace, host_matcher, coredb_name);
AppServiceResources {
deployment,
name: resource_name,
service,
ingress_routes,
}
}
fn generate_service(
appsvc: &AppService,
coredb_name: &str,
resource_name: &str,
namespace: &str,
oref: OwnerReference,
) -> Service {
let mut selector_labels: BTreeMap<String, String> = BTreeMap::new();
selector_labels.insert("app".to_owned(), resource_name.to_string());
selector_labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
selector_labels.insert("coredb.io/name".to_owned(), coredb_name.to_string());
let mut labels = selector_labels.clone();
labels.insert("component".to_owned(), COMPONENT_NAME.to_owned());
let ports = match appsvc.routing.as_ref() {
Some(routing) => {
let ports: Vec<ServicePort> = routing
.iter()
.map(|r| ServicePort {
port: r.port as i32,
name: Some(format!("http-{}", r.port)),
target_port: None,
..ServicePort::default()
})
.collect();
Some(ports)
}
None => None,
};
Service {
metadata: ObjectMeta {
name: Some(resource_name.to_owned()),
namespace: Some(namespace.to_owned()),
labels: Some(labels.clone()),
owner_references: Some(vec![oref]),
..ObjectMeta::default()
},
spec: Some(ServiceSpec {
ports,
selector: Some(selector_labels.clone()),
..ServiceSpec::default()
}),
..Service::default()
}
}
fn generate_deployment(
appsvc: &AppService,
coredb_name: &str,
resource_name: &str,
namespace: &str,
oref: OwnerReference,
) -> Deployment {
let mut labels: BTreeMap<String, String> = BTreeMap::new();
labels.insert("app".to_owned(), resource_name.to_string());
labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
labels.insert("coredb.io/name".to_owned(), coredb_name.to_string());
let deployment_metadata = ObjectMeta {
name: Some(resource_name.to_string()),
namespace: Some(namespace.to_owned()),
labels: Some(labels.clone()),
owner_references: Some(vec![oref]),
..ObjectMeta::default()
};
let (readiness_probe, liveness_probe) = match appsvc.probes.clone() {
Some(probes) => {
let readiness_probe = Probe {
http_get: Some(HTTPGetAction {
path: Some(probes.readiness.path),
port: IntOrString::String(probes.readiness.port),
..HTTPGetAction::default()
}),
initial_delay_seconds: Some(probes.readiness.initial_delay_seconds as i32),
..Probe::default()
};
let liveness_probe = Probe {
http_get: Some(HTTPGetAction {
path: Some(probes.liveness.path),
port: IntOrString::String(probes.liveness.port),
..HTTPGetAction::default()
}),
initial_delay_seconds: Some(probes.liveness.initial_delay_seconds as i32),
..Probe::default()
};
(Some(readiness_probe), Some(liveness_probe))
}
None => (None, None),
};
let container_ports: Option<Vec<ContainerPort>> = match appsvc.routing.as_ref() {
Some(ports) => {
let container_ports: Vec<ContainerPort> = ports
.iter()
.map(|pm| ContainerPort {
container_port: pm.port as i32,
protocol: Some("TCP".to_string()),
..ContainerPort::default()
})
.collect();
Some(container_ports)
}
None => None,
};
let security_context = SecurityContext {
run_as_user: Some(65534),
allow_privilege_escalation: Some(false),
capabilities: Some(Capabilities {
drop: Some(vec!["ALL".to_string()]),
..Capabilities::default()
}),
privileged: Some(false),
run_as_non_root: Some(true),
read_only_root_filesystem: Some(true),
..SecurityContext::default()
};
let cdb_name_env = coredb_name.to_uppercase().replace('-', "_");
let r_conn = format!("{}_R_CONNECTION", cdb_name_env);
let ro_conn = format!("{}_RO_CONNECTION", cdb_name_env);
let rw_conn = format!("{}_RW_CONNECTION", cdb_name_env);
let secret_envs = vec![
EnvVar {
name: r_conn,
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: Some(format!("{}-connection", coredb_name)),
key: "r_uri".to_string(),
..SecretKeySelector::default()
}),
..EnvVarSource::default()
}),
..EnvVar::default()
},
EnvVar {
name: ro_conn,
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: Some(format!("{}-connection", coredb_name)),
key: "rw_uri".to_string(),
..SecretKeySelector::default()
}),
..EnvVarSource::default()
}),
..EnvVar::default()
},
EnvVar {
name: rw_conn,
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: Some(format!("{}-connection", coredb_name)),
key: "ro_uri".to_string(),
..SecretKeySelector::default()
}),
..EnvVarSource::default()
}),
..EnvVar::default()
},
];
let mut env_vars: Vec<EnvVar> = Vec::new();
if let Some(envs) = appsvc.env.clone() {
for env in envs {
let evar: Option<EnvVar> = match (env.value, env.value_from_platform) {
(Some(e), _) => Some(EnvVar {
name: env.name,
value: Some(e),
..EnvVar::default()
}),
(None, Some(e)) => {
let secret_key = match e {
EnvVarRef::ReadOnlyConnection => "ro_uri",
EnvVarRef::ReadWriteConnection => "rw_uri",
};
Some(EnvVar {
name: env.name,
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: Some(format!("{}-connection", coredb_name)),
key: secret_key.to_string(),
..SecretKeySelector::default()
}),
..EnvVarSource::default()
}),
..EnvVar::default()
})
}
_ => {
error!(
"ns: {}, AppService: {}, env var: {} is missing value or valueFromPlatform",
namespace, resource_name, env.name
);
None
}
};
if let Some(e) = evar {
env_vars.push(e);
}
}
}
env_vars.extend(secret_envs);
let pod_spec = PodSpec {
containers: vec![Container {
args: appsvc.args.clone(),
command: appsvc.command.clone(),
env: Some(env_vars),
image: Some(appsvc.image.clone()),
name: appsvc.name.clone(),
ports: container_ports,
resources: appsvc.resources.clone(),
readiness_probe,
liveness_probe,
security_context: Some(security_context),
..Container::default()
}],
..PodSpec::default()
};
let pod_template_spec = PodTemplateSpec {
metadata: Some(deployment_metadata.clone()),
spec: Some(pod_spec),
};
let deployment_spec = DeploymentSpec {
selector: LabelSelector {
match_labels: Some(labels.clone()),
..LabelSelector::default()
},
template: pod_template_spec,
..DeploymentSpec::default()
};
Deployment {
metadata: deployment_metadata,
spec: Some(deployment_spec),
..Deployment::default()
}
}
async fn get_appservice_deployments(
client: &Client,
namespace: &str,
coredb_name: &str,
) -> Result<Vec<String>, Error> {
let label_selector = format!("component={},coredb.io/name={}", COMPONENT_NAME, coredb_name);
let deployent_api: Api<Deployment> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default().labels(&label_selector).timeout(10);
let deployments = deployent_api.list(&lp).await.map_err(Error::KubeError)?;
Ok(deployments
.items
.iter()
.map(|d| d.metadata.name.to_owned().expect("no name on resource"))
.collect())
}
async fn get_appservice_services(
client: &Client,
namespace: &str,
coredb_name: &str,
) -> Result<Vec<String>, Error> {
let label_selector = format!("component={},coredb.io/name={}", COMPONENT_NAME, coredb_name);
let deployent_api: Api<Service> = Api::namespaced(client.clone(), namespace);
let lp = ListParams::default().labels(&label_selector).timeout(10);
let services = deployent_api.list(&lp).await.map_err(Error::KubeError)?;
Ok(services
.items
.iter()
.map(|d| d.metadata.name.to_owned().expect("no name on resource"))
.collect())
}
pub fn to_delete(desired: Vec<String>, actual: Vec<String>) -> Option<Vec<String>> {
let mut to_delete: Vec<String> = Vec::new();
for a in actual {
if !desired.contains(&a) {
to_delete.push(a);
}
}
if to_delete.is_empty() {
None
} else {
Some(to_delete)
}
}
async fn apply_resources(resources: Vec<AppServiceResources>, client: &Client, ns: &str) -> bool {
let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), ns);
let service_api: Api<Service> = Api::namespaced(client.clone(), ns);
let ps = PatchParams::apply("cntrlr").force();
let mut has_errors: bool = false;
for res in resources {
match deployment_api
.patch(&res.name, &ps, &Patch::Apply(&res.deployment))
.await
.map_err(Error::KubeError)
{
Ok(_) => {
debug!("ns: {}, applied AppService Deployment: {}", ns, res.name);
}
Err(e) => {
has_errors = true;
error!(
"ns: {}, failed to apply AppService Deployment: {}, error: {}",
ns, res.name, e
);
}
}
if res.service.is_none() {
continue;
}
match service_api
.patch(&res.name, &ps, &Patch::Apply(&res.service))
.await
.map_err(Error::KubeError)
{
Ok(_) => {
debug!("ns: {}, applied AppService Service: {}", ns, res.name);
}
Err(e) => {
has_errors = true;
error!(
"ns: {}, failed to apply AppService Service: {}, error: {}",
ns, res.name, e
);
}
}
}
has_errors
}
pub async fn reconcile_app_services(cdb: &CoreDB, ctx: Arc<Context>) -> Result<(), Action> {
let client = ctx.client.clone();
let ns = cdb.namespace().unwrap();
let coredb_name = cdb.name_any();
let oref = cdb.controller_owner_ref(&()).unwrap();
let deployment_api: Api<Deployment> = Api::namespaced(client.clone(), &ns);
let service_api: Api<Service> = Api::namespaced(client.clone(), &ns);
let desired_deployments = match cdb.spec.app_services.clone() {
Some(appsvcs) => appsvcs
.iter()
.map(|a| format!("{}-{}", coredb_name, a.name.clone()))
.collect(),
None => {
debug!("No AppServices found in Instance: {}", ns);
vec![]
}
};
let desired_services = match cdb.spec.app_services.clone() {
Some(appsvcs) => {
let mut desired_svc: Vec<String> = Vec::new();
for appsvc in appsvcs.iter() {
if appsvc.routing.as_ref().is_some() {
let svc_name = format!("{}-{}", coredb_name, appsvc.name);
desired_svc.push(svc_name.clone());
}
}
desired_svc
}
None => {
vec![]
}
};
let mut has_errors: bool = false;
let actual_deployments = match get_appservice_deployments(&client, &ns, &coredb_name).await {
Ok(deployments) => deployments,
Err(e) => {
has_errors = true;
error!("ns: {}, failed to get AppService Deployments: {}", ns, e);
vec![]
}
};
let actual_services = match get_appservice_services(&client, &ns, &coredb_name).await {
Ok(services) => services,
Err(e) => {
has_errors = true;
error!("ns: {}, failed to get AppService Services: {}", ns, e);
vec![]
}
};
if let Some(to_delete) = to_delete(desired_deployments, actual_deployments) {
for d in to_delete {
match deployment_api.delete(&d, &Default::default()).await {
Ok(_) => {
debug!("ns: {}, successfully deleted AppService: {}", ns, d);
}
Err(e) => {
has_errors = true;
error!("ns: {}, Failed to delete AppService: {}, error: {}", ns, d, e);
}
}
}
}
if let Some(to_delete) = to_delete(desired_services, actual_services) {
for d in to_delete {
match service_api.delete(&d, &Default::default()).await {
Ok(_) => {
debug!("ns: {}, successfully deleted AppService: {}", ns, d);
}
Err(e) => {
has_errors = true;
error!("ns: {}, Failed to delete AppService: {}, error: {}", ns, d, e);
}
}
}
}
let appsvcs = match cdb.spec.app_services.clone() {
Some(appsvcs) => appsvcs,
None => {
debug!("ns: {}, No AppServices found in spec", ns);
vec![]
}
};
let domain = match std::env::var("DATA_PLANE_BASEDOMAIN") {
Ok(domain) => domain,
Err(_) => {
warn!("`DATA_PLANE_BASEDOMAIN` not set -- assuming `localhost`");
"localhost".to_string()
}
};
let resources: Vec<AppServiceResources> = appsvcs
.iter()
.map(|appsvc| generate_resource(appsvc, &coredb_name, &ns, oref.clone(), domain.to_owned()))
.collect();
let apply_errored = apply_resources(resources.clone(), &client, &ns).await;
let desired_routes: Vec<IngressRouteRoutes> = resources
.iter()
.filter_map(|r| r.ingress_routes.clone())
.flatten()
.collect();
let desired_middlewares = appsvcs
.iter()
.filter_map(|appsvc| appsvc.middlewares.clone())
.flatten()
.collect::<Vec<Middleware>>();
match reconcile_ingress(
client.clone(),
&coredb_name,
&ns,
oref.clone(),
desired_routes,
desired_middlewares,
)
.await
{
Ok(_) => {
debug!("Updated/applied ingress for {}.{}", ns, coredb_name,);
}
Err(e) => {
error!(
"Failed to update/apply IngressRoute {}.{}: {}",
ns, coredb_name, e
);
has_errors = true;
}
}
if has_errors || apply_errored {
return Err(Action::requeue(Duration::from_secs(300)));
}
Ok(())
}