lean-rs-worker 0.1.5

Worker-process boundary for lean-rs host workloads.
Documentation
#![allow(clippy::expect_used)]

use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::time::{Duration, Instant};

use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
use lean_rs_worker::{
    LeanWorkerCancellationToken, LeanWorkerCapabilityBuilder, LeanWorkerError, LeanWorkerPool, LeanWorkerPoolConfig,
    LeanWorkerStreamingCommand, LeanWorkerTypedDataRow, LeanWorkerTypedDataSink, LeanWorkerTypedStreamSummary,
};
use serde::{Deserialize, Serialize};

fn workspace_root() -> PathBuf {
    let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
    manifest_dir
        .parent()
        .and_then(Path::parent)
        .expect("crates/<name> lives two directories below the workspace root")
        .to_path_buf()
}

fn worker_binary() -> PathBuf {
    workspace_root()
        .join("target")
        .join("release")
        .join("lean-rs-worker-child")
}

fn interop_root() -> PathBuf {
    workspace_root().join("fixtures").join("interop-shims")
}

fn capability_builder() -> LeanWorkerCapabilityBuilder {
    LeanWorkerCapabilityBuilder::new(
        interop_root(),
        "lean_rs_interop_consumer",
        "LeanRsInteropConsumer",
        ["LeanRsInteropConsumer.Callback"],
    )
    .worker_executable(worker_binary())
    .validate_metadata(
        "lean_rs_interop_consumer_worker_shape_metadata",
        serde_json::json!({"source": "worker-capability-bench"}),
    )
}

#[derive(Clone, Debug, Serialize)]
struct ShapeRequest {
    workspace: String,
    modules: Vec<String>,
    limit: u64,
}

impl Default for ShapeRequest {
    fn default() -> Self {
        Self {
            workspace: "bench-workspace".to_owned(),
            modules: vec!["Fixture.Basic".to_owned()],
            limit: 512,
        }
    }
}

#[derive(Clone, Debug, Deserialize)]
#[serde(tag = "kind")]
enum ShapeRow {
    #[serde(rename = "declaration")]
    Declaration {
        #[allow(dead_code)]
        module: String,
        #[allow(dead_code)]
        name: String,
        ordinal: u64,
    },
    #[serde(rename = "feature")]
    Feature {
        #[allow(dead_code)]
        module: String,
        #[allow(dead_code)]
        name: String,
        #[allow(dead_code)]
        feature: String,
        score: u64,
        ordinal: u64,
    },
    #[serde(rename = "probe")]
    Probe {
        #[allow(dead_code)]
        left: String,
        #[allow(dead_code)]
        right: String,
        #[allow(dead_code)]
        relation: String,
        ordinal: u64,
    },
}

impl ShapeRow {
    fn checksum(&self) -> u64 {
        match self {
            Self::Declaration { ordinal, .. } | Self::Probe { ordinal, .. } => *ordinal,
            Self::Feature { score, ordinal, .. } => score.saturating_add(*ordinal),
        }
    }
}

#[derive(Clone, Debug, Deserialize)]
struct ShapeSummary {
    #[allow(dead_code)]
    fixture: String,
    #[allow(dead_code)]
    command: String,
    ok: bool,
    rows: u64,
}

#[derive(Default)]
struct CountingSink {
    rows: Mutex<SinkMetrics>,
}

#[derive(Default)]
struct SinkMetrics {
    count: u64,
    checksum: u64,
}

impl CountingSink {
    fn count(&self) -> u64 {
        self.rows.lock().expect("row lock is not poisoned").count
    }
}

impl LeanWorkerTypedDataSink<ShapeRow> for CountingSink {
    fn report(&self, row: LeanWorkerTypedDataRow<ShapeRow>) {
        let mut metrics = self.rows.lock().expect("row lock is not poisoned");
        metrics.count = metrics.count.saturating_add(1);
        metrics.checksum = metrics.checksum.saturating_add(row.payload.checksum());
        metrics.checksum = metrics.checksum.saturating_add(row.sequence);
    }
}

struct CancelAfterFirst<'a> {
    token: &'a LeanWorkerCancellationToken,
    rows: Mutex<u64>,
}

impl LeanWorkerTypedDataSink<ShapeRow> for CancelAfterFirst<'_> {
    fn report(&self, _row: LeanWorkerTypedDataRow<ShapeRow>) {
        let mut rows = self.rows.lock().expect("row lock is not poisoned");
        *rows = rows.saturating_add(1);
        drop(rows);
        self.token.cancel();
    }
}

fn skip_if_missing_worker() -> bool {
    let worker = worker_binary();
    if worker.is_file() {
        false
    } else {
        eprintln!(
            "skipping worker capability bench: {} is missing; run `cargo build --release -p lean-rs-worker --bin lean-rs-worker-child` first",
            worker.display(),
        );
        true
    }
}

fn run_stream(export: &'static str) -> Result<LeanWorkerTypedStreamSummary<ShapeSummary>, LeanWorkerError> {
    let mut capability = capability_builder().open()?;
    let mut session = capability.open_session(None, None)?;
    let command = LeanWorkerStreamingCommand::<ShapeRequest, ShapeRow, ShapeSummary>::new(export);
    let sink = CountingSink::default();
    let summary = session.run_streaming_command(&command, &ShapeRequest::default(), &sink, None, None, None)?;
    assert_eq!(summary.total_rows, sink.count());
    Ok(summary)
}

fn run_pool_stream(
    max_workers: usize,
    export: &'static str,
) -> Result<LeanWorkerTypedStreamSummary<ShapeSummary>, LeanWorkerError> {
    let mut pool = LeanWorkerPool::new(LeanWorkerPoolConfig::new(max_workers).max_total_child_rss_kib(u64::MAX));
    let mut lease = pool.acquire_lease(capability_builder())?;
    let command = LeanWorkerStreamingCommand::<ShapeRequest, ShapeRow, ShapeSummary>::new(export);
    let sink = CountingSink::default();
    let summary = lease.run_streaming_command(&command, &ShapeRequest::default(), &sink, None, None, None)?;
    assert_eq!(summary.total_rows, sink.count());
    Ok(summary)
}

fn bench_operational_shape(c: &mut Criterion) {
    if skip_if_missing_worker() {
        return;
    }

    let mut group = c.benchmark_group("worker::capability_shape");
    group.sample_size(10);
    group.measurement_time(Duration::from_secs(3));

    group.bench_function("cold_startup_builder_open", |b| {
        b.iter(|| {
            let capability = capability_builder().open().expect("capability opens");
            let metadata_len = capability
                .validated_metadata()
                .map_or(0, |metadata| metadata.commands.len());
            std::hint::black_box(metadata_len);
        });
    });

    group.bench_function("first_import_open_session", |b| {
        b.iter(|| {
            let mut capability = capability_builder().open().expect("capability opens");
            let session = capability.open_session(None, None).expect("session opens");
            std::hint::black_box(session.request_timeout());
        });
    });

    group.throughput(Throughput::Elements(4));
    group.bench_with_input(BenchmarkId::new("import_once_stream", "index"), &(), |b, ()| {
        b.iter(|| {
            let summary = run_stream("lean_rs_interop_consumer_worker_shape_index").expect("index stream succeeds");
            std::hint::black_box(summary.total_rows);
            std::hint::black_box(summary.metadata.map(|metadata| (metadata.ok, metadata.rows)));
        });
    });

    group.throughput(Throughput::Elements(2));
    group.bench_with_input(BenchmarkId::new("row_throughput", "extract"), &(), |b, ()| {
        b.iter(|| {
            let summary = run_stream("lean_rs_interop_consumer_worker_shape_extract").expect("extract stream succeeds");
            std::hint::black_box(summary.total_rows);
        });
    });

    group.bench_function("worker_cycle", |b| {
        b.iter(|| {
            let mut capability = capability_builder().open().expect("capability opens");
            capability.worker_mut().cycle().expect("worker cycle succeeds");
            std::hint::black_box(capability.worker().stats().restarts);
        });
    });

    group.bench_function("fatal_exit_recovery", |b| {
        b.iter(|| {
            let mut capability = capability_builder().open().expect("capability opens");
            let sink = CountingSink::default();
            let err = {
                let mut session = capability.open_session(None, None).expect("session opens");
                let command = LeanWorkerStreamingCommand::<ShapeRequest, ShapeRow, ShapeSummary>::new(
                    "lean_rs_interop_consumer_worker_shape_panic_after_row",
                );
                session
                    .run_streaming_command(&command, &ShapeRequest::default(), &sink, None, None, None)
                    .expect_err("panic stream should fail")
            };
            std::hint::black_box(matches!(err, LeanWorkerError::ChildPanicOrAbort { .. }));
        });
    });

    group.bench_function("cancellation_latency", |b| {
        b.iter_custom(|iters| {
            let started = Instant::now();
            for _ in 0..iters {
                let mut capability = capability_builder().open().expect("capability opens");
                let token = LeanWorkerCancellationToken::new();
                let sink = CancelAfterFirst {
                    token: &token,
                    rows: Mutex::new(0),
                };
                let mut session = capability.open_session(None, None).expect("session opens");
                let command = LeanWorkerStreamingCommand::<ShapeRequest, ShapeRow, ShapeSummary>::new(
                    "lean_rs_interop_consumer_worker_shape_extract",
                );
                let err = session
                    .run_streaming_command(&command, &ShapeRequest::default(), &sink, None, Some(&token), None)
                    .expect_err("cancelled stream should fail");
                assert!(matches!(err, LeanWorkerError::Cancelled { .. }));
            }
            started.elapsed()
        });
    });

    group.throughput(Throughput::Elements(47));
    group.bench_function("mathlib_scale_single_worker_pool", |b| {
        b.iter(|| {
            let summary = run_pool_stream(1, "lean_rs_interop_consumer_worker_shape_mathlib_scale_index")
                .expect("mathlib-scale pool stream succeeds");
            std::hint::black_box(summary.total_rows);
        });
    });

    group.throughput(Throughput::Elements(47));
    group.bench_function("mathlib_scale_pool_max_2", |b| {
        b.iter(|| {
            let summary = run_pool_stream(2, "lean_rs_interop_consumer_worker_shape_mathlib_scale_index")
                .expect("mathlib-scale pool stream succeeds");
            std::hint::black_box(summary.total_rows);
        });
    });

    group.finish();
}

criterion_group!(benches, bench_operational_shape);
criterion_main!(benches);