use std::env;
use std::ops::Deref;
use std::sync::Arc;
use anyhow::{
anyhow,
bail,
};
use k8s_openapi::api::admissionregistration::v1 as admissionv1;
use k8s_openapi::api::batch::v1 as batchv1;
use kube::api::{
ListParams,
Patch,
};
use kube::runtime::controller::Action;
use serde_json::json;
use sk_api::prometheus::*;
use sk_api::v1::{
Simulation,
SimulationRoot,
SimulationState,
};
use sk_core::constants::*;
use sk_core::errors::*;
use sk_core::hooks;
use sk_core::k8s::{
LeaseState,
build_simulation_root,
is_terminal,
metrics_ns,
try_claim_lease,
};
use sk_core::prelude::*;
use tokio::runtime::Handle;
use tokio::task::block_in_place;
use tokio::time::Duration;
use tracing::*;
use crate::errors::*;
use crate::objects::*;
use crate::{
GlobalContext,
cert_manager,
};
pub const REQUEUE_DURATION: Duration = Duration::from_secs(RETRY_DELAY_SECONDS);
pub const REQUEUE_ERROR_DURATION: Duration = Duration::from_secs(ERROR_RETRY_DELAY_SECONDS);
pub const JOB_STATUS_CONDITION_COMPLETE: &str = "Complete";
pub const JOB_STATUS_CONDITION_FAILED: &str = "Failed";
type SimulationStatusPatch = serde_json::Value;
#[derive(Clone)]
pub struct ReconcileContext {
pub client: kube::Client,
pub recorder: SkEventRecorder,
pub name: String,
pub metaroot_name: String,
pub driver_name: String,
pub driver_svc: String,
pub prometheus_name: String,
pub webhook_name: String,
}
impl ReconcileContext {
pub fn new(sim: &Simulation, client: kube::Client, recorder: SkEventRecorder) -> ReconcileContext {
let name = sim.name_any();
ReconcileContext {
client,
recorder,
name: name.clone(),
metaroot_name: format!("sk-{}-metaroot", name),
driver_name: format!("sk-{}-driver", name),
driver_svc: format!("sk-{}-driver-svc", name),
prometheus_name: format!("sk-{}-prom", name),
webhook_name: format!("sk-{}-mutatepods", name),
}
}
}
async fn setup_sim_metaroot(ctx: &ReconcileContext, sim: &Simulation) -> anyhow::Result<SimulationRoot> {
let roots_api = kube::Api::<SimulationRoot>::all(ctx.client.clone());
match roots_api.get_opt(&ctx.metaroot_name).await? {
None => {
info!("creating Simulation MetaRoot");
let metaroot = build_simulation_root(&ctx.metaroot_name, sim);
roots_api.create(&Default::default(), &metaroot).await.map_err(|e| e.into())
},
Some(metaroot) => Ok(metaroot),
}
}
type DriverState = (SimulationState, SimulationStatusPatch, u64);
pub(crate) async fn fetch_driver_state(
ctx: &ReconcileContext,
sim: &Simulation,
metaroot: &SimulationRoot,
ctrl_ns: &str,
) -> anyhow::Result<DriverState> {
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
let (mut state, mut start_time, mut end_time, mut completed, mut blocked_duration) =
(SimulationState::Initializing, None, None, None, 0);
if let Some(driver) = jobs_api.get_opt(&ctx.driver_name).await? {
state = SimulationState::Running;
if let Some(status) = driver.status {
completed = status.succeeded;
start_time = status.start_time.map(|t| t.0);
if let Some(cond) =
status.conditions.unwrap_or_default().iter().find(|cond| {
cond.type_ == JOB_STATUS_CONDITION_COMPLETE || cond.type_ == JOB_STATUS_CONDITION_FAILED
})
{
end_time = cond.last_transition_time.as_ref().map(|t| t.0);
state = if cond.type_ == JOB_STATUS_CONDITION_COMPLETE {
SimulationState::Finished
} else {
SimulationState::Failed
};
}
}
}
if !is_terminal(&state) {
if state != SimulationState::Initializing && sim.spec.paused_time.is_some() {
state = SimulationState::Paused;
}
match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns).await? {
LeaseState::Claimed => (),
LeaseState::WaitingForClaim(t) => {
state = SimulationState::Blocked;
blocked_duration = t;
},
LeaseState::Unknown => bail!("unknown lease state"),
}
}
let patch = json!({
"status": {
"observedGeneration": sim.metadata.generation.unwrap_or(1),
"startTime": start_time,
"endTime": end_time,
"completedRuns": completed,
"state": state,
}});
Ok((state, patch, blocked_duration))
}
pub async fn setup_simulation(
ctx: &ReconcileContext,
sim: &Simulation,
metaroot: &SimulationRoot,
ctrl_ns: &str,
) -> anyhow::Result<Action> {
info!("setting up simulation");
debug!("simulation object: {sim:#?}");
if sim.status.is_none() {
ctx.recorder.send_sim_started_event(&sim.name_any()).await?;
}
hooks::execute(sim, hooks::Type::PreStart, &ctx.recorder).await?;
let ns_api = kube::Api::<corev1::Namespace>::all(ctx.client.clone());
let metrics_ns = metrics_ns(sim);
if ns_api.get_opt(&metrics_ns).await?.is_none() {
bail!(SkControllerError::namespace_not_found(&metrics_ns));
};
if ns_api.get_opt(&sim.spec.driver.namespace).await?.is_none() {
info!("creating driver namespace {}", sim.spec.driver.namespace);
let obj = build_driver_namespace(ctx, sim);
ns_api.create(&Default::default(), &obj).await?;
};
let mut prom_ready = false;
match &sim.spec.metrics {
Some(mc) => {
let prom_api = kube::Api::<Prometheus>::namespaced(ctx.client.clone(), &metrics_ns);
match prom_api.get_opt(&ctx.prometheus_name).await? {
None => {
info!("creating Prometheus object {}/{}", metrics_ns, ctx.prometheus_name);
let obj = build_prometheus(&ctx.prometheus_name, sim, metaroot, mc);
prom_api.create(&Default::default(), &obj).await?;
},
Some(prom) => {
if let Some(PrometheusStatus { available_replicas: reps, .. }) = prom.status {
prom_ready = reps > 0;
}
},
}
},
_ => prom_ready = true,
}
if !prom_ready {
info!("waiting for prometheus to be ready");
ctx.recorder.send_waiting_for_metrics_event().await?;
return Ok(Action::requeue(REQUEUE_DURATION));
}
let driver_svc_api = kube::Api::<corev1::Service>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
if driver_svc_api.get_opt(&ctx.driver_svc).await?.is_none() {
info!("creating driver service {}", &ctx.driver_svc);
let obj = build_driver_service(ctx, sim, metaroot);
driver_svc_api.create(&Default::default(), &obj).await?;
}
cert_manager::create_certificate_if_not_present(ctx, sim, metaroot).await?;
let secrets_api = kube::Api::<corev1::Secret>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
let secrets = secrets_api
.list(&ListParams {
label_selector: Some(format!("{SIMULATION_LABEL_KEY}={}", ctx.name)),
..Default::default()
})
.await?;
let driver_cert_secret_name = match secrets.items.len() {
0 => {
info!("waiting for secret to be created");
ctx.recorder.send_waiting_for_secret_event().await?;
return Ok(Action::requeue(REQUEUE_DURATION));
},
x if x > 1 => bail!("found multiple secrets for experiment"),
_ => secrets.items[0].name_any(),
};
let webhook_api = kube::Api::<admissionv1::MutatingWebhookConfiguration>::all(ctx.client.clone());
let mwc_opt = webhook_api.get_opt(&ctx.webhook_name).await?;
if mwc_opt.is_none() {
info!("creating mutating webhook configuration {}", ctx.webhook_name);
let obj = build_mutating_webhook(ctx, sim, metaroot);
webhook_api.create(&Default::default(), &obj).await?;
return Ok(Action::requeue(REQUEUE_DURATION));
}
if let Some(mwc) = &mwc_opt
&& let Some(webhooks) = &mwc.webhooks
&& let Some(webhook) = &webhooks.first()
&& webhook.client_config.ca_bundle.as_ref().is_none_or(|b| b.0.is_empty())
{
info!(
"MutatingWebhookConfiguration {} exists but caBundle not yet populated, requeuing.",
ctx.webhook_name
);
ctx.recorder.send_waiting_for_webhook_cert_event().await?;
return Ok(Action::requeue(REQUEUE_DURATION));
}
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
if jobs_api.get_opt(&ctx.driver_name).await?.is_none() {
info!("creating simulation driver {}", ctx.driver_name);
let obj = build_driver_job(ctx, sim, &driver_cert_secret_name, ctrl_ns)?;
let job = jobs_api.create(&Default::default(), &obj).await?;
ctx.recorder.send_driver_created_event(&job).await?;
}
Ok(Action::await_change())
}
pub async fn cleanup_simulation(ctx: &ReconcileContext, sim: &Simulation) {
let roots_api: kube::Api<SimulationRoot> = kube::Api::all(ctx.client.clone());
info!("cleaning up simulation {}", ctx.name);
if let Err(e) = roots_api.delete(&ctx.metaroot_name, &Default::default()).await {
error!("Error cleaning up simulation: {e:?}");
let _ = ctx .recorder
.send_sim_cleanup_failed_event()
.await;
}
if let Err(e) = hooks::execute(sim, hooks::Type::PostStop, &ctx.recorder).await {
error!("Error running PostStop hooks: {e:?}");
}
}
#[instrument(parent=None, skip_all, fields(simulation=sim.name_any()))]
pub async fn reconcile(sim: Arc<Simulation>, global_ctx: Arc<GlobalContext>) -> Result<Action, AnyhowError> {
let sim = sim.deref();
let ctx = ReconcileContext::new(sim, global_ctx.client.clone(), global_ctx.recorder.with_sim(sim));
let ctrl_ns = env::var(CTRL_NS_ENV_VAR).map_err(|e| anyhow!(e))?;
let metaroot = setup_sim_metaroot(&ctx, sim).await?;
let (simulation_state, status_patch, blocked_duration) = fetch_driver_state(&ctx, sim, &metaroot, &ctrl_ns).await?;
debug!("sending patch status update: {status_patch}");
let sim_api: kube::Api<Simulation> = kube::Api::all(ctx.client.clone());
sim_api
.patch_status(&sim.name_any(), &Default::default(), &Patch::Merge(status_patch))
.await
.map_err(|e| anyhow!(e))?;
match simulation_state {
SimulationState::Initializing => setup_simulation(&ctx, sim, &metaroot, &ctrl_ns).await.map_err(|e| e.into()),
SimulationState::Blocked => {
info!("simulation blocked; sleeping for {blocked_duration} seconds");
ctx.recorder.send_sim_blocked_event().await?;
Ok(Action::requeue(Duration::from_secs(blocked_duration)))
},
SimulationState::Running | SimulationState::Paused => Ok(Action::await_change()),
SimulationState::Finished | SimulationState::Failed => {
cleanup_simulation(&ctx, sim).await;
Ok(Action::await_change())
},
SimulationState::Retrying => unimplemented!(),
}
}
pub fn error_policy(sim: Arc<Simulation>, err: &AnyhowError, global_ctx: Arc<GlobalContext>) -> Action {
skerr!(err, "reconcile failed on simulation {}", sim.namespaced_name());
let (action, state) = if err.is::<SkControllerError>() {
(Action::await_change(), SimulationState::Failed)
} else {
(Action::requeue(REQUEUE_ERROR_DURATION), SimulationState::Retrying)
};
let sim_api: kube::Api<Simulation> = kube::Api::all(global_ctx.client.clone());
if let Err(e) = block_in_place(|| {
Handle::current().block_on(sim_api.patch_status(
&sim.name_any(),
&Default::default(),
&Patch::Merge(json!({
"status": {
"state": state,
}})),
))
}) {
error!("failure updating simulation state for {}: {e:?}", sim.namespaced_name());
}
action
}