use crate::ingress_route_tcp_crd::{
IngressRouteTCP, IngressRouteTCPRoutes, IngressRouteTCPRoutesServices, IngressRouteTCPSpec,
IngressRouteTCPTls,
};
use k8s_openapi::apimachinery::pkg::{
apis::meta::v1::{ObjectMeta, OwnerReference},
util::intstr::IntOrString,
};
use kube::{
api::{DeleteParams, Patch, PatchParams},
Api, Resource, ResourceExt,
};
use std::sync::Arc;
use crate::{apis::coredb_types::CoreDB, errors::OperatorError, Context};
use tracing::log::{debug, error, info, warn};
fn postgres_ingress_route_tcp(
name: String,
namespace: String,
owner_reference: OwnerReference,
matcher: String,
service_name: String,
port: IntOrString,
) -> IngressRouteTCP {
IngressRouteTCP {
metadata: ObjectMeta {
name: Some(name),
namespace: Some(namespace),
owner_references: Some(vec![owner_reference]),
..ObjectMeta::default()
},
spec: IngressRouteTCPSpec {
entry_points: Some(vec!["postgresql".to_string()]),
routes: vec![IngressRouteTCPRoutes {
r#match: matcher,
services: Some(vec![IngressRouteTCPRoutesServices {
name: service_name,
port,
namespace: None,
proxy_protocol: None,
termination_delay: None,
weight: None,
}]),
middlewares: None,
priority: None,
}],
tls: Some(IngressRouteTCPTls {
passthrough: Some(true),
cert_resolver: None,
domains: None,
options: None,
secret_name: None,
store: None,
}),
},
}
}
pub async fn reconcile_extra_postgres_ing_route_tcp(
cdb: &CoreDB,
ctx: Arc<Context>,
namespace: &str,
service_name_read_write: &str,
port: IntOrString,
) -> Result<(), OperatorError> {
let mut extra_domain_names = cdb.spec.extra_domains_rw.clone().unwrap_or_default();
extra_domain_names.sort();
let matchers = extra_domain_names
.iter()
.map(|domain_name| format!("HostSNI(`{}`)", domain_name))
.collect::<Vec<String>>();
let matcher_actual = matchers.join(" || ");
let ingress_route_tcp_name = format!("extra-{}-rw", cdb.name_any());
let owner_reference = cdb.controller_owner_ref(&()).unwrap();
let ingress_route_tcp_to_apply = postgres_ingress_route_tcp(
ingress_route_tcp_name.clone(),
namespace.to_string(),
owner_reference.clone(),
matcher_actual.clone(),
service_name_read_write.to_string(),
port.clone(),
);
let ingress_route_tcp_api: Api<IngressRouteTCP> = Api::namespaced(ctx.client.clone(), namespace);
if !extra_domain_names.is_empty() {
apply_ingress_route_tcp(
ingress_route_tcp_api,
namespace,
&ingress_route_tcp_name,
&ingress_route_tcp_to_apply,
)
.await
} else {
delete_ingress_route_tcp(ingress_route_tcp_api, namespace, &ingress_route_tcp_name).await
}
}
async fn apply_ingress_route_tcp(
ingress_route_tcp_api: Api<IngressRouteTCP>,
namespace: &str,
ingress_route_tcp_name: &String,
ingress_route_tcp_to_apply: &IngressRouteTCP,
) -> Result<(), OperatorError> {
let patch = Patch::Apply(&ingress_route_tcp_to_apply);
let patch_parameters = PatchParams::apply("cntrlr").force();
match ingress_route_tcp_api
.patch(&ingress_route_tcp_name.clone(), &patch_parameters, &patch)
.await
{
Ok(_) => {
info!(
"Updated postgres read and write IngressRouteTCP {}.{}",
ingress_route_tcp_name.clone(),
namespace
);
}
Err(e) => {
error!(
"Failed to update postgres read and write IngressRouteTCP {}.{}: {}",
ingress_route_tcp_name, namespace, e
);
return Err(OperatorError::IngressRouteTcpError);
}
}
Ok(())
}
async fn delete_ingress_route_tcp(
ingress_route_tcp_api: Api<IngressRouteTCP>,
namespace: &str,
ingress_route_tcp_name: &String,
) -> Result<(), OperatorError> {
if ingress_route_tcp_api
.get(&ingress_route_tcp_name.clone())
.await
.is_ok()
{
let delete_parameters = DeleteParams::default();
match ingress_route_tcp_api
.delete(&ingress_route_tcp_name.clone(), &delete_parameters)
.await
{
Ok(_) => {
info!(
"Deleted IngressRouteTCP {}.{}",
ingress_route_tcp_name.clone(),
namespace
);
}
Err(e) => {
error!(
"Failed to delete IngressRouteTCP {}.{}: {}",
ingress_route_tcp_name, namespace, e
);
return Err(OperatorError::IngressRouteTcpError);
}
}
} else {
debug!(
"IngressRouteTCP {}.{} was not found. Assuming it's already deleted.",
ingress_route_tcp_name, namespace
);
}
Ok(())
}
pub async fn reconcile_postgres_ing_route_tcp(
cdb: &CoreDB,
ctx: Arc<Context>,
subdomain: &str,
basedomain: &str,
namespace: &str,
service_name_read_write: &str,
port: IntOrString,
) -> Result<(), OperatorError> {
let client = ctx.client.clone();
let ingress_route_tcp_api: Api<IngressRouteTCP> = Api::namespaced(client, namespace);
let owner_reference = cdb.controller_owner_ref(&()).unwrap();
let ingress_route_tcps = ingress_route_tcp_api.list(&Default::default()).await?;
let ingress_route_tcp_name_prefix_rw = format!("{}-rw-", cdb.name_any());
let ingress_route_tcp_name_prefix_rw = ingress_route_tcp_name_prefix_rw.as_str();
let mut present_matchers_list: Vec<String> = vec![];
let mut present_ing_route_tcp_names_list: Vec<String> = vec![];
for ingress_route_tcp in ingress_route_tcps {
let ingress_route_tcp_name = match ingress_route_tcp.metadata.name.clone() {
Some(ingress_route_tcp_name) => {
if !(ingress_route_tcp_name.starts_with(ingress_route_tcp_name_prefix_rw)
|| ingress_route_tcp_name == cdb.name_any())
{
debug!(
"Skipping non postgres-rw ingress route tcp: {}",
ingress_route_tcp_name
);
continue;
}
ingress_route_tcp_name
}
None => {
error!(
"IngressRouteTCP {}.{}, does not have a name.",
subdomain, basedomain
);
return Err(OperatorError::IngressRouteTCPName);
}
};
debug!(
"Detected ingress route tcp read write endpoint {}.{}",
ingress_route_tcp_name, namespace
);
present_ing_route_tcp_names_list.push(ingress_route_tcp_name.clone());
let service_name_actual = ingress_route_tcp.spec.routes[0]
.services
.clone()
.expect("Ingress route has no services")[0]
.name
.clone();
let service_port_actual = ingress_route_tcp.spec.routes[0]
.services
.clone()
.expect("Ingress route has no services")[0]
.port
.clone();
let matcher_actual = ingress_route_tcp.spec.routes[0].r#match.clone();
present_matchers_list.push(matcher_actual.clone());
if !(service_name_actual == service_name_read_write && service_port_actual == port) {
warn!(
"Postgres read and write IngressRouteTCP {}.{}, does not match the service name or port. Updating service or port and leaving the match rule the same.",
ingress_route_tcp_name, namespace
);
let ingress_route_tcp_to_apply = postgres_ingress_route_tcp(
ingress_route_tcp_name.clone(),
namespace.to_string(),
owner_reference.clone(),
matcher_actual.clone(),
service_name_read_write.to_string(),
port.clone(),
);
apply_ingress_route_tcp(
ingress_route_tcp_api.clone(),
namespace,
&ingress_route_tcp_name,
&ingress_route_tcp_to_apply,
)
.await?;
}
}
let newest_matcher = format!("HostSNI(`{subdomain}.{basedomain}`)");
if !present_matchers_list.contains(&newest_matcher) {
let mut index = 0;
let mut ingress_route_tcp_name_new = format!("{}{}", ingress_route_tcp_name_prefix_rw, index);
while present_ing_route_tcp_names_list.contains(&ingress_route_tcp_name_new) {
index += 1;
ingress_route_tcp_name_new = format!("{}{}", ingress_route_tcp_name_prefix_rw, index);
}
let ingress_route_tcp_name_new = ingress_route_tcp_name_new;
let ingress_route_tcp_to_apply = postgres_ingress_route_tcp(
ingress_route_tcp_name_new.clone(),
namespace.to_string(),
owner_reference.clone(),
newest_matcher.clone(),
service_name_read_write.to_string(),
port.clone(),
);
apply_ingress_route_tcp(
ingress_route_tcp_api,
namespace,
&ingress_route_tcp_name_new,
&ingress_route_tcp_to_apply,
)
.await?;
} else {
debug!(
"There is already an IngressRouteTCP for this matcher, so we don't need to create a new one: {}",
newest_matcher
);
}
Ok(())
}