1use crate::{
2 apis::coredb_types::CoreDB,
3 cloudnativepg::cnpg::{get_fenced_pods, unfence_pod},
4 extensions::{
5 kubernetes_queries::{add_trunk_install_to_status, remove_trunk_installs_from_status},
6 types::{TrunkInstall, TrunkInstallStatus},
7 },
8 trunk::get_latest_trunk_project_version,
9 Context,
10};
11use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::apis::meta::v1::ObjectMeta};
12use kube::{runtime::controller::Action, Api, ResourceExt};
13use std::{collections::HashSet, sync::Arc, time::Duration};
14use tracing::{debug, error, info, instrument, warn};
15
16use crate::apis::coredb_types::CoreDBStatus;
17
18#[instrument(skip(non_fenced_pods, fenced_names) fields(trace_id))]
20fn merge_and_deduplicate_pods(
21 non_fenced_pods: Vec<Pod>,
22 fenced_names: Option<Vec<String>>,
23) -> Vec<Pod> {
24 let mut all_pods: Vec<Pod> = Vec::new();
25 let mut unique_pod_names: HashSet<String> = HashSet::new();
26
27 for pod in non_fenced_pods {
29 if let Some(pod_name) = &pod.metadata.name {
30 if unique_pod_names.insert(pod_name.clone()) {
31 all_pods.push(pod);
32 }
33 }
34 }
35
36 if let Some(fenced_names) = fenced_names {
38 for fenced_name in fenced_names {
39 if unique_pod_names.insert(fenced_name.clone()) {
40 let new_pod = Pod {
41 metadata: ObjectMeta {
42 name: Some(fenced_name),
43 ..Default::default()
44 },
45 ..Default::default()
46 };
47 all_pods.push(new_pod);
48 }
49 }
50 }
51
52 all_pods
53}
54
55#[instrument(skip(ctx, cdb) fields(trace_id))]
57pub async fn all_fenced_and_non_fenced_pods(
58 cdb: &CoreDB,
59 ctx: Arc<Context>,
60) -> Result<Vec<Pod>, Action> {
61 let name = cdb.name_any();
62
63 let pods_fenced = get_fenced_pods(cdb, ctx.clone()).await?;
65
66 let non_fenced_pods = cdb.pods_by_cluster_ready_or_not(ctx.client.clone()).await?;
68
69 let all_pods = merge_and_deduplicate_pods(non_fenced_pods, pods_fenced);
71
72 debug!(
73 "After appending fenced instances for {}, pod count: {}",
74 &name,
75 all_pods.len()
76 );
77
78 Ok(all_pods)
79}
80
81#[instrument(skip(cdb) fields(trace_id))]
83fn find_trunk_installs_to_remove_from_status(cdb: &CoreDB) -> Vec<String> {
84 let name = cdb.name_any();
85 debug!(
86 "Checking for trunk installs to remove from status for {}",
87 &name
88 );
89
90 let mut trunk_installs_to_remove_from_status = Vec::new();
91
92 match &cdb.status {
96 None => {
97 return trunk_installs_to_remove_from_status;
98 }
99 Some(status) => match &status.trunk_installs {
100 None => {
101 return trunk_installs_to_remove_from_status;
102 }
103 Some(trunk_installs) => {
104 for ext_status in trunk_installs {
105 if !cdb
106 .spec
107 .trunk_installs
108 .iter()
109 .any(|ext| ext.name == ext_status.name)
110 {
111 trunk_installs_to_remove_from_status.push(ext_status.name.clone());
112 }
113 }
114 }
115 },
116 };
117
118 trunk_installs_to_remove_from_status
119}
120
121#[instrument(skip(cdb, pod_name) fields(trace_id))]
125pub fn find_trunk_installs_to_pod<'a>(cdb: &'a CoreDB, pod_name: &str) -> Vec<&'a TrunkInstall> {
126 debug!(
127 "Checking for trunk installs to install on pod {} for {}",
128 pod_name,
129 cdb.name_any()
130 );
131
132 if cdb.spec.uses_postgres_image() {
133 return cdb.spec.trunk_installs.iter().collect();
136 }
137
138 let pod_name = pod_name.to_owned();
139 let mut trunk_installs_to_install = Vec::new();
140
141 let trunk_install_statuses = cdb
143 .status
144 .as_ref()
145 .and_then(|status| status.trunk_installs.as_deref())
146 .unwrap_or_default();
147
148 for ext in &cdb.spec.trunk_installs {
150 if !trunk_install_statuses.iter().any(|ext_status| {
151 ext.name == ext_status.name
152 && !ext_status.error
153 && ext_status
154 .installed_to_pods
155 .as_deref()
156 .unwrap_or_default()
157 .contains(&pod_name)
158 }) {
159 trunk_installs_to_install.push(ext);
160 }
161 }
162
163 trunk_installs_to_install
164}
165
166#[instrument(skip(cdb, ctx, pod_name) fields(trace_id, pod_name))]
168async fn is_pod_fenced(cdb: &CoreDB, ctx: Arc<Context>, pod_name: &str) -> Result<bool, Action> {
169 let coredb_name = cdb.metadata.name.as_deref().unwrap_or_default();
170
171 debug!(
172 "Checking if pod {} is fenced for instance {}",
173 pod_name, coredb_name
174 );
175
176 let fenced_pods = get_fenced_pods(cdb, ctx.clone()).await?;
177
178 if let Some(fenced_pods) = fenced_pods {
179 if fenced_pods.contains(&pod_name.to_string()) {
181 debug!("Instance {} pod {} is fenced", coredb_name, pod_name);
182 return Ok(true);
183 }
184 }
185
186 Ok(false)
187}
188
189#[instrument(skip(ctx, cdb))]
190pub async fn reconcile_trunk_installs(
191 cdb: &CoreDB,
192 ctx: Arc<Context>,
193) -> Result<Vec<TrunkInstallStatus>, Action> {
194 let instance_name = cdb.name_any();
195 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
196 error!(
197 "CoreDB namespace is empty for instance: {}.",
198 &instance_name
199 );
200 Action::requeue(tokio::time::Duration::from_secs(300))
201 })?;
202
203 debug!("Starting to reconcile trunk installs for {}", instance_name);
204
205 let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
206
207 let trunk_installs_to_remove_from_status = find_trunk_installs_to_remove_from_status(cdb);
212
213 remove_trunk_installs_from_status(
215 &coredb_api,
216 &instance_name,
217 trunk_installs_to_remove_from_status,
218 )
219 .await?;
220
221 let mut all_results = Vec::new();
223
224 let all_pods = all_fenced_and_non_fenced_pods(cdb, ctx.clone()).await?;
226
227 for pod in all_pods {
229 let pod_name = pod.metadata.name.expect("Pod should always have a name");
230
231 let trunk_installs_to_pod = find_trunk_installs_to_pod(cdb, &pod_name);
233
234 if trunk_installs_to_pod.is_empty() {
235 debug!(
236 "Unfencing any pod that does not require trunk installs, pod {} for {}",
237 pod_name, instance_name
238 );
239 if is_pod_fenced(cdb, ctx.clone(), &pod_name).await? {
241 unfence_pod(cdb, ctx.clone(), &pod_name.clone()).await?;
243 }
244 continue;
245 }
246
247 match install_extensions_to_pod(cdb, trunk_installs_to_pod, &ctx, pod_name.clone()).await {
249 Ok(result) => {
250 all_results = result;
251 }
252 Err(err) => return Err(err),
253 };
254 }
255
256 info!(
257 "Completed trunk install reconciliation for instance {}",
258 instance_name
259 );
260
261 if all_results.is_empty() {
264 debug!("No trunk installs to reconcile for {}", instance_name);
265 all_results = cdb
266 .status
267 .clone()
268 .unwrap_or_default()
269 .trunk_installs
270 .clone()
271 .unwrap_or_default();
272 }
273 Ok(all_results)
274}
275
276#[instrument(skip(cdb, coredb_name) fields(trace_id))]
278fn initialize_trunk_install_statuses(cdb: &CoreDB, coredb_name: &str) -> Vec<TrunkInstallStatus> {
279 cdb.status
280 .clone()
281 .unwrap_or_else(|| {
282 debug!("No current status on {}, initializing default", coredb_name);
283 CoreDBStatus::default()
284 })
285 .trunk_installs
286 .unwrap_or_else(|| {
287 debug!(
288 "No current trunk installs on {}, initializing empty list",
289 coredb_name
290 );
291 vec![]
292 })
293}
294
295#[instrument(skip(cdb, ctx, coredb_name, ext, pod_name) fields(trace_id))]
298async fn execute_extension_install_command(
299 cdb: &CoreDB,
300 ctx: Arc<Context>,
301 coredb_name: &str,
302 ext: &TrunkInstall,
303 pod_name: &str,
304) -> Result<TrunkInstallStatus, bool> {
305 let client = ctx.client.clone();
306
307 let version = match &ext.version {
309 None => {
310 warn!(
311 "Version for extension {} not specified in {}, will fetch latest version",
312 ext.name, coredb_name
313 );
314
315 match get_latest_trunk_project_version(&ext.name).await {
316 Ok(latest_version) => latest_version,
317 Err(_) => {
318 error!(
319 "Failed to get latest version for extension {} in {}",
320 ext.name, coredb_name
321 );
322
323 return Ok(TrunkInstallStatus {
324 name: ext.name.clone(),
325 version: None,
326 error: true,
327 loading: false,
328 error_message: Some("Missing version".to_string()),
329 installed_to_pods: Some(vec![pod_name.to_string()]),
330 });
331 }
332 }
333 }
334 Some(version) => version.clone(),
335 };
336
337 let cmd = vec![
338 "trunk".to_owned(),
339 "install".to_owned(),
340 "-r https://registry.pgtrunk.io".to_owned(),
341 ext.name.clone(),
342 "--version".to_owned(),
343 version,
344 ];
347
348 let result = cdb.exec(pod_name.to_string(), client.clone(), &cmd).await;
349
350 match result {
354 Ok(result) => {
355 let output = format!(
356 "{}\n{}",
357 result
358 .stdout
359 .unwrap_or_else(|| "Nothing in stdout".to_string()),
360 result
361 .stderr
362 .unwrap_or_else(|| "Nothing in stderr".to_string())
363 );
364
365 let trunk_install_status = if result.success {
366 info!(
367 "Installed extension {} into {} for {}",
368 &ext.name, pod_name, coredb_name
369 );
370 TrunkInstallStatus {
371 name: ext.name.clone(),
372 version: ext.version.clone(),
373 error: false,
374 loading: false,
375 error_message: None,
376 installed_to_pods: Some(vec![pod_name.to_string()]),
377 }
378 } else {
379 error!(
380 "Failed to install extension {} into {}:\n{}",
381 &ext.name, pod_name, output
382 );
383 TrunkInstallStatus {
384 name: ext.name.clone(),
385 version: ext.version.clone(),
386 error: true,
387 error_message: Some(output),
388 loading: false,
389 installed_to_pods: Some(vec![pod_name.to_string()]),
390 }
391 };
392
393 Ok(trunk_install_status)
394 }
395 Err(_) => {
396 error!(
397 "Kube exec error installing extension {} into {}: {}",
398 &ext.name, coredb_name, "Kube exec error"
399 );
400 Err(true)
401 }
402 }
403}
404
405#[instrument(skip(cdb, ctx, pod_name) fields(trace_id))]
407pub async fn check_for_so_files(
408 cdb: &CoreDB,
409 ctx: Arc<Context>,
410 pod_name: &str,
411 extension_name: String,
412) -> Result<bool, Action> {
413 let coredb_name = cdb.metadata.name.as_deref().unwrap_or_default();
414
415 info!(
416 "Checking for {}.so in filesystem for instance {}",
417 extension_name, coredb_name
418 );
419
420 let client = ctx.client.clone();
421
422 if let Err(e) = cdb.log_pod_status(client.clone(), pod_name).await {
424 warn!(
425 "Could not fetch or log pod status for instance {}: {:?}",
426 coredb_name, e
427 );
428 return Err(Action::requeue(Duration::from_secs(10)));
429 }
430
431 let so = format!("{}/{}.so", cdb.spec.module_dir(), extension_name);
432 let cmd = vec![
433 "/bin/bash".to_string(),
434 "-c".to_string(),
435 format!("if [ -f '{so}' ]; then echo '{so}'; fi"),
436 ];
437
438 let result = cdb.exec(pod_name.to_string(), client.clone(), &cmd).await;
439
440 match result {
441 Ok(result) => {
442 let output = format!(
443 "{}\n{}",
444 result
445 .stdout
446 .unwrap_or_else(|| "Nothing in stdout".to_string()),
447 result
448 .stderr
449 .unwrap_or_else(|| "Nothing in stderr".to_string())
450 );
451
452 if result.success {
453 if output.contains(format!("{}.so", extension_name).as_str()) {
455 info!(
456 "Found {}.so file in filesystem for instance {}",
457 extension_name, coredb_name
458 );
459 return Ok(true);
460 }
461 info!(
462 "No {}.so found in filesystem for instance {}",
463 extension_name, coredb_name
464 );
465 return Ok(false);
466 }
467 error!(
468 "Failed to check for {}.so in filesystem for instance {}:\n{}",
469 extension_name, coredb_name, output
470 );
471 Err(Action::requeue(Duration::from_secs(10)))
472 }
473 Err(_) => {
474 error!(
475 "Kube exec error checking for {}.so file in filesystem for instance for {}",
476 extension_name, coredb_name
477 );
478 Err(Action::requeue(Duration::from_secs(10)))
479 }
480 }
481}
482
483#[instrument(skip(ctx, cdb) fields(trace_id))]
485pub async fn install_extensions_to_pod(
486 cdb: &CoreDB,
487 trunk_installs: Vec<&TrunkInstall>,
488 ctx: &Arc<Context>,
489 pod_name: String,
490) -> Result<Vec<TrunkInstallStatus>, Action> {
491 let coredb_name = cdb.name_any();
492 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
493 error!("CoreDB namespace is empty for instance: {}.", &coredb_name);
494 Action::requeue(tokio::time::Duration::from_secs(300))
495 })?;
496
497 let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
498
499 let mut current_trunk_install_statuses = initialize_trunk_install_statuses(cdb, &coredb_name);
501
502 if trunk_installs.is_empty() {
503 debug!("No extensions to install into {}", coredb_name);
504 return Ok(current_trunk_install_statuses);
505 }
506 info!(
507 "Installing extensions into {}: {:?}",
508 coredb_name, trunk_installs
509 );
510
511 if let Err(e) = cdb.log_pod_status(ctx.client.clone(), &pod_name).await {
513 warn!(
514 "Could not fetch or log pod status for instance {}: {:?}",
515 coredb_name, e
516 );
517 warn!("Requeueing due to errors for instance {}", coredb_name);
518 return Err(Action::requeue(Duration::from_secs(10)));
519 }
520
521 let mut requeue = false;
522 for ext in trunk_installs.iter() {
523 info!(
524 "Attempting to install extension: {} on {}",
525 ext.name, coredb_name
526 );
527
528 match execute_extension_install_command(cdb, ctx.clone(), &coredb_name, ext, &pod_name)
530 .await
531 {
532 Ok(trunk_install_status) => {
533 if trunk_install_status.error {
534 warn!(
536 "Error occurred during installation: {:?}",
537 trunk_install_status.error_message
538 );
539 current_trunk_install_statuses = add_trunk_install_to_status(
540 &coredb_api,
541 &coredb_name,
542 &trunk_install_status,
543 )
544 .await?;
545 continue;
546 }
547 current_trunk_install_statuses =
548 add_trunk_install_to_status(&coredb_api, &coredb_name, &trunk_install_status)
549 .await?;
550 }
551 Err(should_requeue) => {
552 requeue = should_requeue;
553 }
554 }
555 }
556 if requeue {
557 warn!("Requeueing due to errors for instance {}", coredb_name);
558 return Err(Action::requeue(Duration::from_secs(10)));
559 }
560 info!("Successfully installed all extensions to {}", pod_name);
561
562 let fenced_pods = get_fenced_pods(cdb, ctx.clone()).await?;
564 if let Some(fenced_pods) = fenced_pods {
565 if fenced_pods.contains(&pod_name) {
567 unfence_pod(cdb, ctx.clone(), &pod_name.clone()).await?;
569 }
570 }
571
572 Ok(current_trunk_install_statuses)
573}
574
575#[cfg(test)]
576mod tests {
577 use super::*;
578 use crate::apis::coredb_types::CoreDBSpec;
579
580 #[test]
581 fn test_merge_and_deduplicate_pods() {
582 let pod1 = Pod {
583 metadata: ObjectMeta {
584 name: Some("pod1".to_string()),
585 ..Default::default()
586 },
587 ..Default::default()
588 };
589
590 let pod2 = Pod {
591 metadata: ObjectMeta {
592 name: Some("pod2".to_string()),
593 ..Default::default()
594 },
595 ..Default::default()
596 };
597
598 let non_fenced_pods = vec![pod1.clone(), pod2.clone()];
599 let fenced_names = Some(vec!["pod2".to_string(), "pod3".to_string()]);
600
601 let result = merge_and_deduplicate_pods(non_fenced_pods, fenced_names);
602
603 let deduplicated_names: Vec<String> = result
605 .iter()
606 .filter_map(|pod| pod.metadata.name.clone())
607 .collect();
608 assert_eq!(
609 deduplicated_names,
610 vec!["pod1".to_string(), "pod2".to_string(), "pod3".to_string()]
611 );
612 }
613
614 #[test]
615 fn test_find_trunk_installs_to_remove_from_status() {
616 let trunk_install1 = TrunkInstall {
618 name: "install1".to_string(),
619 version: Some("1.0".to_string()),
620 };
621 let trunk_install2 = TrunkInstall {
622 name: "install2".to_string(),
623 version: Some("1.0".to_string()),
624 };
625
626 let trunk_install_status1 = TrunkInstallStatus {
627 name: "install1".to_string(),
628 version: Some("1.0".to_string()),
629 error: false,
630 error_message: None,
631 loading: false,
632 installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
633 };
634
635 let trunk_install_status2 = TrunkInstallStatus {
636 name: "install2".to_string(),
637 version: Some("1.0".to_string()),
638 error: false,
639 error_message: None,
640 loading: false,
641 installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
642 };
643
644 let trunk_install_status3 = TrunkInstallStatus {
645 name: "install3".to_string(),
646 version: Some("1.0".to_string()),
647 error: false,
648 loading: false,
649 error_message: None,
650 installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
651 };
652
653 let cdb = CoreDB {
654 metadata: ObjectMeta {
655 name: Some("coredb1".to_string()),
656 ..Default::default()
657 },
658 spec: CoreDBSpec {
659 trunk_installs: vec![trunk_install1.clone(), trunk_install2.clone()],
660 ..Default::default()
661 },
662 status: Some(CoreDBStatus {
663 trunk_installs: Some(vec![
664 trunk_install_status1.clone(),
665 trunk_install_status2.clone(),
666 trunk_install_status3.clone(),
667 ]),
668 ..Default::default()
669 }),
670 };
671
672 let result = find_trunk_installs_to_remove_from_status(&cdb);
674
675 assert_eq!(result.len(), 1);
677 assert_eq!(result[0], "install3");
678 }
679
680 #[test]
681 fn test_find_trunk_installs_to_pod() {
682 let trunk_install1 = TrunkInstall {
684 name: "install1".to_string(),
685 version: Some("1.0".to_string()),
686 };
687 let trunk_install2 = TrunkInstall {
688 name: "install2".to_string(),
689 version: Some("1.0".to_string()),
690 };
691 let trunk_install3 = TrunkInstall {
692 name: "install3".to_string(),
693 version: Some("1.0".to_string()),
694 };
695
696 let trunk_install_status1 = TrunkInstallStatus {
697 name: "install1".to_string(),
698 version: Some("1.0".to_string()),
699 error: false,
700 error_message: None,
701 loading: false,
702 installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
703 };
704
705 let mut cdb = CoreDB {
706 metadata: ObjectMeta {
707 name: Some("coredb1".to_string()),
708 ..Default::default()
709 },
710 spec: CoreDBSpec {
711 trunk_installs: vec![
712 trunk_install1.clone(),
713 trunk_install2.clone(),
714 trunk_install3.clone(),
715 ],
716 ..Default::default()
717 },
718 status: Some(CoreDBStatus {
719 trunk_installs: Some(vec![trunk_install_status1.clone()]),
720 ..Default::default()
721 }),
722 };
723 let pod_name = "test-coredb-24631-1";
724
725 let result = find_trunk_installs_to_pod(&cdb, pod_name);
727
728 assert_eq!(result.len(), 2);
730 assert_eq!(result[0].name, "install2");
731 assert_eq!(result[1].name, "install3");
732
733 cdb.spec.image = "quay.io/tembo/postgres:17-noble".to_string();
735 let result = find_trunk_installs_to_pod(&cdb, pod_name);
736 assert_eq!(result.len(), 3);
737 assert_eq!(result[0].name, "install1");
738 assert_eq!(result[1].name, "install2");
739 assert_eq!(result[2].name, "install3");
740 }
741
742 #[test]
743 fn test_initialize_trunk_install_statuses() {
744 let trunk_install1 = TrunkInstall {
746 name: "install1".to_string(),
747 version: Some("1.0".to_string()),
748 };
749 let trunk_install2 = TrunkInstall {
750 name: "install2".to_string(),
751 version: Some("1.0".to_string()),
752 };
753
754 let trunk_install_status1 = TrunkInstallStatus {
755 name: "install1".to_string(),
756 version: Some("1.0".to_string()),
757 error: false,
758 loading: false,
759 error_message: None,
760 installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
761 };
762
763 let trunk_install_status2 = TrunkInstallStatus {
764 name: "install2".to_string(),
765 version: Some("1.0".to_string()),
766 error: false,
767 loading: false,
768 error_message: None,
769 installed_to_pods: Some(vec!["test-coredb-24631-1".to_string()]),
770 };
771
772 let cdb_with_status = CoreDB {
773 metadata: ObjectMeta {
774 name: Some("coredb1".to_string()),
775 ..Default::default()
776 },
777 spec: CoreDBSpec {
778 trunk_installs: vec![trunk_install1.clone(), trunk_install2.clone()],
779 ..Default::default()
780 },
781 status: Some(CoreDBStatus {
782 trunk_installs: Some(vec![
783 trunk_install_status1.clone(),
784 trunk_install_status2.clone(),
785 ]),
786 ..Default::default()
787 }),
788 };
789
790 let coredb_name = "cdb_with_status";
791 let result = initialize_trunk_install_statuses(&cdb_with_status, coredb_name);
792 assert_eq!(result.len(), 2); let cdb_with_empty_status = CoreDB {
796 metadata: ObjectMeta {
797 name: Some("coredb1".to_string()),
798 ..Default::default()
799 },
800 spec: CoreDBSpec {
801 trunk_installs: vec![],
802 ..Default::default()
803 },
804 status: Some(CoreDBStatus {
805 trunk_installs: Some(vec![]),
806 ..Default::default()
807 }),
808 };
809 let coredb_name = "cdb_with_empty_status";
810 let result = initialize_trunk_install_statuses(&cdb_with_empty_status, coredb_name);
811 assert!(result.is_empty());
812
813 let cdb_without_status = CoreDB {
815 metadata: ObjectMeta {
816 name: Some("coredb1".to_string()),
817 ..Default::default()
818 },
819 spec: CoreDBSpec {
820 ..Default::default()
821 },
822 status: None,
823 };
824 let coredb_name = "cdb_without_status";
825 let result = initialize_trunk_install_statuses(&cdb_without_status, coredb_name);
826 assert!(result.is_empty());
827 }
828}