motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
# Architecture

> **Status:** describes the shipped design on the `async-rewrite`
> branch. The rewrite replaced the earlier synchronous prototype
> entirely — there is no legacy type surface left in the crate.

This document describes how the library is structured today. The
goal is a small, honest, idiomatic Rust surface over NNG —
comparable feature set to motorcortex-python but without replicating
python-isms that don't fit Rust's ownership model.

## Philosophy

1. **Async is the core.** Every user-visible operation is an `async fn`.
   Blocking callers use a thin façade that hides a
   `tokio::runtime::Runtime`. (The reqwest model.)
2. **One coherent core, two thin frontends.** No parallel
   `AsyncRequest` / `Request` hierarchies — a single
   `Request` type on top of a single driver.
3. **Serialisation in the type system.** `Request` is clonable and
   `Send + Sync`, but the underlying NNG socket is mutated one command
   at a time by a driver task. Callers can't cause concurrent
   send/recv races because the public API never hands out the socket.
4. **Pay for what you use.** Callback-style subscriptions have zero
   channel allocation. `latest()` allocates a `watch`. `stream()`
   lazily allocates a `broadcast`. Each is opt-in.
5. **Compile-time protocol, runtime-fallible I/O.** Message types,
   hashes, and dtype dispatch are resolved at compile time via
   `prost` + the `Hash` trait. Only NNG and decoding errors are
   runtime failures — and those live in one enum
   (`MotorcortexError`).
6. **Drop is the cleanup story.** No `atexit` hooks, no explicit
   `close()` requirement. Dropping a `Request` closes the command
   channel, which lets the driver exit cleanly, which closes the
   socket.

## Layered overview

```
┌──────────────────────────────────────────────────────────┐
│                 public API (async)                       │
│   Request    Subscribe    Subscription    ConnectionState│
└──────────────────────────────────────────────────────────┘
                │ mpsc::Sender<Cmd>
┌──────────────────────────────────────────────────────────┐
│         driver thread (one dedicated std::thread)        │
│   owns ConnectionManager                                 │
│   blocking_recv on the Cmd channel, calls NNG inline     │
│   publishes ConnectionState + subscription updates       │
│   Request driver also joins a token-refresh helper       │
└──────────────────────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────┐
│             nng-c-sys (blocking FFI)                     │
└──────────────────────────────────────────────────────────┘

┌──────────────────────────────────────────────────────────┐
│       blocking façade (motorcortex::blocking)            │
│   owns a current-thread Runtime; each method is          │
│   rt.block_on(inner.method()).                           │
└──────────────────────────────────────────────────────────┘
```

## Public API

### `Request` — request/reply handle

```rust
pub struct Request { /* opaque; Clone + Send + Sync */ }

impl Request {
    pub fn new() -> Self;
    pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>;
    pub async fn disconnect(&self) -> Result<()>;

    pub async fn login(&self, user: &str, pass: &str) -> Result<StatusCode>;
    pub async fn logout(&self) -> Result<StatusCode>;

    pub async fn request_parameter_tree(&self) -> Result<StatusCode>;
    pub async fn get_parameter_tree_hash(&self) -> Result<u32>;

    pub async fn get_parameter<V: GetParameterValue + Default>(&self, path: &str) -> Result<V>;
    pub async fn set_parameter<V: SetParameterValue>(&self, path: &str, v: V) -> Result<StatusCode>;
    pub async fn get_parameters<T: GetParameterTuple>(&self, paths: &[&str]) -> Result<T>;
    pub async fn set_parameters<T: SetParameterTuple>(&self, paths: &[&str], v: T) -> Result<StatusCode>;

    pub async fn create_group(&self, paths: impl Parameters, alias: &str, fdiv: u32) -> Result<GroupStatusMsg>;
    pub async fn remove_group(&self, alias: &str) -> Result<StatusCode>;

    // Session tokens — feed the reconnect path, persistable across
    // process restarts.
    pub async fn get_session_token(&self) -> Result<String>;
    pub async fn restore_session(&self, token: &str) -> Result<StatusCode>;
    pub fn session_token(&self) -> Option<String>;
    pub fn session_refresh_count(&self) -> u64;

    pub fn state(&self) -> watch::Receiver<ConnectionState>;
    pub fn parameter_tree(&self) -> Arc<RwLock<ParameterTree>>;
}
```

Cloning a `Request` is cheap — it's an `mpsc::Sender` plus a few
`watch::Receiver` / `Arc` clones. Multiple tasks can hold their own
handle; commands queue at the single driver, which guarantees
req/rep ordering.

### `Subscribe` — publish/subscribe handle

```rust
pub struct Subscribe { /* opaque; Clone + Send + Sync */ }

impl Subscribe {
    pub fn new() -> Self;
    pub async fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>;
    pub async fn disconnect(&self) -> Result<()>;

    pub async fn subscribe(
        &self,
        req: &Request,
        paths: impl Parameters,
        alias: &str,
        fdiv: u32,
    ) -> Result<Subscription>;

    pub async fn unsubscribe(&self, req: &Request, sub: Subscription) -> Result<()>;

    /// Re-register every active subscription after a reconnect that
    /// lost server-side group state. Outstanding `Subscription`
    /// clones stay valid — the driver rebinds each handle in place
    /// with its fresh server-assigned id.
    pub async fn resubscribe(&self, req: &Request) -> Result<()>;

    pub fn state(&self) -> watch::Receiver<ConnectionState>;
}
```

`Subscribe::subscribe` takes a `&Request` because it has to issue a
`CreateGroupMsg` RPC before registering the group ID with NNG. The
two handles stay independent — they don't share a driver, only the
one create-group call.

### `Subscription` — per-group handle

```rust
pub struct Subscription { /* opaque; Clone + Send + Sync */ }

impl Subscription {
    pub fn id(&self) -> u32;           // atomic; updated by resubscribe()
    pub fn name(&self) -> &str;        // fixed for life of the handle
    pub fn paths(&self) -> Vec<String>;
    pub fn fdiv(&self) -> u32;

    /// Sync read — returns the most recent payload decoded as a
    /// tuple matching the subscription's parameter shape.
    pub fn read<V: GetParameterTuple>(&self) -> Option<(TimeSpec, V)>;

    /// Sync read — same payload, flattened into a `Vec<V>`
    /// (every scalar element of every subscribed parameter).
    pub fn read_all<V: GetParameterValue + Default>(&self) -> Option<(TimeSpec, Vec<V>)>;

    /// Await the most recent payload. Lossy: intermediate samples
    /// between calls are discarded. Resolves immediately once any
    /// payload has arrived.
    pub async fn latest<V: GetParameterTuple>(&self) -> Result<(TimeSpec, V)>;

    /// Every sample, bounded ring buffer. `Err(Missed(n))` if the
    /// consumer falls behind by more than `capacity` samples.
    pub fn stream<V: GetParameterTuple + Send + 'static>(&self, capacity: usize)
        -> impl Stream<Item = StreamResult<V>> + use<V>;

    /// Fire-and-forget callback. Runs on the receive thread; don't block.
    pub fn notify<F: Fn(&Subscription) + Send + Sync + 'static>(&self, f: F);
}

pub type StreamResult<V> = Result<(TimeSpec, V), Missed>;

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct Missed(pub u64);
```

Three sinks, each opt-in:

- **`notify`** — no allocation, no channel, runs inline on the
  receive thread.
- **`latest`** (and `read` / `read_all`) — one `watch::Sender`,
  overwritten each update.
- **`stream`** — one lazily-created `broadcast::Sender` with
  explicit capacity.

Every `update()` pushes to all active sinks for that subscription,
so `notify` + `stream` + `latest` can coexist on the same
Subscription. See *How `stream()` works* under Internal design for
the full flow.

### `ConnectionState`

```rust
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ConnectionState {
    /// Initial state + after a clean `disconnect()`.
    Disconnected,
    /// Happy path — socket is up, session is restored (if previously set).
    Connected,
    /// NNG pipe event fired REM_POST. NNG's dialer is retrying the
    /// transport in the background; we'll transition back to
    /// `Connected` (or to `SessionExpired`) once the pipe comes back.
    ConnectionLost,
    /// The socket came back, but `RestoreSession(token)` failed.
    /// Caller action required — re-login with fresh credentials or
    /// give up. We do *not* silently re-login with cached creds.
    SessionExpired,
}
```

Exposed via `Request::state()` and `Subscribe::state()` as
`watch::Receiver<ConnectionState>`. Consumers can
`state.changed().await` or poll `*state.borrow()`.

The NNG dialer's own redial loop is invisible at this level — we
don't emit `Reconnecting` for it. Only session-layer transitions
show up.

### Blocking façade

```rust
pub mod blocking {
    pub struct Request { /* owns hidden current-thread runtime */ }
    impl Request {
        pub fn new() -> Result<Self>;
        pub fn connect(&self, url: &str, opts: ConnectionOptions) -> Result<()>;
        // every async method mirrored with .await dropped
    }

    pub struct Subscribe { /* ... */ }
    pub struct Subscription { /* ... */ }
}
```

Each method is literally `self.rt.block_on(self.inner.foo(args))`.
Lifecycle: one hidden current-thread `tokio::runtime::Runtime` per
handle. Drop shuts the runtime down.

### Free helpers

```rust
/// Split "wss://host:req_port:sub_port" into (req_url, sub_url).
pub fn parse_url(s: &str) -> Result<(String, String)>;

/// Convenience constructor — `new()` + `connect()` in one.
impl Request {
    pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>;
}
impl Subscribe {
    pub async fn connect_to(url: &str, opts: ConnectionOptions) -> Result<Self>;
}
```

No `Session` type. If three consumers end up writing the same
preamble, add one. Until then, explicit construction beats opaque
sugar.

## Internal design

### The driver

One driver thread per `Request` and one per `Subscribe`, each a
dedicated `std::thread` rather than a tokio task. The driver owns
the `ConnectionManager` (sole owner, no `Mutex`), reads `Cmd` /
`SubCmd` values from a `tokio::sync::mpsc::UnboundedReceiver` via
`blocking_recv`, and calls NNG inline:

```rust
pub(crate) enum Cmd {
    Connect   { url: String, opts: ConnectionOptions, reply: oneshot::Sender<Result<()>> },
    Disconnect{ reply: oneshot::Sender<Result<()>> },
    Login     { user: String, pass: String, reply: oneshot::Sender<Result<StatusCode>> },
    Logout    { reply: oneshot::Sender<Result<StatusCode>> },
    RequestParameterTree { reply: oneshot::Sender<Result<StatusCode>> },
    GetParameter  { path: String, reply: oneshot::Sender<Result<Vec<u8>>> },
    SetParameter  { path: String, value: Vec<u8>, reply: oneshot::Sender<Result<StatusCode>> },
    GetParameters { msg: GetParameterListMsg, reply: oneshot::Sender<Result<ParameterListMsg>> },
    SetParameters { msg: SetParameterListMsg, reply: oneshot::Sender<Result<StatusCode>> },
    CreateGroup   { msg: CreateGroupMsg, reply: oneshot::Sender<Result<GroupStatusMsg>> },
    RemoveGroup   { alias: String, reply: oneshot::Sender<Result<StatusCode>> },
    GetParameterTreeHash { reply: oneshot::Sender<Result<u32>> },
    GetSessionToken      { reply: oneshot::Sender<Result<String>> },
    RestoreSession       { token: String, reply: oneshot::Sender<Result<StatusCode>> },
    /// Background refresh tick — fired by the helper thread.
    RefreshTokenTick,
    /// Pipe-notify event, forwarded from an NNG callback.
    Pipe(PipeEvent),
}

fn run_request_driver(
    self_tx: mpsc::UnboundedSender<Cmd>,
    mut rx: mpsc::UnboundedReceiver<Cmd>,
    state_tx: watch::Sender<ConnectionState>,
    tree: Arc<RwLock<ParameterTree>>,
    last_token: Arc<RwLock<Option<String>>>,
    refresh_count: Arc<AtomicU64>,
) {
    let mut conn = ConnectionManager::new();
    let mut user_wants_connected = false;
    let mut pending_reconnect    = false;
    let mut reconnect_enabled    = true;
    let mut max_reconnect_attempts: Option<u32> = None;
    let mut consecutive_restore_failures: u32 = 0;
    let mut refresh_thread: Option<thread::JoinHandle<()>> = None;

    while let Some(cmd) = rx.blocking_recv() {
        match cmd {
            Cmd::Connect { url, opts, reply } => {
                let result = conn.connect(&url, opts, nng_c_sys::nng_req0_open, on_pipe.clone());
                // …state transitions, spawn token-refresh helper…
                let _ = reply.send(result);
            }
            Cmd::Pipe(event) => apply_pipe_event_request(event, /* … */),
            Cmd::RefreshTokenTick if *state_tx.borrow() == ConnectionState::Connected => {
                let _ = do_get_session_token(&conn, &last_token);
                refresh_count.fetch_add(1, Ordering::Relaxed);
            }
            /* …other handlers… */
            _ => {}
        }
    }
    // mpsc closed → exit; ConnectionManager::drop closes the socket.
}
```

Why a dedicated `std::thread` instead of a tokio task with
`spawn_blocking` per-RPC:

- **NNG calls are synchronous** and each blocks for the duration of
  one round-trip. A task that `spawn_blocking`s per-command would
  churn the blocking pool; a single owner thread is simpler.
- **Thread count stays bounded**`Request` spawns 1 driver +
  1 refresh helper; `Subscribe` spawns 1 driver + 1 receive thread.
  No growth with clones, no growth with in-flight RPCs.
- **No tokio runtime required on the driver side.** Callers
  `.await` the `oneshot::Receiver` from any runtime context (or
  block on it via the blocking façade).

### Reconnect + session tokens

The NNG dialer is configured with `RECONNMINT` / `RECONNMAXT` so
the transport redials itself on drop. The driver layers session
restore on top:

- **Pipe callback** → forwards ADD_POST / REM_POST as `Cmd::Pipe`
  / `SubCmd::Pipe` through the mpsc channel. No work happens in
  the callback itself — it runs on an NNG thread and must not
  block.
- **Token refresh** — a helper `std::thread` wakes every
  `ConnectionOptions::token_refresh_interval` (default 30 s) and
  fires `Cmd::RefreshTokenTick`. The tick handler gates on
  `state_tx.borrow() == Connected` so no RPCs land on a dead
  pipe.
- **Pipe state machine**`apply_pipe_event_request` factors the
  decision out into a pure `decide_state_after_pipe_event` helper
  (unit-tested in isolation). It handles:
  - REM_POST → `ConnectionLost` (if reconnect enabled) or
    `Disconnected` (if not).
  - ADD_POST with a pending reconnect → try
    `RestoreSession(cached_token)`. On `Ok` / `ReadOnlyMode`    `Connected` + counter reset. On any other status →
    `SessionExpired` + counter++; if
    `max_reconnect_attempts` caps out, disable the dialer and
    publish `Disconnected`.
  - ADD_POST with no token cached → treated as a bare reconnect,
    `Connected`.
- **Subscribe side** has no restore logic of its own. When the
  Request side successfully restores, callers can invoke
  `Subscribe::resubscribe(&req)` to re-create every group against
  the new server state. The driver rebinds each `Subscription`
  in place (see "Subscription rebind" below).

### The subscribe receive loop

A second dedicated `std::thread` (not a tokio task) runs alongside
the Subscribe driver. It blocks on `nng_recv`, parses the 3-byte
group-id prefix, looks up the Subscription in the shared table, and
calls `Subscription::update(buffer)`:

```rust
fn run_subscribe_receive(
    sock: nng_socket,
    subscriptions: Arc<RwLock<HashMap<u32, Subscription>>>,
    stop: Arc<AtomicBool>,
) {
    loop {
        if stop.load(Ordering::Relaxed) { break; }
        match receive_message(&sock) {
            Ok(buffer) => {
                let id = parse_group_id(&buffer);
                if let Some(sub) = subscriptions.read().unwrap().get(&id).cloned() {
                    sub.update(buffer);
                }
            }
            Err(_) => {
                if stop.load(Ordering::Relaxed) { break; }
                std::thread::sleep(Duration::from_millis(50));
            }
        }
    }
}
```

Two threads per Subscribe (driver + receive), same shape as
motorcortex-python. `spawn_blocking` is avoided on the receive hot
path — a persistent blocking recv would monopolise a pool thread
anyway.

### `Subscription::update` — the fan-out point

`update()` forwards one received payload into three independent
sinks, each optional. Consumers pay only for the sinks they use:

```rust
pub(crate) fn update(&self, buffer: Vec<u8>) {
    // Watch: always present. `send_replace` stores unconditionally,
    // unlike `send` which errors (and drops the value) when no
    // receivers are active — most subs don't keep a watch::Receiver
    // alive.
    self.inner.buffer.send_replace(buffer.clone());

    // Broadcast: created lazily by the first `stream()` call.
    if let Some(tx) = self.inner.broadcast.lock().unwrap().as_ref() {
        let _ = tx.send(buffer);
    }

    // Callback: set by `notify()`. Runs inline on the receive thread.
    let cb = self.inner.callback.read().unwrap().clone();
    if let Some(cb) = cb { cb(self); }
}
```

`SubscriptionInner` layout:

```rust
struct SubscriptionInner {
    /// Server-assigned id. Atomic so the subscribe receive loop can
    /// read it without taking the layout lock; `rebind()` swaps it
    /// in place when `Subscribe::resubscribe` gives us a new one.
    id: AtomicU32,
    alias: String,            // fixed for the life of the handle
    fdiv: u32,                // frequency divider; resubscribe replays it
    /// Parameter layout — may change on resubscribe if the server
    /// reshuffled offsets / ids. Decoders take the read lock; the
    /// driver takes the write lock while swapping.
    layout: RwLock<GroupLayout>,  // description + data_types
    buffer: watch::Sender<Vec<u8>>,     // latest() / read() / read_all()
    broadcast: Mutex<Option<broadcast::Sender<Vec<u8>>>>,  // stream()
    callback: RwLock<Option<Callback>>, // notify()
}
```

### Subscription rebind

`Subscribe::resubscribe(&req)` does:

1. Snapshot the shared subscriptions map (one `RwLock::read` on the
   `Arc<RwLock<HashMap<u32, Subscription>>>` shared with the
   driver), clone the `Subscription` handles out.
2. For each, `req.create_group(paths, alias, fdiv).await` — gives
   the server a fresh group descriptor.
3. Hand the `(old_id, new_group_msg)` pairs to the driver via
   `SubCmd::ApplyResubscribe`. The driver holds the map write lock
   for the whole batch, unsubscribes each old NNG filter, subscribes
   the new one, calls `Subscription::rebind(new_group)`, and rekeys
   the map.

Outstanding `Subscription` clones the caller is holding stay valid
across the rebind because the Arc'd inner is shared; they observe
the new id/layout the moment the write lock releases.

### How `stream()` works

`Subscription::stream<V>(capacity)` returns an `impl Stream<Item =
StreamResult<V>>` — a lossy-on-lag, broadcast-backed channel.

```rust
pub fn stream<V>(&self, capacity: usize) -> impl Stream<Item = StreamResult<V>> + use<V>
where V: GetParameterTuple + Send + 'static,
{
    let sender = self.ensure_broadcast(capacity);   // create or reuse
    let rx     = sender.subscribe();                 // fresh receiver
    let inner  = Arc::clone(&self.inner);            // decode context
    unfold(rx, move |mut rx| {
        let inner = Arc::clone(&inner);
        async move {
            loop {
                match rx.recv().await {
                    Ok(buffer) => match decode_tuple::<V>(&inner, &buffer) {
                        Some(decoded) => return Some((Ok(decoded), rx)),
                        None          => continue,        // unsupported protocol → skip
                    },
                    Err(RecvError::Lagged(n)) => return Some((Err(Missed(n)), rx)),
                    Err(RecvError::Closed)    => return None,
                }
            }
        }
    })
}
```

Lifecycle:

1. **First call** to `stream()` takes the `broadcast` `Mutex`,
   allocates the ring with the requested `capacity`, stores the
   `Sender`, returns a `Sender::clone`. All subsequent calls reuse
   the same channel; `capacity` is honoured only on the first.
2. **`sender.subscribe()`** — each `stream()` call hands the caller
   their own `broadcast::Receiver` with its own read cursor. N
   consumers → N receivers, each seeing every sample, no fan-out
   cost beyond the `Receiver` struct.
3. **`Arc::clone(&self.inner)`** — the Stream needs `description`
   and `data_types` to decode, so we capture an Arc clone. Together
   with the `+ use<V>` precise-capturing bound, this keeps the
   returned Stream free of any `&self` lifetime — callers can hold
   it past the original Subscription borrow.
4. **`unfold`** — each `.next().await` runs the inner closure. The
   `rx` is the unfold state; it's passed back into the closure each
   round so the receiver's cursor persists across polls.

Three outcomes per iteration:

- `Ok(buffer)` → decode → `Some((Ok((ts, v)), rx))`. Stream yields.
- `Ok(buffer)` → decode returns `None` (unsupported wire protocol)
  **skip silently** via `continue`, keep the stream alive. Only a
  `Closed` channel ends the stream.
- `Err(Lagged(n))``Some((Err(Missed(n)), rx))`. User sees explicit
  back-pressure — the ring overwrote `n` samples before this
  receiver got back to them.
- `Err(Closed)``None`. Sender has been dropped, i.e. every
  Subscription clone is gone. End of stream.

**Back-pressure behaviour.** With `capacity = 4`:

```
publish A B C D     ring = [A B C D], reader cursor at A
publish E           ring = [B C D E], A is gone
publish F G         ring = [D E F G]
reader polls        → catches D, next next() surfaces Err(Missed(3))
                      because cursor had to jump over A, B, C
reader continues    → E, F, G
```

Tokio's broadcast counts the skip for us; we just re-wrap the
`RecvError::Lagged(n)` as `Missed(n)` — matching the vocabulary
used in our public API.

### Why this shape

- **Zero cost unused.** A Subscription that uses only `notify` never
  allocates the broadcast ring. The `Mutex<Option<…>>` check is one
  atomic compare-and-branch per `update()`.
- **N consumers, one channel.** Multiple `stream()` callers share
  one `Sender`; each holds their own cursor. Extra memory per
  consumer ≈ one `Receiver` struct.
- **Explicit back-pressure.** `Missed(n)` makes lag visible to the
  consumer; the library never silently grows memory to "catch up".
- **No blocking threads on the consumer side.** The receive loop
  is the only blocking thread; the broadcast channel is pure-async
  on every reader.

### Parameter tree cache

`Request` holds `Arc<RwLock<ParameterTree>>` at the handle level, and
a clone is held by the driver. `request_parameter_tree()` refreshes
the driver's copy *and* the shared one. `set_parameter` / `get_parameter`
read the dtype from the shared lock with no channel round-trip.

This means every handle clone sees the same tree — in practice tree
lookups are a single `Arc::clone` + `RwLock::read`, effectively free.

## Key decisions

### Why a dedicated `std::thread`, not `spawn_blocking` or `nng_aio`

NNG exposes both a blocking C API (`nng_send` / `nng_recv`) and an
async one (`nng_aio_*`). Three options were on the table:

1. **Tokio task + `spawn_blocking` per RPC** — simplest on paper
   but churns the blocking pool (one pool thread per in-flight
   command) and leaks tokio runtime requirements into the driver's
   control flow.
2. **Dedicated `std::thread` with blocking NNG calls (chosen).**
   One owner thread per handle; `mpsc::UnboundedReceiver::blocking_recv`
   serialises commands; NNG is called inline. Thread count is
   bounded and predictable.
3. **`nng_aio` bridge to Rust wakers.** Purest on paper but
   bridging NNG's callback model to waker lifetimes without UB is
   finicky. Motorcortex workloads (kHz-rate telemetry, handful of
   concurrent RPCs) don't need the throughput — the driver is
   fast enough blocking.

Revisit (3) only if someone measures blocking-call latency as a
bottleneck.

### Why an actor pattern, not `Arc<Mutex<Request>>`

Both enforce serialisation, but:

- `Arc<Mutex<Request>>` puts the serialisation primitive in the user's
  API. Every user code snippet starts with `let req = Arc::new(Mutex::new(…))`.
- Actor pattern moves serialisation into the driver, invisible to
  users. `Request` becomes a clonable handle. The type system says
  "this is shareable"; the driver enforces "only one command at a
  time".

The actor pattern also enables `ConnectionState` publishing and
background work (timers, reconnect) without touching the user-facing
types.

### Why no `Session`

Python's `Session` exists because:

1. Context manager (`with`) — Rust has `Drop`.
2. Token refresh timer — belongs in the driver.
3. Connect-everything sugar — a 3-line free function covers this.

See [NOTES.md](NOTES.md) for the full reasoning.

### Why keep compile-time protobuf

Python loads `.proto` files at runtime (`MessageTypes.load(...)`).
This is flexible but pays for it: every send/recv does a dict lookup
to find the serializer, hash computation is runtime, mismatches
surface as runtime exceptions.

Rust uses `prost` + a compile-time `Hash` trait. Faster, safer, and
smaller — and any new message type lands via the `.proto` → `build.rs`
pipeline that's already in place. Runtime namespace loading is not a
use case worth supporting.

## Differences from motorcortex-python

| Concern | Python | Rust (target) |
|---|---|---|
| Concurrency | `ThreadPoolExecutor` + `Reply` futures | `tokio` tasks + `async fn` |
| RPC call | submit to pool → `Reply` | `mpsc::send(Cmd)``oneshot.await` |
| Cleanup | `atexit` + `threading._register_atexit` dance | `Drop` |
| Connection state | enum + `StateCallbackHandler` | `tokio::sync::watch<ConnectionState>` |
| Subscribe fan-out | callbacks on receive workers | callback + `watch` + `broadcast` |
| Message types | runtime registry (`MessageTypes`) | compile-time via `prost` |
| Sessions | `Session` context manager | *(intentionally omitted)* |
| Parse-URL helper | `parseUrl`, `makeUrl` | `parse_url` |
| Token refresh | `Timer` on `Request` | driver-internal background task |
| Errors | exception hierarchy | single `MotorcortexError` enum |

Most items are the same idea expressed differently. The deliberate
divergences:

- **No runtime message registry** — prost wins.
- **No `Session`**`Drop` + a URL helper does the job.
- **Watch channel for state** — cleaner than a callback fan-out
  registry.

## Testing strategy

- **Unit tests** (`tests/unit.rs` + `#[cfg(test)]` in src): protocol
  round-trips, parameter tree manipulation, `Subscription` decode with
  hand-crafted protocol-1 buffers, error variant formatting. Offline,
  hermetic. Run with `cargo test --test unit` and `cargo test --lib`.
- **Integration tests** (`tests/integration/`): drive the vendored
  C++ `test_server` over real NNG sockets. Must use
  `--test-threads=1` because tests share a single server instance.
- **Coverage**: `cargo-llvm-cov` merges unit + lib + integration;
  Cobertura output wired into GitLab. Target ≥ 90% regions.

See [tests/README.md](tests/README.md) for the walkthrough.

## Repository layout

```
src/
├── lib.rs                     re-exports + module wiring
├── core/                      async-first driver + handles
│   ├── mod.rs                 pub use Request, Subscribe, Subscription
│   ├── request.rs             Request handle + Cmd enum
│   ├── subscribe.rs           Subscribe handle + SubCmd enum
│   ├── subscription.rs        Subscription handle + sinks + rebind
│   ├── driver.rs              Request/Subscribe driver loops,
│   │                          pipe-event state machine, helpers
│   ├── proto.rs               encode_with_hash / decode_message
│   ├── state.rs               ConnectionState + watch plumbing
│   └── util.rs                await_reply helper
├── blocking/                  sync façade over core
│   ├── mod.rs
│   ├── request.rs             rt.block_on wrappers
│   ├── subscribe.rs
│   └── subscription.rs
├── client/                    shared client-side types
│   ├── mod.rs
│   ├── parameter_tree.rs      ParameterTree (shared with driver)
│   ├── parameters.rs          Parameters trait
│   └── receive.rs             receive_message wrapper
├── connection/
│   ├── mod.rs
│   ├── connection_manager.rs  NNG socket + dialer + TLS + pipe notify
│   ├── connection_options.rs  reconnect / token-refresh / max-attempts
│   └── pipe_event.rs          PipeEvent enum + handler type
├── error.rs                   MotorcortexError
├── msg/
│   ├── hash.rs                compile-time hash trait
│   └── motorcortex_msg.rs     prost-generated
├── parameter_value/
│   ├── processing.rs          encode/decode dispatch
│   ├── get_parameter_value.rs trait impls
│   ├── set_parameter_value.rs trait impls
│   ├── get_parameter_tuple.rs
│   └── set_parameter_tuple.rs
├── nng_init_threads.rs
├── nng_logger.rs
├── time_spec.rs               TimeSpec + chrono interop
└── url.rs                     parse_url

examples/                       runnable demos
├── async_request.rs
├── blocking_request.rs
├── subscribe_latest.rs
└── subscribe_stream.rs
```

## Current state (`async-rewrite` tip)

The design above is the shipped one. Test tally on branch tip:

- `cargo test --lib` — 80 tests (actor + state machine + proto +
  subscription decode + url + parameter-value dispatch).
- `cargo test --test unit` — 53 offline tests mirroring
  motorcortex-python's unit suite.
- `cargo test --test integration -- --test-threads=1` — 37 tests
  (connect, parameters, subscribe, tree, session-token, 7-test
  reconnect module, blocking-façade smoke, drop-order audits)
  plus one `#[ignore]`d stress test
  (`stress_16_clones_hot_get_parameter`) you can enable with
  `--ignored`.
- Merged coverage (2026-04-20): **89.57 %** regions / **88.73 %**
  lines / **90.96 %** functions. Every substantive `core/*`
  module clears the 90 % gate (`driver.rs` 94 %, `subscription.rs`
  99 %, `subscribe.rs` 91 %, `request.rs` 93 %, `proto.rs` 92 %,
  `state.rs` 100 %).

Open follow-ups tracked in `TODO.md`:

- Per-RPC performance bench against the python client.
- Cap-exceeded end-to-end test for `max_reconnect_attempts`
  (needs a server-side flag).

---

*Last reviewed: 2026-04-20.*