# motorcortex-rust
[](https://git.vectioneer.com/pub/motorcortex-rust/-/commits/development)
[](https://git.vectioneer.com/pub/motorcortex-rust/-/commits/development)
[](https://crates.io/crates/motorcortex-rust)
[](https://docs.rs/motorcortex-rust)
[](https://doc.rust-lang.org/edition-guide/rust-2024/index.html)
[](LICENSE)
Rust client for the Motorcortex Core. Implements the low-level wire
protocol defined by `motorcortex.proto`: Request/Reply for parameter
access, Publish/Subscribe for real-time streaming, all over WebSocket
Security (TLS).
Ships in two flavours:
- **`motorcortex_rust::core::*`** — async, tokio-driven. The recommended
surface. Handles are `Clone + Send + Sync`; multiple tasks share one
connection safely.
- **`motorcortex_rust::blocking::*`** — thin synchronous wrapper. Useful
for scripts, tests, and embedded-adjacent code that doesn't want a
tokio runtime in `main()`.
Both share the same actor-style driver — no duplicated protocol logic.
## Features
- Req/Rep RPCs: `login` / `logout`, `get_parameter` / `set_parameter`,
batch variants with tuple types, `request_parameter_tree`,
`create_group` / `remove_group`, `get_parameter_tree_hash`.
- Pub/Sub: `Subscription` handles with three independent sinks —
sync `notify(cb)` callback, async `latest().await` (watch-style,
lossy), async `stream(capacity)` / blocking `iter(capacity)` (bounded
broadcast with explicit `Missed(n)` back-pressure).
- `ConnectionState` published via `tokio::sync::watch` so callers can
observe `Connected` / `Disconnected` / `ConnectionLost` /
`SessionExpired` transitions.
- Shared parameter-tree cache accessible via `request.parameter_tree()`.
- Compile-time message hashes via the `Hash` trait — drift between
client and server is a `rustc` error, not a runtime exception.
## Getting started
```toml
# Cargo.toml
[dependencies]
motorcortex-rust = "0.5"
tokio = { version = "1", features = ["full"] } # optional — async callers only
```
Or pin to the `master` branch on the Vectioneer GitLab:
```toml
[dependencies]
motorcortex-rust = { git = "https://git.vectioneer.com/pub/motorcortex-rust", branch = "master" }
```
The TLS certificate the examples expect (`mcx.cert.crt`) is downloadable
from <https://docs.motorcortex.io/mcx.cert.crt>.
### Async example (recommended)
```rust
use motorcortex_rust::core::Request;
use motorcortex_rust::{ConnectionOptions, parse_url};
#[tokio::main]
async fn main() -> motorcortex_rust::Result<()> {
// One server URL with both ports → two dial URLs.
let (req_url, _sub_url) = parse_url("wss://127.0.0.1:5568:5567")?;
let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
let req = Request::connect_to(&req_url, opts).await?;
req.request_parameter_tree().await?;
// Scalar round-trip with automatic type conversion.
req.set_parameter("root/Control/dummyDouble", 2.345).await?;
let as_f32: f32 = req.get_parameter("root/Control/dummyDouble").await?;
let as_string: String = req.get_parameter("root/Control/dummyDouble").await?;
let as_i64: i64 = req.get_parameter("root/Control/dummyDouble").await?;
println!("{as_f32} / {as_string} / {as_i64}");
// Fixed-size array.
req.set_parameter("root/Control/dummyDoubleVec", [1.0, 2.0, 3.0]).await?;
let arr: [f64; 3] = req.get_parameter("root/Control/dummyDoubleVec").await?;
println!("{arr:?}");
// Batch (tuple) — one round-trip, N parameters.
let (b, d, i): (bool, f64, i32) = req.get_parameters(&[
"root/Control/dummyBool",
"root/Control/dummyDouble",
"root/Control/dummyInt32",
]).await?;
println!("{b} {d} {i}");
req.disconnect().await?;
Ok(())
}
```
### Subscribe example
```rust
use futures::StreamExt;
use motorcortex_rust::core::{Request, Subscribe};
use motorcortex_rust::{ConnectionOptions, parse_url};
#[tokio::main]
async fn main() -> motorcortex_rust::Result<()> {
let (req_url, sub_url) = parse_url("wss://127.0.0.1:5568:5567")?;
let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
let req = Request::connect_to(&req_url, opts.clone()).await?;
req.request_parameter_tree().await?;
let sub = Subscribe::connect_to(&sub_url, opts).await?;
let subscription = sub.subscribe(
&req,
["root/Control/dummyDouble"],
"position-group",
10, // fdiv: every 10th cycle = 100 Hz on a 1 kHz server
).await?;
// Option A — "what's the current value?" (lossy, latest wins).
let (_ts, v): (_, f64) = subscription.latest().await?;
println!("latest = {v}");
// Option B — every sample, bounded ring buffer.
let mut stream = Box::pin(subscription.stream::<f64>(256));
while let Some(item) = stream.next().await {
match item {
Ok((_ts, v)) => println!("{v}"),
Err(missed) => eprintln!("{missed}"),
}
}
Ok(())
}
```
### Blocking example (no tokio in `main()`)
```rust
use motorcortex_rust::blocking::Request;
use motorcortex_rust::{ConnectionOptions, parse_url};
fn main() -> motorcortex_rust::Result<()> {
let (req_url, _) = parse_url("wss://127.0.0.1:5568:5567")?;
let opts = ConnectionOptions::new("mcx.cert.crt".into(), 1000, 1000);
let req = Request::connect_to(&req_url, opts)?;
req.request_parameter_tree()?;
req.set_parameter("root/Control/dummyDouble", 2.345)?;
let v: f64 = req.get_parameter("root/Control/dummyDouble")?;
println!("{v}");
Ok(())
}
```
Same method names as the async API, same error type, same semantics —
just drop the `.await`s. The handle owns a hidden current-thread tokio
runtime internally.
### Runnable examples
Four small binaries under [`examples/`](examples/) that map to the
snippets above. Each takes its URL, cert path, credentials, and a
parameter path as CLI args; the defaults point at the in-repo
`tests/mcx.cert.crt` and a local `test_server` on ports 5568/5567,
so `cargo run --example foo` works out of the box once the server
is running (see [Running the tests](#running-the-tests) for how to
build and start it):
```bash
cargo run --example async_request
cargo run --example blocking_request
cargo run --example subscribe_latest
cargo run --example subscribe_stream
# Pointing at a real server:
cargo run --example async_request -- \
wss://mcx.example.com:443:443 /etc/ssl/certs/mcx.cert.crt root secret root/Control/dummyDouble
```
### Observing connection state
```rust
# use motorcortex_rust::core::Request;
# async fn demo() -> motorcortex_rust::Result<()> {
# let req = Request::new();
let mut state = req.state(); // watch::Receiver<ConnectionState>
while state.changed().await.is_ok() {
match *state.borrow() {
motorcortex_rust::ConnectionState::Connected => println!("up"),
motorcortex_rust::ConnectionState::ConnectionLost => println!("flaky"),
motorcortex_rust::ConnectionState::SessionExpired => println!("please re-login"),
motorcortex_rust::ConnectionState::Disconnected => println!("down"),
}
}
# Ok(()) }
```
## Running behind nginx (or any reverse proxy)
Motorcortex's websocket sessions are long-lived. Reverse proxies
default to closing idle websockets after ~60 s, which looks like a
transport drop to the client. To survive that, the driver periodically
refreshes the server-issued session token
(`ConnectionOptions::token_refresh_interval`, default 30 s). When the
pipe does drop, NNG redials and the driver hands the cached token back
via `RestoreSession`, so RPCs resume without a re-login.
For the loop to work end-to-end, nginx needs websocket upgrade
forwarding and a read timeout that's at least as long as the refresh
interval. Minimal config:
```nginx
map $http_upgrade $connection_upgrade {
default upgrade;
'' close;
}
server {
listen 443 ssl http2;
server_name mcx.example.com;
ssl_certificate /etc/ssl/certs/mcx.crt;
ssl_certificate_key /etc/ssl/private/mcx.key;
# REQ socket.
location /mcx_req {
proxy_pass https://127.0.0.1:5568;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
# At least 2× the refresh interval. 60 s covers the 30 s default
# with headroom; bump both if you raise `token_refresh_interval`.
proxy_read_timeout 60s;
proxy_send_timeout 60s;
}
# SUB socket — same headers, different upstream port.
location /mcx_sub {
proxy_pass https://127.0.0.1:5567;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection $connection_upgrade;
proxy_read_timeout 60s;
proxy_send_timeout 60s;
}
}
```
If the proxy sits between the client and server and *does* drop the
pipe, you'll see `ConnectionState::ConnectionLost` on the state watch;
the driver will then quietly reconnect and transition back to
`Connected` without user code having to do anything. On a server
restart that actually loses session state, the driver publishes
`SessionExpired` instead so your code knows a fresh `login()` is
needed.
## Running the tests
```bash
# Offline — lib tests + integration-test-independent unit tests.
cargo test --lib
cargo test --test unit
# Server-driven. Build the vendored C++ test_server once, then run.
# `--test-threads=1` is mandatory — tests share a single server instance.
cmake -S tests/server -B tests/server/build -DCMAKE_BUILD_TYPE=Release
cmake --build tests/server/build -j
cargo test --test integration -- --test-threads=1
```
See [`tests/README.md`](tests/README.md) for the full walkthrough,
including coverage (`cargo llvm-cov`) and parallelism caveats.
## Architecture
Design doc: [`ARCHITECTURE.md`](ARCHITECTURE.md) — actor-style driver,
three-sink subscription model, blocking façade as a thin `block_on`
layer.
Rollout plan for the remaining phases (reconnect + session tokens,
polish): [`TODO.md`](TODO.md).
## CI runner setup
The GitLab pipeline (`.gitlab-ci.yml`) expects a runner tagged `rust`
with `curl`, `cmake`, a C++20 `g++`, `pkg-config`, `libnng-dev`, and
the Motorcortex-proprietary `mcx-core` package already provisioned
(same stack as the `motorcortex-python` runner).
The Rust toolchain itself is **not** installed on the runner — the
pipeline downloads `rustup` into `$CI_PROJECT_DIR/.rustup` on first run
(minimal profile + `llvm-tools-preview`) and caches it under the
`cargo` cache key alongside `cargo-llvm-cov`. Subsequent runs reuse
the cached toolchain.
## Documentation
Inline rustdoc on every public item:
```bash
cargo doc --open
```
## License
MIT — see [`LICENSE`](LICENSE).