sk-ctrl 2.6.0

Kubernetes controller for replaying traces in a simulated cluster
use std::env;
use std::ops::Deref;
use std::sync::Arc;

use anyhow::{
    anyhow,
    bail,
};
use k8s_openapi::api::admissionregistration::v1 as admissionv1;
use k8s_openapi::api::batch::v1 as batchv1;
use kube::api::{
    ListParams,
    Patch,
};
use kube::runtime::controller::Action;
use serde_json::json;
use sk_api::prometheus::*;
use sk_api::v1::{
    Simulation,
    SimulationRoot,
    SimulationState,
};
use sk_core::constants::*;
use sk_core::errors::*;
use sk_core::hooks;
use sk_core::k8s::{
    LeaseState,
    build_simulation_root,
    is_terminal,
    metrics_ns,
    try_claim_lease,
};
use sk_core::prelude::*;
use tokio::runtime::Handle;
use tokio::task::block_in_place;
use tokio::time::Duration;
use tracing::*;

use crate::errors::*;
use crate::objects::*;
use crate::{
    GlobalContext,
    cert_manager,
};

pub const REQUEUE_DURATION: Duration = Duration::from_secs(RETRY_DELAY_SECONDS);
pub const REQUEUE_ERROR_DURATION: Duration = Duration::from_secs(ERROR_RETRY_DELAY_SECONDS);
pub const JOB_STATUS_CONDITION_COMPLETE: &str = "Complete";
pub const JOB_STATUS_CONDITION_FAILED: &str = "Failed";

type SimulationStatusPatch = serde_json::Value;

#[derive(Clone)]
pub struct ReconcileContext {
    pub client: kube::Client,
    pub recorder: SkEventRecorder,

    pub name: String,
    pub metaroot_name: String,
    pub driver_name: String,
    pub driver_svc: String,
    pub prometheus_name: String,
    pub webhook_name: String,
}

impl ReconcileContext {
    pub fn new(sim: &Simulation, client: kube::Client, recorder: SkEventRecorder) -> ReconcileContext {
        let name = sim.name_any();

        ReconcileContext {
            client,
            recorder,

            name: name.clone(),
            metaroot_name: format!("sk-{}-metaroot", name),
            driver_name: format!("sk-{}-driver", name),
            driver_svc: format!("sk-{}-driver-svc", name),
            prometheus_name: format!("sk-{}-prom", name),
            webhook_name: format!("sk-{}-mutatepods", name),
        }
    }
}

async fn setup_sim_metaroot(ctx: &ReconcileContext, sim: &Simulation) -> anyhow::Result<SimulationRoot> {
    let roots_api = kube::Api::<SimulationRoot>::all(ctx.client.clone());
    match roots_api.get_opt(&ctx.metaroot_name).await? {
        None => {
            info!("creating Simulation MetaRoot");
            let metaroot = build_simulation_root(&ctx.metaroot_name, sim);
            roots_api.create(&Default::default(), &metaroot).await.map_err(|e| e.into())
        },
        Some(metaroot) => Ok(metaroot),
    }
}

// The "left" driver state contains an actual status from the driver job, along with its start and
// end times (if they exist).  The "right" driver state contains an "inferred" state, such as
// "blocked" (i.e., the driver hasn't even been created yet because we couldn't claim the lease).
type DriverState = (SimulationState, SimulationStatusPatch, u64);

pub(crate) async fn fetch_driver_state(
    ctx: &ReconcileContext,
    sim: &Simulation,
    metaroot: &SimulationRoot,
    ctrl_ns: &str,
) -> anyhow::Result<DriverState> {
    let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
    let (mut state, mut start_time, mut end_time, mut completed, mut blocked_duration) =
        (SimulationState::Initializing, None, None, None, 0);

    if let Some(driver) = jobs_api.get_opt(&ctx.driver_name).await? {
        state = SimulationState::Running;
        if let Some(status) = driver.status {
            completed = status.succeeded;
            start_time = status.start_time.map(|t| t.0);
            if let Some(cond) =
                status.conditions.unwrap_or_default().iter().find(|cond| {
                    cond.type_ == JOB_STATUS_CONDITION_COMPLETE || cond.type_ == JOB_STATUS_CONDITION_FAILED
                })
            {
                end_time = cond.last_transition_time.as_ref().map(|t| t.0);
                state = if cond.type_ == JOB_STATUS_CONDITION_COMPLETE {
                    SimulationState::Finished
                } else {
                    SimulationState::Failed
                };
            }
        }
    }

    // State processing if the simulation hasn't completed
    if !is_terminal(&state) {
        // If the simulation hasn't started yet, we don't want to set the state to paused; this way
        // we can still bring up the driver pod and do something with it before the simulation
        // proper starts.  Once the driver pod comes up, the reconciler will get re-triggered and
        // update the state.
        if state != SimulationState::Initializing && sim.spec.paused_time.is_some() {
            state = SimulationState::Paused;
        }

        // It's a little weird that we're trying to claim a lease inside the "fetch_driver_state" function,
        // but whatever, there are worse things.
        //
        // It would be cool to check the error and return Blocked if something else nabbed the lease first.
        // But it's not actually that important, because it will just requeue and on the next time through
        // it will correctly determine the Blocked status, so I'm not sure it's worth the increased
        // complexity.
        match try_claim_lease(ctx.client.clone(), sim, metaroot, ctrl_ns).await? {
            LeaseState::Claimed => (),
            LeaseState::WaitingForClaim(t) => {
                state = SimulationState::Blocked;
                blocked_duration = t;
            },
            LeaseState::Unknown => bail!("unknown lease state"),
        }
    }

    let patch = json!({
    "status": {
        "observedGeneration": sim.metadata.generation.unwrap_or(1),
        "startTime": start_time,
        "endTime": end_time,
        "completedRuns": completed,
        "state": state,
    }});

    Ok((state, patch, blocked_duration))
}

pub async fn setup_simulation(
    ctx: &ReconcileContext,
    sim: &Simulation,
    metaroot: &SimulationRoot,
    ctrl_ns: &str,
) -> anyhow::Result<Action> {
    info!("setting up simulation");
    debug!("simulation object: {sim:#?}");

    // The very first time we see a simulation object, there is no status that has
    // been set (unless the user has intentionally written to the Status field
    // when they created the CR, which would be dumb, don't do that), so this is a hack
    // to only send the "SimulationStarted" event once.
    if sim.status.is_none() {
        ctx.recorder.send_sim_started_event(&sim.name_any()).await?;
    }

    // TODO (SK-205): these are not idempotent right now, it will run the PreStart hooks every time
    // we call setup.  It's on the user to make sure the hooks are idempotent (which is also not
    // noted in the documentation).  We should improve this situation somewhat.
    hooks::execute(sim, hooks::Type::PreStart, &ctx.recorder).await?;

    // Validate the input before doing anything
    let ns_api = kube::Api::<corev1::Namespace>::all(ctx.client.clone());
    let metrics_ns = metrics_ns(sim);
    if ns_api.get_opt(&metrics_ns).await?.is_none() {
        bail!(SkControllerError::namespace_not_found(&metrics_ns));
    };

    // Create the namespaces
    if ns_api.get_opt(&sim.spec.driver.namespace).await?.is_none() {
        info!("creating driver namespace {}", sim.spec.driver.namespace);
        let obj = build_driver_namespace(ctx, sim);
        ns_api.create(&Default::default(), &obj).await?;
    };

    // Set up the metrics collector
    let mut prom_ready = false;
    match &sim.spec.metrics {
        Some(mc) => {
            // if async closures ever become a thing, you could simplify this logic with .unwrap_or_else;
            // you might be able to hack something currently with futures.then(...), but I couldn't figure
            // out a good way to do so.
            let prom_api = kube::Api::<Prometheus>::namespaced(ctx.client.clone(), &metrics_ns);
            match prom_api.get_opt(&ctx.prometheus_name).await? {
                None => {
                    info!("creating Prometheus object {}/{}", metrics_ns, ctx.prometheus_name);
                    let obj = build_prometheus(&ctx.prometheus_name, sim, metaroot, mc);
                    prom_api.create(&Default::default(), &obj).await?;
                },
                Some(prom) => {
                    if let Some(PrometheusStatus { available_replicas: reps, .. }) = prom.status {
                        prom_ready = reps > 0;
                    }
                },
            }
        },
        _ => prom_ready = true,
    }

    if !prom_ready {
        info!("waiting for prometheus to be ready");
        ctx.recorder.send_waiting_for_metrics_event().await?;
        return Ok(Action::requeue(REQUEUE_DURATION));
    }

    // Set up the webhook
    let driver_svc_api = kube::Api::<corev1::Service>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
    if driver_svc_api.get_opt(&ctx.driver_svc).await?.is_none() {
        info!("creating driver service {}", &ctx.driver_svc);
        let obj = build_driver_service(ctx, sim, metaroot);
        driver_svc_api.create(&Default::default(), &obj).await?;
    }

    // CertManager creates two parts, a CA Bundle that is injected directly into
    // the mutating webhook configuration, and a secret that needs to be provided
    // to the driver pod for it to act as a webhook.  We need to wait for both of
    // these to be ready before we can start the simulation.
    cert_manager::create_certificate_if_not_present(ctx, sim, metaroot).await?;

    let secrets_api = kube::Api::<corev1::Secret>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
    let secrets = secrets_api
        .list(&ListParams {
            label_selector: Some(format!("{SIMULATION_LABEL_KEY}={}", ctx.name)),
            ..Default::default()
        })
        .await?;
    let driver_cert_secret_name = match secrets.items.len() {
        0 => {
            info!("waiting for secret to be created");
            ctx.recorder.send_waiting_for_secret_event().await?;
            return Ok(Action::requeue(REQUEUE_DURATION));
        },
        x if x > 1 => bail!("found multiple secrets for experiment"),
        _ => secrets.items[0].name_any(),
    };

    let webhook_api = kube::Api::<admissionv1::MutatingWebhookConfiguration>::all(ctx.client.clone());
    let mwc_opt = webhook_api.get_opt(&ctx.webhook_name).await?;
    if mwc_opt.is_none() {
        info!("creating mutating webhook configuration {}", ctx.webhook_name);
        let obj = build_mutating_webhook(ctx, sim, metaroot);
        webhook_api.create(&Default::default(), &obj).await?;
        return Ok(Action::requeue(REQUEUE_DURATION));
    }

    if let Some(mwc) = &mwc_opt
        && let Some(webhooks) = &mwc.webhooks
        // We create one webhook in this configuration but webhooks is a Vec<MutatingWebhooks>
        && let Some(webhook) = &webhooks.first()
        // ca_bundle is a ByteString, a tuple struct where .0 is the inner Vec<u8>.
        // If the ca_bundle is None or empty, it has not been populated by cert-manager yet.
        && webhook.client_config.ca_bundle.as_ref().is_none_or(|b| b.0.is_empty())
    {
        info!(
            "MutatingWebhookConfiguration {} exists but caBundle not yet populated, requeuing.",
            ctx.webhook_name
        );
        ctx.recorder.send_waiting_for_webhook_cert_event().await?;
        return Ok(Action::requeue(REQUEUE_DURATION));
    }

    // Create the actual driver
    let jobs_api = kube::Api::<batchv1::Job>::namespaced(ctx.client.clone(), &sim.spec.driver.namespace);
    if jobs_api.get_opt(&ctx.driver_name).await?.is_none() {
        info!("creating simulation driver {}", ctx.driver_name);
        let obj = build_driver_job(ctx, sim, &driver_cert_secret_name, ctrl_ns)?;
        let job = jobs_api.create(&Default::default(), &obj).await?;
        ctx.recorder.send_driver_created_event(&job).await?;
    }
    Ok(Action::await_change())
}

pub async fn cleanup_simulation(ctx: &ReconcileContext, sim: &Simulation) {
    let roots_api: kube::Api<SimulationRoot> = kube::Api::all(ctx.client.clone());

    info!("cleaning up simulation {}", ctx.name);
    if let Err(e) = roots_api.delete(&ctx.metaroot_name, &Default::default()).await {
        error!("Error cleaning up simulation: {e:?}");
        let _ = ctx // if we fail to send the event, it's fine
            .recorder
            .send_sim_cleanup_failed_event()
            .await;
    }

    if let Err(e) = hooks::execute(sim, hooks::Type::PostStop, &ctx.recorder).await {
        error!("Error running PostStop hooks: {e:?}");
    }
}

#[instrument(parent=None, skip_all, fields(simulation=sim.name_any()))]
pub async fn reconcile(sim: Arc<Simulation>, global_ctx: Arc<GlobalContext>) -> Result<Action, AnyhowError> {
    let sim = sim.deref();
    let ctx = ReconcileContext::new(sim, global_ctx.client.clone(), global_ctx.recorder.with_sim(sim));
    let ctrl_ns = env::var(CTRL_NS_ENV_VAR).map_err(|e| anyhow!(e))?;

    let metaroot = setup_sim_metaroot(&ctx, sim).await?;
    let (simulation_state, status_patch, blocked_duration) = fetch_driver_state(&ctx, sim, &metaroot, &ctrl_ns).await?;

    debug!("sending patch status update: {status_patch}");
    let sim_api: kube::Api<Simulation> = kube::Api::all(ctx.client.clone());
    sim_api
        .patch_status(&sim.name_any(), &Default::default(), &Patch::Merge(status_patch))
        .await
        .map_err(|e| anyhow!(e))?;

    match simulation_state {
        SimulationState::Initializing => setup_simulation(&ctx, sim, &metaroot, &ctrl_ns).await.map_err(|e| e.into()),
        SimulationState::Blocked => {
            info!("simulation blocked; sleeping for {blocked_duration} seconds");
            ctx.recorder.send_sim_blocked_event().await?;
            Ok(Action::requeue(Duration::from_secs(blocked_duration)))
        },
        SimulationState::Running | SimulationState::Paused => Ok(Action::await_change()),
        SimulationState::Finished | SimulationState::Failed => {
            // This action should never return an error, we want to try cleaning up once and if it
            // doesn't work, just abort (may revisit this in the future)
            cleanup_simulation(&ctx, sim).await;
            Ok(Action::await_change())
        },

        // The driver itself can never return "Retrying", this is only set by the controller's
        // error_policy (see below).  If the driver were to return Retrying that would be very
        // weird indeed and we should panic.
        //
        // I have some qualms about having a simulation state that doesn't match 1-1 with the
        // driver state, but then also using the same enum for both... but I think in this specific
        // circumstance it's OK.
        SimulationState::Retrying => unimplemented!(),
    }
}

pub fn error_policy(sim: Arc<Simulation>, err: &AnyhowError, global_ctx: Arc<GlobalContext>) -> Action {
    skerr!(err, "reconcile failed on simulation {}", sim.namespaced_name());
    let (action, state) = if err.is::<SkControllerError>() {
        (Action::await_change(), SimulationState::Failed)
    } else {
        (Action::requeue(REQUEUE_ERROR_DURATION), SimulationState::Retrying)
    };

    let sim_api: kube::Api<Simulation> = kube::Api::all(global_ctx.client.clone());
    if let Err(e) = block_in_place(|| {
        Handle::current().block_on(sim_api.patch_status(
            &sim.name_any(),
            &Default::default(),
            &Patch::Merge(json!({
            "status": {
                "state": state,
            }})),
        ))
    }) {
        error!("failure updating simulation state for {}: {e:?}", sim.namespaced_name());
    }

    action
}