1use chrono::{DateTime, Utc};
2use futures::stream::StreamExt;
3
4use crate::{
5 apis::coredb_types::{CoreDB, CoreDBStatus, VolumeSnapshot},
6 app_service::manager::reconcile_app_services,
7 cloudnativepg::{
8 archive::wal::reconcile_last_archive_status,
9 backups::Backup,
10 cnpg::{
11 cnpg_cluster_from_cdb, reconcile_cnpg, reconcile_cnpg_scheduled_backup,
12 reconcile_pooler,
13 },
14 placement::cnpg_placement::PlacementConfig,
15 retention::snapshots::cleanup_old_volume_snapshots,
16 VOLUME_SNAPSHOT_CLASS_NAME,
17 },
18 config::Config,
19 dedicated_networking::reconcile_dedicated_networking,
20 exec::{ExecCommand, ExecOutput},
21 extensions::database_queries::is_not_restarting,
22 heartbeat::reconcile_heartbeat,
23 ingress::reconcile_postgres_ing_route_tcp,
24 postgres_certificates::reconcile_certificates,
25 psql::{PsqlCommand, PsqlOutput},
26 secret::{reconcile_postgres_role_secret, reconcile_secret},
27 telemetry, Error, Metrics, Result,
28};
29use k8s_openapi::{api::core::v1::Pod, apimachinery::pkg::util::intstr::IntOrString};
30use kube::{
31 api::{Api, ListParams, Patch, PatchParams, ResourceExt},
32 client::Client,
33 runtime::{
34 controller::{Action, Controller},
35 events::{Event, EventType, Recorder, Reporter},
36 finalizer::{finalizer, Event as Finalizer},
37 wait::Condition,
38 watcher::Config as watcherConfig,
39 },
40 Resource,
41};
42
43use crate::cloudnativepg::hibernate::reconcile_cluster_hibernation;
44use crate::{
45 apis::postgres_parameters::PgConfig,
46 configmap::reconcile_generic_metrics_configmap,
47 extensions::{database_queries::list_config_params, reconcile_extensions},
48 ingress::{reconcile_extra_postgres_ing_route_tcp, reconcile_ip_allowlist_middleware},
49 network_policies::reconcile_network_policies,
50 postgres_exporter::reconcile_metrics_configmap,
51 trunk::{extensions_that_require_load, reconcile_trunk_configmap},
52};
53use k8s_openapi::api::core::v1::Secret;
54use serde::Serialize;
55use serde_json::json;
56use std::sync::Arc;
57use tokio::{sync::RwLock, time::Duration};
58use tracing::*;
59
60pub static COREDB_FINALIZER: &str = "coredbs.coredb.io";
61pub static COREDB_ANNOTATION: &str = "coredbs.coredb.io/watch";
62
63#[derive(Clone)]
65pub struct Context {
66 pub client: Client,
68 pub recorder: Recorder,
70 pub diagnostics: Arc<RwLock<Diagnostics>>,
72 pub metrics: Arc<Metrics>,
74}
75
76pub fn requeue_normal_with_jitter() -> Action {
77 let cfg = Config::default();
78 let jitter = rand::Rng::random_range(&mut rand::rng(), 0..60);
80 Action::requeue(Duration::from_secs(cfg.reconcile_ttl + jitter))
81}
82
83#[instrument(skip(ctx, cdb), fields(trace_id))]
84async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
85 let trace_id = telemetry::get_trace_id();
86 Span::current().record("trace_id", field::display(&trace_id));
87 let cfg = Config::default();
88 let _timer = ctx.metrics.reconcile.count_and_measure(&trace_id);
89 ctx.diagnostics.write().await.last_event = Utc::now();
90 let ns = cdb.namespace().unwrap(); let coredbs: Api<CoreDB> = Api::namespaced(ctx.client.clone(), &ns);
92 let metadata = cdb.meta();
94 let annotations = metadata.annotations.clone().unwrap_or_default();
96
97 if let Some(value) = annotations.get(COREDB_ANNOTATION) {
99 if value == "false" {
101 info!(
102 "Skipping reconciliation for CoreDB \"{}\" in {}",
103 cdb.name_any(),
104 ns
105 );
106 return Ok(Action::await_change());
107 }
108 }
109
110 debug!("Reconciling CoreDB \"{}\" in {}", cdb.name_any(), ns);
111 finalizer(&coredbs, COREDB_FINALIZER, cdb, |event| async {
112 match event {
113 Finalizer::Apply(cdb) => match cdb.reconcile(ctx.clone(), &cfg).await {
114 Ok(action) => Ok(action),
115 Err(requeue_action) => Ok(requeue_action),
116 },
117 Finalizer::Cleanup(cdb) => cdb.cleanup(ctx.clone()).await,
118 }
119 })
120 .await
121 .map_err(|e| Error::FinalizerError(Box::new(e)))
122}
123
124pub(crate) fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
125 warn!("reconcile failed: {:?}", error);
126 ctx.metrics.reconcile.set_failure(&cdb, error);
127
128 match error {
130 Error::KubeError(kube_error) => match kube_error {
131 kube::Error::Api(api_error) if api_error.code == 429 => {
132 let backoff: u64 = 60;
134 let max_jitter: u64 = 120;
135 let jitter = rand::Rng::random_range(&mut rand::rng(), 0..=max_jitter);
136 let backoff_with_jitter = Duration::from_secs(backoff + jitter);
137 warn!(
139 "Received HTTP 429 Too Many Requests. Requeuing after {} seconds.",
140 backoff_with_jitter.as_secs()
141 );
142 Action::requeue(backoff_with_jitter)
143 }
144 _ => Action::requeue(Duration::from_secs(5 * 60)),
145 },
146 _ => Action::requeue(Duration::from_secs(5 * 60)),
147 }
148}
149
150fn create_volume_snapshot_patch(cfg: &Config) -> serde_json::Value {
153 json!({
154 "spec": {
155 "backup": {
156 "volumeSnapshot": {
157 "enabled": cfg.enable_volume_snapshot,
158 "snapshotClass": if cfg.enable_volume_snapshot {
159 Some(VOLUME_SNAPSHOT_CLASS_NAME.to_string())
160 } else {
161 None
162 }
163 }
164 }
165 }
166 })
167}
168
169fn is_volume_snapshot_update_needed(
171 volume_snapshot: Option<&VolumeSnapshot>,
172 enable_volume_snapshot: bool,
173) -> bool {
174 let current_enabled = volume_snapshot.map(|vs| vs.enabled).unwrap_or(false);
175 current_enabled != enable_volume_snapshot
176}
177
178impl CoreDB {
179 #[instrument(skip(self, ctx, cfg))]
181 async fn reconcile(&self, ctx: Arc<Context>, cfg: &Config) -> Result<Action, Action> {
182 let client = ctx.client.clone();
183 let ns = self.namespace().unwrap();
184 let name = self.name_any();
185 let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), &ns);
186
187 reconcile_cluster_hibernation(self, &ctx).await?;
189
190 let placement_config = PlacementConfig::new(self);
192
193 reconcile_network_policies(ctx.client.clone(), &ns).await?;
194
195 reconcile_trunk_configmap(ctx.client.clone(), &ns).await?;
197
198 reconcile_certificates(ctx.client.clone(), self, &ns).await?;
199
200 let delete = self.spec.replicas < 1 || self.spec.stop || self.spec.disable_ingress;
202
203 match std::env::var("DATA_PLANE_BASEDOMAIN") {
205 Ok(basedomain) => {
206 debug!(
207 "DATA_PLANE_BASEDOMAIN is set to {}, reconciling IngressRouteTCP and MiddlewareTCP for {}",
208 basedomain, name.clone()
209 );
210
211 let middleware_name = reconcile_ip_allowlist_middleware(self, ctx.clone())
212 .await
213 .map_err(|e| {
214 error!("Error reconciling MiddlewareTCP for {}: {:?}", name, e);
215 Action::requeue(Duration::from_secs(300))
216 })?;
217
218 let service_name_read_only = format!("{}-ro", self.name_any().as_str());
219 let prefix_read_only = format!("{}-ro-", self.name_any().as_str());
220 let read_only_subdomain = format!("{}-ro", self.name_any().as_str());
221 reconcile_postgres_ing_route_tcp(
222 self,
223 ctx.clone(),
224 &read_only_subdomain,
225 basedomain.as_str(),
226 ns.as_str(),
227 prefix_read_only.as_str(),
228 service_name_read_only.as_str(),
229 IntOrString::Int(5432),
230 vec![middleware_name.clone()],
231 delete,
232 )
233 .await
234 .map_err(|e| {
235 error!("Error reconciling postgres ingress route: {:?}", e);
236 Action::requeue(Duration::from_secs(300))
240 })?;
241
242 let service_name_read_write = format!("{}-rw", self.name_any().as_str());
243 let prefix_read_write = format!("{}-rw-", self.name_any().as_str());
244 reconcile_postgres_ing_route_tcp(
245 self,
246 ctx.clone(),
247 self.name_any().as_str(),
248 basedomain.as_str(),
249 ns.as_str(),
250 prefix_read_write.as_str(),
251 service_name_read_write.as_str(),
252 IntOrString::Int(5432),
253 vec![middleware_name.clone()],
254 delete,
255 )
256 .await
257 .map_err(|e| {
258 error!("Error reconciling postgres ingress route: {:?}", e);
259 Action::requeue(Duration::from_secs(300))
263 })?;
264
265 reconcile_extra_postgres_ing_route_tcp(
266 self,
267 ctx.clone(),
268 ns.as_str(),
269 service_name_read_write.as_str(),
270 IntOrString::Int(5432),
271 vec![middleware_name.clone()],
272 )
273 .await
274 .map_err(|e| {
275 error!("Error reconciling extra postgres ingress route: {:?}", e);
276 Action::requeue(Duration::from_secs(300))
280 })?;
281
282 reconcile_dedicated_networking(self, ctx.clone(), basedomain.as_str())
283 .await
284 .map_err(|e| {
285 error!("Error reconciling dedicated networking: {:?}", e);
286 Action::requeue(Duration::from_secs(300))
287 })?;
288
289 let name_pooler = format!("{}-pooler", self.name_any().as_str());
290 let prefix_pooler = format!("{}-pooler-", self.name_any().as_str());
291 let delete_pooler = delete || !self.spec.connectionPooler.enabled;
292 reconcile_postgres_ing_route_tcp(
293 self,
294 ctx.clone(),
295 name_pooler.as_str(),
296 basedomain.as_str(),
297 ns.as_str(),
298 prefix_pooler.as_str(),
299 name_pooler.as_str(),
300 IntOrString::Int(5432),
301 vec![middleware_name.clone()],
302 delete_pooler,
303 )
304 .await
305 .map_err(|e| {
306 error!("Error reconciling pooler ingress route: {:?}", e);
307 Action::requeue(Duration::from_secs(300))
311 })?;
312 }
313 Err(_e) => {
314 warn!(
315 "DATA_PLANE_BASEDOMAIN is not set, skipping reconciliation of IngressRouteTCP"
316 );
317 }
318 };
319
320 debug!("Reconciling secret");
321 reconcile_secret(self, ctx.clone()).await?;
323 reconcile_app_services(self, ctx.clone(), placement_config.clone()).await?;
324
325 if self
326 .spec
327 .metrics
328 .as_ref()
329 .and_then(|m| m.queries.as_ref())
330 .is_some()
331 {
332 debug!("Reconciling prometheus configmap");
333 reconcile_metrics_configmap(self, client.clone(), &ns)
334 .await
335 .map_err(|e| {
336 error!("Error reconciling prometheus configmap: {:?}", e);
337 Action::requeue(Duration::from_secs(300))
338 })?;
339 }
340
341 let _ = reconcile_postgres_role_secret(
342 self,
343 ctx.clone(),
344 "readonly",
345 &format!("{}-ro", name.clone()),
346 )
347 .await
348 .map_err(|e| {
349 error!("Error reconciling postgres exporter secret: {:?}", e);
350 Action::requeue(Duration::from_secs(300))
351 })?;
352
353 reconcile_generic_metrics_configmap(self, ctx.clone()).await?;
354
355 self.enable_volume_snapshot(cfg, ctx.clone()).await?;
359
360 reconcile_cnpg(self, ctx.clone()).await?;
361 if cfg.enable_backup {
362 reconcile_cnpg_scheduled_backup(self, ctx.clone()).await?;
363 }
364
365 crate::deployment_postgres_exporter::cleanup_postgres_exporter(self, ctx.clone())
367 .await
368 .map_err(|e| {
369 error!("Error reconciling prometheus exporter deployment: {:?}", e);
370 Action::requeue(Duration::from_secs(300))
371 })?;
372
373 reconcile_pooler(self, ctx.clone(), placement_config.clone()).await?;
375
376 let pg_postmaster_start_time = is_not_restarting(self, ctx.clone(), "postgres").await?;
378
379 let patch_status = json!({
380 "apiVersion": "coredb.io/v1alpha1",
381 "kind": "CoreDB",
382 "status": {
383 "running": true,
384 "pg_postmaster_start_time": pg_postmaster_start_time,
385 }
386 });
387 patch_cdb_status_merge(&coredbs, &name, patch_status).await?;
388 let (trunk_installs, extensions) =
389 reconcile_extensions(self, ctx.clone(), &coredbs, &name).await?;
390
391 let recovery_time = self
392 .get_recovery_time(ctx.clone(), cfg.enable_volume_snapshot)
393 .await?;
394 let last_archiver_status = reconcile_last_archive_status(self, ctx.clone()).await?;
395
396 let current_config_values = get_current_config_values(self, ctx.clone()).await?;
397
398 #[allow(deprecated)]
399 let new_status = CoreDBStatus {
400 running: true,
401 extensionsUpdating: false,
402 storage: Some(self.spec.storage.clone()),
403 extensions: Some(extensions),
404 trunk_installs: Some(trunk_installs),
405 resources: Some(self.spec.resources.clone()),
406 runtime_config: Some(current_config_values),
407 first_recoverability_time: recovery_time,
408 last_fully_reconciled_at: None,
409 pg_postmaster_start_time,
410 last_archiver_status,
411 };
412
413 debug!("Updating CoreDB status to {:?} for {name}", new_status);
414
415 let patch_status = json!({
416 "apiVersion": "coredb.io/v1alpha1",
417 "kind": "CoreDB",
418 "status": new_status
419 });
420
421 patch_cdb_status_merge(&coredbs, &name, patch_status).await?;
422
423 reconcile_heartbeat(self, ctx.clone()).await?;
424
425 if cfg.enable_volume_snapshot {
429 match cleanup_old_volume_snapshots(
430 self,
431 client,
432 cfg.volume_snapshot_retention_period_days,
433 )
434 .await
435 {
436 Ok(_) => {
437 info!(
438 "Successfully cleaned up old volume snapshots for instance: {}",
439 self.name_any()
440 );
441 }
442 Err(action) => {
443 return Err(action);
444 }
445 }
446 }
447
448 info!("Fully reconciled {}", self.name_any());
449 Ok(requeue_normal_with_jitter())
450 }
451
452 #[instrument(skip(self, ctx))]
455 async fn enable_volume_snapshot(&self, cfg: &Config, ctx: Arc<Context>) -> Result<(), Action> {
456 let client = ctx.client.clone();
457 let name = self.name_any();
458 let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
459 error!("CoreDB namespace is empty for instance: {}.", name);
460 Action::requeue(tokio::time::Duration::from_secs(300))
461 })?;
462
463 let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), namespace);
465
466 if !is_volume_snapshot_update_needed(
468 self.spec.backup.volume_snapshot.as_ref(),
469 cfg.enable_volume_snapshot,
470 ) {
471 return Ok(());
472 }
473
474 let patch = create_volume_snapshot_patch(cfg);
476 let patch_params = PatchParams {
477 field_manager: Some("cntrlr".to_string()),
478 ..PatchParams::default()
479 };
480 let patch_status = Patch::Merge(patch.clone());
481 match coredbs.patch(&name, &patch_params, &patch_status).await {
482 Ok(_) => {
483 debug!("Successfully updated CoreDB status for {}", name);
484 Ok(())
485 }
486 Err(e) => {
487 error!("Error updating CoreDB status for {}: {:?}", name, e);
488 Err(Action::requeue(Duration::from_secs(10)))
489 }
490 }
491 }
492
493 #[instrument(skip(self, ctx))]
495 async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
496 let oref = self.object_ref(&());
497 ctx.recorder
499 .publish(
500 &Event {
501 type_: EventType::Normal,
502 reason: "DeleteCoreDB".into(),
503 note: Some(format!("Delete `{}`", self.name_any())),
504 action: "Deleting".into(),
505 secondary: None,
506 },
507 &oref,
508 )
509 .await
510 .map_err(Error::KubeError)?;
511 Ok(Action::await_change())
512 }
513
514 #[instrument(skip(self, client))]
515 async fn primary_pod_cnpg_conditional_readiness(
516 &self,
517 client: Client,
518 wait_for_ready: bool,
519 ) -> Result<Pod, Action> {
520 let requires_load =
521 extensions_that_require_load(client.clone(), &self.metadata.namespace.clone().unwrap())
522 .await?;
523 let cluster = cnpg_cluster_from_cdb(self, None, requires_load);
524 let cluster_name = cluster.metadata.name.as_ref().ok_or_else(|| {
525 error!(
526 "CNPG Cluster name is empty for instance: {}.",
527 self.name_any()
528 );
529 Action::requeue(tokio::time::Duration::from_secs(300))
530 })?;
531 let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
532 error!(
533 "CoreDB namespace is empty for instance: {}.",
534 self.name_any()
535 );
536 Action::requeue(tokio::time::Duration::from_secs(300))
537 })?;
538 let cluster_selector = format!("cnpg.io/cluster={}", cluster_name);
539 let role_selector = "role=primary";
540 let list_params = ListParams::default()
541 .labels(&cluster_selector)
542 .labels(role_selector);
543 let pods: Api<Pod> = Api::namespaced(client, namespace);
544 let pods = pods.list(&list_params);
545 let pod_list = pods.await.map_err(|_e| {
547 error!(
549 "Failed to query for CNPG primary pod of {}",
550 &self.name_any()
551 );
552 Action::requeue(Duration::from_secs(300))
553 })?;
554 if pod_list.items.is_empty() {
556 warn!("Failed to find CNPG primary pod of {}, this can be expected if the pod is restarting for some reason", &self.name_any());
558 return Err(Action::requeue(Duration::from_secs(5)));
559 }
560 let primary = pod_list.items[0].clone();
561
562 if wait_for_ready && !is_postgres_ready().matches_object(Some(&primary)) {
563 warn!(
565 "Found CNPG primary pod of {}, but it is not ready",
566 &self.name_any()
567 );
568 return Err(Action::requeue(Duration::from_secs(5)));
569 }
570
571 Ok(primary)
572 }
573
574 #[instrument(skip(self, client))]
575 pub async fn primary_pod_cnpg(&self, client: Client) -> Result<Pod, Action> {
576 self.primary_pod_cnpg_conditional_readiness(client, true)
577 .await
578 }
579
580 #[instrument(skip(self, client))]
581 pub async fn primary_pod_cnpg_ready_or_not(&self, client: Client) -> Result<Pod, Action> {
582 self.primary_pod_cnpg_conditional_readiness(client, false)
583 .await
584 }
585
586 #[instrument(skip(self, client))]
587 async fn pods_by_cluster_conditional_readiness(
588 &self,
589 client: Client,
590 wait_for_ready: bool,
591 ) -> Result<Vec<Pod>, Action> {
592 let requires_load =
593 extensions_that_require_load(client.clone(), &self.metadata.namespace.clone().unwrap())
594 .await?;
595 let cluster = cnpg_cluster_from_cdb(self, None, requires_load);
596 let cluster_name = cluster.metadata.name.as_ref().ok_or_else(|| {
597 error!(
598 "CNPG Cluster name is empty for instance: {}.",
599 self.name_any()
600 );
601 Action::requeue(tokio::time::Duration::from_secs(300))
602 })?;
603 let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
604 error!(
605 "CoreDB namespace is empty for instance: {}.",
606 self.name_any()
607 );
608 Action::requeue(tokio::time::Duration::from_secs(300))
609 })?;
610
611 let cluster_selector =
613 format!("cnpg.io/cluster={cluster_name},cnpg.io/podRole=instance,role=primary");
614 let replica_selector = format!("cnpg.io/cluster={cluster_name},role=replica");
615
616 let list_params_cluster = ListParams::default().labels(&cluster_selector);
617 let list_params_replica = ListParams::default().labels(&replica_selector);
618
619 let pods: Api<Pod> = Api::namespaced(client, namespace);
620 let primary_pods = pods.list(&list_params_cluster);
621 let replica_pods = pods.list(&list_params_replica);
622
623 let primary_pod_list = primary_pods.await.map_err(|_e| {
624 error!(
625 "Failed to query for CNPG primary pods of {}",
626 &self.name_any()
627 );
628 Action::requeue(Duration::from_secs(300))
629 })?;
630
631 let replica_pod_list = replica_pods.await.map_err(|_e| {
632 error!(
633 "Failed to query for CNPG replica pods of {}",
634 &self.name_any()
635 );
636 Action::requeue(Duration::from_secs(300))
637 })?;
638
639 let pod_list = [primary_pod_list.items, replica_pod_list.items].concat();
640
641 if pod_list.is_empty() {
642 warn!("Failed to find CNPG pods of {}", &self.name_any());
643 return Err(Action::requeue(Duration::from_secs(30)));
644 }
645
646 let ready_pods: Vec<Pod> = pod_list
648 .into_iter()
649 .filter(|pod| {
650 if let Some(conditions) = &pod.status.as_ref().and_then(|s| s.conditions.as_ref()) {
651 conditions
652 .iter()
653 .any(|c| c.type_ == "Ready" && c.status == "True")
654 } else {
655 false
656 }
657 })
658 .collect();
659
660 if wait_for_ready && ready_pods.is_empty() {
662 warn!("Failed to find ready CNPG pods of {}", &self.name_any());
663 return Err(Action::requeue(Duration::from_secs(30)));
664 }
665
666 Ok(ready_pods)
667 }
668
669 #[instrument(skip(self, client))]
670 pub async fn pods_by_cluster(&self, client: Client) -> Result<Vec<Pod>, Action> {
671 self.pods_by_cluster_conditional_readiness(client, true)
672 .await
673 }
674
675 #[instrument(skip(self, client))]
676 pub async fn pods_by_cluster_ready_or_not(&self, client: Client) -> Result<Vec<Pod>, Action> {
677 self.pods_by_cluster_conditional_readiness(client, false)
678 .await
679 }
680
681 #[instrument(skip(self, client))]
682 async fn check_replica_count_matches_pods(&self, client: Client) -> Result<(), Action> {
683 let desired_replica_count = self.spec.replicas;
685 debug!(
686 "Instance {} has a desired replica count: {}",
687 self.name_any(),
688 desired_replica_count
689 );
690
691 let current_pods = self.pods_by_cluster(client.clone()).await?;
693 let pod_names: Vec<String> = current_pods.iter().map(|pod| pod.name_any()).collect();
694 debug!(
695 "Found {} pods, {:?} for {}",
696 current_pods.len(),
697 pod_names,
698 self.name_any()
699 );
700
701 if current_pods.len() != desired_replica_count as usize {
703 warn!(
704 "Number of running pods ({}) does not match desired replica count ({}) for ({}). Requeuing.",
705 current_pods.len(),
706 desired_replica_count,
707 self.name_any()
708 );
709 return Err(Action::requeue(Duration::from_secs(10)));
710 }
711
712 info!(
713 "Number of running pods ({}) matches desired replica count ({}) for ({}).",
714 current_pods.len(),
715 desired_replica_count,
716 self.name_any()
717 );
718 Ok(())
719 }
720
721 pub async fn log_pod_status(&self, client: Client, pod_name: &str) -> Result<(), kube::Error> {
722 let namespace = self
723 .metadata
724 .namespace
725 .clone()
726 .expect("CoreDB should have a namespace");
727 let pods: Api<Pod> = Api::namespaced(client.clone(), &namespace);
728 match pods.get(pod_name).await {
729 Ok(pod) => {
730 let status = pod
731 .status
732 .as_ref()
733 .map(|s| format!("{:?}", s))
734 .unwrap_or_else(|| "Unknown".to_string());
735 debug!(
736 "Status of instance {} pod {} in namespace {}: {}",
737 self.metadata
738 .name
739 .clone()
740 .expect("CoreDB should have a name"),
741 pod_name,
742 namespace,
743 status
744 );
745 Ok(())
746 }
747 Err(e) => Err(e),
748 }
749 }
750
751 #[instrument(skip(self, context))]
752 pub async fn psql(
753 &self,
754 command: String,
755 database: String,
756 context: Arc<Context>,
757 ) -> Result<PsqlOutput, Action> {
758 let pod = self.primary_pod_cnpg(context.client.clone()).await?;
759 let pod_name_cnpg = pod.metadata.name.as_ref().ok_or_else(|| {
760 error!("Pod name is empty for instance: {}.", self.name_any());
761 Action::requeue(tokio::time::Duration::from_secs(300))
762 })?;
763
764 let cnpg_psql_command = PsqlCommand::new(
765 pod_name_cnpg.clone(),
766 self.metadata.namespace.clone().unwrap(),
767 command,
768 database,
769 context.clone(),
770 );
771 debug!("Running exec command in {}", pod_name_cnpg);
772 cnpg_psql_command.execute().await
773 }
774
775 pub async fn exec(
776 &self,
777 pod_name: String,
778 client: Client,
779 command: &[String],
780 ) -> Result<ExecOutput, Error> {
781 ExecCommand::new(pod_name, self.metadata.namespace.clone().unwrap(), client)
782 .execute(command)
783 .await
784 }
785
786 fn process_backups(&self, backup_list: Vec<Backup>) -> Option<DateTime<Utc>> {
787 let backup = backup_list
788 .iter()
789 .filter_map(|backup| backup.status.as_ref())
790 .filter(|status| status.phase.as_deref() == Some("completed"))
791 .filter_map(|status| status.stopped_at.as_ref())
792 .filter_map(|stopped_at_str| DateTime::parse_from_rfc3339(stopped_at_str).ok())
793 .map(|dt_with_offset| dt_with_offset.with_timezone(&Utc))
794 .min();
795
796 backup
797 }
798
799 #[instrument(skip(self, context))]
802 pub async fn get_recovery_time(
803 &self,
804 context: Arc<Context>,
805 enable_volume_snapshot: bool,
806 ) -> Result<Option<DateTime<Utc>>, Action> {
807 let client = context.client.clone();
808 let namespace = self.metadata.namespace.as_ref().ok_or_else(|| {
809 error!(
810 "CoreDB namespace is empty for instance: {}.",
811 self.name_any()
812 );
813 Action::requeue(tokio::time::Duration::from_secs(300))
814 })?;
815 let cluster_name = self.name_any();
816 let backup: Api<Backup> = Api::namespaced(client, namespace);
817
818 let scheduled_backup_name = if enable_volume_snapshot {
820 format!("{}-snap", cluster_name)
821 } else {
822 cluster_name.clone()
823 };
824
825 let label_selector = format!(
827 "cnpg.io/cluster={},cnpg.io/scheduled-backup={}",
828 cluster_name, scheduled_backup_name
829 );
830
831 let lp = ListParams::default().labels(&label_selector);
832 let backup_list = backup.list(&lp).await.map_err(|e| {
833 error!("Error getting backups: {:?}", e);
834 Action::requeue(Duration::from_secs(300))
835 })?;
836
837 let oldest_backup_time = self.process_backups(backup_list.items);
838
839 Ok(oldest_backup_time)
840 }
841}
842
843pub fn is_pod_ready() -> impl Condition<Pod> + 'static {
844 move |obj: Option<&Pod>| {
845 if let Some(pod) = &obj {
846 if let Some(status) = &pod.status {
847 if let Some(conds) = &status.conditions {
848 if let Some(pcond) = conds.iter().find(|c| c.type_ == "ContainersReady") {
849 return pcond.status == "True";
850 }
851 }
852 }
853 }
854 false
855 }
856}
857
858pub fn is_postgres_ready() -> impl Condition<Pod> + 'static {
859 move |obj: Option<&Pod>| {
860 if let Some(pod) = &obj {
861 if let Some(status) = &pod.status {
862 if let Some(container_statuses) = &status.container_statuses {
863 for container in container_statuses {
864 if container.name == "postgres" {
865 return container.ready;
866 }
867 }
868 }
869 }
870 }
871 false
872 }
873}
874
875#[instrument(skip(ctx, cdb))]
876pub async fn get_current_coredb_resource(
877 cdb: &CoreDB,
878 ctx: Arc<Context>,
879) -> Result<CoreDB, Action> {
880 let coredb_name = cdb.name_any();
881 let namespace = cdb.metadata.namespace.as_ref().ok_or_else(|| {
882 error!("Namespace is empty for instance: {}.", &coredb_name);
883 Action::requeue(tokio::time::Duration::from_secs(300))
884 })?;
885 let coredb_api: Api<CoreDB> = Api::namespaced(ctx.client.clone(), namespace);
886 let coredb = coredb_api.get(&coredb_name).await.map_err(|e| {
887 error!("Error getting CoreDB resource: {:?}", e);
888 Action::requeue(Duration::from_secs(10))
889 })?;
890 Ok(coredb)
891}
892
893pub async fn get_current_config_values(
895 cdb: &CoreDB,
896 ctx: Arc<Context>,
897) -> Result<Vec<PgConfig>, Action> {
898 let cfg = list_config_params(cdb, ctx.clone()).await?;
899 Ok(cfg)
900}
901
902pub async fn patch_cdb_status_merge(
903 cdb: &Api<CoreDB>,
904 name: &str,
905 patch: serde_json::Value,
906) -> Result<(), Action> {
907 let pp = PatchParams {
908 field_manager: Some("cntrlr".to_string()),
909 ..PatchParams::default()
910 };
911 let patch_status = Patch::Merge(patch.clone());
912 match cdb.patch_status(name, &pp, &patch_status).await {
913 Ok(_) => {
914 debug!("Successfully updated CoreDB status for {}", name);
915 Ok(())
916 }
917 Err(e) => {
918 error!("Error updating CoreDB status for {}: {:?}", name, e);
919 Err(Action::requeue(Duration::from_secs(10)))
920 }
921 }
922}
923
924#[derive(Clone, Serialize)]
926pub struct Diagnostics {
927 #[serde(deserialize_with = "from_ts")]
928 pub last_event: DateTime<Utc>,
929 #[serde(skip)]
930 pub reporter: Reporter,
931}
932impl Default for Diagnostics {
933 fn default() -> Self {
934 Self {
935 last_event: Utc::now(),
936 reporter: "tembo-controller".into(),
937 }
938 }
939}
940impl Diagnostics {
941 fn recorder(&self, client: Client) -> Recorder {
942 Recorder::new(client, self.reporter.clone())
943 }
944}
945
946#[derive(Clone, Default)]
948pub struct State {
949 diagnostics: Arc<RwLock<Diagnostics>>,
951 metrics: Arc<Metrics>,
953}
954
955impl State {
957 pub fn metrics(&self) -> String {
959 let mut buffer = String::new();
960 let registry = &*self.metrics.registry;
961 prometheus_client::encoding::text::encode(&mut buffer, registry).unwrap();
962 buffer
963 }
964
965 pub async fn diagnostics(&self) -> Diagnostics {
967 self.diagnostics.read().await.clone()
968 }
969
970 pub async fn to_context(&self, client: Client) -> Arc<Context> {
972 Arc::new(Context {
973 client: client.clone(),
974 recorder: self.diagnostics.read().await.recorder(client),
975 metrics: self.metrics.clone(),
976 diagnostics: self.diagnostics.clone(),
977 })
978 }
979}
980
981pub async fn run(state: State) {
983 let client = Client::try_default()
984 .await
985 .expect("failed to create kube Client");
986 let coredb = Api::<CoreDB>::all(client.clone());
987 if let Err(e) = coredb.list(&ListParams::default().limit(1)).await {
988 error!("CRD is not queryable; {e:?}. Is the CRD installed?");
989 info!("Installation: cargo run --bin crdgen | kubectl apply -f -");
990 std::process::exit(1);
991 }
992 let secret_api = Api::<Secret>::all(client.clone());
993 Controller::new(coredb, watcherConfig::default().any_semantic())
994 .owns(secret_api, watcherConfig::default().any_semantic())
995 .shutdown_on_signal()
996 .run(reconcile, error_policy, state.to_context(client).await)
997 .filter_map(|x| async move { std::result::Result::ok(x) })
998 .for_each(|_| futures::future::ready(()))
999 .await;
1000}
1001
1002#[cfg(test)]
1004mod test {
1005 use super::{reconcile, Backup, Context, CoreDB};
1006 use crate::apis::coredb_types::VolumeSnapshot;
1007 use crate::cloudnativepg::{
1008 backups::{BackupCluster, BackupSpec, BackupStatus},
1009 VOLUME_SNAPSHOT_CLASS_NAME,
1010 };
1011 use crate::config::Config;
1012 use crate::controller::{create_volume_snapshot_patch, is_volume_snapshot_update_needed};
1013 use crate::fixtures::{timeout_after_1s, Scenario};
1014 use chrono::{DateTime, NaiveDate, Utc};
1015 use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
1016 use std::sync::Arc;
1017
1018 #[tokio::test]
1019 async fn new_coredbs_without_finalizers_gets_a_finalizer() {
1020 let (testctx, fakeserver) = Context::test();
1021 let coredb = CoreDB::test();
1022 let mocksrv = fakeserver.run(Scenario::FinalizerCreation(coredb.clone()));
1023 reconcile(Arc::new(coredb), testctx)
1024 .await
1025 .expect("reconciler");
1026 timeout_after_1s(mocksrv).await;
1027 }
1028
1029 #[tokio::test]
1030 async fn test_process_backups() {
1031 let coredb = CoreDB::test();
1032 let backup_name = "test-backup-1".to_string();
1033 let namespace = "test".to_string();
1034
1035 let backup_list = vec![Backup {
1036 metadata: ObjectMeta {
1037 name: Some(backup_name.clone()),
1038 namespace: Some(namespace),
1039 ..Default::default()
1040 },
1041 spec: BackupSpec {
1042 cluster: BackupCluster {
1043 name: backup_name.clone(),
1044 },
1045 ..Default::default()
1046 },
1047 status: Some(BackupStatus {
1048 phase: Some("completed".to_string()),
1049 stopped_at: Some("2023-09-19T23:14:00Z".to_string()),
1050 ..Default::default()
1051 }),
1052 }];
1053
1054 let oldest_backup_time = coredb.process_backups(backup_list);
1055
1056 let expected_time = NaiveDate::from_ymd_opt(2023, 9, 19)
1057 .and_then(|date| date.and_hms_opt(23, 14, 0))
1058 .map(|naive_dt| DateTime::from_naive_utc_and_offset(naive_dt, Utc));
1059 assert_eq!(oldest_backup_time, expected_time);
1060 }
1061
1062 #[tokio::test]
1063 async fn test_process_backups_multiple_backups() {
1064 let coredb = CoreDB::test();
1065
1066 let backup_list = vec![
1067 Backup {
1068 metadata: ObjectMeta {
1069 name: Some("backup-1".to_string()),
1070 namespace: Some("test".to_string()),
1071 ..Default::default()
1072 },
1073 spec: BackupSpec {
1074 cluster: BackupCluster {
1075 name: "backup-1".to_string(),
1076 },
1077 ..Default::default()
1078 },
1079 status: Some(BackupStatus {
1080 phase: Some("completed".to_string()),
1081 stopped_at: Some("2023-09-19T23:14:00Z".to_string()),
1082 ..Default::default()
1083 }),
1084 },
1085 Backup {
1086 metadata: ObjectMeta {
1087 name: Some("backup-2".to_string()),
1088 namespace: Some("test".to_string()),
1089 ..Default::default()
1090 },
1091 spec: BackupSpec {
1092 cluster: BackupCluster {
1093 name: "backup-2".to_string(),
1094 },
1095 ..Default::default()
1096 },
1097 status: Some(BackupStatus {
1098 phase: Some("completed".to_string()),
1099 stopped_at: Some("2023-09-18T22:12:00Z".to_string()), ..Default::default()
1101 }),
1102 },
1103 Backup {
1104 metadata: ObjectMeta {
1105 name: Some("backup-3".to_string()),
1106 namespace: Some("test".to_string()),
1107 ..Default::default()
1108 },
1109 spec: BackupSpec {
1110 cluster: BackupCluster {
1111 name: "backup-3".to_string(),
1112 },
1113 ..Default::default()
1114 },
1115 status: Some(BackupStatus {
1116 phase: Some("completed".to_string()),
1117 stopped_at: Some("2023-09-19T21:11:00Z".to_string()),
1118 ..Default::default()
1119 }),
1120 },
1121 Backup {
1122 metadata: ObjectMeta {
1123 name: Some("backup-4".to_string()),
1124 namespace: Some("test".to_string()),
1125 ..Default::default()
1126 },
1127 spec: BackupSpec {
1128 cluster: BackupCluster {
1129 name: "backup-4".to_string(),
1130 },
1131 ..Default::default()
1132 },
1133 status: Some(BackupStatus {
1134 phase: Some("failed".to_string()),
1135 stopped_at: Some("2023-09-19T21:11:00Z".to_string()),
1136 ..Default::default()
1137 }),
1138 },
1139 ];
1140
1141 let oldest_backup_time = coredb.process_backups(backup_list);
1142
1143 let expected_time = NaiveDate::from_ymd_opt(2023, 9, 18)
1144 .and_then(|date| date.and_hms_opt(22, 12, 0))
1145 .map(|naive_dt| DateTime::from_naive_utc_and_offset(naive_dt, Utc));
1146
1147 assert_eq!(oldest_backup_time, expected_time);
1148 }
1149
1150 #[tokio::test]
1151 async fn test_process_backups_no_backup() {
1152 let coredb = CoreDB::test();
1153
1154 let backup_list: Vec<Backup> = vec![];
1156
1157 let oldest_backup_time = coredb.process_backups(backup_list);
1158
1159 assert_eq!(oldest_backup_time, None);
1161 }
1162
1163 #[test]
1164 fn test_create_volume_snapshot_patch_enabled() {
1165 let cfg = Config {
1166 enable_volume_snapshot: true,
1167 ..Config::default()
1168 };
1169
1170 let expected_volume_snapshot = VolumeSnapshot {
1171 enabled: true,
1172 snapshot_class: Some(VOLUME_SNAPSHOT_CLASS_NAME.to_string()),
1173 };
1174
1175 let actual_patch = create_volume_snapshot_patch(&cfg);
1176
1177 let actual_volume_snapshot: VolumeSnapshot =
1179 serde_json::from_value(actual_patch["spec"]["backup"]["volumeSnapshot"].clone())
1180 .expect("Failed to deserialize actual_patch into VolumeSnapshot");
1181
1182 assert_eq!(actual_volume_snapshot, expected_volume_snapshot);
1183 }
1184
1185 #[test]
1186 fn test_create_volume_snapshot_patch_disabled() {
1187 let cfg = Config {
1188 enable_volume_snapshot: false,
1189 ..Config::default()
1190 };
1191
1192 let expected_volume_snapshot = VolumeSnapshot {
1193 enabled: false,
1194 snapshot_class: None,
1195 };
1196
1197 let actual_patch = create_volume_snapshot_patch(&cfg);
1198
1199 let actual_volume_snapshot: VolumeSnapshot =
1201 serde_json::from_value(actual_patch["spec"]["backup"]["volumeSnapshot"].clone())
1202 .expect("Failed to deserialize actual_patch into VolumeSnapshot");
1203
1204 assert_eq!(actual_volume_snapshot, expected_volume_snapshot);
1205 }
1206
1207 #[test]
1208 fn test_is_volume_snapshot_update_needed() {
1209 let volume_snapshot_enabled = Some(VolumeSnapshot {
1210 enabled: true,
1211 snapshot_class: Some(VOLUME_SNAPSHOT_CLASS_NAME.to_string()),
1212 });
1213 let volume_snapshot_disabled = Some(VolumeSnapshot {
1214 enabled: false,
1215 snapshot_class: None,
1216 });
1217
1218 assert!(!is_volume_snapshot_update_needed(
1220 volume_snapshot_enabled.as_ref(),
1221 true
1222 ));
1223 assert!(!is_volume_snapshot_update_needed(
1224 volume_snapshot_disabled.as_ref(),
1225 false
1226 ));
1227 assert!(!is_volume_snapshot_update_needed(None, false));
1228
1229 assert!(is_volume_snapshot_update_needed(
1231 volume_snapshot_enabled.as_ref(),
1232 false
1233 ));
1234 assert!(is_volume_snapshot_update_needed(
1235 volume_snapshot_disabled.as_ref(),
1236 true
1237 ));
1238 assert!(is_volume_snapshot_update_needed(None, true));
1239 }
1240
1241 use crate::{error_policy, Error};
1243 use bytes::Bytes;
1244 use futures::pin_mut;
1245 use http::{Request, Response, StatusCode};
1246 use http_body_util::Full;
1247 use k8s_openapi::api::core::v1::Pod;
1248 use kube::runtime::events::Recorder;
1249 use kube::{api::Api, client::Body, Client};
1250 use serde_json::json;
1251 use tower_test::mock;
1252
1253 #[tokio::test]
1254 async fn test_error_policy_429() {
1255 let coredb = CoreDB::test();
1257
1258 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Full<Bytes>>>();
1260 let client = Client::new(mock_service, "default".to_string());
1261 let ctx = Arc::new(Context {
1262 client: client.clone(),
1263 metrics: Default::default(),
1264 diagnostics: Default::default(),
1265 recorder: Recorder::new(client.clone(), "tembo-controller".into()),
1266 });
1267
1268 let spawned = tokio::spawn(async move {
1270 pin_mut!(handle);
1271 if let Some((_request, send)) = handle.next_request().await {
1272 send.send_response(
1274 Response::builder()
1275 .status(StatusCode::TOO_MANY_REQUESTS)
1276 .body(Full::new(Bytes::from(
1277 json!({
1278 "kind": "Status",
1279 "apiVersion": "v1",
1280 "metadata": {},
1281 "status": "Failure",
1282 "message": "Too Many Requests",
1283 "reason": "TooManyRequests",
1284 "code": 429
1285 })
1286 .to_string(),
1287 )))
1288 .unwrap(),
1289 );
1290 }
1291 });
1292
1293 let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
1295 let err = pod_api.get("test-pod").await.err().unwrap();
1296
1297 let custom_error = Error::from(err);
1299
1300 let action = error_policy(Arc::new(coredb), &custom_error, ctx);
1302 let action_str = format!("{:?}", action);
1303
1304 println!("Action: {:?}", action);
1305
1306 let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
1308 if let Some(captures) = re.captures(&action_str) {
1309 let duration_secs = captures[1].parse::<u64>().unwrap();
1310 assert!((60..=180).contains(&duration_secs));
1311 } else {
1312 panic!("Unexpected action format: {}", action_str);
1313 }
1314
1315 spawned.await.unwrap();
1316 }
1317
1318 #[tokio::test]
1319 async fn test_error_policy_non_429() {
1320 let coredb = CoreDB::test();
1322
1323 let (mock_service, handle) = mock::pair::<Request<Body>, Response<Full<Bytes>>>();
1325 let client = Client::new(mock_service, "default".to_string());
1326 let ctx = Arc::new(Context {
1327 client: client.clone(),
1328 metrics: Default::default(),
1329 diagnostics: Default::default(),
1330 recorder: Recorder::new(client.clone(), "tembo-controller".into()),
1331 });
1332
1333 let spawned = tokio::spawn(async move {
1335 pin_mut!(handle);
1336 if let Some((_request, send)) = handle.next_request().await {
1337 send.send_response(
1338 Response::builder()
1339 .status(StatusCode::TOO_MANY_REQUESTS)
1340 .body(Full::new(Bytes::from(
1341 json!({
1342 "kind": "Status",
1343 "apiVersion": "v1",
1344 "metadata": {},
1345 "status": "Failure",
1346 "message": "Not Found",
1347 "reason": "Not Found",
1348 "code": 404
1349 })
1350 .to_string(),
1351 )))
1352 .unwrap(),
1353 );
1354 }
1355 });
1356
1357 let pod_api: Api<Pod> = Api::namespaced(ctx.client.clone(), "default");
1359 let err = pod_api.get("test-pod").await.err().unwrap();
1360
1361 let custom_error = Error::from(err);
1363
1364 let action = error_policy(Arc::new(coredb), &custom_error, ctx);
1366 let action_str = format!("{:?}", action);
1367
1368 println!("Action: {:?}", action);
1369
1370 let re = regex::Regex::new(r"requeue_after: Some\((\d+)s\)").unwrap();
1372 if let Some(captures) = re.captures(&action_str) {
1373 let duration_secs = captures[1].parse::<u64>().unwrap();
1374 assert_eq!(duration_secs, 300);
1375 } else {
1376 panic!("Unexpected action format: {}", action_str);
1377 }
1378
1379 spawned.await.unwrap();
1380 }
1381}