use crate::{telemetry, Error, Metrics, Result};
use chrono::{DateTime, Utc};
use futures::{
future::{BoxFuture, FutureExt},
stream::StreamExt,
};
use crate::{
defaults,
psql::{PsqlCommand, PsqlOutput},
service::reconcile_svc,
statefulset::{reconcile_sts, stateful_set_from_cdb},
};
use kube::{
api::{Api, ListParams, Patch, PatchParams, ResourceExt},
client::Client,
runtime::{
controller::{Action, Controller},
events::{Event, EventType, Recorder, Reporter},
finalizer::{finalizer, Event as Finalizer},
},
CustomResource, Resource,
};
use k8s_openapi::api::core::v1::Pod;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::json;
use std::sync::Arc;
use tokio::{sync::RwLock, time::Duration};
use tracing::*;
pub static COREDB_FINALIZER: &str = "coredbs.coredb.io";
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[cfg_attr(test, derive(Default))]
#[kube(kind = "CoreDB", group = "coredb.io", version = "v1alpha1", namespaced)]
#[kube(status = "CoreDBStatus", shortname = "cdb")]
pub struct CoreDBSpec {
#[serde(default = "defaults::default_replicas")]
pub replicas: i32,
#[serde(default = "defaults::default_image")]
pub image: String,
#[serde(default = "defaults::default_port")]
pub port: i32,
#[serde(default = "defaults::default_uid")]
pub uid: i32,
}
#[derive(Deserialize, Serialize, Clone, Default, Debug, JsonSchema)]
pub struct CoreDBStatus {
pub running: bool,
}
#[derive(Clone)]
pub struct Context {
pub client: Client,
pub diagnostics: Arc<RwLock<Diagnostics>>,
pub metrics: Metrics,
}
#[instrument(skip(ctx, cdb), fields(trace_id))]
async fn reconcile(cdb: Arc<CoreDB>, ctx: Arc<Context>) -> Result<Action> {
let trace_id = telemetry::get_trace_id();
Span::current().record("trace_id", &field::display(&trace_id));
let _timer = ctx.metrics.count_and_measure();
ctx.diagnostics.write().await.last_event = Utc::now();
let ns = cdb.namespace().unwrap(); let coredbs: Api<CoreDB> = Api::namespaced(ctx.client.clone(), &ns);
info!("Reconciling CoreDB \"{}\" in {}", cdb.name_any(), ns);
finalizer(&coredbs, COREDB_FINALIZER, cdb, |event| async {
match event {
Finalizer::Apply(cdb) => cdb.reconcile(ctx.clone()).await,
Finalizer::Cleanup(cdb) => cdb.cleanup(ctx.clone()).await,
}
})
.await
.map_err(|e| Error::FinalizerError(Box::new(e)))
}
fn error_policy(cdb: Arc<CoreDB>, error: &Error, ctx: Arc<Context>) -> Action {
warn!("reconcile failed: {:?}", error);
ctx.metrics.reconcile_failure(&cdb, error);
Action::requeue(Duration::from_secs(5 * 60))
}
impl CoreDB {
async fn reconcile(&self, ctx: Arc<Context>) -> Result<Action> {
let client = ctx.client.clone();
let _recorder = ctx.diagnostics.read().await.recorder(client.clone(), self);
let ns = self.namespace().unwrap();
let name = self.name_any();
let coredbs: Api<CoreDB> = Api::namespaced(client, &ns);
let new_status = Patch::Apply(json!({
"apiVersion": "coredb.io/v1alpha1",
"kind": "CoreDB",
"status": CoreDBStatus {
running: true,
}
}));
let ps = PatchParams::apply("cntrlr").force();
let _o = coredbs
.patch_status(&name, &ps, &new_status)
.await
.map_err(Error::KubeError)?;
reconcile_sts(self, ctx.clone())
.await
.expect("error reconciling statefulset");
reconcile_svc(self, ctx.clone())
.await
.expect("error reconciling service");
Ok(Action::requeue(Duration::from_secs(60)))
}
async fn cleanup(&self, ctx: Arc<Context>) -> Result<Action> {
let recorder = ctx.diagnostics.read().await.recorder(ctx.client.clone(), self);
recorder
.publish(Event {
type_: EventType::Normal,
reason: "DeleteCoreDB".into(),
note: Some(format!("Delete `{}`", self.name_any())),
action: "Reconciling".into(),
secondary: None,
})
.await
.map_err(Error::KubeError)?;
Ok(Action::await_change())
}
async fn primary_pod(&self, client: Client) -> Result<Pod> {
let sts = stateful_set_from_cdb(self);
let sts_name = sts.metadata.name.unwrap();
let sts_namespace = sts.metadata.namespace.unwrap();
let label_selector = format!("statefulset={}", sts_name);
let list_params = ListParams::default().labels(&label_selector);
let pods: Api<Pod> = Api::namespaced(client, &sts_namespace);
let pods = pods.list(&list_params);
let primary = pods.await.unwrap().items[0].clone();
return Ok(primary);
}
pub async fn psql(
&self,
command: String,
database: String,
client: Client,
) -> Result<PsqlOutput, kube::Error> {
let pod_name = self
.primary_pod(client.clone())
.await
.unwrap()
.metadata
.name
.unwrap();
return PsqlCommand::new(
pod_name,
self.metadata.namespace.clone().unwrap(),
command,
database,
client,
)
.execute()
.await;
}
}
#[derive(Clone, Serialize)]
pub struct Diagnostics {
#[serde(deserialize_with = "from_ts")]
pub last_event: DateTime<Utc>,
#[serde(skip)]
pub reporter: Reporter,
}
impl Default for Diagnostics {
fn default() -> Self {
Self {
last_event: Utc::now(),
reporter: "coredb-controller".into(),
}
}
}
impl Diagnostics {
fn recorder(&self, client: Client, cdb: &CoreDB) -> Recorder {
Recorder::new(client, self.reporter.clone(), cdb.object_ref(&()))
}
}
#[derive(Clone, Default)]
pub struct State {
diagnostics: Arc<RwLock<Diagnostics>>,
registry: prometheus::Registry,
}
impl State {
pub fn metrics(&self) -> Vec<prometheus::proto::MetricFamily> {
self.registry.gather()
}
pub async fn diagnostics(&self) -> Diagnostics {
self.diagnostics.read().await.clone()
}
pub fn create_context(&self, client: Client) -> Arc<Context> {
Arc::new(Context {
client,
metrics: Metrics::default().register(&self.registry).unwrap(),
diagnostics: self.diagnostics.clone(),
})
}
}
pub async fn init(client: Client) -> (BoxFuture<'static, ()>, State) {
let state = State::default();
let cdb = Api::<CoreDB>::all(client.clone());
if let Err(e) = cdb.list(&ListParams::default().limit(1)).await {
error!("CRD is not queryable; {e:?}. Is the CRD installed?");
info!("Installation: cargo run --bin crdgen | kubectl apply -f -");
std::process::exit(1);
}
let controller = Controller::new(cdb, ListParams::default())
.run(reconcile, error_policy, state.create_context(client))
.filter_map(|x| async move { std::result::Result::ok(x) })
.for_each(|_| futures::future::ready(()))
.boxed();
(controller, state)
}
#[cfg(test)]
mod test {
use super::{reconcile, Context, CoreDB};
use std::sync::Arc;
#[tokio::test]
async fn new_coredbs_without_finalizers_gets_a_finalizer() {
let (testctx, fakeserver, _) = Context::test();
let coredb = CoreDB::test();
fakeserver.handle_finalizer_creation(&coredb);
let res = reconcile(Arc::new(coredb), testctx).await;
assert!(res.is_ok(), "initial creation succeeds in adding finalizer");
}
#[tokio::test]
async fn test_patches_coredb() {
let (testctx, fakeserver, _) = Context::test();
let coredb = CoreDB::test().finalized();
fakeserver.handle_coredb_patch(&coredb);
let res = reconcile(Arc::new(coredb), testctx).await;
assert!(res.is_ok(), "finalized coredb succeeds in its reconciler");
}
}