lean-rs-worker 0.1.2

Worker-process boundary for lean-rs host workloads.
Documentation
#![allow(clippy::expect_used)]

use std::path::{Path, PathBuf};
use std::sync::Mutex;

use lean_rs_worker::{
    LeanWorkerCapabilityBuilder, LeanWorkerPool, LeanWorkerPoolConfig, LeanWorkerStreamingCommand,
    LeanWorkerTypedDataRow, LeanWorkerTypedDataSink,
};
use serde::{Deserialize, Serialize};

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let mut pool = LeanWorkerPool::new(LeanWorkerPoolConfig::new(2));
    let builder = LeanWorkerCapabilityBuilder::new(
        interop_root(),
        "lean_rs_interop_consumer",
        "LeanRsInteropConsumer",
        ["LeanRsInteropConsumer.Callback"],
    );

    let command =
        LeanWorkerStreamingCommand::<Request, Row, Summary>::new("lean_rs_interop_consumer_worker_data_stream");
    let rows = RecordingRows::default();

    {
        let mut lease = pool.acquire_lease(builder.clone())?;
        let summary = lease.run_streaming_command(
            &command,
            &Request {
                source: "worker-pool-example".to_owned(),
            },
            &rows,
            None,
            None,
            None,
        )?;
        if let Some(metadata) = summary.metadata {
            println!(
                "first lease rows={} fixture={} ok={}",
                summary.total_rows, metadata.fixture, metadata.ok
            );
        }
    }

    {
        let mut lease = pool.acquire_lease(builder)?;
        lease.cycle()?;
    }

    println!(
        "pool workers={} captured_rows={:?}",
        pool.snapshot().workers,
        rows.rows()
    );
    Ok(())
}

fn workspace_root() -> PathBuf {
    let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
    manifest_dir
        .parent()
        .and_then(Path::parent)
        .expect("crates/<name> lives two directories below the workspace root")
        .to_path_buf()
}

fn interop_root() -> PathBuf {
    workspace_root().join("fixtures").join("interop-shims")
}

#[derive(Debug, Serialize)]
struct Request {
    source: String,
}

#[derive(Clone, Debug, Deserialize)]
struct Row {
    kind: String,
    ordinal: u64,
}

#[derive(Debug, Deserialize)]
struct Summary {
    fixture: String,
    ok: bool,
}

#[derive(Default)]
struct RecordingRows {
    rows: Mutex<Vec<LeanWorkerTypedDataRow<Row>>>,
}

impl RecordingRows {
    fn rows(&self) -> Vec<String> {
        self.rows
            .lock()
            .expect("rows lock is not poisoned")
            .iter()
            .map(|row| {
                format!(
                    "{}#{}:{}:{}",
                    row.stream, row.sequence, row.payload.kind, row.payload.ordinal
                )
            })
            .collect()
    }
}

impl LeanWorkerTypedDataSink<Row> for RecordingRows {
    fn report(&self, row: LeanWorkerTypedDataRow<Row>) {
        self.rows.lock().expect("rows lock is not poisoned").push(row);
    }
}