motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
//! `#[ignore]`d stress test — flip on with `--ignored` when you want
//! to hammer the driver.
//!
//! Spawns `N` cloned [`Request`] handles, each running a hot loop of
//! [`Request::get_parameter`] for `DURATION`. The point is to shake
//! out deadlocks, unbounded queue growth, and handler-level
//! contention regressions. Every clone shares one driver thread — if
//! the mpsc command channel or the `ParameterTree` lock ever
//! seriously backs up, the p99 latency here is the first thing that
//! blows out.
//!
//! Runtime: ~30 s. Reserved for `cargo test -- --ignored`.

use std::sync::Arc;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::time::{Duration, Instant};

use motorcortex_rust::ConnectionOptions;
use motorcortex_rust::core::Request;

use crate::{CERT_PATH, URL_REQ};

const N_CLONES: usize = 16;
const DURATION: Duration = Duration::from_secs(30);
const PARAM: &str = "root/Control/dummyDouble";
/// Safety net on individual RPC latency. The server runs a 1 kHz
/// loop; any call that takes more than this almost certainly means
/// the driver wedged.
const MAX_CALL_LATENCY: Duration = Duration::from_secs(1);

#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
#[ignore]
async fn stress_16_clones_hot_get_parameter() {
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let req = Request::connect_to(URL_REQ, opts).await.expect("connect");
    req.request_parameter_tree().await.expect("tree");

    let total_calls = Arc::new(AtomicUsize::new(0));

    let deadline = Instant::now() + DURATION;
    let mut handles = Vec::with_capacity(N_CLONES);

    for _ in 0..N_CLONES {
        let req = req.clone();
        let total = Arc::clone(&total_calls);
        handles.push(tokio::spawn(async move {
            // Per-task latency histogram — coarse buckets so we don't
            // pay allocator cost per call.
            let mut min = Duration::MAX;
            let mut max = Duration::ZERO;
            let mut sum = Duration::ZERO;
            let mut count: u64 = 0;
            while Instant::now() < deadline {
                let t0 = Instant::now();
                let _v: f64 = req.get_parameter(PARAM).await.expect("get_parameter");
                let dt = t0.elapsed();
                assert!(
                    dt < MAX_CALL_LATENCY,
                    "single RPC exceeded {MAX_CALL_LATENCY:?} — driver likely wedged",
                );
                if dt < min {
                    min = dt;
                }
                if dt > max {
                    max = dt;
                }
                sum += dt;
                count += 1;
            }
            total.fetch_add(count as usize, Ordering::Relaxed);
            (count, min, sum, max)
        }));
    }

    let mut overall_min = Duration::MAX;
    let mut overall_max = Duration::ZERO;
    let mut overall_sum = Duration::ZERO;
    let mut overall_count: u64 = 0;
    for (i, handle) in handles.into_iter().enumerate() {
        let (count, min, sum, max) = handle
            .await
            .unwrap_or_else(|_| panic!("task {i} panicked"));
        if min < overall_min {
            overall_min = min;
        }
        if max > overall_max {
            overall_max = max;
        }
        overall_sum += sum;
        overall_count += count;
    }

    assert!(
        overall_count > 0,
        "stress test produced zero RPCs — something is very wrong",
    );
    let avg = overall_sum / (overall_count as u32).max(1);
    eprintln!(
        "stress: {N_CLONES} clones × {DURATION:?}{overall_count} calls \
         (min {overall_min:?}, avg {avg:?}, max {overall_max:?})"
    );

    req.disconnect().await.expect("disconnect");
}