motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
# motorcortex-rust

[![pipeline status](https://git.vectioneer.com/pub/motorcortex-rust/badges/master/pipeline.svg)](https://git.vectioneer.com/pub/motorcortex-rust/-/commits/development)
[![coverage report](https://git.vectioneer.com/pub/motorcortex-rust/badges/master/coverage.svg?job=coverage)](https://git.vectioneer.com/pub/motorcortex-rust/-/commits/development)
[![crates.io](https://img.shields.io/crates/v/motorcortex-rust.svg)](https://crates.io/crates/motorcortex-rust)
[![docs.rs](https://img.shields.io/docsrs/motorcortex-rust)](https://docs.rs/motorcortex-rust)
[![Rust edition](https://img.shields.io/badge/edition-2024-orange.svg)](https://doc.rust-lang.org/edition-guide/rust-2024/index.html)
[![License: MIT](https://img.shields.io/badge/license-MIT-blue.svg)](LICENSE)

Rust client for the Motorcortex Core. Implements the low-level wire
protocol defined by `motorcortex.proto`: Request/Reply for parameter
access, Publish/Subscribe for real-time streaming, all over WebSocket
Security (TLS).

Ships in two flavours:

- **`motorcortex_rust::core::*`** — async, tokio-driven. The recommended
  surface. Handles are `Clone + Send + Sync`; multiple tasks share one
  connection safely.
- **`motorcortex_rust::blocking::*`** — thin synchronous wrapper. Useful
  for scripts, tests, and embedded-adjacent code that doesn't want a
  tokio runtime in `main()`.

Both share the same actor-style driver — no duplicated protocol logic.

## Features

- Req/Rep RPCs: `login` / `logout`, `get_parameter` / `set_parameter`,
  batch variants with tuple types, `request_parameter_tree`,
  `create_group` / `remove_group`, `get_parameter_tree_hash`.
- Pub/Sub: `Subscription` handles with three independent sinks —
  sync `notify(cb)` callback, async `latest().await` (watch-style,
  lossy), async `stream(capacity)` / blocking `iter(capacity)` (bounded
  broadcast with explicit `Missed(n)` back-pressure).
- `ConnectionState` published via `tokio::sync::watch` so callers can
  observe `Connected` / `Disconnected` / `ConnectionLost` /
  `SessionExpired` transitions.
- Shared parameter-tree cache accessible via `request.parameter_tree()`.
- Compile-time message hashes via the `Hash` trait — drift between
  client and server is a `rustc` error, not a runtime exception.

## Getting started

```toml
# Cargo.toml
[dependencies]
motorcortex-rust = "0.5"
tokio = { version = "1", features = ["full"] }  # optional — async callers only
```

Or pin to the `master` branch on the Vectioneer GitLab:

```toml
[dependencies]
motorcortex-rust = { git = "https://git.vectioneer.com/pub/motorcortex-rust", branch = "master" }
```

The TLS certificate the examples expect (`mcx.cert.crt`) is downloadable
from <https://docs.motorcortex.io/mcx.cert.crt>.

### Async example (recommended)

```rust
use motorcortex_rust::core::Request;
use motorcortex_rust::{ConnectionOptions, parse_url};

#[tokio::main]
async fn main() -> motorcortex_rust::Result<()> {
    // One server URL with both ports → two dial URLs.
    let (req_url, _sub_url) = parse_url("wss://127.0.0.1:5568:5567")?;
    let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);

    let req = Request::connect_to(&req_url, opts).await?;
    req.request_parameter_tree().await?;

    // Scalar round-trip with automatic type conversion.
    req.set_parameter("root/Control/dummyDouble", 2.345).await?;
    let as_f32:    f32    = req.get_parameter("root/Control/dummyDouble").await?;
    let as_string: String = req.get_parameter("root/Control/dummyDouble").await?;
    let as_i64:    i64    = req.get_parameter("root/Control/dummyDouble").await?;
    println!("{as_f32} / {as_string} / {as_i64}");

    // Fixed-size array.
    req.set_parameter("root/Control/dummyDoubleVec", [1.0, 2.0, 3.0]).await?;
    let arr: [f64; 3] = req.get_parameter("root/Control/dummyDoubleVec").await?;
    println!("{arr:?}");

    // Batch (tuple) — one round-trip, N parameters.
    let (b, d, i): (bool, f64, i32) = req.get_parameters(&[
        "root/Control/dummyBool",
        "root/Control/dummyDouble",
        "root/Control/dummyInt32",
    ]).await?;
    println!("{b} {d} {i}");

    req.disconnect().await?;
    Ok(())
}
```

### Subscribe example

```rust
use futures::StreamExt;
use motorcortex_rust::core::{Request, Subscribe};
use motorcortex_rust::{ConnectionOptions, parse_url};

#[tokio::main]
async fn main() -> motorcortex_rust::Result<()> {
    let (req_url, sub_url) = parse_url("wss://127.0.0.1:5568:5567")?;
    let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);

    let req = Request::connect_to(&req_url, opts.clone()).await?;
    req.request_parameter_tree().await?;
    let sub = Subscribe::connect_to(&sub_url, opts).await?;

    let subscription = sub.subscribe(
        &req,
        ["root/Control/dummyDouble"],
        "position-group",
        10,     // fdiv: every 10th cycle = 100 Hz on a 1 kHz server
    ).await?;

    // Option A — "what's the current value?" (lossy, latest wins).
    let (_ts, v): (_, f64) = subscription.latest().await?;
    println!("latest = {v}");

    // Option B — every sample, bounded ring buffer.
    let mut stream = Box::pin(subscription.stream::<f64>(256));
    while let Some(item) = stream.next().await {
        match item {
            Ok((_ts, v)) => println!("{v}"),
            Err(missed)  => eprintln!("{missed}"),
        }
    }
    Ok(())
}
```

### Blocking example (no tokio in `main()`)

```rust
use motorcortex_rust::blocking::Request;
use motorcortex_rust::{ConnectionOptions, parse_url};

fn main() -> motorcortex_rust::Result<()> {
    let (req_url, _) = parse_url("wss://127.0.0.1:5568:5567")?;
    let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);

    let req = Request::connect_to(&req_url, opts)?;
    req.request_parameter_tree()?;

    req.set_parameter("root/Control/dummyDouble", 2.345)?;
    let v: f64 = req.get_parameter("root/Control/dummyDouble")?;
    println!("{v}");
    Ok(())
}
```

Same method names as the async API, same error type, same semantics —
just drop the `.await`s. The handle owns a hidden current-thread tokio
runtime internally.

### Runnable examples

Four small binaries under [`examples/`](examples/) that map to the
snippets above. Each takes its URL, cert path, credentials, and a
parameter path as CLI args; the defaults point at the in-repo
`tests/mcx.cert.crt` and a local `test_server` on ports 5568/5567,
so `cargo run --example foo` works out of the box once the server
is running (see [Running the tests](#running-the-tests) for how to
build and start it):

```bash
cargo run --example async_request
cargo run --example blocking_request
cargo run --example subscribe_latest
cargo run --example subscribe_stream

# Pointing at a real server:
cargo run --example async_request -- \
    wss://mcx.example.com:443:443 /etc/ssl/certs/mcx.cert.crt root secret root/Control/dummyDouble
```

### Observing connection state

```rust
# use motorcortex_rust::core::Request;
# async fn demo() -> motorcortex_rust::Result<()> {
# let req = Request::new();
let mut state = req.state();           // watch::Receiver<ConnectionState>
while state.changed().await.is_ok() {
    match *state.borrow() {
        motorcortex_rust::ConnectionState::Connected     => println!("up"),
        motorcortex_rust::ConnectionState::ConnectionLost => println!("flaky"),
        motorcortex_rust::ConnectionState::SessionExpired => println!("please re-login"),
        motorcortex_rust::ConnectionState::Disconnected  => println!("down"),
    }
}
# Ok(()) }
```

## Running behind nginx (or any reverse proxy)

Motorcortex's websocket sessions are long-lived. Reverse proxies
default to closing idle websockets after ~60 s, which looks like a
transport drop to the client. To survive that, the driver periodically
refreshes the server-issued session token
(`ConnectionOptions::token_refresh_interval`, default 30 s). When the
pipe does drop, NNG redials and the driver hands the cached token back
via `RestoreSession`, so RPCs resume without a re-login.

For the loop to work end-to-end, nginx needs websocket upgrade
forwarding and a read timeout that's at least as long as the refresh
interval. Minimal config:

```nginx
map $http_upgrade $connection_upgrade {
    default upgrade;
    ''      close;
}

server {
    listen 443 ssl http2;
    server_name mcx.example.com;

    ssl_certificate     /etc/ssl/certs/mcx.crt;
    ssl_certificate_key /etc/ssl/private/mcx.key;

    # REQ socket.
    location /mcx_req {
        proxy_pass         https://127.0.0.1:5568;
        proxy_http_version 1.1;
        proxy_set_header   Upgrade    $http_upgrade;
        proxy_set_header   Connection $connection_upgrade;

        # At least 2× the refresh interval. 60 s covers the 30 s default
        # with headroom; bump both if you raise `token_refresh_interval`.
        proxy_read_timeout  60s;
        proxy_send_timeout  60s;
    }

    # SUB socket — same headers, different upstream port.
    location /mcx_sub {
        proxy_pass         https://127.0.0.1:5567;
        proxy_http_version 1.1;
        proxy_set_header   Upgrade    $http_upgrade;
        proxy_set_header   Connection $connection_upgrade;
        proxy_read_timeout  60s;
        proxy_send_timeout  60s;
    }
}
```

If the proxy sits between the client and server and *does* drop the
pipe, you'll see `ConnectionState::ConnectionLost` on the state watch;
the driver will then quietly reconnect and transition back to
`Connected` without user code having to do anything. On a server
restart that actually loses session state, the driver publishes
`SessionExpired` instead so your code knows a fresh `login()` is
needed.

## Running the tests

```bash
# Offline — lib tests + integration-test-independent unit tests.
cargo test --lib
cargo test --test unit

# Server-driven. Build the vendored C++ test_server once, then run.
# `--test-threads=1` is mandatory — tests share a single server instance.
cmake -S tests/server -B tests/server/build -DCMAKE_BUILD_TYPE=Release
cmake --build tests/server/build -j
cargo test --test integration -- --test-threads=1
```

See [`tests/README.md`](tests/README.md) for the full walkthrough,
including coverage (`cargo llvm-cov`) and parallelism caveats.

## Architecture

Design doc: [`ARCHITECTURE.md`](ARCHITECTURE.md) — actor-style driver,
three-sink subscription model, blocking façade as a thin `block_on`
layer.

Rollout plan for the remaining phases (reconnect + session tokens,
polish): [`TODO.md`](TODO.md).

## CI runner setup

The GitLab pipeline (`.gitlab-ci.yml`) expects a runner tagged `rust`
with `curl`, `cmake`, a C++20 `g++`, `pkg-config`, `libnng-dev`, and
the Motorcortex-proprietary `mcx-core` package already provisioned
(same stack as the `motorcortex-python` runner).

The Rust toolchain itself is **not** installed on the runner — the
pipeline downloads `rustup` into `$CI_PROJECT_DIR/.rustup` on first run
(minimal profile + `llvm-tools-preview`) and caches it under the
`cargo` cache key alongside `cargo-llvm-cov`. Subsequent runs reuse
the cached toolchain.

## Documentation

Inline rustdoc on every public item:

```bash
cargo doc --open
```

## License

MIT — see [`LICENSE`](LICENSE).