# Architecture
> **Status:** describes the shipped design on the `async-rewrite`
> branch. The rewrite replaced the earlier synchronous prototype
> entirely — there is no legacy type surface left in the crate.
This document describes how the library is structured today. The
goal is a small, honest, idiomatic Rust surface over NNG —
comparable feature set to motorcortex-python but without replicating
python-isms that don't fit Rust's ownership model.
## Philosophy
1. **Async is the core.** Every user-visible operation is an `async fn`.
Blocking callers use a thin façade that hides a
`tokio::runtime::Runtime`. (The reqwest model.)
2. **One coherent core, two thin frontends.** No parallel
`AsyncRequest` / `Request` hierarchies — a single
`Request` type on top of a single driver.
3. **Serialisation in the type system.** `Request` is clonable and
`Send + Sync`, but the underlying NNG socket is mutated one command
at a time by a driver task. Callers can't cause concurrent
send/recv races because the public API never hands out the socket.
4. **Pay for what you use.** Callback-style subscriptions have zero
channel allocation. `latest()` allocates a `watch`. `stream()`
lazily allocates a `broadcast`. Each is opt-in.
5. **Compile-time protocol, runtime-fallible I/O.** Message types,
hashes, and dtype dispatch are resolved at compile time via
`prost` + the `Hash` trait. Only NNG and decoding errors are
runtime failures — and those live in one enum
(`MotorcortexError`).
6. **Drop is the cleanup story.** No `atexit` hooks, no explicit
`close()` requirement. Dropping a `Request` closes the command
channel, which lets the driver exit cleanly, which closes the
socket.
## Layered overview
```
┌──────────────────────────────────────────────────────────┐
│ public API (async) │
│ Request Subscribe Subscription ConnectionState│
└──────────────────────────────────────────────────────────┘
│ mpsc::Sender<Cmd>
▼
┌──────────────────────────────────────────────────────────┐
│ driver thread (one dedicated std::thread) │
│ owns ConnectionManager │
│ blocking_recv on the Cmd channel, calls NNG inline │
│ publishes ConnectionState + subscription updates │
│ Request driver also joins a token-refresh helper │
└──────────────────────────────────────────────────────────┘
│
▼
┌──────────────────────────────────────────────────────────┐
│ nng-c-sys (blocking FFI) │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│ blocking façade (motorcortex::blocking) │
│ owns a current-thread Runtime; each method is │
│ rt.block_on(inner.method()). │
└──────────────────────────────────────────────────────────┘
```
## Public API
### `Request` — request/reply handle
```rust
pub struct Request { /* opaque; Clone + Send + Sync */ }
impl Request {
pub fn new() -> Self;
pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>;
pub async fn disconnect(&self) -> Result<()>;
pub async fn login(&self, user: &str, pass: &str) -> Result<StatusCode>;
pub async fn logout(&self) -> Result<StatusCode>;
pub async fn request_parameter_tree(&self) -> Result<StatusCode>;
pub async fn get_parameter_tree_hash(&self) -> Result<u32>;
pub async fn get_parameter<V: GetParameterValue + Default>(&self, path: &str) -> Result<V>;
pub async fn set_parameter<V: SetParameterValue>(&self, path: &str, v: V) -> Result<StatusCode>;
pub async fn get_parameters<T: GetParameterTuple>(&self, paths: &[&str]) -> Result<T>;
pub async fn set_parameters<T: SetParameterTuple>(&self, paths: &[&str], v: T) -> Result<StatusCode>;
pub async fn create_group(&self, paths: impl Parameters, alias: &str, fdiv: u32) -> Result<GroupStatusMsg>;
pub async fn remove_group(&self, alias: &str) -> Result<StatusCode>;
// Session tokens — feed the reconnect path, persistable across
// process restarts.
pub async fn get_session_token(&self) -> Result<String>;
pub async fn restore_session(&self, token: &str) -> Result<StatusCode>;
pub fn session_token(&self) -> Option<String>;
pub fn session_refresh_count(&self) -> u64;
pub fn state(&self) -> watch::Receiver<ConnectionState>;
pub fn parameter_tree(&self) -> Arc<RwLock<ParameterTree>>;
}
```
Cloning a `Request` is cheap — it's an `mpsc::Sender` plus a few
`watch::Receiver` / `Arc` clones. Multiple tasks can hold their own
handle; commands queue at the single driver, which guarantees
req/rep ordering.
### `Subscribe` — publish/subscribe handle
```rust
pub struct Subscribe { /* opaque; Clone + Send + Sync */ }
impl Subscribe {
pub fn new() -> Self;
pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>;
pub async fn disconnect(&self) -> Result<()>;
pub async fn subscribe(
&self,
req: &Request,
paths: impl Parameters,
alias: &str,
fdiv: u32,
) -> Result<Subscription>;
pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()>;
/// Re-register every active subscription after a reconnect that
/// lost server-side group state. Outstanding `Subscription`
/// clones stay valid — the driver rebinds each handle in place
/// with its fresh server-assigned id.
pub async fn resubscribe(&self, req: &Request) -> Result<()>;
pub fn state(&self) -> watch::Receiver<ConnectionState>;
}
```
`Subscribe::subscribe` takes a `&Request` because it has to issue a
`CreateGroupMsg` RPC before registering the group ID with NNG. The
two handles stay independent — they don't share a driver, only the
one create-group call.
### `Subscription` — per-group handle
```rust
pub struct Subscription { /* opaque; Clone + Send + Sync */ }
impl Subscription {
pub fn id(&self) -> u32; // atomic; updated by resubscribe()
pub fn name(&self) -> &str; // fixed for life of the handle
pub fn paths(&self) -> Vec<String>;
pub fn fdiv(&self) -> u32;
/// Sync read — returns the most recent payload decoded as a
/// tuple matching the subscription's parameter shape.
pub fn read<V: GetParameterTuple>(&self) -> Option<(TimeSpec, V)>;
/// Sync read — same payload, flattened into a `Vec<V>`
/// (every scalar element of every subscribed parameter).
pub fn read_all<V: GetParameterValue + Default>(&self) -> Option<(TimeSpec, Vec<V>)>;
/// Await the most recent payload. Lossy: intermediate samples
/// between calls are discarded. Resolves immediately once any
/// payload has arrived.
pub async fn latest<V: GetParameterTuple>(&self) -> Result<(TimeSpec, V)>;
/// Every sample, bounded ring buffer. `Err(Missed(n))` if the
/// consumer falls behind by more than `capacity` samples.
pub fn stream<V: GetParameterTuple + Send + 'static>(&self, capacity: usize)
-> impl Stream<Item = StreamResult<V>> + use<V>;
/// Fire-and-forget callback. Runs on the receive thread; don't block.
pub fn notify<F: Fn(&Subscription) + Send + Sync + 'static>(&self, f: F);
}
pub type StreamResult<V> = Result<(TimeSpec, V), Missed>;
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Missed(pub u64);
```
Three sinks, each opt-in:
- **`notify`** — no allocation, no channel, runs inline on the
receive thread.
- **`latest`** (and `read` / `read_all`) — one `watch::Sender`,
overwritten each update.
- **`stream`** — one lazily-created `broadcast::Sender` with
explicit capacity.
Every `update()` pushes to all active sinks for that subscription,
so `notify` + `stream` + `latest` can coexist on the same
Subscription. See *How `stream()` works* under Internal design for
the full flow.
### `ConnectionState`
```rust
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
/// Initial state + after a clean `disconnect()`.
Disconnected,
/// Happy path — socket is up, session is restored (if previously set).
Connected,
/// NNG pipe event fired REM_POST. NNG's dialer is retrying the
/// transport in the background; we'll transition back to
/// `Connected` (or to `SessionExpired`) once the pipe comes back.
ConnectionLost,
/// The socket came back, but `RestoreSession(token)` failed.
/// Caller action required — re-login with fresh credentials or
/// give up. We do *not* silently re-login with cached creds.
SessionExpired,
}
```
Exposed via `Request::state()` and `Subscribe::state()` as
`watch::Receiver<ConnectionState>`. Consumers can
`state.changed().await` or poll `*state.borrow()`.
The NNG dialer's own redial loop is invisible at this level — we
don't emit `Reconnecting` for it. Only session-layer transitions
show up.
### Blocking façade
```rust
pub mod blocking {
pub struct Request { /* owns hidden current-thread runtime */ }
impl Request {
pub fn new() -> Result<Self>;
pub fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>;
// every async method mirrored with .await dropped
}
pub struct Subscribe { /* ... */ }
pub struct Subscription { /* ... */ }
}
```
Each method is literally `self.rt.block_on(self.inner.foo(args))`.
Lifecycle: one hidden current-thread `tokio::runtime::Runtime` per
handle. Drop shuts the runtime down.
### Free helpers
```rust
/// Split "wss://host:req_port:sub_port" into (req_url, sub_url).
pub fn parse_url(s: &str) -> Result<(String, String)>;
/// Convenience constructor — `new()` + `connect()` in one.
impl Request {
pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>;
}
impl Subscribe {
pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>;
}
```
No `Session` type. If three consumers end up writing the same
preamble, add one. Until then, explicit construction beats opaque
sugar.
## Internal design
### The driver
One driver thread per `Request` and one per `Subscribe`, each a
dedicated `std::thread` rather than a tokio task. The driver owns
the `ConnectionManager` (sole owner, no `Mutex`), reads `Cmd` /
`SubCmd` values from a `tokio::sync::mpsc::UnboundedReceiver` via
`blocking_recv`, and calls NNG inline:
```rust
pub(crate) enum Cmd {
Connect { url: String, opts: ConnectionOptions, reply: oneshot::Sender<Result<()>> },
Disconnect{ reply: oneshot::Sender<Result<()>> },
Login { user: String, pass: String, reply: oneshot::Sender<Result<StatusCode>> },
Logout { reply: oneshot::Sender<Result<StatusCode>> },
RequestParameterTree { reply: oneshot::Sender<Result<StatusCode>> },
GetParameter { path: String, reply: oneshot::Sender<Result<Vec<u8>>> },
SetParameter { path: String, value: Vec<u8>, reply: oneshot::Sender<Result<StatusCode>> },
GetParameters { msg: GetParameterListMsg, reply: oneshot::Sender<Result<ParameterListMsg>> },
SetParameters { msg: SetParameterListMsg, reply: oneshot::Sender<Result<StatusCode>> },
CreateGroup { msg: CreateGroupMsg, reply: oneshot::Sender<Result<GroupStatusMsg>> },
RemoveGroup { alias: String, reply: oneshot::Sender<Result<StatusCode>> },
GetParameterTreeHash { reply: oneshot::Sender<Result<u32>> },
GetSessionToken { reply: oneshot::Sender<Result<String>> },
RestoreSession { token: String, reply: oneshot::Sender<Result<StatusCode>> },
/// Background refresh tick — fired by the helper thread.
RefreshTokenTick,
/// Pipe-notify event, forwarded from an NNG callback.
Pipe(PipeEvent),
}
fn run_request_driver(
self_tx: mpsc::UnboundedSender<Cmd>,
mut rx: mpsc::UnboundedReceiver<Cmd>,
state_tx: watch::Sender<ConnectionState>,
tree: Arc<RwLock<ParameterTree>>,
last_token: Arc<RwLock<Option<String>>>,
refresh_count: Arc<AtomicU64>,
) {
let mut conn = ConnectionManager::new();
let mut user_wants_connected = false;
let mut pending_reconnect = false;
let mut reconnect_enabled = true;
let mut max_reconnect_attempts: Option<u32> = None;
let mut consecutive_restore_failures: u32 = 0;
let mut refresh_thread: Option<thread::JoinHandle<()>> = None;
while let Some(cmd) = rx.blocking_recv() {
match cmd {
Cmd::Connect { url, opts, reply } => {
let result = conn.connect(&url, opts, nng_c_sys::nng_req0_open, on_pipe.clone());
// …state transitions, spawn token-refresh helper…
let _ = reply.send(result);
}
Cmd::Pipe(event) => apply_pipe_event_request(event, /* … */),
Cmd::RefreshTokenTick if *state_tx.borrow() == ConnectionState::Connected => {
let _ = do_get_session_token(&conn, &last_token);
refresh_count.fetch_add(1, Ordering::Relaxed);
}
/* …other handlers… */
_ => {}
}
}
// mpsc closed → exit; ConnectionManager::drop closes the socket.
}
```
Why a dedicated `std::thread` instead of a tokio task with
`spawn_blocking` per-RPC:
- **NNG calls are synchronous** and each blocks for the duration of
one round-trip. A task that `spawn_blocking`s per-command would
churn the blocking pool; a single owner thread is simpler.
- **Thread count stays bounded** — `Request` spawns 1 driver +
1 refresh helper; `Subscribe` spawns 1 driver + 1 receive thread.
No growth with clones, no growth with in-flight RPCs.
- **No tokio runtime required on the driver side.** Callers
`.await` the `oneshot::Receiver` from any runtime context (or
block on it via the blocking façade).
### Reconnect + session tokens
The NNG dialer is configured with `RECONNMINT` / `RECONNMAXT` so
the transport redials itself on drop. The driver layers session
restore on top:
- **Pipe callback** → forwards ADD_POST / REM_POST as `Cmd::Pipe`
/ `SubCmd::Pipe` through the mpsc channel. No work happens in
the callback itself — it runs on an NNG thread and must not
block.
- **Token refresh** — a helper `std::thread` wakes every
`ConnectionOptions::token_refresh_interval` (default 30 s) and
fires `Cmd::RefreshTokenTick`. The tick handler gates on
`state_tx.borrow() == Connected` so no RPCs land on a dead
pipe.
- **Pipe state machine** — `apply_pipe_event_request` factors the
decision out into a pure `decide_state_after_pipe_event` helper
(unit-tested in isolation). It handles:
- REM_POST → `ConnectionLost` (if reconnect enabled) or
`Disconnected` (if not).
- ADD_POST with a pending reconnect → try
`RestoreSession(cached_token)`. On `Ok` / `ReadOnlyMode` →
`Connected` + counter reset. On any other status →
`SessionExpired` + counter++; if
`max_reconnect_attempts` caps out, disable the dialer and
publish `Disconnected`.
- ADD_POST with no token cached → treated as a bare reconnect,
`Connected`.
- **Subscribe side** has no restore logic of its own. When the
Request side successfully restores, callers can invoke
`Subscribe::resubscribe(&req)` to re-create every group against
the new server state. The driver rebinds each `Subscription`
in place (see "Subscription rebind" below).
### The subscribe receive loop
A second dedicated `std::thread` (not a tokio task) runs alongside
the Subscribe driver. It blocks on `nng_recv`, parses the 3-byte
group-id prefix, looks up the Subscription in the shared table, and
calls `Subscription::update(buffer)`:
```rust
fn run_subscribe_receive(
sock: nng_socket,
subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
stop: Arc<AtomicBool>,
) {
loop {
if stop.load(Ordering::Relaxed) { break; }
match receive_message(&sock) {
Ok(buffer) => {
let id = parse_group_id(&buffer);
if let Some(sub) = subscriptions.read().unwrap().get(&id).cloned() {
sub.update(buffer);
}
}
Err(_) => {
if stop.load(Ordering::Relaxed) { break; }
std::thread::sleep(Duration::from_millis(50));
}
}
}
}
```
Two threads per Subscribe (driver + receive), same shape as
motorcortex-python. `spawn_blocking` is avoided on the receive hot
path — a persistent blocking recv would monopolise a pool thread
anyway.
### `Subscription::update` — the fan-out point
`update()` forwards one received payload into three independent
sinks, each optional. Consumers pay only for the sinks they use:
```rust
pub(crate) fn update(&self, buffer: Vec<u8>) {
// Watch: always present. `send_replace` stores unconditionally,
// unlike `send` which errors (and drops the value) when no
// receivers are active — most subs don't keep a watch::Receiver
// alive.
self.inner.buffer.send_replace(buffer.clone());
// Broadcast: created lazily by the first `stream()` call.
if let Some(tx) = self.inner.broadcast.lock().unwrap().as_ref() {
let _ = tx.send(buffer);
}
// Callback: set by `notify()`. Runs inline on the receive thread.
let cb = self.inner.callback.read().unwrap().clone();
if let Some(cb) = cb { cb(self); }
}
```
`SubscriptionInner` layout:
```rust
struct SubscriptionInner {
/// Server-assigned id. Atomic so the subscribe receive loop can
/// read it without taking the layout lock; `rebind()` swaps it
/// in place when `Subscribe::resubscribe` gives us a new one.
id: AtomicU32,
alias: String, // fixed for the life of the handle
fdiv: u32, // frequency divider; resubscribe replays it
/// Parameter layout — may change on resubscribe if the server
/// reshuffled offsets / ids. Decoders take the read lock; the
/// driver takes the write lock while swapping.
layout: RwLock<GroupLayout>, // description + data_types
buffer: watch::Sender<Vec<u8>>, // latest() / read() / read_all()
broadcast: Mutex<Option<broadcast::Sender<Vec<u8>>>>, // stream()
callback: RwLock<Option<Callback>>, // notify()
}
```
### Subscription rebind
`Subscribe::resubscribe(&req)` does:
1. Snapshot the shared subscriptions map (one `RwLock::read` on the
`Arc<RwLock<HashMap<u32, Subscription>>>` shared with the
driver), clone the `Subscription` handles out.
2. For each, `req.create_group(paths, alias, fdiv).await` — gives
the server a fresh group descriptor.
3. Hand the `(old_id, new_group_msg)` pairs to the driver via
`SubCmd::ApplyResubscribe`. The driver holds the map write lock
for the whole batch, unsubscribes each old NNG filter, subscribes
the new one, calls `Subscription::rebind(new_group)`, and rekeys
the map.
Outstanding `Subscription` clones the caller is holding stay valid
across the rebind because the Arc'd inner is shared; they observe
the new id/layout the moment the write lock releases.
### How `stream()` works
`Subscription::stream<V>(capacity)` returns an `impl Stream<Item =
StreamResult<V>>` — a lossy-on-lag, broadcast-backed channel.
```rust
pub fn stream<V>(&self, capacity: usize) -> impl Stream<Item = StreamResult<V>> + use<V>
where V: GetParameterTuple + Send + 'static,
{
let sender = self.ensure_broadcast(capacity); // create or reuse
let rx = sender.subscribe(); // fresh receiver
let inner = Arc::clone(&self.inner); // decode context
unfold(rx, move |mut rx| {
let inner = Arc::clone(&inner);
async move {
loop {
match rx.recv().await {
Ok(buffer) => match decode_tuple::<V>(&inner, &buffer) {
Some(decoded) => return Some((Ok(decoded), rx)),
None => continue, // unsupported protocol → skip
},
Err(RecvError::Lagged(n)) => return Some((Err(Missed(n)), rx)),
Err(RecvError::Closed) => return None,
}
}
}
})
}
```
Lifecycle:
1. **First call** to `stream()` takes the `broadcast` `Mutex`,
allocates the ring with the requested `capacity`, stores the
`Sender`, returns a `Sender::clone`. All subsequent calls reuse
the same channel; `capacity` is honoured only on the first.
2. **`sender.subscribe()`** — each `stream()` call hands the caller
their own `broadcast::Receiver` with its own read cursor. N
consumers → N receivers, each seeing every sample, no fan-out
cost beyond the `Receiver` struct.
3. **`Arc::clone(&self.inner)`** — the Stream needs `description`
and `data_types` to decode, so we capture an Arc clone. Together
with the `+ use<V>` precise-capturing bound, this keeps the
returned Stream free of any `&self` lifetime — callers can hold
it past the original Subscription borrow.
4. **`unfold`** — each `.next().await` runs the inner closure. The
`rx` is the unfold state; it's passed back into the closure each
round so the receiver's cursor persists across polls.
Three outcomes per iteration:
- `Ok(buffer)` → decode → `Some((Ok((ts, v)), rx))`. Stream yields.
- `Ok(buffer)` → decode returns `None` (unsupported wire protocol)
→ **skip silently** via `continue`, keep the stream alive. Only a
`Closed` channel ends the stream.
- `Err(Lagged(n))` → `Some((Err(Missed(n)), rx))`. User sees explicit
back-pressure — the ring overwrote `n` samples before this
receiver got back to them.
- `Err(Closed)` → `None`. Sender has been dropped, i.e. every
Subscription clone is gone. End of stream.
**Back-pressure behaviour.** With `capacity = 4`:
```
publish A B C D ring = [A B C D], reader cursor at A
publish E ring = [B C D E], A is gone
publish F G ring = [D E F G]
reader polls → catches D, next next() surfaces Err(Missed(3))
because cursor had to jump over A, B, C
reader continues → E, F, G
```
Tokio's broadcast counts the skip for us; we just re-wrap the
`RecvError::Lagged(n)` as `Missed(n)` — matching the vocabulary
used in our public API.
### Why this shape
- **Zero cost unused.** A Subscription that uses only `notify` never
allocates the broadcast ring. The `Mutex<Option<…>>` check is one
atomic compare-and-branch per `update()`.
- **N consumers, one channel.** Multiple `stream()` callers share
one `Sender`; each holds their own cursor. Extra memory per
consumer ≈ one `Receiver` struct.
- **Explicit back-pressure.** `Missed(n)` makes lag visible to the
consumer; the library never silently grows memory to "catch up".
- **No blocking threads on the consumer side.** The receive loop
is the only blocking thread; the broadcast channel is pure-async
on every reader.
### Parameter tree cache
`Request` holds `Arc<RwLock<ParameterTree>>` at the handle level, and
a clone is held by the driver. `request_parameter_tree()` refreshes
the driver's copy *and* the shared one. `set_parameter` / `get_parameter`
read the dtype from the shared lock with no channel round-trip.
This means every handle clone sees the same tree — in practice tree
lookups are a single `Arc::clone` + `RwLock::read`, effectively free.
## Key decisions
### Why a dedicated `std::thread`, not `spawn_blocking` or `nng_aio`
NNG exposes both a blocking C API (`nng_send` / `nng_recv`) and an
async one (`nng_aio_*`). Three options were on the table:
1. **Tokio task + `spawn_blocking` per RPC** — simplest on paper
but churns the blocking pool (one pool thread per in-flight
command) and leaks tokio runtime requirements into the driver's
control flow.
2. **Dedicated `std::thread` with blocking NNG calls (chosen).**
One owner thread per handle; `mpsc::UnboundedReceiver::blocking_recv`
serialises commands; NNG is called inline. Thread count is
bounded and predictable.
3. **`nng_aio` bridge to Rust wakers.** Purest on paper but
bridging NNG's callback model to waker lifetimes without UB is
finicky. Motorcortex workloads (kHz-rate telemetry, handful of
concurrent RPCs) don't need the throughput — the driver is
fast enough blocking.
Revisit (3) only if someone measures blocking-call latency as a
bottleneck.
### Why an actor pattern, not `Arc<Mutex<Request>>`
Both enforce serialisation, but:
- `Arc<Mutex<Request>>` puts the serialisation primitive in the user's
API. Every user code snippet starts with `let req = Arc::new(Mutex::new(…))`.
- Actor pattern moves serialisation into the driver, invisible to
users. `Request` becomes a clonable handle. The type system says
"this is shareable"; the driver enforces "only one command at a
time".
The actor pattern also enables `ConnectionState` publishing and
background work (timers, reconnect) without touching the user-facing
types.
### Why no `Session`
Python's `Session` exists because:
1. Context manager (`with`) — Rust has `Drop`.
2. Token refresh timer — belongs in the driver.
3. Connect-everything sugar — a 3-line free function covers this.
See [NOTES.md](NOTES.md) for the full reasoning.
### Why keep compile-time protobuf
Python loads `.proto` files at runtime (`MessageTypes.load(...)`).
This is flexible but pays for it: every send/recv does a dict lookup
to find the serializer, hash computation is runtime, mismatches
surface as runtime exceptions.
Rust uses `prost` + a compile-time `Hash` trait. Faster, safer, and
smaller — and any new message type lands via the `.proto` → `build.rs`
pipeline that's already in place. Runtime namespace loading is not a
use case worth supporting.
## Differences from motorcortex-python
| Concurrency | `ThreadPoolExecutor` + `Reply` futures | `tokio` tasks + `async fn` |
| RPC call | submit to pool → `Reply` | `mpsc::send(Cmd)` → `oneshot.await` |
| Cleanup | `atexit` + `threading._register_atexit` dance | `Drop` |
| Connection state | enum + `StateCallbackHandler` | `tokio::sync::watch<ConnectionState>` |
| Subscribe fan-out | callbacks on receive workers | callback + `watch` + `broadcast` |
| Message types | runtime registry (`MessageTypes`) | compile-time via `prost` |
| Sessions | `Session` context manager | *(intentionally omitted)* |
| Parse-URL helper | `parseUrl`, `makeUrl` | `parse_url` |
| Token refresh | `Timer` on `Request` | driver-internal background task |
| Errors | exception hierarchy | single `MotorcortexError` enum |
Most items are the same idea expressed differently. The deliberate
divergences:
- **No runtime message registry** — prost wins.
- **No `Session`** — `Drop` + a URL helper does the job.
- **Watch channel for state** — cleaner than a callback fan-out
registry.
## Testing strategy
- **Unit tests** (`tests/unit.rs` + `#[cfg(test)]` in src): protocol
round-trips, parameter tree manipulation, `Subscription` decode with
hand-crafted protocol-1 buffers, error variant formatting. Offline,
hermetic. Run with `cargo test --test unit` and `cargo test --lib`.
- **Integration tests** (`tests/integration/`): drive the vendored
C++ `test_server` over real NNG sockets. Must use
`--test-threads=1` because tests share a single server instance.
- **Coverage**: `cargo-llvm-cov` merges unit + lib + integration;
Cobertura output wired into GitLab. Target ≥ 90% regions.
See [tests/README.md](tests/README.md) for the walkthrough.
## Repository layout
```
src/
├── lib.rs re-exports + module wiring
├── core/ async-first driver + handles
│ ├── mod.rs pub use Request, Subscribe, Subscription
│ ├── request.rs Request handle + Cmd enum
│ ├── subscribe.rs Subscribe handle + SubCmd enum
│ ├── subscription.rs Subscription handle + sinks + rebind
│ ├── driver.rs Request/Subscribe driver loops,
│ │ pipe-event state machine, helpers
│ ├── proto.rs encode_with_hash / decode_message
│ ├── state.rs ConnectionState + watch plumbing
│ └── util.rs await_reply helper
├── blocking/ sync façade over core
│ ├── mod.rs
│ ├── request.rs rt.block_on wrappers
│ ├── subscribe.rs
│ └── subscription.rs
├── client/ shared client-side types
│ ├── mod.rs
│ ├── parameter_tree.rs ParameterTree (shared with driver)
│ ├── parameters.rs Parameters trait
│ └── receive.rs receive_message wrapper
├── connection/
│ ├── mod.rs
│ ├── connection_manager.rs NNG socket + dialer + TLS + pipe notify
│ ├── connection_options.rs reconnect / token-refresh / max-attempts
│ └── pipe_event.rs PipeEvent enum + handler type
├── error.rs MotorcortexError
├── msg/
│ ├── hash.rs compile-time hash trait
│ └── motorcortex_msg.rs prost-generated
├── parameter_value/
│ ├── processing.rs encode/decode dispatch
│ ├── get_parameter_value.rs trait impls
│ ├── set_parameter_value.rs trait impls
│ ├── get_parameter_tuple.rs
│ └── set_parameter_tuple.rs
├── nng_init_threads.rs
├── nng_logger.rs
├── time_spec.rs TimeSpec + chrono interop
└── url.rs parse_url
examples/ runnable demos
├── async_request.rs
├── blocking_request.rs
├── subscribe_latest.rs
└── subscribe_stream.rs
```
## Current state (`async-rewrite` tip)
The design above is the shipped one. Test tally on branch tip:
- `cargo test --lib` — 80 tests (actor + state machine + proto +
subscription decode + url + parameter-value dispatch).
- `cargo test --test unit` — 53 offline tests mirroring
motorcortex-python's unit suite.
- `cargo test --test integration -- --test-threads=1` — 37 tests
(connect, parameters, subscribe, tree, session-token, 7-test
reconnect module, blocking-façade smoke, drop-order audits)
plus one `#[ignore]`d stress test
(`stress_16_clones_hot_get_parameter`) you can enable with
`--ignored`.
- Merged coverage (2026-04-20): **89.57 %** regions / **88.73 %**
lines / **90.96 %** functions. Every substantive `core/*`
module clears the 90 % gate (`driver.rs` 94 %, `subscription.rs`
99 %, `subscribe.rs` 91 %, `request.rs` 93 %, `proto.rs` 92 %,
`state.rs` 100 %).
Open follow-ups tracked in `TODO.md`:
- Per-RPC performance bench against the python client.
- Cap-exceeded end-to-end test for `max_reconnect_attempts`
(needs a server-side flag).
---
*Last reviewed: 2026-04-20.*