controller/extensions/
install.rs

1use crate::{
2    apis::coredb_types::CoreDB,
3    cloudnativepg::cnpg::{get_fenced_pods, unfence_pod},
4    extensions::{
5        kubernetes_queries::{add_trunk_install_to_status, remove_trunk_installs_from_status},
6        types::{TrunkInstall, TrunkInstallStatus},
7    },
8    trunk::get_latest_trunk_project_version,
9    Context,
10};
11use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};
12use kube::{runtime::controller::Action, Api, ResourceExt};
13use std::{collections::HashSet, sync::Arc, time::Duration};
14use tracing::{debug, error, info, instrument, warn};
15
16use crate::apis::coredb_types::CoreDBStatus;
17
18// Syncroniously merge and deduplicate pods
19#[instrument(skip(non_fenced_pods, fenced_names) fields(trace_id))]
20fn merge_and_deduplicate_pods(
21    non_fenced_pods: Vec<Pod>,
22    fenced_names: Option<Vec<String>>,
23) -> Vec<Pod> {
24    let mut all_pods: Vec<Pod> = Vec::new();
25    let mut unique_pod_names: HashSet<String> = HashSet::new();
26
27    // Add non-fenced pods and update the HashSet with their names
28    for pod in non_fenced_pods {
29        if let Some(pod_name) = &pod.metadata.name {
30            if unique_pod_names.insert(pod_name.clone()) {
31                all_pods.push(pod);
32            }
33        }
34    }
35
36    // Add fenced pods and update the HashSet with their names
37    if let Some(fenced_names) = fenced_names {
38        for fenced_name in fenced_names {
39            if unique_pod_names.insert(fenced_name.clone()) {
40                let new_pod = Pod {
41                    metadata: ObjectMeta {
42                        name: Some(fenced_name),
43                        ..Default::default()
44                    },
45                    ..Default::default()
46                };
47                all_pods.push(new_pod);
48            }
49        }
50    }
51
52    all_pods
53}
54
55// Collect any fenced pods and add them to the list of pods to install extensions into
56#[instrument(skip(ctx, cdb) fields(trace_id))]
57pub async fn all_fenced_and_non_fenced_pods(
58    cdb: &CoreDB,
59    ctx: Arc<Context>,
60) -> Result<Vec<Pod>, Action> {
61    let name = cdb.name_any();
62
63    // Get fenced pods
64    let pods_fenced = get_fenced_pods(cdb, ctx.clone()).await?;
65
66    // Get all non-fenced pods
67    let non_fenced_pods = cdb.pods_by_cluster_ready_or_not(ctx.client.clone()).await?;
68
69    // Merge and deduplicate pod names
70    let all_pods = merge_and_deduplicate_pods(non_fenced_pods, pods_fenced);
71
72    debug!(
73        "After appending fenced instances for {}, pod count: {}",
74        &name,
75        all_pods.len()
76    );
77
78    Ok(all_pods)
79}
80
81/// Find all trunk installs to remove and return a list of strings
82#[instrument(skip(cdb) fields(trace_id))]
83fn find_trunk_installs_to_remove_from_status(cdb: &CoreDB) -> Vec<String> {
84    let name = cdb.name_any();
85    debug!(
86        "Checking for trunk installs to remove from status for {}",
87        &name
88    );
89
90    let mut trunk_installs_to_remove_from_status = Vec::new();
91
92    // Get extensions in status.trunk_install that are not in spec
93    // Deleting them from status allows for retrying installation
94    // by first removing the extension from the spec, then adding it back
95    match &cdb.status {
96        None => {
97            return trunk_installs_to_remove_from_status;
98        }
99        Some(status) => match &status.trunk_installs {
100            None => {
101                return trunk_installs_to_remove_from_status;
102            }
103            Some(trunk_installs) => {
104                for ext_status in trunk_installs {
105                    if !cdb
106                        .spec
107                        .trunk_installs
108                        .iter()
109                        .any(|ext| ext.name == ext_status.name)
110                    {
111                        trunk_installs_to_remove_from_status.push(ext_status.name.clone());
112                    }
113                }
114            }
115        },
116    };
117
118    trunk_installs_to_remove_from_status
119}
120
121/// Find all trunk installs to install on a pod and return a Vec of TrunkInstall
122/// This function also needs to define a lifetime, since we are only returning a reference to
123/// TrunkInstall, which is owned by CoreDB we only need to define a lifetime for CoreDB
124#[instrument(skip(cdb, pod_name) fields(trace_id))]
125pub fn find_trunk_installs_to_pod<'a>(cdb: &'a CoreDB, pod_name: &str) -> Vec<&'a TrunkInstall> {
126    debug!(
127        "Checking for trunk installs to install on pod {} for {}",
128        pod_name,
129        cdb.name_any()
130    );
131
132    if cdb.spec.uses_postgres_image() {
133        // Install all extensions on the Postgres image.
134        // XXX Do we need to exclude those already on the pod but in an error state?
135        return cdb.spec.trunk_installs.iter().collect();
136    }
137
138    let pod_name = pod_name.to_owned();
139    let mut trunk_installs_to_install = Vec::new();
140
141    // All TrunkInstallStatus in CDB spec
142    let trunk_install_statuses = cdb
143        .status
144        .as_ref()
145        .and_then(|status| status.trunk_installs.as_deref())
146        .unwrap_or_default();
147
148    // Get extensions in spec.trunk_install that are not in status.trunk_install
149    for ext in &cdb.spec.trunk_installs {
150        if !trunk_install_statuses.iter().any(|ext_status| {
151            ext.name == ext_status.name
152                && !ext_status.error
153                && ext_status
154                    .installed_to_pods
155                    .as_deref()
156                    .unwrap_or_default()
157                    .contains(&pod_name)
158        }) {
159            trunk_installs_to_install.push(ext);
160        }
161    }
162
163    trunk_installs_to_install
164}
165
166// is_pod_fenced function checks if a pod is fenced and returns a bool or requeue action
167#[instrument(skip(cdb, ctx, pod_name) fields(trace_id, pod_name))]
168async fn is_pod_fenced(cdb: &CoreDB, ctx: Arc<Context>, pod_name: &str) -> Result<bool, Action> {
169    let coredb_name = cdb.metadata.name.as_deref().unwrap_or_default();
170
171    debug!(
172        "Checking if pod {} is fenced for instance {}",
173        pod_name, coredb_name
174    );
175
176    let fenced_pods = get_fenced_pods(cdb, ctx.clone()).await?;
177
178    if let Some(fenced_pods) = fenced_pods {
179        // Check if pod_name is in fenced_pods
180        if fenced_pods.contains(&pod_name.to_string()) {
181            debug!("Instance {} pod {} is fenced", coredb_name, pod_name);
182            return Ok(true);
183        }
184    }
185
186    Ok(false)
187}
188
189#[instrument(skip(ctx, cdb))]
190pub async fn reconcile_trunk_installs(
191    cdb: &CoreDB,
192    ctx: Arc<Context>,
193) -> Result<Vec<TrunkInstallStatus>, Action> {
194    let instance_name = cdb.name_any();
195    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
196        error!(
197            "CoreDB namespace is empty for instance: {}.",
198            &instance_name
199        );
200        Action::requeue(tokio::time::Duration::from_secs(300))
201    })?;
202
203    debug!("Starting to reconcile trunk installs for {}", instance_name);
204
205    let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
206
207    // Get extensions in status.trunk_install that are not in spec
208    // Deleting them from status allows for retrying installation
209    // by first removing the extension from the spec, then adding it back
210    // Get trunk installs to remove from status
211    let trunk_installs_to_remove_from_status = find_trunk_installs_to_remove_from_status(cdb);
212
213    // Remove extensions from status
214    remove_trunk_installs_from_status(
215        &coredb_api,
216        &instance_name,
217        trunk_installs_to_remove_from_status,
218    )
219    .await?;
220
221    // Get extensions in spec.trunk_install that are not in status.trunk_install
222    let mut all_results = Vec::new();
223
224    // Get all fenced and non-fenced pods for this instance
225    let all_pods = all_fenced_and_non_fenced_pods(cdb, ctx.clone()).await?;
226
227    // Loop through all pods and install missing trunk installs
228    for pod in all_pods {
229        let pod_name = pod.metadata.name.expect("Pod should always have a name");
230
231        // Filter trunk installs that are not yet installed on this instance
232        let trunk_installs_to_pod = find_trunk_installs_to_pod(cdb, &pod_name);
233
234        if trunk_installs_to_pod.is_empty() {
235            debug!(
236                "Unfencing any pod that does not require trunk installs, pod {} for {}",
237                pod_name, instance_name
238            );
239            // Check if pod is fenced and if so unfence it otherwise continue
240            if is_pod_fenced(cdb, ctx.clone(), &pod_name).await? {
241                // Unfence pod_name
242                unfence_pod(cdb, ctx.clone(), &pod_name.clone()).await?;
243            }
244            continue;
245        }
246
247        // Install missing trunk installs
248        match install_extensions_to_pod(cdb, trunk_installs_to_pod, &ctx, pod_name.clone()).await {
249            Ok(result) => {
250                all_results = result;
251            }
252            Err(err) => return Err(err),
253        };
254    }
255
256    info!(
257        "Completed trunk install reconciliation for instance {}",
258        instance_name
259    );
260
261    // Check if all_results is empty, if so use status.trunk_installs to make sure we don't end up
262    // in a reconcile loop and re-install loop
263    if all_results.is_empty() {
264        debug!("No trunk installs to reconcile for {}", instance_name);
265        all_results = cdb
266            .status
267            .clone()
268            .unwrap_or_default()
269            .trunk_installs
270            .clone()
271            .unwrap_or_default();
272    }
273    Ok(all_results)
274}
275
276// initializing current_trunk_install_statuses from CoreDB status and return a Vec of TrunkInstallStatus
277#[instrument(skip(cdb, coredb_name) fields(trace_id))]
278fn initialize_trunk_install_statuses(cdb: &CoreDB, coredb_name: &str) -> Vec<TrunkInstallStatus> {
279    cdb.status
280        .clone()
281        .unwrap_or_else(|| {
282            debug!("No current status on {}, initializing default", coredb_name);
283            CoreDBStatus::default()
284        })
285        .trunk_installs
286        .unwrap_or_else(|| {
287            debug!(
288                "No current trunk installs on {}, initializing empty list",
289                coredb_name
290            );
291            vec![]
292        })
293}
294
295/// execute_extension_install_command function executes the trunk install command and returns a
296/// TrunkInstallStatus or bool
297#[instrument(skip(cdb, ctx, coredb_name, ext, pod_name) fields(trace_id))]
298async fn execute_extension_install_command(
299    cdb: &CoreDB,
300    ctx: Arc<Context>,
301    coredb_name: &str,
302    ext: &TrunkInstall,
303    pod_name: &str,
304) -> Result<TrunkInstallStatus, bool> {
305    let client = ctx.client.clone();
306
307    // Handle the case where version is None
308    let version = match &ext.version {
309        None => {
310            warn!(
311                "Version for extension {} not specified in {}, will fetch latest version",
312                ext.name, coredb_name
313            );
314
315            match get_latest_trunk_project_version(&ext.name).await {
316                Ok(latest_version) => latest_version,
317                Err(_) => {
318                    error!(
319                        "Failed to get latest version for extension {} in {}",
320                        ext.name, coredb_name
321                    );
322
323                    return Ok(TrunkInstallStatus {
324                        name: ext.name.clone(),
325                        version: None,
326                        error: true,
327                        loading: false,
328                        error_message: Some("Missing version".to_string()),
329                        installed_to_pods: Some(vec![pod_name.to_string()]),
330                    });
331                }
332            }
333        }
334        Some(version) => version.clone(),
335    };
336
337    let cmd = vec![
338        "trunk".to_owned(),
339        "install".to_owned(),
340        "-r https://registry.pgtrunk.io".to_owned(),
341        ext.name.clone(),
342        "--version".to_owned(),
343        version,
344        // "--pkglibdir".to_owned(),
345        // cdb.spec.module_dir(),
346    ];
347
348    let result = cdb.exec(pod_name.to_string(), client.clone(), &cmd).await;
349
350    // Check if the exec command was successful
351    // keep in mind that installed_to_pods can be merged with existing pods in the list where
352    // the extension was already installed
353    match result {
354        Ok(result) => {
355            let output = format!(
356                "{}\n{}",
357                result
358                    .stdout
359                    .unwrap_or_else(|| "Nothing in stdout".to_string()),
360                result
361                    .stderr
362                    .unwrap_or_else(|| "Nothing in stderr".to_string())
363            );
364
365            let trunk_install_status = if result.success {
366                info!(
367                    "Installed extension {} into {} for {}",
368                    &ext.name, pod_name, coredb_name
369                );
370                TrunkInstallStatus {
371                    name: ext.name.clone(),
372                    version: ext.version.clone(),
373                    error: false,
374                    loading: false,
375                    error_message: None,
376                    installed_to_pods: Some(vec![pod_name.to_string()]),
377                }
378            } else {
379                error!(
380                    "Failed to install extension {} into {}:\n{}",
381                    &ext.name, pod_name, output
382                );
383                TrunkInstallStatus {
384                    name: ext.name.clone(),
385                    version: ext.version.clone(),
386                    error: true,
387                    error_message: Some(output),
388                    loading: false,
389                    installed_to_pods: Some(vec![pod_name.to_string()]),
390                }
391            };
392
393            Ok(trunk_install_status)
394        }
395        Err(_) => {
396            error!(
397                "Kube exec error installing extension {} into {}: {}",
398                &ext.name, coredb_name, "Kube exec error"
399            );
400            Err(true)
401        }
402    }
403}
404
405// Check if <extension_name>.so file exists for a given extension in `cdb.module_dir()`.
406#[instrument(skip(cdb, ctx, pod_name) fields(trace_id))]
407pub async fn check_for_so_files(
408    cdb: &CoreDB,
409    ctx: Arc<Context>,
410    pod_name: &str,
411    extension_name: String,
412) -> Result<bool, Action> {
413    let coredb_name = cdb.metadata.name.as_deref().unwrap_or_default();
414
415    info!(
416        "Checking for {}.so in filesystem for instance {}",
417        extension_name, coredb_name
418    );
419
420    let client = ctx.client.clone();
421
422    // Check if the pod is up yet
423    if let Err(e) = cdb.log_pod_status(client.clone(), pod_name).await {
424        warn!(
425            "Could not fetch or log pod status for instance {}: {:?}",
426            coredb_name, e
427        );
428        return Err(Action::requeue(Duration::from_secs(10)));
429    }
430
431    let so = format!("{}/{}.so", cdb.spec.module_dir(), extension_name);
432    let cmd = vec![
433        "/bin/bash".to_string(),
434        "-c".to_string(),
435        format!("if [ -f '{so}' ]; then echo '{so}'; fi"),
436    ];
437
438    let result = cdb.exec(pod_name.to_string(), client.clone(), &cmd).await;
439
440    match result {
441        Ok(result) => {
442            let output = format!(
443                "{}\n{}",
444                result
445                    .stdout
446                    .unwrap_or_else(|| "Nothing in stdout".to_string()),
447                result
448                    .stderr
449                    .unwrap_or_else(|| "Nothing in stderr".to_string())
450            );
451
452            if result.success {
453                // Check if .so files exist in output
454                if output.contains(format!("{}.so", extension_name).as_str()) {
455                    info!(
456                        "Found {}.so file in filesystem for instance {}",
457                        extension_name, coredb_name
458                    );
459                    return Ok(true);
460                }
461                info!(
462                    "No {}.so found in filesystem for instance {}",
463                    extension_name, coredb_name
464                );
465                return Ok(false);
466            }
467            error!(
468                "Failed to check for {}.so in filesystem for instance {}:\n{}",
469                extension_name, coredb_name, output
470            );
471            Err(Action::requeue(Duration::from_secs(10)))
472        }
473        Err(_) => {
474            error!(
475                "Kube exec error checking for {}.so file in filesystem for instance for {}",
476                extension_name, coredb_name
477            );
478            Err(Action::requeue(Duration::from_secs(10)))
479        }
480    }
481}
482
483/// handles installing extensions
484#[instrument(skip(ctx, cdb) fields(trace_id))]
485pub async fn install_extensions_to_pod(
486    cdb: &CoreDB,
487    trunk_installs: Vec<&TrunkInstall>,
488    ctx: &Arc<Context>,
489    pod_name: String,
490) -> Result<Vec<TrunkInstallStatus>, Action> {
491    let coredb_name = cdb.name_any();
492    let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
493        error!("CoreDB namespace is empty for instance: {}.", &coredb_name);
494        Action::requeue(tokio::time::Duration::from_secs(300))
495    })?;
496
497    let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
498
499    // Lookup current status for trunk installs
500    let mut current_trunk_install_statuses = initialize_trunk_install_statuses(cdb, &coredb_name);
501
502    if trunk_installs.is_empty() {
503        debug!("No extensions to install into {}", coredb_name);
504        return Ok(current_trunk_install_statuses);
505    }
506    info!(
507        "Installing extensions into {}: {:?}",
508        coredb_name, trunk_installs
509    );
510
511    // If the pod is not up yet, do not try and install the extension
512    if let Err(e) = cdb.log_pod_status(ctx.client.clone(), &pod_name).await {
513        warn!(
514            "Could not fetch or log pod status for instance {}: {:?}",
515            coredb_name, e
516        );
517        warn!("Requeueing due to errors for instance {}", coredb_name);
518        return Err(Action::requeue(Duration::from_secs(10)));
519    }
520
521    let mut requeue = false;
522    for ext in trunk_installs.iter() {
523        info!(
524            "Attempting to install extension: {} on {}",
525            ext.name, coredb_name
526        );
527
528        // Execute trunk install command
529        match execute_extension_install_command(cdb, ctx.clone(), &coredb_name, ext, &pod_name)
530            .await
531        {
532            Ok(trunk_install_status) => {
533                if trunk_install_status.error {
534                    // Log and continue to the next iteration
535                    warn!(
536                        "Error occurred during installation: {:?}",
537                        trunk_install_status.error_message
538                    );
539                    current_trunk_install_statuses = add_trunk_install_to_status(
540                        &coredb_api,
541                        &coredb_name,
542                        &trunk_install_status,
543                    )
544                    .await?;
545                    continue;
546                }
547                current_trunk_install_statuses =
548                    add_trunk_install_to_status(&coredb_api, &coredb_name, &trunk_install_status)
549                        .await?;
550            }
551            Err(should_requeue) => {
552                requeue = should_requeue;
553            }
554        }
555    }
556    if requeue {
557        warn!("Requeueing due to errors for instance {}", coredb_name);
558        return Err(Action::requeue(Duration::from_secs(10)));
559    }
560    info!("Successfully installed all extensions to {}", pod_name);
561
562    // Check for fenced pods and unfence it
563    let fenced_pods = get_fenced_pods(cdb, ctx.clone()).await?;
564    if let Some(fenced_pods) = fenced_pods {
565        // Check if pod_name is in fenced_pods
566        if fenced_pods.contains(&pod_name) {
567            // Unfence pod_name
568            unfence_pod(cdb, ctx.clone(), &pod_name.clone()).await?;
569        }
570    }
571
572    Ok(current_trunk_install_statuses)
573}
574
575#[cfg(test)]
576mod tests {
577    use super::*;
578    use crate::apis::coredb_types::CoreDBSpec;
579
580    #[test]
581    fn test_merge_and_deduplicate_pods() {
582        let pod1 = Pod {
583            metadata: ObjectMeta {
584                name: Some("pod1".to_string()),
585                ..Default::default()
586            },
587            ..Default::default()
588        };
589
590        let pod2 = Pod {
591            metadata: ObjectMeta {
592                name: Some("pod2".to_string()),
593                ..Default::default()
594            },
595            ..Default::default()
596        };
597
598        let non_fenced_pods = vec![pod1.clone(), pod2.clone()];
599        let fenced_names = Some(vec!["pod2".to_string(), "pod3".to_string()]);
600
601        let result = merge_and_deduplicate_pods(non_fenced_pods, fenced_names);
602
603        // Deduplicated names should be ["pod1", "pod2", "pod3"]
604        let deduplicated_names: Vec<String> = result
605            .iter()
606            .filter_map(|pod| pod.metadata.name.clone())
607            .collect();
608        assert_eq!(
609            deduplicated_names,
610            vec!["pod1".to_string(), "pod2".to_string(), "pod3".to_string()]
611        );
612    }
613
614    #[test]
615    fn test_find_trunk_installs_to_remove_from_status() {
616        // Arrange
617        let trunk_install1 = TrunkInstall {
618            name: "install1".to_string(),
619            version: Some("1.0".to_string()),
620        };
621        let trunk_install2 = TrunkInstall {
622            name: "install2".to_string(),
623            version: Some("1.0".to_string()),
624        };
625
626        let trunk_install_status1 = TrunkInstallStatus {
627            name: "install1".to_string(),
628            version: Some("1.0".to_string()),
629            error: false,
630            error_message: None,
631            loading: false,
632            installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
633        };
634
635        let trunk_install_status2 = TrunkInstallStatus {
636            name: "install2".to_string(),
637            version: Some("1.0".to_string()),
638            error: false,
639            error_message: None,
640            loading: false,
641            installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
642        };
643
644        let trunk_install_status3 = TrunkInstallStatus {
645            name: "install3".to_string(),
646            version: Some("1.0".to_string()),
647            error: false,
648            loading: false,
649            error_message: None,
650            installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
651        };
652
653        let cdb = CoreDB {
654            metadata: ObjectMeta {
655                name: Some("coredb1".to_string()),
656                ..Default::default()
657            },
658            spec: CoreDBSpec {
659                trunk_installs: vec![trunk_install1.clone(), trunk_install2.clone()],
660                ..Default::default()
661            },
662            status: Some(CoreDBStatus {
663                trunk_installs: Some(vec![
664                    trunk_install_status1.clone(),
665                    trunk_install_status2.clone(),
666                    trunk_install_status3.clone(),
667                ]),
668                ..Default::default()
669            }),
670        };
671
672        // Act
673        let result = find_trunk_installs_to_remove_from_status(&cdb);
674
675        // Assert
676        assert_eq!(result.len(), 1);
677        assert_eq!(result[0], "install3");
678    }
679
680    #[test]
681    fn test_find_trunk_installs_to_pod() {
682        // Arrange
683        let trunk_install1 = TrunkInstall {
684            name: "install1".to_string(),
685            version: Some("1.0".to_string()),
686        };
687        let trunk_install2 = TrunkInstall {
688            name: "install2".to_string(),
689            version: Some("1.0".to_string()),
690        };
691        let trunk_install3 = TrunkInstall {
692            name: "install3".to_string(),
693            version: Some("1.0".to_string()),
694        };
695
696        let trunk_install_status1 = TrunkInstallStatus {
697            name: "install1".to_string(),
698            version: Some("1.0".to_string()),
699            error: false,
700            error_message: None,
701            loading: false,
702            installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
703        };
704
705        let mut cdb = CoreDB {
706            metadata: ObjectMeta {
707                name: Some("coredb1".to_string()),
708                ..Default::default()
709            },
710            spec: CoreDBSpec {
711                trunk_installs: vec![
712                    trunk_install1.clone(),
713                    trunk_install2.clone(),
714                    trunk_install3.clone(),
715                ],
716                ..Default::default()
717            },
718            status: Some(CoreDBStatus {
719                trunk_installs: Some(vec![trunk_install_status1.clone()]),
720                ..Default::default()
721            }),
722        };
723        let pod_name = "test-coredb-24631-1";
724
725        // Act
726        let result = find_trunk_installs_to_pod(&cdb, pod_name);
727
728        // Assert
729        assert_eq!(result.len(), 2);
730        assert_eq!(result[0].name, "install2");
731        assert_eq!(result[1].name, "install3");
732
733        // Test with Postgres image.
734        cdb.spec.image = "quay.io/tembo/postgres:17-noble".to_string();
735        let result = find_trunk_installs_to_pod(&cdb, pod_name);
736        assert_eq!(result.len(), 3);
737        assert_eq!(result[0].name, "install1");
738        assert_eq!(result[1].name, "install2");
739        assert_eq!(result[2].name, "install3");
740    }
741
742    #[test]
743    fn test_initialize_trunk_install_statuses() {
744        // Test when TrunkInstallStatus should have 2
745        let trunk_install1 = TrunkInstall {
746            name: "install1".to_string(),
747            version: Some("1.0".to_string()),
748        };
749        let trunk_install2 = TrunkInstall {
750            name: "install2".to_string(),
751            version: Some("1.0".to_string()),
752        };
753
754        let trunk_install_status1 = TrunkInstallStatus {
755            name: "install1".to_string(),
756            version: Some("1.0".to_string()),
757            error: false,
758            loading: false,
759            error_message: None,
760            installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
761        };
762
763        let trunk_install_status2 = TrunkInstallStatus {
764            name: "install2".to_string(),
765            version: Some("1.0".to_string()),
766            error: false,
767            loading: false,
768            error_message: None,
769            installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
770        };
771
772        let cdb_with_status = CoreDB {
773            metadata: ObjectMeta {
774                name: Some("coredb1".to_string()),
775                ..Default::default()
776            },
777            spec: CoreDBSpec {
778                trunk_installs: vec![trunk_install1.clone(), trunk_install2.clone()],
779                ..Default::default()
780            },
781            status: Some(CoreDBStatus {
782                trunk_installs: Some(vec![
783                    trunk_install_status1.clone(),
784                    trunk_install_status2.clone(),
785                ]),
786                ..Default::default()
787            }),
788        };
789
790        let coredb_name = "cdb_with_status";
791        let result = initialize_trunk_install_statuses(&cdb_with_status, coredb_name);
792        assert_eq!(result.len(), 2); // as we have 2 TrunkInstallStatus objects
793
794        // Test when CoreDB has a status but no trunk_installs
795        let cdb_with_empty_status = CoreDB {
796            metadata: ObjectMeta {
797                name: Some("coredb1".to_string()),
798                ..Default::default()
799            },
800            spec: CoreDBSpec {
801                trunk_installs: vec![],
802                ..Default::default()
803            },
804            status: Some(CoreDBStatus {
805                trunk_installs: Some(vec![]),
806                ..Default::default()
807            }),
808        };
809        let coredb_name = "cdb_with_empty_status";
810        let result = initialize_trunk_install_statuses(&cdb_with_empty_status, coredb_name);
811        assert!(result.is_empty());
812
813        // Test when CoreDB has no status
814        let cdb_without_status = CoreDB {
815            metadata: ObjectMeta {
816                name: Some("coredb1".to_string()),
817                ..Default::default()
818            },
819            spec: CoreDBSpec {
820                ..Default::default()
821            },
822            status: None,
823        };
824        let coredb_name = "cdb_without_status";
825        let result = initialize_trunk_install_statuses(&cdb_without_status, coredb_name);
826        assert!(result.is_empty());
827    }
828}