firkin-single-node 0.0.3

Production Apple/VZ runtime composition for the firkin Rust containerization library
#![allow(missing_docs)]

use std::collections::BTreeMap;
use std::ffi::OsStr;
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};

use firkin_sandbox::{Command, DataPlaneSpec, Runtime, SandboxSpec, TemplateSpec};
use firkin_single_node::{AppleVzBackend, SingleNodeConfig};
use firkin_trace::{BenchmarkMetricKind, BenchmarkSample, BenchmarkUnit};
use futures_util::future::join_all;

const SHELL_POOL_DENSITY_BREAKPOINT_METRIC: &str =
    "debug.density.max_active_before_sandbox_shell_pool_run_completion_p95_doubles";

#[tokio::test]
#[ignore = "live VM-backed shell pool density proof; requires signed test harness"]
async fn live_sandbox_shell_pool_density_writes_repeat_samples() {
    let artifact_temp = tempfile::tempdir().expect("artifact tempdir");
    let runtime_temp = tempfile::tempdir().expect("runtime tempdir");
    let artifact = shell_pool_density_artifact_path(
        artifact_temp.path(),
        std::env::var_os("FIRKIN_LIVE_SANDBOX_SHELL_POOL_DENSITY_ARTIFACT").as_deref(),
    );
    let repeats = repeat_count(
        std::env::var_os("FIRKIN_LIVE_SANDBOX_SHELL_POOL_DENSITY_REPEATS").as_deref(),
        10,
    );
    let density_levels = density_levels(
        std::env::var_os("FIRKIN_LIVE_SANDBOX_SHELL_POOL_DENSITY_LEVELS").as_deref(),
        &[1, 2, 4, 8],
    );
    let image = std::env::var("FIRKIN_LIVE_SANDBOX_SHELL_POOL_IMAGE")
        .unwrap_or_else(|_| "debian:bookworm-slim".to_owned());

    let backend = AppleVzBackend::from_config(SingleNodeConfig::new(
        runtime_temp.path().join("state"),
        "cube.localhost",
    ));
    let runtime = Runtime::build(backend).await.expect("runtime");
    let template = runtime
        .templates()
        .prepare(TemplateSpec::oci(image).data_plane(DataPlaneSpec::none()))
        .await
        .expect("prepare template");
    let sandbox = runtime
        .sandboxes()
        .create(SandboxSpec::from_template(&template))
        .await
        .expect("create sandbox");

    let samples = collect_shell_pool_density_samples(&sandbox, &density_levels, repeats).await;
    assert_eq!(samples.len(), density_levels.len() * repeats);

    if let Some(parent) = artifact.parent() {
        std::fs::create_dir_all(parent).expect("create shell pool density artifact parent");
    }
    write_shell_pool_density_artifact(&artifact, samples);
    assert!(artifact.exists());

    let _ = sandbox.delete().await;
}

#[test]
fn shell_pool_density_artifact_tags_completion_boundary() {
    let artifact_temp = tempfile::tempdir().expect("artifact tempdir");
    let artifact = artifact_temp.path().join("shell-pool-density.json");
    write_shell_pool_density_artifact(
        &artifact,
        vec![
            shell_pool_density_sample(1, 0, 2, Duration::from_micros(500)),
            shell_pool_density_sample(1, 1, 2, Duration::from_micros(600)),
            shell_pool_density_sample(2, 0, 2, Duration::from_micros(700)),
            shell_pool_density_sample(2, 1, 2, Duration::from_micros(800)),
        ],
    );
    let value: serde_json::Value =
        serde_json::from_reader(std::fs::File::open(&artifact).expect("open artifact"))
            .expect("parse artifact");
    assert_eq!(value["kind"], "live_sandbox_shell_pool_density");
    let samples = value["samples"].as_array().expect("samples");
    assert!(samples.iter().any(|sample| {
        sample["metric"] == SHELL_POOL_DENSITY_BREAKPOINT_METRIC
            && sample["value"].as_f64() == Some(2.0)
    }));
    let sample = samples
        .iter()
        .find(|sample| sample["metric"] == "debug.exec.sandbox_shell_pool_run_completion_c1_ms")
        .expect("c1 sample");
    assert_eq!(
        sample["tags"]["measurement_boundary"],
        "sandbox_shell_pool_run_completion"
    );
    assert_eq!(sample["tags"]["shell_surface"], "firkin_sandbox_shell_pool");
    assert_eq!(sample["tags"]["output_boundary"], "command_completion");
    assert_eq!(sample["tags"]["warmup_dispatch"], "excluded");
}

async fn collect_shell_pool_density_samples(
    sandbox: &firkin_sandbox::Sandbox,
    density_levels: &[usize],
    repeats: usize,
) -> Vec<BenchmarkSample> {
    let density_level_tag = density_levels
        .iter()
        .map(usize::to_string)
        .collect::<Vec<_>>()
        .join(",");
    let mut samples = Vec::with_capacity(density_levels.len() * repeats);
    for &concurrency in density_levels {
        let pool = sandbox
            .shell_pool(concurrency)
            .await
            .unwrap_or_else(|error| panic!("open shell pool c{concurrency}: {error}"));
        let slots = pool.slots();
        let warmups = join_all(slots.iter().cloned().enumerate().map(
            |(index, shell)| async move {
                shell
                    .run(Command::shell(format!(
                        "printf shell-pool-warm-c{concurrency}-{index}"
                    )))
                    .await
                    .unwrap_or_else(|error| panic!("warm shell c{concurrency}/{index}: {error}"))
            },
        ))
        .await;
        assert_eq!(warmups.len(), concurrency);

        for repeat in 0..repeats {
            let dispatches = join_all(slots.iter().cloned().enumerate().map(
                |(index, shell)| async move {
                    let expected = format!("shell-pool-c{concurrency}-r{repeat}-{index}");
                    let started = Instant::now();
                    let output = shell
                        .run(Command::shell(format!("printf {expected}")))
                        .await
                        .unwrap_or_else(|error| {
                            panic!("run shell c{concurrency}/r{repeat}/{index}: {error}")
                        });
                    assert_eq!(output.stdout, expected.as_bytes());
                    started.elapsed()
                },
            ))
            .await;
            let slowest = dispatches
                .into_iter()
                .max()
                .unwrap_or_else(|| panic!("shell pool density c{concurrency} had no shells"));
            samples.push(
                shell_pool_density_sample(concurrency, repeat, repeats, slowest)
                    .with_dynamic_tag("concurrency_levels", density_level_tag.clone()),
            );
        }
        pool.close()
            .await
            .unwrap_or_else(|error| panic!("close shell pool c{concurrency}: {error}"));
    }
    samples
}

fn shell_pool_density_sample(
    concurrency: usize,
    repeat: usize,
    repeat_count: usize,
    elapsed: Duration,
) -> BenchmarkSample {
    BenchmarkSample::new(
        format!("debug.exec.sandbox_shell_pool_run_completion_c{concurrency}_ms"),
        BenchmarkMetricKind::LifecycleLatency,
        BenchmarkUnit::Milliseconds,
        elapsed.as_secs_f64() * 1000.0,
    )
    .with_static_tag("measurement_boundary", "sandbox_shell_pool_run_completion")
    .with_static_tag("shell_surface", "firkin_sandbox_shell_pool")
    .with_static_tag("output_boundary", "command_completion")
    .with_static_tag("command", "printf")
    .with_static_tag("warmup_dispatch", "excluded")
    .with_dynamic_tag("concurrency_level", concurrency.to_string())
    .with_dynamic_tag("repeat_index", repeat.to_string())
    .with_dynamic_tag("repeat_count", repeat_count.to_string())
}

fn write_shell_pool_density_artifact(artifact: &Path, mut samples: Vec<BenchmarkSample>) {
    if let Some(sample) = shell_pool_density_breakpoint_sample(&samples) {
        samples.push(sample);
    }
    let mut metric_counts = BTreeMap::<String, usize>::new();
    for sample in &samples {
        *metric_counts.entry(sample.metric().to_owned()).or_default() += 1;
    }
    let samples = samples
        .into_iter()
        .map(|sample| {
            let confidence = confidence_for_count(metric_counts[sample.metric()]);
            sample.with_dynamic_tag("confidence", confidence)
        })
        .collect::<Vec<_>>();
    let file = std::fs::File::create(artifact).expect("create shell pool density artifact");
    serde_json::to_writer_pretty(
        file,
        &serde_json::json!({
            "kind": "live_sandbox_shell_pool_density",
            "samples": samples,
            "traces": [],
        }),
    )
    .expect("write shell pool density artifact");
}

fn shell_pool_density_breakpoint_sample(samples: &[BenchmarkSample]) -> Option<BenchmarkSample> {
    let mut samples_by_level = BTreeMap::<usize, Vec<f64>>::new();
    let mut concurrency_levels = None;
    for sample in samples {
        if !sample
            .metric()
            .starts_with("debug.exec.sandbox_shell_pool_run_completion_c")
        {
            continue;
        }
        concurrency_levels = concurrency_levels
            .or_else(|| sample.tag_value("concurrency_levels").map(str::to_owned));
        let Some(level) = sample
            .tag_value("concurrency_level")
            .and_then(|value| value.parse::<usize>().ok())
        else {
            continue;
        };
        samples_by_level
            .entry(level)
            .or_default()
            .push(sample.value());
    }
    let baseline = samples_by_level
        .get(&1)
        .map(|values| nearest_rank(values, 95))?;
    let threshold = baseline * 2.0;
    let breakpoint = samples_by_level
        .iter()
        .filter_map(|(&level, values)| {
            let p95 = nearest_rank(values, 95);
            (p95 <= threshold).then_some(level)
        })
        .max()?;
    let mut sample = BenchmarkSample::from_static(
        SHELL_POOL_DENSITY_BREAKPOINT_METRIC,
        BenchmarkMetricKind::WorkloadResource,
        BenchmarkUnit::Count,
        breakpoint as f64,
    )
    .with_static_tag("source", "sandbox-shell-pool-run-completion-p95-threshold")
    .with_static_tag("measurement_boundary", "sandbox_shell_pool_run_completion")
    .with_static_tag("shell_surface", "firkin_sandbox_shell_pool")
    .with_static_tag("output_boundary", "command_completion")
    .with_dynamic_tag("baseline_p95_ms", format!("{baseline:.6}"))
    .with_dynamic_tag("threshold_p95_ms", format!("{threshold:.6}"));
    if let Some(levels) = concurrency_levels {
        sample = sample.with_dynamic_tag("concurrency_levels", levels);
    }
    Some(sample)
}

fn nearest_rank(values: &[f64], percentile: usize) -> f64 {
    let mut sorted = values.to_vec();
    sorted.sort_by(f64::total_cmp);
    let rank = sorted.len().saturating_mul(percentile).div_ceil(100);
    let index = rank.saturating_sub(1).min(sorted.len() - 1);
    sorted[index]
}

fn shell_pool_density_artifact_path(temp: &Path, override_path: Option<&OsStr>) -> PathBuf {
    override_path.map_or_else(
        || temp.join("live-sandbox-shell-pool-density.json"),
        PathBuf::from,
    )
}

fn repeat_count(value: Option<&OsStr>, default: usize) -> usize {
    value
        .and_then(|value| value.to_str())
        .and_then(|value| value.parse::<usize>().ok())
        .filter(|value| *value > 0)
        .unwrap_or(default)
}

fn density_levels(value: Option<&OsStr>, default: &[usize]) -> Vec<usize> {
    value
        .and_then(|value| value.to_str())
        .map(|value| {
            value
                .split(',')
                .filter_map(|item| item.trim().parse::<usize>().ok())
                .filter(|level| *level > 0)
                .collect::<Vec<_>>()
        })
        .filter(|levels| !levels.is_empty())
        .unwrap_or_else(|| default.to_vec())
}

fn confidence_for_count(count: usize) -> &'static str {
    match count {
        0 => "missing",
        1..=2 => "smoke_only",
        3..=4 => "superfast_iteration",
        5..=9 => "fast_iteration",
        10..=29 => "baseline_checkpoint",
        30..=99 => "p50_p90_decision_grade",
        100..=499 => "p95_decision_grade",
        _ => "p99_decision_grade",
    }
}