use std::env;
use std::ops::Deref;
use std::sync::Arc;
use anyhow::{
anyhow,
bail,
};
use k8s_openapi::api::admissionregistration::v1 as admissionv1;
use k8s_openapi::api::batch::v1 as batchv1;
use kube::api::{
ListParams,
Patch,
};
use kube::runtime::controller::Action;
use serde_json::json;
use sk_api::prometheus::*;
use sk_api::v1::{
Simulation,
SimulationRoot,
SimulationState,
};
use sk_core::constants::*;
use sk_core::errors::*;
use sk_core::hooks;
use sk_core::k8s::{
LeaseState,
build_simulation_root,
is_terminal,
metrics_ns,
try_claim_lease,
};
use sk_core::prelude::*;
use tokio::runtime::Handle;
use tokio::task::block_in_place;
use tokio::time::Duration;
use tracing::*;
use crate::cert_manager;
use crate::context::SimulationContext;
use crate::errors::*;
use crate::objects::*;
pub const REQUEUE_DURATION: Duration = Duration::from_secs(RETRY_DELAY_SECONDS);
pub const REQUEUE_ERROR_DURATION: Duration = Duration::from_secs(ERROR_RETRY_DELAY_SECONDS);
pub const JOB_STATUS_CONDITION_COMPLETE: &str = "Complete";
pub const JOB_STATUS_CONDITION_FAILED: &str = "Failed";
type SimulationStatusPatch = serde_json::Value;
async fn setup_sim_metaroot(ctx: &SimulationContext, sim: &Simulation) -> anyhow::Result<SimulationRoot> {
let roots_api = kube::Api::<SimulationRoot>::all(ctx.client.clone());
match roots_api.get_opt(&ctx.metaroot_name).await? {
None => {
info!("creating Simulation MetaRoot");
let metaroot = build_simulation_root(&ctx.metaroot_name, sim);
roots_api.create(&Default::default(), &metaroot).await.map_err(|e| e.into())
},
Some(metaroot) => Ok(metaroot),
}
}
type DriverState = (SimulationState, SimulationStatusPatch, u64);
pub(crate) async fn fetch_driver_state(
ctx: &SimulationContext,
sim: &Simulation,
metaroot: &SimulationRoot,
ctrl_ns: &str,
) -> anyhow::Result<DriverState> {
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
let (mut state, mut start_time, mut end_time, mut completed, mut blocked_duration) =
(SimulationState::Initializing, None, None, None, 0);
if let Some(driver) = jobs_api.get_opt(&ctx.driver_name).await? {
state = SimulationState::Running;
if let Some(status) = driver.status {
completed = status.succeeded;
start_time = status.start_time.map(|t| t.0);
if let Some(cond) =
status.conditions.unwrap_or_default().iter().find(|cond| {
cond.type_ == JOB_STATUS_CONDITION_COMPLETE || cond.type_ == JOB_STATUS_CONDITION_FAILED
})
{
end_time = cond.last_transition_time.as_ref().map(|t| t.0);
state = if cond.type_ == JOB_STATUS_CONDITION_COMPLETE {
SimulationState::Finished
} else {
SimulationState::Failed
};
}
}
}
if !is_terminal(&state) {
if state != SimulationState::Initializing && sim.spec.paused_time.is_some() {
state = SimulationState::Paused;
}
match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns).await? {
LeaseState::Claimed => (),
LeaseState::WaitingForClaim(t) => {
state = SimulationState::Blocked;
blocked_duration = t;
},
LeaseState::Unknown => bail!("unknown lease state"),
}
}
let patch = json!({
"status": {
"observedGeneration": sim.metadata.generation.unwrap_or(1),
"startTime": start_time,
"endTime": end_time,
"completedRuns": completed,
"state": state,
}});
Ok((state, patch, blocked_duration))
}
pub async fn setup_simulation(
ctx: &SimulationContext,
sim: &Simulation,
metaroot: &SimulationRoot,
ctrl_ns: &str,
) -> anyhow::Result<Action> {
info!("setting up simulation");
hooks::execute(sim, hooks::Type::PreStart).await?;
let ns_api = kube::Api::<corev1::Namespace>::all(ctx.client.clone());
let metrics_ns = metrics_ns(sim);
if ns_api.get_opt(&metrics_ns).await?.is_none() {
bail!(SkControllerError::namespace_not_found(&metrics_ns));
};
if ns_api.get_opt(&sim.spec.driver.namespace).await?.is_none() {
info!("creating driver namespace {}", sim.spec.driver.namespace);
let obj = build_driver_namespace(ctx, sim);
ns_api.create(&Default::default(), &obj).await?;
};
let mut prom_ready = false;
match &sim.spec.metrics {
Some(mc) => {
let prom_api = kube::Api::<Prometheus>::namespaced(ctx.client.clone(), &metrics_ns);
match prom_api.get_opt(&ctx.prometheus_name).await? {
None => {
info!("creating Prometheus object {}/{}", metrics_ns, ctx.prometheus_name);
let obj = build_prometheus(&ctx.prometheus_name, sim, metaroot, mc);
prom_api.create(&Default::default(), &obj).await?;
},
Some(prom) => {
if let Some(PrometheusStatus { available_replicas: reps, .. }) = prom.status {
prom_ready = reps > 0;
}
},
}
},
_ => prom_ready = true,
}
if !prom_ready {
info!("waiting for prometheus to be ready");
return Ok(Action::requeue(REQUEUE_DURATION));
}
let driver_svc_api = kube::Api::<corev1::Service>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
if driver_svc_api.get_opt(&ctx.driver_svc).await?.is_none() {
info!("creating driver service {}", &ctx.driver_svc);
let obj = build_driver_service(ctx, sim, metaroot);
driver_svc_api.create(&Default::default(), &obj).await?;
}
if ctx.opts.use_cert_manager {
cert_manager::create_certificate_if_not_present(ctx, sim, metaroot).await?;
}
let secrets_api = kube::Api::<corev1::Secret>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
let secrets = secrets_api
.list(&ListParams {
label_selector: Some(format!("{SIMULATION_LABEL_KEY}={}", ctx.name)),
..Default::default()
})
.await?;
let driver_cert_secret_name = match secrets.items.len() {
0 => {
info!("waiting for secret to be created");
return Ok(Action::requeue(REQUEUE_DURATION));
},
x if x > 1 => bail!("found multiple secrets for experiment"),
_ => secrets.items[0].name_any(),
};
let webhook_api = kube::Api::<admissionv1::MutatingWebhookConfiguration>::all(ctx.client.clone());
let mwc_opt = webhook_api.get_opt(&ctx.webhook_name).await?;
if mwc_opt.is_none() {
info!("creating mutating webhook configuration {}", ctx.webhook_name);
let obj = build_mutating_webhook(ctx, sim, metaroot);
webhook_api.create(&Default::default(), &obj).await?;
return Ok(Action::requeue(REQUEUE_DURATION));
}
if let Some(mwc) = &mwc_opt
&& let Some(webhooks) = &mwc.webhooks
&& let Some(webhook) = &webhooks.first()
&& webhook.client_config.ca_bundle.as_ref().is_none_or(|b| b.0.is_empty())
{
info!(
"MutatingWebhookConfiguration {} exists but caBundle not yet populated, requeuing.",
ctx.webhook_name
);
return Ok(Action::requeue(REQUEUE_DURATION));
}
let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
if jobs_api.get_opt(&ctx.driver_name).await?.is_none() {
info!("creating simulation driver {}", ctx.driver_name);
let obj = build_driver_job(ctx, sim, &driver_cert_secret_name, ctrl_ns)?;
jobs_api.create(&Default::default(), &obj).await?;
}
Ok(Action::await_change())
}
pub async fn cleanup_simulation(ctx: &SimulationContext, sim: &Simulation) {
let roots_api: kube::Api<SimulationRoot> = kube::Api::all(ctx.client.clone());
info!("cleaning up simulation {}", ctx.name);
if let Err(e) = roots_api.delete(&ctx.metaroot_name, &Default::default()).await {
error!("Error cleaning up simulation: {e:?}");
}
if let Err(e) = hooks::execute(sim, hooks::Type::PostStop).await {
error!("Error running PostStop hooks: {e:?}");
}
}
#[instrument(parent=None, skip_all, fields(simulation=sim.name_any()))]
pub async fn reconcile(sim: Arc<Simulation>, ctx: Arc<SimulationContext>) -> Result<Action, AnyhowError> {
let sim = sim.deref();
let ctx = ctx.with_sim(sim);
let ctrl_ns = env::var(CTRL_NS_ENV_VAR).map_err(|e| anyhow!(e))?;
let metaroot = setup_sim_metaroot(&ctx, sim).await?;
let (simulation_state, status_patch, blocked_duration) = fetch_driver_state(&ctx, sim, &metaroot, &ctrl_ns).await?;
debug!("sending patch status update: {status_patch}");
let sim_api: kube::Api<Simulation> = kube::Api::all(ctx.client.clone());
sim_api
.patch_status(&sim.name_any(), &Default::default(), &Patch::Merge(status_patch))
.await
.map_err(|e| anyhow!(e))?;
match simulation_state {
SimulationState::Initializing => setup_simulation(&ctx, sim, &metaroot, &ctrl_ns).await.map_err(|e| e.into()),
SimulationState::Blocked => {
info!("simulation blocked; sleeping for {blocked_duration} seconds");
Ok(Action::requeue(Duration::from_secs(blocked_duration)))
},
SimulationState::Running | SimulationState::Paused => Ok(Action::await_change()),
SimulationState::Finished | SimulationState::Failed => {
cleanup_simulation(&ctx, sim).await;
Ok(Action::await_change())
},
SimulationState::Retrying => unimplemented!(),
}
}
pub fn error_policy(sim: Arc<Simulation>, err: &AnyhowError, ctx: Arc<SimulationContext>) -> Action {
skerr!(err, "reconcile failed on simulation {}", sim.namespaced_name());
let (action, state) = if err.is::<SkControllerError>() {
(Action::await_change(), SimulationState::Failed)
} else {
(Action::requeue(REQUEUE_ERROR_DURATION), SimulationState::Retrying)
};
let sim_api: kube::Api<Simulation> = kube::Api::all(ctx.client.clone());
if let Err(e) = block_in_place(|| {
Handle::current().block_on(sim_api.patch_status(
&sim.name_any(),
&Default::default(),
&Patch::Merge(json!({
"status": {
"state": state,
}})),
))
}) {
error!("failure updating simulation state for {}: {e:?}", sim.namespaced_name());
}
action
}