running-process 4.5.5

Subprocess and PTY runtime for the running-process project
Documentation
#![cfg(feature = "client")]

use std::sync::{Arc, Barrier};
use std::thread;

use prost::Message;
use running_process::broker::protocol::{
    hello_reply::Result as HelloReplyResult, BrokerIsolation, Frame, FrameKind, Hello,
    PayloadEncoding, ServiceDefinition,
};
use running_process::broker::server::{HelloHandler, PeerIdentity, RegisteredBackend};

const CLIENTS: usize = 100;

fn service_definition() -> ServiceDefinition {
    ServiceDefinition {
        service_name: "zccache".into(),
        binary_path: "/usr/local/bin/zccache".into(),
        isolation: BrokerIsolation::SharedBroker as i32,
        explicit_instance: String::new(),
        per_version_binary_dir: "/opt/zccache/versions".into(),
        min_version: "1.10.0".into(),
        version_allow_list: vec!["1.11.20".into()],
        labels: Default::default(),
    }
}

fn handler() -> HelloHandler {
    HelloHandler::new()
        .with_backend(RegisteredBackend {
            service_definition: service_definition(),
            daemon_version: "1.11.20".into(),
            backend_pipe: r"\\.\pipe\rpb-v1-test-backend".into(),
            server_capabilities: 0x01,
        })
        .unwrap()
}

fn hello(index: usize) -> Hello {
    Hello {
        client_min_protocol: 1,
        client_max_protocol: 1,
        service_name: "zccache".into(),
        wanted_version: "1.11.20".into(),
        client_version: "zccache-cli/1.11.20".into(),
        client_capabilities: 0,
        auth_token: Vec::new(),
        request_id: format!("req-concurrent-{index}"),
        connection_id: 0,
        peer_pid: 0,
        client_lib_name: "running-process".into(),
        client_lib_version: "4.0.3".into(),
        peer_attestation_nonce: Vec::new(),
        capability_token: Vec::new(),
        client_keepalive_secs: 60,
    }
}

fn peer() -> PeerIdentity {
    PeerIdentity {
        pid: std::process::id(),
        uid_or_sid: "test-peer".into(),
    }
}

fn frame_for_hello(index: usize, hello: &Hello) -> Frame {
    Frame {
        envelope_version: 1,
        kind: FrameKind::Request as i32,
        payload_protocol: 0,
        payload: hello.encode_to_vec(),
        request_id: (index + 1) as u64,
        payload_encoding: PayloadEncoding::None as i32,
        deadline_unix_ms: 0,
        traceparent: String::new(),
        tracestate: String::new(),
    }
}

#[test]
fn handler_assigns_deterministic_unique_connection_ids_under_concurrency() {
    let handler = Arc::new(handler());
    let barrier = Arc::new(Barrier::new(CLIENTS));
    let mut clients = Vec::with_capacity(CLIENTS);

    for index in 0..CLIENTS {
        let handler = Arc::clone(&handler);
        let barrier = Arc::clone(&barrier);
        clients.push(thread::spawn(move || {
            let request = hello(index);
            let frame = frame_for_hello(index, &request);
            barrier.wait();
            match handler.handle_frame(frame, peer()).result.unwrap() {
                HelloReplyResult::Negotiated(negotiated) => {
                    assert_eq!(negotiated.daemon_version, "1.11.20");
                    negotiated.connection_id
                }
                HelloReplyResult::Refused(refused) => {
                    panic!("unexpected refusal for client {index}: {refused:?}")
                }
            }
        }));
    }

    let mut connection_ids = clients
        .into_iter()
        .map(|client| client.join().unwrap())
        .collect::<Vec<_>>();
    connection_ids.sort_unstable();

    assert_eq!(connection_ids, (1..=CLIENTS as u64).collect::<Vec<_>>());
}