motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation

motorcortex-rust

pipeline status coverage report crates.io docs.rs Rust edition License: MIT

Rust client for the Motorcortex Core. Implements the low-level wire protocol defined by motorcortex.proto: Request/Reply for parameter access, Publish/Subscribe for real-time streaming, all over WebSocket Security (TLS).

Ships in two flavours:

  • motorcortex_rust::core::* — async, tokio-driven. The recommended surface. Handles are Clone + Send + Sync; multiple tasks share one connection safely.
  • motorcortex_rust::blocking::* — thin synchronous wrapper. Useful for scripts, tests, and embedded-adjacent code that doesn't want a tokio runtime in main().

Both share the same actor-style driver — no duplicated protocol logic.

Features

  • Req/Rep RPCs: login / logout, get_parameter / set_parameter, batch variants with tuple types, request_parameter_tree, create_group / remove_group, get_parameter_tree_hash.
  • Pub/Sub: Subscription handles with three independent sinks — sync notify(cb) callback, async latest().await (watch-style, lossy), async stream(capacity) / blocking iter(capacity) (bounded broadcast with explicit Missed(n) back-pressure).
  • ConnectionState published via tokio::sync::watch so callers can observe Connected / Disconnected / ConnectionLost / SessionExpired transitions.
  • Shared parameter-tree cache accessible via request.parameter_tree().
  • Compile-time message hashes via the Hash trait — drift between client and server is a rustc error, not a runtime exception.

Getting started

# Cargo.toml
[dependencies]
motorcortex-rust = "0.5"
tokio = { version = "1", features = ["full"] }  # optional — async callers only

Or pin to the master branch on the Vectioneer GitLab:

[dependencies]
motorcortex-rust = { git = "https://git.vectioneer.com/pub/motorcortex-rust", branch = "master" }

The TLS certificate the examples expect (mcx.cert.crt) is downloadable from https://docs.motorcortex.io/mcx.cert.crt.

Async example (recommended)

use motorcortex_rust::core::Request;
use motorcortex_rust::{ConnectionOptions, parse_url};

#[tokio::main]
async fn main() -> motorcortex_rust::Result<()> {
    // One server URL with both ports → two dial URLs.
    let (req_url, _sub_url) = parse_url("wss://127.0.0.1:5568:5567")?;
    let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);

    let req = Request::connect_to(&req_url, opts).await?;
    req.request_parameter_tree().await?;

    // Scalar round-trip with automatic type conversion.
    req.set_parameter("root/Control/dummyDouble", 2.345).await?;
    let as_f32:    f32    = req.get_parameter("root/Control/dummyDouble").await?;
    let as_string: String = req.get_parameter("root/Control/dummyDouble").await?;
    let as_i64:    i64    = req.get_parameter("root/Control/dummyDouble").await?;
    println!("{as_f32} / {as_string} / {as_i64}");

    // Fixed-size array.
    req.set_parameter("root/Control/dummyDoubleVec", [1.0, 2.0, 3.0]).await?;
    let arr: [f64; 3] = req.get_parameter("root/Control/dummyDoubleVec").await?;
    println!("{arr:?}");

    // Batch (tuple) — one round-trip, N parameters.
    let (b, d, i): (bool, f64, i32) = req.get_parameters(&[
        "root/Control/dummyBool",
        "root/Control/dummyDouble",
        "root/Control/dummyInt32",
    ]).await?;
    println!("{b} {d} {i}");

    req.disconnect().await?;
    Ok(())
}

Subscribe example

use futures::StreamExt;
use motorcortex_rust::core::{Request, Subscribe};
use motorcortex_rust::{ConnectionOptions, parse_url};

#[tokio::main]
async fn main() -> motorcortex_rust::Result<()> {
    let (req_url, sub_url) = parse_url("wss://127.0.0.1:5568:5567")?;
    let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);

    let req = Request::connect_to(&req_url, opts.clone()).await?;
    req.request_parameter_tree().await?;
    let sub = Subscribe::connect_to(&sub_url, opts).await?;

    let subscription = sub.subscribe(
        &req,
        ["root/Control/dummyDouble"],
        "position-group",
        10,     // fdiv: every 10th cycle = 100 Hz on a 1 kHz server
    ).await?;

    // Option A — "what's the current value?" (lossy, latest wins).
    let (_ts, v): (_, f64) = subscription.latest().await?;
    println!("latest = {v}");

    // Option B — every sample, bounded ring buffer.
    let mut stream = Box::pin(subscription.stream::<f64>(256));
    while let Some(item) = stream.next().await {
        match item {
            Ok((_ts, v)) => println!("{v}"),
            Err(missed)  => eprintln!("{missed}"),
        }
    }
    Ok(())
}

Blocking example (no tokio in main())

use motorcortex_rust::blocking::Request;
use motorcortex_rust::{ConnectionOptions, parse_url};

fn main() -> motorcortex_rust::Result<()> {
    let (req_url, _) = parse_url("wss://127.0.0.1:5568:5567")?;
    let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);

    let req = Request::connect_to(&req_url, opts)?;
    req.request_parameter_tree()?;

    req.set_parameter("root/Control/dummyDouble", 2.345)?;
    let v: f64 = req.get_parameter("root/Control/dummyDouble")?;
    println!("{v}");
    Ok(())
}

Same method names as the async API, same error type, same semantics — just drop the .awaits. The handle owns a hidden current-thread tokio runtime internally.

Runnable examples

Four small binaries under examples/ that map to the snippets above. Each takes its URL, cert path, credentials, and a parameter path as CLI args; the defaults point at the in-repo tests/mcx.cert.crt and a local test_server on ports 5568/5567, so cargo run --example foo works out of the box once the server is running (see Running the tests for how to build and start it):

cargo run --example async_request
cargo run --example blocking_request
cargo run --example subscribe_latest
cargo run --example subscribe_stream

# Pointing at a real server:
cargo run --example async_request -- \
    wss://mcx.example.com:443:443 /etc/ssl/certs/mcx.cert.crt root secret root/Control/dummyDouble

Observing connection state

# use motorcortex_rust::core::Request;
# async fn demo() -> motorcortex_rust::Result<()> {
# let req = Request::new();
let mut state = req.state();           // watch::Receiver<ConnectionState>
while state.changed().await.is_ok() {
    match *state.borrow() {
        motorcortex_rust::ConnectionState::Connected     => println!("up"),
        motorcortex_rust::ConnectionState::ConnectionLost => println!("flaky"),
        motorcortex_rust::ConnectionState::SessionExpired => println!("please re-login"),
        motorcortex_rust::ConnectionState::Disconnected  => println!("down"),
    }
}
# Ok(()) }

Running behind nginx (or any reverse proxy)

Motorcortex's websocket sessions are long-lived. Reverse proxies default to closing idle websockets after ~60 s, which looks like a transport drop to the client. To survive that, the driver periodically refreshes the server-issued session token (ConnectionOptions::token_refresh_interval, default 30 s). When the pipe does drop, NNG redials and the driver hands the cached token back via RestoreSession, so RPCs resume without a re-login.

For the loop to work end-to-end, nginx needs websocket upgrade forwarding and a read timeout that's at least as long as the refresh interval. Minimal config:

map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

server {
    listen 443 ssl http2;
    server_name mcx.example.com;

    ssl_certificate     /etc/ssl/certs/mcx.crt;
    ssl_certificate_key /etc/ssl/private/mcx.key;

    # REQ socket.
    location /mcx_req {
        proxy_pass         https://127.0.0.1:5568;
        proxy_http_version 1.1;
        proxy_set_header   Upgrade    $http_upgrade;
        proxy_set_header   Connection $connection_upgrade;

        # At least 2× the refresh interval. 60 s covers the 30 s default
        # with headroom; bump both if you raise `token_refresh_interval`.
        proxy_read_timeout  60s;
        proxy_send_timeout  60s;
    }

    # SUB socket — same headers, different upstream port.
    location /mcx_sub {
        proxy_pass         https://127.0.0.1:5567;
        proxy_http_version 1.1;
        proxy_set_header   Upgrade    $http_upgrade;
        proxy_set_header   Connection $connection_upgrade;
        proxy_read_timeout  60s;
        proxy_send_timeout  60s;
    }
}

If the proxy sits between the client and server and does drop the pipe, you'll see ConnectionState::ConnectionLost on the state watch; the driver will then quietly reconnect and transition back to Connected without user code having to do anything. On a server restart that actually loses session state, the driver publishes SessionExpired instead so your code knows a fresh login() is needed.

Running the tests

# Offline — lib tests + integration-test-independent unit tests.
cargo test --lib
cargo test --test unit

# Server-driven. Build the vendored C++ test_server once, then run.
# `--test-threads=1` is mandatory — tests share a single server instance.
cmake -S tests/server -B tests/server/build -DCMAKE_BUILD_TYPE=Release
cmake --build tests/server/build -j
cargo test --test integration -- --test-threads=1

See tests/README.md for the full walkthrough, including coverage (cargo llvm-cov) and parallelism caveats.

Architecture

Design doc: ARCHITECTURE.md — actor-style driver, three-sink subscription model, blocking façade as a thin block_on layer.

Rollout plan for the remaining phases (reconnect + session tokens, polish): TODO.md.

CI runner setup

The GitLab pipeline (.gitlab-ci.yml) expects a runner tagged rust with curl, cmake, a C++20 g++, pkg-config, libnng-dev, and the Motorcortex-proprietary mcx-core package already provisioned (same stack as the motorcortex-python runner).

The Rust toolchain itself is not installed on the runner — the pipeline downloads rustup into $CI_PROJECT_DIR/.rustup on first run (minimal profile + llvm-tools-preview) and caches it under the cargo cache key alongside cargo-llvm-cov. Subsequent runs reuse the cached toolchain.

Documentation

Inline rustdoc on every public item:

cargo doc --open

License

MIT — see LICENSE.