1use crate::{telemetry, Error, Metrics, Result};
2use chrono::{DateTime, Utc};
3use futures::{
4 future::{BoxFuture, FutureExt},
5 stream::StreamExt,
6};
7
8use crate::{
9 config::Config,
10 cronjob::reconcile_cronjob,
11 exec::{ExecCommand, ExecOutput},
12 psql::{PsqlCommand, PsqlOutput},
13 rbac::reconcile_rbac,
14 service::reconcile_svc,
15 statefulset::{reconcile_sts, stateful_set_from_cdb},
16};
17use kube::{
18 api::{Api, ListParams, Patch, PatchParams, ResourceExt},
19 client::Client,
20 runtime::{
21 controller::{Action, Controller},
22 events::{Event, EventType, Recorder, Reporter},
23 finalizer::{finalizer, Event as Finalizer},
24 },
25 Resource,
26};
27
28use crate::{
29 apis::coredb_types::{CoreDB, CoreDBStatus},
30 extensions::{reconcile_extensions, Extension},
31 postgres_exporter::{create_postgres_exporter_role, reconcile_prom_configmap},
32 secret::reconcile_secret,
33};
34use k8s_openapi::api::core::v1::{Namespace, Pod};
35use kube::runtime::wait::Condition;
36use serde::Serialize;
37use serde_json::json;
38use std::sync::Arc;
39use tokio::{sync::RwLock, time::Duration};
40use tracing::*;
41
42pub static COREDB_FINALIZER: &str = "coredbs.coredb.io";
43pub static COREDB_ANNOTATION: &str = "coredbs.coredb.io/watch";
44
45#[derive(Clone)]
47pub struct Context {
48 pub client: Client,
50 pub diagnostics: Arc<RwLock<Diagnostics>>,
52 pub metrics: Metrics,
54}
55
56#[instrument(skip(ctx, cdb), fields(trace_id))]
57async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
58 let cfg = Config::default();
59 let trace_id = telemetry::get_trace_id();
60 Span::current().record("trace_id", &field::display(&trace_id));
61 let _timer = ctx.metrics.count_and_measure();
62 ctx.diagnostics.write().await.last_event = Utc::now();
63 let ns = cdb.namespace().unwrap(); let coredbs: Api<CoreDB> = Api::namespaced(ctx.client.clone(), &ns);
65 let metadata = cdb.meta().clone();
67 let annotations = metadata.annotations.clone().unwrap_or_default();
69
70 if let Some(value) = annotations.get(COREDB_ANNOTATION) {
72 if value == "false" {
74 info!(
75 "Skipping reconciliation for CoreDB \"{}\" in {}",
76 cdb.name_any(),
77 ns
78 );
79 return Ok(Action::await_change());
80 }
81 }
82
83 info!("Reconciling CoreDB \"{}\" in {}", cdb.name_any(), ns);
84 finalizer(&coredbs, COREDB_FINALIZER, cdb, |event| async {
85 match event {
86 Finalizer::Apply(cdb) => match cdb.reconcile(ctx.clone(), &cfg).await {
87 Ok(action) => Ok(action),
88 Err(requeue_action) => Ok(requeue_action),
89 },
90 Finalizer::Cleanup(cdb) => cdb.cleanup(ctx.clone()).await,
91 }
92 })
93 .await
94 .map_err(|e| Error::FinalizerError(Box::new(e)))
95}
96
97fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
98 warn!("reconcile failed: {:?}", error);
99 ctx.metrics.reconcile_failure(&cdb, error);
100 Action::requeue(Duration::from_secs(5 * 60))
101}
102
103impl CoreDB {
104 async fn reconcile(&self, ctx: Arc<Context>, cfg: &Config) -> Result<Action, Action> {
106 let client = ctx.client.clone();
107 let _recorder = ctx.diagnostics.read().await.recorder(client.clone(), self);
108 let ns = self.namespace().unwrap();
109 let name = self.name_any();
110 let coredbs: Api<CoreDB> = Api::namespaced(client.clone(), &ns);
111
112 if self.spec.postgresExporterEnabled {
114 reconcile_prom_configmap(self, client.clone(), &ns)
115 .await
116 .map_err(|e| {
117 error!("Error reconciling prometheus configmap: {:?}", e);
118 Action::requeue(Duration::from_secs(10))
119 })?;
120 }
121
122 reconcile_rbac(self, ctx.clone()).await.map_err(|e| {
124 error!("Error reconciling service account: {:?}", e);
125 Action::requeue(Duration::from_secs(300))
126 })?;
127
128 reconcile_secret(self, ctx.clone()).await.map_err(|e| {
130 error!("Error reconciling secret: {:?}", e);
131 Action::requeue(Duration::from_secs(10))
132 })?;
133
134 reconcile_cronjob(self, ctx.clone()).await.map_err(|e| {
136 error!("Error reconciling cronjob: {:?}", e);
137 Action::requeue(Duration::from_secs(10))
138 })?;
139
140 reconcile_sts(self, ctx.clone()).await.map_err(|e| {
142 error!("Error reconciling statefulset: {:?}", e);
143 Action::requeue(Duration::from_secs(10))
144 })?;
145
146 reconcile_svc(self, ctx.clone()).await.map_err(|e| {
148 error!("Error reconciling service: {:?}", e);
149 Action::requeue(Duration::from_secs(10))
150 })?;
151
152 let new_status = match self.spec.stop {
153 false => {
154 let primary_pod = self.primary_pod(ctx.client.clone()).await;
155 if primary_pod.is_err() {
156 debug!("Did not find primary pod");
157 return Ok(Action::requeue(Duration::from_secs(1)));
158 }
159 let primary_pod = primary_pod.unwrap();
160
161 if !is_postgres_ready().matches_object(Some(&primary_pod)) {
162 debug!("Postgres is not ready");
163 return Ok(Action::requeue(Duration::from_secs(1)));
164 }
165 create_postgres_exporter_role(self, ctx.clone())
167 .await
168 .map_err(|e| {
169 error!(
170 "Error creating postgres_exporter on CoreDB {}, {}",
171 self.metadata.name.clone().unwrap(),
172 e
173 );
174 Action::requeue(Duration::from_secs(5))
175 })?;
176
177 if !is_pod_ready().matches_object(Some(&primary_pod)) {
178 debug!("Did not find primary pod");
179 return Ok(Action::requeue(Duration::from_secs(1)));
180 }
181
182 let extensions: Vec<Extension> = reconcile_extensions(self, ctx.clone(), &coredbs, &name)
183 .await
184 .map_err(|e| {
185 error!("Error reconciling extensions: {:?}", e);
186 Action::requeue(Duration::from_secs(10))
187 })?;
188
189 if cfg.enable_initial_backup {
192 let backup_command = vec![
193 "/bin/sh".to_string(),
194 "-c".to_string(),
195 "/usr/bin/wal-g backup-push /var/lib/postgresql/data --full --verify".to_string(),
196 ];
197
198 let _backup_result = self
199 .exec(primary_pod.name_any(), client, &backup_command)
200 .await
201 .map_err(|e| {
202 error!("Error running backup: {:?}", e);
203 Action::requeue(Duration::from_secs(10))
204 })?;
205 }
206
207 CoreDBStatus {
208 running: true,
209 extensionsUpdating: false,
210 storage: self.spec.storage.clone(),
211 sharedirStorage: self.spec.sharedirStorage.clone(),
212 pkglibdirStorage: self.spec.pkglibdirStorage.clone(),
213 extensions: Some(extensions),
214 }
215 }
216 true => CoreDBStatus {
217 running: false,
218 extensionsUpdating: false,
219 storage: self.spec.storage.clone(),
220 sharedirStorage: self.spec.sharedirStorage.clone(),
221 pkglibdirStorage: self.spec.pkglibdirStorage.clone(),
222 extensions: self.status.clone().and_then(|f| f.extensions),
223 },
224 };
225
226 let patch_status = json!({
227 "apiVersion": "coredb.io/v1alpha1",
228 "kind": "CoreDB",
229 "status": new_status
230 });
231
232 patch_cdb_status_force(&coredbs, &name, patch_status).await?;
233
234 Ok(Action::requeue(Duration::from_secs(60)))
236 }
237
238 async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
240 let ns_api: Api<Namespace> = Api::all(ctx.client.clone());
244 let ns_status = ns_api
245 .get_status(self.metadata.namespace.as_ref().unwrap())
246 .await
247 .map_err(Error::KubeError);
248 let phase = ns_status.unwrap().status.unwrap().phase;
249 if phase == Some("Terminating".to_string()) {
250 return Ok(Action::await_change());
251 }
252 let recorder = ctx.diagnostics.read().await.recorder(ctx.client.clone(), self);
253 recorder
255 .publish(Event {
256 type_: EventType::Normal,
257 reason: "DeleteCoreDB".into(),
258 note: Some(format!("Delete `{}`", self.name_any())),
259 action: "Reconciling".into(),
260 secondary: None,
261 })
262 .await
263 .map_err(Error::KubeError)?;
264 Ok(Action::await_change())
265 }
266
267 pub async fn primary_pod(&self, client: Client) -> Result<Pod, Error> {
268 let sts = stateful_set_from_cdb(self);
269 let sts_name = sts.metadata.name.unwrap();
270 let sts_namespace = sts.metadata.namespace.unwrap();
271 let label_selector = format!("statefulset={sts_name}");
272 let list_params = ListParams::default().labels(&label_selector);
273 let pods: Api<Pod> = Api::namespaced(client, &sts_namespace);
274 let pods = pods.list(&list_params);
275 let pod_list = pods.await.map_err(Error::KubeError)?;
277 if pod_list.items.is_empty() {
279 return Err(Error::KubeError(kube::Error::Api(kube::error::ErrorResponse {
280 status: "404".to_string(),
281 message: "No pods found".to_string(),
282 reason: "Not Found".to_string(),
283 code: 404,
284 })));
285 }
286 let primary = pod_list.items[0].clone();
287 Ok(primary)
288 }
289
290 pub async fn psql(
291 &self,
292 command: String,
293 database: String,
294 client: Client,
295 ) -> Result<PsqlOutput, kube::Error> {
296 let pod_name = self
297 .primary_pod(client.clone())
298 .await
299 .unwrap()
300 .metadata
301 .name
302 .unwrap();
303
304 PsqlCommand::new(
305 pod_name,
306 self.metadata.namespace.clone().unwrap(),
307 command,
308 database,
309 client,
310 )
311 .execute()
312 .await
313 }
314
315 pub async fn exec(
316 &self,
317 pod_name: String,
318 client: Client,
319 command: &[String],
320 ) -> Result<ExecOutput, Error> {
321 ExecCommand::new(pod_name, self.metadata.namespace.clone().unwrap(), client)
322 .execute(command)
323 .await
324 }
325}
326
327pub fn is_pod_ready() -> impl Condition<Pod> + 'static {
328 move |obj: Option<&Pod>| {
329 if let Some(pod) = &obj {
330 if let Some(status) = &pod.status {
331 if let Some(conds) = &status.conditions {
332 if let Some(pcond) = conds.iter().find(|c| c.type_ == "ContainersReady") {
333 return pcond.status == "True";
334 }
335 }
336 }
337 }
338 false
339 }
340}
341
342pub fn is_postgres_ready() -> impl Condition<Pod> + 'static {
343 move |obj: Option<&Pod>| {
344 if let Some(pod) = &obj {
345 if let Some(status) = &pod.status {
346 if let Some(container_statuses) = &status.container_statuses {
347 for container in container_statuses {
348 if container.name == "postgres" {
349 return container.ready;
350 }
351 }
352 }
353 }
354 }
355 false
356 }
357}
358
359pub async fn patch_cdb_status_force(
360 cdb: &Api<CoreDB>,
361 name: &str,
362 patch: serde_json::Value,
363) -> Result<(), Action> {
364 let ps = PatchParams::apply("cntrlr").force();
365 let patch_status = Patch::Apply(patch);
366 let _o = cdb.patch_status(name, &ps, &patch_status).await.map_err(|e| {
367 error!("Error updating CoreDB status: {:?}", e);
368 Action::requeue(Duration::from_secs(10))
369 })?;
370 Ok(())
371}
372
373pub async fn patch_cdb_status_merge(
374 cdb: &Api<CoreDB>,
375 name: &str,
376 patch: serde_json::Value,
377) -> Result<(), Action> {
378 let pp = PatchParams::default();
379 let patch_status = Patch::Merge(patch);
380 let _o = cdb.patch_status(name, &pp, &patch_status).await.map_err(|e| {
381 error!("Error updating CoreDB status: {:?}", e);
382 Action::requeue(Duration::from_secs(10))
383 })?;
384 Ok(())
385}
386
387#[derive(Clone, Serialize)]
389pub struct Diagnostics {
390 #[serde(deserialize_with = "from_ts")]
391 pub last_event: DateTime<Utc>,
392 #[serde(skip)]
393 pub reporter: Reporter,
394}
395impl Default for Diagnostics {
396 fn default() -> Self {
397 Self {
398 last_event: Utc::now(),
399 reporter: "coredb-controller".into(),
400 }
401 }
402}
403impl Diagnostics {
404 fn recorder(&self, client: Client, cdb: &CoreDB) -> Recorder {
405 Recorder::new(client, self.reporter.clone(), cdb.object_ref(&()))
406 }
407}
408
409#[derive(Clone, Default)]
411pub struct State {
412 diagnostics: Arc<RwLock<Diagnostics>>,
414 registry: prometheus::Registry,
416}
417
418impl State {
420 pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
422 self.registry.gather()
423 }
424
425 pub async fn diagnostics(&self) -> Diagnostics {
427 self.diagnostics.read().await.clone()
428 }
429
430 pub fn create_context(&self, client: Client) -> Arc<Context> {
432 Arc::new(Context {
433 client,
434 metrics: Metrics::default().register(&self.registry).unwrap(),
435 diagnostics: self.diagnostics.clone(),
436 })
437 }
438}
439
440pub async fn init(client: Client) -> (BoxFuture<'static, ()>, State) {
442 let state = State::default();
443 let cdb = Api::<CoreDB>::all(client.clone());
444 if let Err(e) = cdb.list(&ListParams::default().limit(1)).await {
445 error!("CRD is not queryable; {e:?}. Is the CRD installed?");
446 info!("Installation: cargo run --bin crdgen | kubectl apply -f -");
447 std::process::exit(1);
448 }
449 let controller = Controller::new(cdb, ListParams::default())
450 .shutdown_on_signal()
451 .run(reconcile, error_policy, state.create_context(client))
452 .filter_map(|x| async move { Result::ok(x) })
453 .for_each(|_| futures::future::ready(()))
454 .boxed();
455 (controller, state)
456}
457
458#[cfg(test)]
460mod test {
461 use super::{reconcile, Context, CoreDB};
462 use std::sync::Arc;
463
464 #[tokio::test]
465 async fn new_coredbs_without_finalizers_gets_a_finalizer() {
466 let (testctx, fakeserver, _) = Context::test();
467 let coredb = CoreDB::test();
468 fakeserver.handle_finalizer_creation(&coredb);
470 let res = reconcile(Arc::new(coredb), testctx).await;
471 assert!(res.is_ok(), "initial creation succeeds in adding finalizer");
472 }
473
474 #[tokio::test]
475 async fn test_patches_coredb() {
476 let (testctx, fakeserver, _) = Context::test();
477 let coredb = CoreDB::test().finalized();
478 fakeserver.handle_coredb_patch(&coredb);
479 let res = reconcile(Arc::new(coredb), testctx).await;
480 assert!(res.is_ok(), "finalized coredb succeeds in its reconciler");
481 }
482}