lean-rs-worker 0.1.5

Worker-process boundary for lean-rs host workloads.
Documentation
#![allow(clippy::expect_used, clippy::panic, clippy::wildcard_enum_match_arm)]

use std::path::{Path, PathBuf};
use std::sync::Mutex;
use std::thread;
use std::time::Duration;

use lean_rs_worker::{
    LeanWorkerCancellationToken, LeanWorkerError, LeanWorkerPool, LeanWorkerPoolConfig, LeanWorkerStreamingCommand,
    LeanWorkerTypedDataRow, LeanWorkerTypedDataSink,
};
use serde::{Deserialize, Serialize};
use serde_json::Value;

fn worker_binary() -> PathBuf {
    PathBuf::from(env!("CARGO_BIN_EXE_lean-rs-worker-child"))
}

fn workspace_root() -> PathBuf {
    let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
    manifest_dir
        .parent()
        .and_then(Path::parent)
        .expect("crates/<name> lives two directories below the workspace root")
        .to_path_buf()
}

fn interop_root() -> PathBuf {
    workspace_root().join("fixtures").join("interop-shims")
}

fn builder() -> lean_rs_worker::LeanWorkerCapabilityBuilder {
    lean_rs_worker::LeanWorkerCapabilityBuilder::new(
        interop_root(),
        "lean_rs_interop_consumer",
        "LeanRsInteropConsumer",
        ["LeanRsInteropConsumer.Callback"],
    )
    .worker_executable(worker_binary())
}

#[derive(Debug, Serialize)]
struct FixtureRequest {
    source: String,
}

fn request(source: &str) -> FixtureRequest {
    FixtureRequest {
        source: source.to_owned(),
    }
}

#[derive(Clone, Debug, Deserialize, Eq, PartialEq)]
struct ManyRow {
    i: u64,
}

fn many_command(export: &str) -> LeanWorkerStreamingCommand<FixtureRequest, ManyRow, Value> {
    LeanWorkerStreamingCommand::new(export)
}

#[derive(Default)]
struct RecordingRows {
    rows: Mutex<Vec<LeanWorkerTypedDataRow<ManyRow>>>,
}

impl RecordingRows {
    fn len(&self) -> usize {
        self.rows.lock().expect("rows lock is not poisoned").len()
    }
}

impl LeanWorkerTypedDataSink<ManyRow> for RecordingRows {
    fn report(&self, row: LeanWorkerTypedDataRow<ManyRow>) {
        self.rows.lock().expect("rows lock is not poisoned").push(row);
    }
}

struct SlowRows {
    rows: Mutex<Vec<LeanWorkerTypedDataRow<ManyRow>>>,
    delay: Duration,
}

impl SlowRows {
    fn new(delay: Duration) -> Self {
        Self {
            rows: Mutex::new(Vec::new()),
            delay,
        }
    }

    fn len(&self) -> usize {
        self.rows.lock().expect("rows lock is not poisoned").len()
    }
}

impl LeanWorkerTypedDataSink<ManyRow> for SlowRows {
    fn report(&self, row: LeanWorkerTypedDataRow<ManyRow>) {
        self.rows.lock().expect("rows lock is not poisoned").push(row);
        thread::sleep(self.delay);
    }
}

struct SlowCancelRows<'a> {
    token: &'a LeanWorkerCancellationToken,
    rows: Mutex<u64>,
    delay: Duration,
    cancel_after: u64,
}

impl LeanWorkerTypedDataSink<ManyRow> for SlowCancelRows<'_> {
    fn report(&self, _row: LeanWorkerTypedDataRow<ManyRow>) {
        let mut rows = self.rows.lock().expect("rows lock is not poisoned");
        *rows = rows.saturating_add(1);
        if *rows >= self.cancel_after {
            self.token.cancel();
        }
        drop(rows);
        thread::sleep(self.delay);
    }
}

#[test]
fn pool_and_lease_snapshots_record_stream_throughput() {
    let mut pool = LeanWorkerPool::new(LeanWorkerPoolConfig::new(1));
    let mut lease = pool.acquire_lease(builder()).expect("pool opens lease");
    let initial = lease.snapshot();
    assert_eq!(initial.active_workers, 1);
    assert_eq!(initial.warm_leases, 0);

    let rows = RecordingRows::default();
    let summary = lease
        .run_streaming_command(
            &many_command("lean_rs_interop_consumer_worker_data_stream_many"),
            &request("observability-throughput"),
            &rows,
            None,
            None,
            None,
        )
        .expect("many-row stream succeeds");

    assert_eq!(summary.total_rows, 512);
    assert_eq!(rows.len(), 512);
    let lease_snapshot = lease.snapshot();
    assert_eq!(lease_snapshot.stream_requests, 1);
    assert_eq!(lease_snapshot.stream_successes, 1);
    assert_eq!(lease_snapshot.stream_failures, 0);
    assert_eq!(lease_snapshot.data_rows_delivered, summary.total_rows);
    assert!(lease_snapshot.data_row_payload_bytes > 0);
    assert!(lease_snapshot.stream_elapsed > Duration::ZERO);
    drop(lease);

    let pool_snapshot = pool.snapshot();
    assert_eq!(pool_snapshot.active_workers, 0);
    assert_eq!(pool_snapshot.warm_leases, 1);
    assert_eq!(pool_snapshot.data_rows_delivered, summary.total_rows);
    assert_eq!(pool_snapshot.queue_depth, 0);
}

#[test]
fn slow_sink_applies_bounded_backpressure_without_losing_rows() {
    let mut pool = LeanWorkerPool::new(LeanWorkerPoolConfig::new(1));
    let mut lease = pool.acquire_lease(builder()).expect("pool opens lease");
    let rows = SlowRows::new(Duration::from_millis(2));
    let summary = lease
        .run_streaming_command(
            &many_command("lean_rs_interop_consumer_worker_data_stream_many"),
            &request("observability-slow-sink"),
            &rows,
            None,
            None,
            None,
        )
        .expect("slow sink stream succeeds under bounded backpressure");

    let snapshot = lease.snapshot();
    assert_eq!(summary.total_rows, 512);
    assert_eq!(rows.len(), 512);
    assert_eq!(snapshot.data_rows_delivered, 512);
    assert!(
        snapshot.backpressure_waits > 0,
        "slow sink should make the bounded reader wait at least once",
    );
    assert_eq!(snapshot.backpressure_failures, 0);
}

#[test]
fn cancellation_during_backpressure_invalidates_lease() {
    let mut pool = LeanWorkerPool::new(LeanWorkerPoolConfig::new(1));
    let mut lease = pool.acquire_lease(builder()).expect("pool opens lease");
    let token = LeanWorkerCancellationToken::new();
    let rows = SlowCancelRows {
        token: &token,
        rows: Mutex::new(0),
        delay: Duration::from_millis(2),
        cancel_after: 96,
    };

    let err = lease
        .run_streaming_command(
            &many_command("lean_rs_interop_consumer_worker_data_stream_many"),
            &request("observability-cancel"),
            &rows,
            None,
            Some(&token),
            None,
        )
        .expect_err("sink cancellation should stop a backpressured stream");

    match err {
        LeanWorkerError::Cancelled { operation } => assert_eq!(operation, "worker_run_data_stream"),
        other => panic!("expected cancellation, got {other:?}"),
    }
    let snapshot = lease.snapshot();
    assert!(!lease.is_valid());
    assert_eq!(snapshot.stream_failures, 1);
    assert_eq!(snapshot.cancelled_restarts, 1);
    assert!(snapshot.data_rows_delivered >= 96);
    assert!(snapshot.backpressure_waits > 0);
}

#[test]
fn fatal_exit_after_buffered_rows_records_failure_without_commit() {
    let mut pool = LeanWorkerPool::new(LeanWorkerPoolConfig::new(1));
    let mut lease = pool.acquire_lease(builder()).expect("pool opens lease");
    let rows = RecordingRows::default();

    let err = lease
        .run_streaming_command(
            &many_command("lean_rs_interop_consumer_worker_data_stream_many_then_panic"),
            &request("observability-fatal"),
            &rows,
            None,
            None,
            None,
        )
        .expect_err("fatal child exit should fail the stream");

    match err {
        LeanWorkerError::ChildPanicOrAbort { exit } => assert!(!exit.success),
        other => panic!("expected child panic/abort, got {other:?}"),
    }
    let snapshot = lease.snapshot();
    assert!(!lease.is_valid());
    assert!(rows.len() > 0, "rows before fatal exit remain tentative");
    assert_eq!(snapshot.stream_successes, 0);
    assert_eq!(snapshot.stream_failures, 1);
    assert_eq!(snapshot.data_rows_delivered as usize, rows.len());
}