1use crate::{
2 ingress_route_crd::{
3 IngressRoute, IngressRouteRoutes, IngressRouteRoutesKind, IngressRouteRoutesMiddlewares,
4 IngressRouteRoutesServices, IngressRouteRoutesServicesKind, IngressRouteSpec,
5 IngressRouteTls,
6 },
7 traefik::middlewares_crd::{
8 Middleware as TraefikMiddleware, MiddlewareHeaders, MiddlewareReplacePathRegex,
9 MiddlewareSpec, MiddlewareStripPrefix,
10 },
11 Result,
12};
13use k8s_openapi::apimachinery::pkg::{apis::meta::v1::OwnerReference, util::intstr::IntOrString};
14use kube::{
15 api::{Api, ListParams, ObjectMeta, Patch, PatchParams},
16 Client,
17};
18
19use std::collections::BTreeMap;
20
21use tracing::{debug, error};
22
23use super::{
24 manager::to_delete,
25 types::{AppService, Middleware, COMPONENT_NAME},
26};
27
28use crate::traefik::ingress_route_tcp_crd::{
29 IngressRouteTCP, IngressRouteTCPRoutes, IngressRouteTCPRoutesMiddlewares,
30 IngressRouteTCPRoutesServices, IngressRouteTCPSpec, IngressRouteTCPTls,
31};
32
33use crate::app_service::types::IngressType;
34
35#[derive(Clone, Debug)]
36pub struct MiddleWareWrapper {
37 pub name: String,
38 pub mw: TraefikMiddleware,
39}
40
41fn generate_ingress(
42 coredb_name: &str,
43 namespace: &str,
44 oref: OwnerReference,
45 routes: Vec<IngressRouteRoutes>,
46 entry_points: Vec<String>,
47) -> IngressRoute {
48 let mut labels: BTreeMap<String, String> = BTreeMap::new();
49 labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
50 labels.insert("coredb.io/name".to_owned(), coredb_name.to_string());
51
52 IngressRoute {
53 metadata: ObjectMeta {
54 name: Some(coredb_name.to_owned()),
56 namespace: Some(namespace.to_owned()),
57 owner_references: Some(vec![oref]),
58 labels: Some(labels.clone()),
59 ..ObjectMeta::default()
60 },
61 spec: IngressRouteSpec {
62 entry_points: Some(entry_points),
63 routes,
64 tls: Some(IngressRouteTls::default()),
65 },
66 }
67}
68
69fn generate_ingress_tcp(
70 name: &str,
71 namespace: &str,
72 oref: OwnerReference,
73 routes: Vec<IngressRouteTCPRoutes>,
74 entry_points: Vec<String>,
75) -> IngressRouteTCP {
76 let mut labels: BTreeMap<String, String> = BTreeMap::new();
77 labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
78 labels.insert("coredb.io/name".to_owned(), name.to_string());
79
80 IngressRouteTCP {
81 metadata: ObjectMeta {
82 name: Some(name.to_string()),
84 namespace: Some(namespace.to_owned()),
85 owner_references: Some(vec![oref]),
86 labels: Some(labels.clone()),
87 ..ObjectMeta::default()
88 },
89 spec: IngressRouteTCPSpec {
90 entry_points: Some(entry_points),
91 routes,
92 tls: Some(IngressRouteTCPTls {
93 passthrough: Some(true),
94 ..IngressRouteTCPTls::default()
95 }),
96 },
97 }
98}
99
100fn generate_middlewares(
103 coredb_name: &str,
104 namespace: &str,
105 oref: OwnerReference,
106 middlewares: Vec<Middleware>,
107) -> Vec<MiddleWareWrapper> {
108 let mut traefik_middlwares: Vec<MiddleWareWrapper> = Vec::new();
109 let mut labels: BTreeMap<String, String> = BTreeMap::new();
110 labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
111 labels.insert("coredb.io/name".to_owned(), coredb_name.to_string());
112
113 for mw in middlewares {
114 let traefik_mw = match mw {
115 Middleware::CustomRequestHeaders(mw) => {
116 let mw_name = format!("{}-{}", coredb_name, mw.name);
117 let mwh = MiddlewareHeaders {
118 custom_request_headers: Some(mw.config),
119 ..MiddlewareHeaders::default()
120 };
121 let tmw = TraefikMiddleware {
122 metadata: ObjectMeta {
123 name: Some(mw_name.clone()),
124 namespace: Some(namespace.to_owned()),
125 owner_references: Some(vec![oref.clone()]),
126 labels: Some(labels.clone()),
127 ..ObjectMeta::default()
128 },
129 spec: MiddlewareSpec {
130 headers: Some(mwh),
131 ..MiddlewareSpec::default()
132 },
133 };
134 MiddleWareWrapper {
135 name: mw_name,
136 mw: tmw,
137 }
138 }
139 Middleware::StripPrefix(mw) => {
140 let mw_name = format!("{}-{}", coredb_name, mw.name);
141 let mwsp = MiddlewareStripPrefix {
142 force_slash: None,
143 prefixes: Some(mw.config),
144 };
145 let tmw = TraefikMiddleware {
146 metadata: ObjectMeta {
147 name: Some(mw_name.clone()),
148 namespace: Some(namespace.to_owned()),
149 owner_references: Some(vec![oref.clone()]),
150 labels: Some(labels.clone()),
151 ..ObjectMeta::default()
152 },
153 spec: MiddlewareSpec {
154 strip_prefix: Some(mwsp),
155 ..MiddlewareSpec::default()
156 },
157 };
158 MiddleWareWrapper {
159 name: mw_name,
160 mw: tmw,
161 }
162 }
163 Middleware::ReplacePathRegex(mw) => {
164 let mw_name = format!("{}-{}", coredb_name, mw.name);
165 let mwrpr = MiddlewareReplacePathRegex {
166 regex: Some(mw.config.regex),
167 replacement: Some(mw.config.replacement),
168 };
169 let tmw = TraefikMiddleware {
170 metadata: ObjectMeta {
171 name: Some(mw_name.clone()),
172 namespace: Some(namespace.to_owned()),
173 owner_references: Some(vec![oref.clone()]),
174 labels: Some(labels.clone()),
175 ..ObjectMeta::default()
176 },
177 spec: MiddlewareSpec {
178 replace_path_regex: Some(mwrpr),
179 ..MiddlewareSpec::default()
180 },
181 };
182 MiddleWareWrapper {
183 name: mw_name,
184 mw: tmw,
185 }
186 }
187 };
188 traefik_middlwares.push(traefik_mw);
189 }
190 traefik_middlwares
191}
192
193pub fn generate_ingress_routes(
196 appsvc: &AppService,
197 resource_name: &str,
198 namespace: &str,
199 host_matcher: String,
200 coredb_name: &str,
201) -> Option<Vec<IngressRouteRoutes>> {
202 match appsvc.routing.clone() {
203 Some(routings) => {
204 let mut routes: Vec<IngressRouteRoutes> = Vec::new();
205 for route in routings.iter() {
206 match route.ingress_path.clone() {
207 Some(path) => {
208 if route.ingress_type.clone()?.eq(&IngressType::tcp) {
209 debug!("Skipping IngressRouteRoutes for TCP ingress type");
211 continue;
212 }
213
214 let matcher = format!("{host_matcher} && PathPrefix(`{}`)", path);
215 let middlewares: Option<Vec<IngressRouteRoutesMiddlewares>> =
216 route.middlewares.clone().map(|names| {
217 names
218 .into_iter()
219 .map(|m| IngressRouteRoutesMiddlewares {
220 name: format!("{}-{}", &coredb_name, m),
221 namespace: Some(namespace.to_owned()),
222 })
223 .collect()
224 });
225 let route = IngressRouteRoutes {
226 kind: IngressRouteRoutesKind::Rule,
227 r#match: matcher.clone(),
228 services: Some(vec![IngressRouteRoutesServices {
229 name: resource_name.to_string(),
230 port: Some(IntOrString::Int(route.port as i32)),
231 namespace: None,
235 kind: Some(IngressRouteRoutesServicesKind::Service),
236 ..IngressRouteRoutesServices::default()
237 }]),
238 middlewares,
239 priority: None,
240 syntax: None,
241 };
242 routes.push(route);
243 }
244 None => {
245 continue;
247 }
248 }
249 }
250 Some(routes)
251 }
252 None => None,
253 }
254}
255
256pub fn generate_ingress_tcp_routes(
257 appsvc: &AppService,
258 resource_name: &str,
259 namespace: &str,
260 host_matcher_tcp: String,
261 coredb_name: &str,
262) -> Option<Vec<IngressRouteTCPRoutes>> {
263 match appsvc.routing.clone() {
264 Some(routings) => {
265 let mut routes: Vec<IngressRouteTCPRoutes> = Vec::new();
266 for route in routings.iter() {
267 match route.ingress_path.clone() {
268 Some(_path) => {
269 if !route.ingress_type.clone()?.eq(&IngressType::tcp) {
270 debug!("Skipping IngressRouteTCPRoutes for non-TCP ingress type");
272 continue;
273 }
274
275 let middlewares: Option<Vec<IngressRouteTCPRoutesMiddlewares>> =
276 route.middlewares.clone().map(|names| {
277 names
278 .into_iter()
279 .map(|m| IngressRouteTCPRoutesMiddlewares {
280 name: format!("{}-{}", &coredb_name, m),
281 namespace: Some(namespace.to_owned()),
282 })
283 .collect()
284 });
285 let route = IngressRouteTCPRoutes {
286 r#match: host_matcher_tcp.clone(),
287 services: Some(vec![IngressRouteTCPRoutesServices {
288 name: resource_name.to_string(),
289 port: IntOrString::Int(route.port as i32),
290 namespace: None,
291 ..IngressRouteTCPRoutesServices::default()
292 }]),
293 middlewares,
294 priority: None,
295 syntax: None,
296 };
297 routes.push(route);
298 }
299 None => {
300 continue;
302 }
303 }
304 }
305 Some(routes)
306 }
307 None => None,
308 }
309}
310
311pub async fn reconcile_ingress(
312 client: Client,
313 coredb_name: &str,
314 ns: &str,
315 oref: OwnerReference,
316 desired_routes: Vec<IngressRouteRoutes>,
317 desired_middlewares: Vec<Middleware>,
318 entry_points: Vec<String>,
319) -> Result<(), kube::Error> {
320 let ingress_api: Api<IngressRoute> = Api::namespaced(client.clone(), ns);
321
322 let middleware_api: Api<TraefikMiddleware> = Api::namespaced(client.clone(), ns);
323 let desired_middlewares =
324 generate_middlewares(coredb_name, ns, oref.clone(), desired_middlewares);
325 let actual_mw_names = get_middlewares(client.clone(), ns, coredb_name).await?;
326 let desired_mw_names = desired_middlewares
327 .iter()
328 .map(|mw| mw.name.clone())
329 .collect::<Vec<String>>();
330 if let Some(to_delete) = to_delete(desired_mw_names, actual_mw_names) {
331 for d in to_delete {
332 match middleware_api.delete(&d, &Default::default()).await {
333 Ok(_) => {
334 debug!("ns: {}, successfully deleted Middleware: {}", ns, d);
335 }
336 Err(e) => {
337 error!(
338 "ns: {}, Failed to delete Middleware: {}, error: {}",
339 ns, d, e
340 );
341 }
342 }
343 }
344 }
345 for desired_mw in desired_middlewares {
346 match apply_middleware(middleware_api.clone(), &desired_mw.name, &desired_mw.mw).await {
347 Ok(_) => {
348 debug!(
349 "ns: {}, successfully applied Middleware: {}",
350 ns, desired_mw.name
351 );
352 }
353 Err(e) => {
354 error!(
355 "ns: {}, Failed to apply Middleware: {}, error: {}",
356 ns, desired_mw.name, e
357 );
358 }
359 }
360 }
361
362 let ingress = generate_ingress(coredb_name, ns, oref, desired_routes.clone(), entry_points);
363 if desired_routes.is_empty() {
364 let lp = ListParams::default().labels("component=appService");
366 let ingress_routes = ingress_api.list(&lp).await?;
368 if let Some(ingress_route) = ingress_routes.into_iter().next() {
369 match ingress_api
370 .delete(&ingress_route.metadata.name.unwrap(), &Default::default())
371 .await
372 {
373 Ok(_) => {
374 debug!(
375 "ns: {}, successfully deleted IngressRoute: {}",
376 ns, coredb_name
377 );
378 return Ok(());
379 }
380 Err(e) => {
381 error!(
382 "ns: {}, Failed to delete IngressRoute: {}, error: {}",
383 ns, coredb_name, e
384 );
385 return Err(e);
386 }
387 }
388 }
389 return Ok(());
390 }
391 match apply_ingress_route(ingress_api, coredb_name, &ingress).await {
392 Ok(_) => {
393 debug!("Updated/applied IngressRoute for {}.{}", ns, coredb_name,);
394 Ok(())
395 }
396 Err(e) => {
397 error!(
398 "Failed to update/apply IngressRoute {}.{}: {}",
399 ns, coredb_name, e
400 );
401 Err(e)
402 }
403 }
404}
405
406pub async fn reconcile_ingress_tcp(
407 client: Client,
408 coredb_name: &str,
409 ns: &str,
410 oref: OwnerReference,
411 desired_routes: Vec<IngressRouteTCPRoutes>,
412 desired_middlewares: Vec<Middleware>,
414 entry_points_tcp: Vec<String>,
415 app_name: &str,
416) -> Result<(), kube::Error> {
417 let ingress_api: Api<IngressRouteTCP> = Api::namespaced(client.clone(), ns);
418 let name = format!("{}-{}", coredb_name, app_name);
419
420 let middleware_api: Api<TraefikMiddleware> = Api::namespaced(client.clone(), ns);
421 let desired_middlewares = generate_middlewares(&name, ns, oref.clone(), desired_middlewares);
422 let actual_mw_names = get_middlewares(client.clone(), ns, &name).await?;
423 let desired_mw_names = desired_middlewares
424 .iter()
425 .map(|mw| mw.name.clone())
426 .collect::<Vec<String>>();
427 if let Some(to_delete) = to_delete(desired_mw_names, actual_mw_names) {
428 for d in to_delete {
429 match middleware_api.delete(&d, &Default::default()).await {
430 Ok(_) => {
431 debug!("ns: {}, successfully deleted Middleware: {}", ns, d);
432 }
433 Err(e) => {
434 error!(
435 "ns: {}, Failed to delete Middleware: {}, error: {}",
436 ns, d, e
437 );
438 }
439 }
440 }
441 }
442 for desired_mw in desired_middlewares {
443 match apply_middleware(middleware_api.clone(), &desired_mw.name, &desired_mw.mw).await {
444 Ok(_) => {
445 debug!(
446 "ns: {}, successfully applied Middleware: {}",
447 ns, desired_mw.name
448 );
449 }
450 Err(e) => {
451 error!(
452 "ns: {}, Failed to apply Middleware: {}, error: {}",
453 ns, desired_mw.name, e
454 );
455 }
456 }
457 }
458
459 if desired_routes.is_empty() {
460 let lp = ListParams::default().labels("component=appService");
462 let ingress_tcp_routes = ingress_api.list(&lp).await?;
464 if let Some(ingress_tcp_route) = ingress_tcp_routes.into_iter().next() {
465 match ingress_api
466 .delete(
467 &ingress_tcp_route.metadata.name.unwrap(),
468 &Default::default(),
469 )
470 .await
471 {
472 Ok(_) => {
473 debug!(
474 "ns: {}, successfully deleted IngressRouteTCP: {}",
475 ns, &name
476 );
477 return Ok(());
478 }
479 Err(e) => {
480 error!(
481 "ns: {}, Failed to delete IngressRouteTCP: {}, error: {}",
482 ns, &name, e
483 );
484 return Err(e);
485 }
486 }
487 }
488 return Ok(());
489 }
490
491 if desired_routes.iter().any(|r| {
494 r.services
495 .as_ref()
496 .unwrap_or(&vec![])
497 .iter()
498 .any(|s| s.name == name)
499 }) {
500 let ingress_tcp =
501 generate_ingress_tcp(&name, ns, oref, desired_routes.clone(), entry_points_tcp);
502 match apply_ingress_route_tcp(ingress_api, &name, &ingress_tcp).await {
503 Ok(_) => {
504 debug!("Updated/applied IngressRouteTCP for {}.{}", ns, &name,);
505 Ok(())
506 }
507 Err(e) => {
508 error!(
509 "Failed to update/apply IngressRouteTCP {}.{}: {}",
510 ns, &name, e
511 );
512 Err(e)
513 }
514 }
515 } else {
516 debug!("No IngressRouteTCP routes match the app name: {}", app_name);
517 Ok(())
518 }
519}
520
521async fn apply_middleware(
522 mw_api: Api<TraefikMiddleware>,
523 mw_name: &str,
524 mw: &TraefikMiddleware,
525) -> Result<TraefikMiddleware, kube::Error> {
526 let patch_parameters = PatchParams::apply("cntrlr").force();
527 mw_api
528 .patch(mw_name, &patch_parameters, &Patch::Apply(&mw))
529 .await
530}
531
532async fn apply_ingress_route(
533 ingress_api: Api<IngressRoute>,
534 ingress_name: &str,
535 ingress_route: &IngressRoute,
536) -> Result<IngressRoute, kube::Error> {
537 let patch_parameters = PatchParams::apply("cntrlr").force();
538 ingress_api
539 .patch(
540 ingress_name,
541 &patch_parameters,
542 &Patch::Apply(&ingress_route),
543 )
544 .await
545}
546
547async fn apply_ingress_route_tcp(
548 ingress_api: Api<IngressRouteTCP>,
549 ingress_name: &str,
550 ingress_route_tcp: &IngressRouteTCP,
551) -> Result<IngressRouteTCP, kube::Error> {
552 let patch_parameters = PatchParams::apply("cntrlr").force();
553 ingress_api
554 .patch(
555 ingress_name,
556 &patch_parameters,
557 &Patch::Apply(&ingress_route_tcp),
558 )
559 .await
560}
561
562async fn get_middlewares(
563 client: Client,
564 namespace: &str,
565 coredb_name: &str,
566) -> Result<Vec<String>, kube::Error> {
567 let label_selector = format!(
568 "component={},coredb.io/name={}",
569 COMPONENT_NAME, coredb_name
570 );
571 let deployent_api: Api<TraefikMiddleware> = Api::namespaced(client, namespace);
572 let lp = ListParams::default().labels(&label_selector).timeout(10);
573 let deployments = deployent_api.list(&lp).await?;
574 Ok(deployments
575 .items
576 .iter()
577 .map(|d| d.metadata.name.to_owned().expect("no name on resource"))
578 .collect())
579}