controller/app_service/
ingress.rs

1use crate::{
2    ingress_route_crd::{
3        IngressRoute, IngressRouteRoutes, IngressRouteRoutesKind, IngressRouteRoutesMiddlewares,
4        IngressRouteRoutesServices, IngressRouteRoutesServicesKind, IngressRouteSpec,
5        IngressRouteTls,
6    },
7    traefik::middlewares_crd::{
8        Middleware as TraefikMiddleware, MiddlewareHeaders, MiddlewareReplacePathRegex,
9        MiddlewareSpec, MiddlewareStripPrefix,
10    },
11    Result,
12};
13use k8s_openapi::apimachinery::pkg::{apis::meta::v1::OwnerReference, util::intstr::IntOrString};
14use kube::{
15    api::{Api, ListParams, ObjectMeta, Patch, PatchParams},
16    Client,
17};
18
19use std::collections::BTreeMap;
20
21use tracing::{debug, error};
22
23use super::{
24    manager::to_delete,
25    types::{AppService, Middleware, COMPONENT_NAME},
26};
27
28use crate::traefik::ingress_route_tcp_crd::{
29    IngressRouteTCP, IngressRouteTCPRoutes, IngressRouteTCPRoutesMiddlewares,
30    IngressRouteTCPRoutesServices, IngressRouteTCPSpec, IngressRouteTCPTls,
31};
32
33use crate::app_service::types::IngressType;
34
35#[derive(Clone, Debug)]
36pub struct MiddleWareWrapper {
37    pub name: String,
38    pub mw: TraefikMiddleware,
39}
40
41fn generate_ingress(
42    coredb_name: &str,
43    namespace: &str,
44    oref: OwnerReference,
45    routes: Vec<IngressRouteRoutes>,
46    entry_points: Vec<String>,
47) -> IngressRoute {
48    let mut labels: BTreeMap<String, String> = BTreeMap::new();
49    labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
50    labels.insert("coredb.io/name".to_owned(), coredb_name.to_string());
51
52    IngressRoute {
53        metadata: ObjectMeta {
54            // using coredb name, since we'll have 1x ingress per coredb
55            name: Some(coredb_name.to_owned()),
56            namespace: Some(namespace.to_owned()),
57            owner_references: Some(vec![oref]),
58            labels: Some(labels.clone()),
59            ..ObjectMeta::default()
60        },
61        spec: IngressRouteSpec {
62            entry_points: Some(entry_points),
63            routes,
64            tls: Some(IngressRouteTls::default()),
65        },
66    }
67}
68
69fn generate_ingress_tcp(
70    name: &str,
71    namespace: &str,
72    oref: OwnerReference,
73    routes: Vec<IngressRouteTCPRoutes>,
74    entry_points: Vec<String>,
75) -> IngressRouteTCP {
76    let mut labels: BTreeMap<String, String> = BTreeMap::new();
77    labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
78    labels.insert("coredb.io/name".to_owned(), name.to_string());
79
80    IngressRouteTCP {
81        metadata: ObjectMeta {
82            // using coredb name, since we'll have 1x ingress per coredb
83            name: Some(name.to_string()),
84            namespace: Some(namespace.to_owned()),
85            owner_references: Some(vec![oref]),
86            labels: Some(labels.clone()),
87            ..ObjectMeta::default()
88        },
89        spec: IngressRouteTCPSpec {
90            entry_points: Some(entry_points),
91            routes,
92            tls: Some(IngressRouteTCPTls {
93                passthrough: Some(true),
94                ..IngressRouteTCPTls::default()
95            }),
96        },
97    }
98}
99
100// creates traefik middleware objects
101// named `<coredb-name>-<specified-middleware-name>`
102fn generate_middlewares(
103    coredb_name: &str,
104    namespace: &str,
105    oref: OwnerReference,
106    middlewares: Vec<Middleware>,
107) -> Vec<MiddleWareWrapper> {
108    let mut traefik_middlwares: Vec<MiddleWareWrapper> = Vec::new();
109    let mut labels: BTreeMap<String, String> = BTreeMap::new();
110    labels.insert("component".to_owned(), COMPONENT_NAME.to_string());
111    labels.insert("coredb.io/name".to_owned(), coredb_name.to_string());
112
113    for mw in middlewares {
114        let traefik_mw = match mw {
115            Middleware::CustomRequestHeaders(mw) => {
116                let mw_name = format!("{}-{}", coredb_name, mw.name);
117                let mwh = MiddlewareHeaders {
118                    custom_request_headers: Some(mw.config),
119                    ..MiddlewareHeaders::default()
120                };
121                let tmw = TraefikMiddleware {
122                    metadata: ObjectMeta {
123                        name: Some(mw_name.clone()),
124                        namespace: Some(namespace.to_owned()),
125                        owner_references: Some(vec![oref.clone()]),
126                        labels: Some(labels.clone()),
127                        ..ObjectMeta::default()
128                    },
129                    spec: MiddlewareSpec {
130                        headers: Some(mwh),
131                        ..MiddlewareSpec::default()
132                    },
133                };
134                MiddleWareWrapper {
135                    name: mw_name,
136                    mw: tmw,
137                }
138            }
139            Middleware::StripPrefix(mw) => {
140                let mw_name = format!("{}-{}", coredb_name, mw.name);
141                let mwsp = MiddlewareStripPrefix {
142                    force_slash: None,
143                    prefixes: Some(mw.config),
144                };
145                let tmw = TraefikMiddleware {
146                    metadata: ObjectMeta {
147                        name: Some(mw_name.clone()),
148                        namespace: Some(namespace.to_owned()),
149                        owner_references: Some(vec![oref.clone()]),
150                        labels: Some(labels.clone()),
151                        ..ObjectMeta::default()
152                    },
153                    spec: MiddlewareSpec {
154                        strip_prefix: Some(mwsp),
155                        ..MiddlewareSpec::default()
156                    },
157                };
158                MiddleWareWrapper {
159                    name: mw_name,
160                    mw: tmw,
161                }
162            }
163            Middleware::ReplacePathRegex(mw) => {
164                let mw_name = format!("{}-{}", coredb_name, mw.name);
165                let mwrpr = MiddlewareReplacePathRegex {
166                    regex: Some(mw.config.regex),
167                    replacement: Some(mw.config.replacement),
168                };
169                let tmw = TraefikMiddleware {
170                    metadata: ObjectMeta {
171                        name: Some(mw_name.clone()),
172                        namespace: Some(namespace.to_owned()),
173                        owner_references: Some(vec![oref.clone()]),
174                        labels: Some(labels.clone()),
175                        ..ObjectMeta::default()
176                    },
177                    spec: MiddlewareSpec {
178                        replace_path_regex: Some(mwrpr),
179                        ..MiddlewareSpec::default()
180                    },
181                };
182                MiddleWareWrapper {
183                    name: mw_name,
184                    mw: tmw,
185                }
186            }
187        };
188        traefik_middlwares.push(traefik_mw);
189    }
190    traefik_middlwares
191}
192
193// generates Kubernetes IngressRoute template for an appService
194// maps the specified
195pub fn generate_ingress_routes(
196    appsvc: &AppService,
197    resource_name: &str,
198    namespace: &str,
199    host_matcher: String,
200    coredb_name: &str,
201) -> Option<Vec<IngressRouteRoutes>> {
202    match appsvc.routing.clone() {
203        Some(routings) => {
204            let mut routes: Vec<IngressRouteRoutes> = Vec::new();
205            for route in routings.iter() {
206                match route.ingress_path.clone() {
207                    Some(path) => {
208                        if route.ingress_type.clone()?.eq(&IngressType::tcp) {
209                            // Do not create IngressRouteRoutes for TCP ingress type
210                            debug!("Skipping IngressRouteRoutes for TCP ingress type");
211                            continue;
212                        }
213
214                        let matcher = format!("{host_matcher} && PathPrefix(`{}`)", path);
215                        let middlewares: Option<Vec<IngressRouteRoutesMiddlewares>> =
216                            route.middlewares.clone().map(|names| {
217                                names
218                                    .into_iter()
219                                    .map(|m| IngressRouteRoutesMiddlewares {
220                                        name: format!("{}-{}", &coredb_name, m),
221                                        namespace: Some(namespace.to_owned()),
222                                    })
223                                    .collect()
224                            });
225                        let route = IngressRouteRoutes {
226                            kind: IngressRouteRoutesKind::Rule,
227                            r#match: matcher.clone(),
228                            services: Some(vec![IngressRouteRoutesServices {
229                                name: resource_name.to_string(),
230                                port: Some(IntOrString::Int(route.port as i32)),
231                                // namespace attribute is NOT a kubernetes namespace
232                                // it is the Traefik provider namespace: https://doc.traefik.io/traefik/v3.0/providers/overview/#provider-namespace
233                                // https://doc.traefik.io/traefik/v3.0/routing/providers/kubernetes-crd/#kind-middleware
234                                namespace: None,
235                                kind: Some(IngressRouteRoutesServicesKind::Service),
236                                ..IngressRouteRoutesServices::default()
237                            }]),
238                            middlewares,
239                            priority: None,
240                            syntax: None,
241                        };
242                        routes.push(route);
243                    }
244                    None => {
245                        // do not create ingress when there is no path provided
246                        continue;
247                    }
248                }
249            }
250            Some(routes)
251        }
252        None => None,
253    }
254}
255
256pub fn generate_ingress_tcp_routes(
257    appsvc: &AppService,
258    resource_name: &str,
259    namespace: &str,
260    host_matcher_tcp: String,
261    coredb_name: &str,
262) -> Option<Vec<IngressRouteTCPRoutes>> {
263    match appsvc.routing.clone() {
264        Some(routings) => {
265            let mut routes: Vec<IngressRouteTCPRoutes> = Vec::new();
266            for route in routings.iter() {
267                match route.ingress_path.clone() {
268                    Some(_path) => {
269                        if !route.ingress_type.clone()?.eq(&IngressType::tcp) {
270                            // Do not create IngressRouteTCPRoutes for non-TCP ingress type
271                            debug!("Skipping IngressRouteTCPRoutes for non-TCP ingress type");
272                            continue;
273                        }
274
275                        let middlewares: Option<Vec<IngressRouteTCPRoutesMiddlewares>> =
276                            route.middlewares.clone().map(|names| {
277                                names
278                                    .into_iter()
279                                    .map(|m| IngressRouteTCPRoutesMiddlewares {
280                                        name: format!("{}-{}", &coredb_name, m),
281                                        namespace: Some(namespace.to_owned()),
282                                    })
283                                    .collect()
284                            });
285                        let route = IngressRouteTCPRoutes {
286                            r#match: host_matcher_tcp.clone(),
287                            services: Some(vec![IngressRouteTCPRoutesServices {
288                                name: resource_name.to_string(),
289                                port: IntOrString::Int(route.port as i32),
290                                namespace: None,
291                                ..IngressRouteTCPRoutesServices::default()
292                            }]),
293                            middlewares,
294                            priority: None,
295                            syntax: None,
296                        };
297                        routes.push(route);
298                    }
299                    None => {
300                        // do not create ingress when there is no path provided
301                        continue;
302                    }
303                }
304            }
305            Some(routes)
306        }
307        None => None,
308    }
309}
310
311pub async fn reconcile_ingress(
312    client: Client,
313    coredb_name: &str,
314    ns: &str,
315    oref: OwnerReference,
316    desired_routes: Vec<IngressRouteRoutes>,
317    desired_middlewares: Vec<Middleware>,
318    entry_points: Vec<String>,
319) -> Result<(), kube::Error> {
320    let ingress_api: Api<IngressRoute> = Api::namespaced(client.clone(), ns);
321
322    let middleware_api: Api<TraefikMiddleware> = Api::namespaced(client.clone(), ns);
323    let desired_middlewares =
324        generate_middlewares(coredb_name, ns, oref.clone(), desired_middlewares);
325    let actual_mw_names = get_middlewares(client.clone(), ns, coredb_name).await?;
326    let desired_mw_names = desired_middlewares
327        .iter()
328        .map(|mw| mw.name.clone())
329        .collect::<Vec<String>>();
330    if let Some(to_delete) = to_delete(desired_mw_names, actual_mw_names) {
331        for d in to_delete {
332            match middleware_api.delete(&d, &Default::default()).await {
333                Ok(_) => {
334                    debug!("ns: {}, successfully deleted Middleware: {}", ns, d);
335                }
336                Err(e) => {
337                    error!(
338                        "ns: {}, Failed to delete Middleware: {}, error: {}",
339                        ns, d, e
340                    );
341                }
342            }
343        }
344    }
345    for desired_mw in desired_middlewares {
346        match apply_middleware(middleware_api.clone(), &desired_mw.name, &desired_mw.mw).await {
347            Ok(_) => {
348                debug!(
349                    "ns: {}, successfully applied Middleware: {}",
350                    ns, desired_mw.name
351                );
352            }
353            Err(e) => {
354                error!(
355                    "ns: {}, Failed to apply Middleware: {}, error: {}",
356                    ns, desired_mw.name, e
357                );
358            }
359        }
360    }
361
362    let ingress = generate_ingress(coredb_name, ns, oref, desired_routes.clone(), entry_points);
363    if desired_routes.is_empty() {
364        // we don't need an IngressRoute when there are no routes
365        let lp = ListParams::default().labels("component=appService");
366        // Check if there are any IngressRoute objects with the label component=appService and delete them
367        let ingress_routes = ingress_api.list(&lp).await?;
368        if let Some(ingress_route) = ingress_routes.into_iter().next() {
369            match ingress_api
370                .delete(&ingress_route.metadata.name.unwrap(), &Default::default())
371                .await
372            {
373                Ok(_) => {
374                    debug!(
375                        "ns: {}, successfully deleted IngressRoute: {}",
376                        ns, coredb_name
377                    );
378                    return Ok(());
379                }
380                Err(e) => {
381                    error!(
382                        "ns: {}, Failed to delete IngressRoute: {}, error: {}",
383                        ns, coredb_name, e
384                    );
385                    return Err(e);
386                }
387            }
388        }
389        return Ok(());
390    }
391    match apply_ingress_route(ingress_api, coredb_name, &ingress).await {
392        Ok(_) => {
393            debug!("Updated/applied IngressRoute for {}.{}", ns, coredb_name,);
394            Ok(())
395        }
396        Err(e) => {
397            error!(
398                "Failed to update/apply IngressRoute {}.{}: {}",
399                ns, coredb_name, e
400            );
401            Err(e)
402        }
403    }
404}
405
406pub async fn reconcile_ingress_tcp(
407    client: Client,
408    coredb_name: &str,
409    ns: &str,
410    oref: OwnerReference,
411    desired_routes: Vec<IngressRouteTCPRoutes>,
412    // TODO: this should be a MiddlewareTCP
413    desired_middlewares: Vec<Middleware>,
414    entry_points_tcp: Vec<String>,
415    app_name: &str,
416) -> Result<(), kube::Error> {
417    let ingress_api: Api<IngressRouteTCP> = Api::namespaced(client.clone(), ns);
418    let name = format!("{}-{}", coredb_name, app_name);
419
420    let middleware_api: Api<TraefikMiddleware> = Api::namespaced(client.clone(), ns);
421    let desired_middlewares = generate_middlewares(&name, ns, oref.clone(), desired_middlewares);
422    let actual_mw_names = get_middlewares(client.clone(), ns, &name).await?;
423    let desired_mw_names = desired_middlewares
424        .iter()
425        .map(|mw| mw.name.clone())
426        .collect::<Vec<String>>();
427    if let Some(to_delete) = to_delete(desired_mw_names, actual_mw_names) {
428        for d in to_delete {
429            match middleware_api.delete(&d, &Default::default()).await {
430                Ok(_) => {
431                    debug!("ns: {}, successfully deleted Middleware: {}", ns, d);
432                }
433                Err(e) => {
434                    error!(
435                        "ns: {}, Failed to delete Middleware: {}, error: {}",
436                        ns, d, e
437                    );
438                }
439            }
440        }
441    }
442    for desired_mw in desired_middlewares {
443        match apply_middleware(middleware_api.clone(), &desired_mw.name, &desired_mw.mw).await {
444            Ok(_) => {
445                debug!(
446                    "ns: {}, successfully applied Middleware: {}",
447                    ns, desired_mw.name
448                );
449            }
450            Err(e) => {
451                error!(
452                    "ns: {}, Failed to apply Middleware: {}, error: {}",
453                    ns, desired_mw.name, e
454                );
455            }
456        }
457    }
458
459    if desired_routes.is_empty() {
460        // we don't need an IngressRouteTCP when there are no routes
461        let lp = ListParams::default().labels("component=appService");
462        // Check if there are any IngressRouteTCP objects with the label component=appService and delete them
463        let ingress_tcp_routes = ingress_api.list(&lp).await?;
464        if let Some(ingress_tcp_route) = ingress_tcp_routes.into_iter().next() {
465            match ingress_api
466                .delete(
467                    &ingress_tcp_route.metadata.name.unwrap(),
468                    &Default::default(),
469                )
470                .await
471            {
472                Ok(_) => {
473                    debug!(
474                        "ns: {}, successfully deleted IngressRouteTCP: {}",
475                        ns, &name
476                    );
477                    return Ok(());
478                }
479                Err(e) => {
480                    error!(
481                        "ns: {}, Failed to delete IngressRouteTCP: {}, error: {}",
482                        ns, &name, e
483                    );
484                    return Err(e);
485                }
486            }
487        }
488        return Ok(());
489    }
490
491    // Only create IngressRouteTCP if there are routes for apps that need them. Check if the app
492    // name is in the desired_routes.
493    if desired_routes.iter().any(|r| {
494        r.services
495            .as_ref()
496            .unwrap_or(&vec![])
497            .iter()
498            .any(|s| s.name == name)
499    }) {
500        let ingress_tcp =
501            generate_ingress_tcp(&name, ns, oref, desired_routes.clone(), entry_points_tcp);
502        match apply_ingress_route_tcp(ingress_api, &name, &ingress_tcp).await {
503            Ok(_) => {
504                debug!("Updated/applied IngressRouteTCP for {}.{}", ns, &name,);
505                Ok(())
506            }
507            Err(e) => {
508                error!(
509                    "Failed to update/apply IngressRouteTCP {}.{}: {}",
510                    ns, &name, e
511                );
512                Err(e)
513            }
514        }
515    } else {
516        debug!("No IngressRouteTCP routes match the app name: {}", app_name);
517        Ok(())
518    }
519}
520
521async fn apply_middleware(
522    mw_api: Api<TraefikMiddleware>,
523    mw_name: &str,
524    mw: &TraefikMiddleware,
525) -> Result<TraefikMiddleware, kube::Error> {
526    let patch_parameters = PatchParams::apply("cntrlr").force();
527    mw_api
528        .patch(mw_name, &patch_parameters, &Patch::Apply(&mw))
529        .await
530}
531
532async fn apply_ingress_route(
533    ingress_api: Api<IngressRoute>,
534    ingress_name: &str,
535    ingress_route: &IngressRoute,
536) -> Result<IngressRoute, kube::Error> {
537    let patch_parameters = PatchParams::apply("cntrlr").force();
538    ingress_api
539        .patch(
540            ingress_name,
541            &patch_parameters,
542            &Patch::Apply(&ingress_route),
543        )
544        .await
545}
546
547async fn apply_ingress_route_tcp(
548    ingress_api: Api<IngressRouteTCP>,
549    ingress_name: &str,
550    ingress_route_tcp: &IngressRouteTCP,
551) -> Result<IngressRouteTCP, kube::Error> {
552    let patch_parameters = PatchParams::apply("cntrlr").force();
553    ingress_api
554        .patch(
555            ingress_name,
556            &patch_parameters,
557            &Patch::Apply(&ingress_route_tcp),
558        )
559        .await
560}
561
562async fn get_middlewares(
563    client: Client,
564    namespace: &str,
565    coredb_name: &str,
566) -> Result<Vec<String>, kube::Error> {
567    let label_selector = format!(
568        "component={},coredb.io/name={}",
569        COMPONENT_NAME, coredb_name
570    );
571    let deployent_api: Api<TraefikMiddleware> = Api::namespaced(client, namespace);
572    let lp = ListParams::default().labels(&label_selector).timeout(10);
573    let deployments = deployent_api.list(&lp).await?;
574    Ok(deployments
575        .items
576        .iter()
577        .map(|d| d.metadata.name.to_owned().expect("no name on resource"))
578        .collect())
579}