motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
//! Reconnect integration tests — mirror of motorcortex-python's
//! `test_reconnect.py`.
//!
//! Each test spins up its own short-lived server on ports 5578/5577
//! (separate from the global server on 5568/5567), lets it die, brings
//! up a replacement, and verifies the client state machine transitions
//! correctly.
//!
//! The server binary must be built before running:
//!   cd tests/server && cmake -B build && cmake --build build
//!
//! These tests manage their own server processes and are intentionally
//! sequential (run under `--test-threads=1`).

use std::process::{Child, Command};
use std::sync::Arc;
use std::time::Duration;
use std::{env, thread};

use tokio::sync::Notify;

use motorcortex_rust::core::{Request, Subscribe};
use motorcortex_rust::{ConnectionOptions, ConnectionState};

use crate::CERT_PATH;

const URL_REQ_R: &str = "wss://localhost:5578";
const URL_SUB_R: &str = "wss://localhost:5577";

fn reconnect_opts() -> ConnectionOptions {
    ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
        .with_reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
        // Disable token refresh for cleaner test control.
        .with_token_refresh_interval(Duration::ZERO)
}

fn reconnect_opts_with_refresh(interval: Duration) -> ConnectionOptions {
    ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
        .with_reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
        .with_token_refresh_interval(interval)
}

fn server_bin() -> String {
    format!(
        "{}/tests/server/build/test_server",
        env!("CARGO_MANIFEST_DIR")
    )
}

fn server_config() -> String {
    format!(
        "{}/tests/server/config/reconnect_config.json",
        env!("CARGO_MANIFEST_DIR")
    )
}

/// Spawn the test server on ports 5578/5577.
/// `lifetime_secs`: if `Some(n)`, server self-terminates after n seconds
/// via the `MCX_TEST_SERVER_LIFETIME_SEC` watchdog added to main.cpp.
fn start_reconnect_server(lifetime_secs: Option<u32>) -> Child {
    let mut cmd = Command::new(server_bin());
    cmd.arg("-c").arg(server_config()).arg("-s");
    if let Some(secs) = lifetime_secs {
        cmd.env("MCX_TEST_SERVER_LIFETIME_SEC", secs.to_string());
    }
    let child = cmd.spawn().expect("failed to start reconnect test server");
    // Give the server time to bind ports and be ready.
    thread::sleep(Duration::from_secs(3));
    child
}

/// Poll `state` until it equals `want`, timing out after `deadline`.
/// Returns `true` if the state was observed before the deadline.
async fn wait_for_state(
    req: &Request,
    want: ConnectionState,
    deadline: Duration,
) -> bool {
    let start = std::time::Instant::now();
    let mut rx = req.state();
    loop {
        if *rx.borrow() == want {
            return true;
        }
        if start.elapsed() >= deadline {
            return false;
        }
        // Park until the next state change or a short poll tick.
        let _ = tokio::time::timeout(Duration::from_millis(200), rx.changed()).await;
    }
}

// ---------------------------------------------------------------------------

#[tokio::test]
async fn test_connection_lost_emits_state() {
    let mut server = start_reconnect_server(Some(4));

    let req = Request::connect_to(URL_REQ_R, reconnect_opts())
        .await
        .expect("connect");
    req.login("root", "vectioneer").await.expect("login");

    // Wait for server to die and the state to flip to ConnectionLost.
    assert!(
        wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
        "expected ConnectionLost after server shutdown, got {:?}",
        *req.state().borrow()
    );

    // Clean up — the server already exited; reap to avoid a zombie.
    let _ = server.wait();
}

#[tokio::test]
async fn test_reconnect_restores_session() {
    // Start a short-lived server (4 s lifetime).
    let mut first = start_reconnect_server(Some(4));

    let req = Request::connect_to(URL_REQ_R, reconnect_opts())
        .await
        .expect("connect");
    req.login("root", "vectioneer").await.expect("login");
    // Fetch a token so the driver can restore the session after reconnect.
    req.get_session_token().await.expect("get_session_token");

    // Wait for the first server to die.
    assert!(
        wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
        "expected ConnectionLost"
    );
    let _ = first.wait();

    // Bring up a replacement server on the same ports (no lifetime limit).
    let mut second = start_reconnect_server(None);

    // NNG's dialer retries the transport; the driver's pipe-ADD handler
    // calls RestoreSession and moves to Connected.
    assert!(
        wait_for_state(&req, ConnectionState::Connected, Duration::from_secs(15)).await,
        "expected Connected after reconnect, got {:?}",
        *req.state().borrow()
    );

    // Verify RPCs work on the restored session.
    req.request_parameter_tree()
        .await
        .expect("request_parameter_tree must succeed after reconnect");

    req.disconnect().await.expect("disconnect");
    let _ = second.kill();
    let _ = second.wait();
}

#[tokio::test]
async fn test_session_expired_emits_state() {
    // Start a server, connect, get a token, disconnect, corrupt the
    // cached token, reconnect — the driver should emit SessionExpired.
    //
    // We exercise this path without a real server restart by using
    // `restore_session` with a garbage token while already connected.
    // The full "crash-and-restart with bad token" path is covered by
    // end-to-end tests.
    let mut server = start_reconnect_server(None);

    let req = Request::connect_to(URL_REQ_R, reconnect_opts())
        .await
        .expect("connect");
    req.login("root", "vectioneer").await.expect("login");

    // Verify that restore_session returns a non-Ok status for a bogus token
    // (not Ok/ReadOnlyMode → driver would emit SessionExpired on reconnect).
    let status = req
        .restore_session("totally-invalid-token")
        .await
        .expect("restore_session RPC completes even for bad tokens");
    use motorcortex_rust::StatusCode;
    assert!(
        !matches!(status, StatusCode::Ok | StatusCode::ReadOnlyMode),
        "expected non-Ok status for invalid token, got {status:?}"
    );

    req.disconnect().await.expect("disconnect");
    let _ = server.kill();
    let _ = server.wait();
}

#[tokio::test]
async fn test_reconnect_disabled_emits_disconnected_on_drop() {
    // With `.with_reconnect(false)`, NNG's dialer is pinned to zero
    // backoff and the driver treats REM_POST as terminal: state
    // jumps to `Disconnected`, not `ConnectionLost`.
    let mut server = start_reconnect_server(Some(4));

    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
        .with_reconnect(false)
        .with_token_refresh_interval(Duration::ZERO);
    let req = Request::connect_to(URL_REQ_R, opts).await.expect("connect");
    req.login("root", "vectioneer").await.expect("login");

    // Server self-terminates after 4 s. With reconnect disabled we
    // expect `Disconnected`, never `ConnectionLost`.
    assert!(
        wait_for_state(&req, ConnectionState::Disconnected, Duration::from_secs(12)).await,
        "expected Disconnected after server shutdown with reconnect=false, got {:?}",
        *req.state().borrow()
    );

    let _ = server.kill();
    let _ = server.wait();
}

#[tokio::test]
async fn test_max_reconnect_attempts_allows_happy_path() {
    // Smoke test for the opt: setting `max_reconnect_attempts` must
    // not disturb the ordinary server-restart → restore-session
    // recovery. Exercises the counter-reset path: a successful
    // restore clears the consecutive-failure count so the guard
    // never trips.
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
        .with_reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
        .with_token_refresh_interval(Duration::ZERO)
        .with_max_reconnect_attempts(Some(3));

    let mut first = start_reconnect_server(Some(4));
    let req = Request::connect_to(URL_REQ_R, opts).await.expect("connect");
    req.login("root", "vectioneer").await.expect("login");
    req.get_session_token().await.expect("token");

    assert!(
        wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
        "expected ConnectionLost"
    );
    let _ = first.wait();

    let mut second = start_reconnect_server(None);
    assert!(
        wait_for_state(&req, ConnectionState::Connected, Duration::from_secs(15)).await,
        "expected Connected after restore with max_reconnect_attempts set, got {:?}",
        *req.state().borrow()
    );

    req.request_parameter_tree()
        .await
        .expect("RPC must work on the restored session");

    req.disconnect().await.expect("disconnect");
    let _ = second.kill();
    let _ = second.wait();
}

#[tokio::test]
async fn test_resubscribe_after_server_restart_delivers_payloads() {
    // End-to-end: subscribe to a parameter, kill the server, bring
    // up a replacement (state fully gone → restore_session will
    // fail), re-login on the Request, then call
    // `Subscribe::resubscribe` to re-register the group against the
    // fresh server. The same `Subscription` handle must resume
    // delivering callbacks.
    let mut first = start_reconnect_server(Some(5));

    let req = Request::connect_to(URL_REQ_R, reconnect_opts())
        .await
        .expect("connect req");
    req.login("root", "vectioneer").await.expect("login");
    // Fetch a token so the driver's reconnect handler has something
    // to try — against the fresh server it'll fail, landing us in
    // `SessionExpired` (which is the state the user code below
    // reacts to by re-logging in).
    req.get_session_token().await.expect("token");
    req.request_parameter_tree().await.expect("tree");
    let sub = Subscribe::connect_to(URL_SUB_R, reconnect_opts())
        .await
        .expect("connect sub");

    let subscription = sub
        .subscribe(&req, ["root/Control/dummyDouble"], "resub-e2e", 1000)
        .await
        .expect("subscribe");
    let original_alias = subscription.name().to_string();

    let notify = Arc::new(Notify::new());
    let notify_cb = Arc::clone(&notify);
    subscription.notify(move |_| {
        notify_cb.notify_one();
    });

    // Verify we're receiving before the server dies.
    req.set_parameter("root/Control/dummyDouble", 1.5f64)
        .await
        .expect("set");
    tokio::time::timeout(Duration::from_secs(3), notify.notified())
        .await
        .expect("pre-crash callback must fire");

    // Wait for the server to exit and the Request to notice.
    assert!(
        wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
        "expected ConnectionLost after server shutdown"
    );
    let _ = first.wait();

    // Bring up a fresh server — it has no memory of the previous
    // group assignments.
    let mut second = start_reconnect_server(None);

    // Wait for the driver's reconnect handler to settle. Whether it
    // lands on `Connected` (token accepted by the fresh server) or
    // `SessionExpired` (token rejected) is server-implementation-
    // dependent; either is a legitimate place to call
    // `resubscribe()` from. Treat both as the "ready for replay"
    // signal.
    let ready_state = tokio::time::timeout(Duration::from_secs(15), async {
        let mut rx = req.state();
        loop {
            let s = *rx.borrow();
            if matches!(s, ConnectionState::Connected | ConnectionState::SessionExpired) {
                return s;
            }
            rx.changed().await.ok();
        }
    })
    .await
    .expect("state must settle to Connected or SessionExpired after reconnect");

    if ready_state == ConnectionState::SessionExpired {
        req.login("root", "vectioneer")
            .await
            .expect("re-login after SessionExpired");
    }
    req.request_parameter_tree()
        .await
        .expect("tree on fresh server");
    sub.resubscribe(&req)
        .await
        .expect("resubscribe after server restart");

    // The subscription handle is the same one the caller still holds;
    // alias is preserved, a fresh server-assigned id may be in place.
    assert_eq!(subscription.name(), original_alias);

    // Callbacks resume on the same handle.
    req.set_parameter("root/Control/dummyDouble", 2.5f64)
        .await
        .expect("set");
    tokio::time::timeout(Duration::from_secs(5), notify.notified())
        .await
        .expect("post-resubscribe callback must fire against fresh server");

    sub.unsubscribe(&req, subscription)
        .await
        .expect("unsubscribe");
    sub.disconnect().await.expect("sub disconnect");
    req.disconnect().await.expect("req disconnect");
    let _ = second.kill();
    let _ = second.wait();
}

#[tokio::test]
async fn test_token_refresh_paused_while_disconnected() {
    // With a live pipe the driver fires `GetSessionToken` on every
    // `token_refresh_interval` tick. When the pipe goes down the
    // handler must short-circuit: nothing to send, nothing to wait
    // on. `session_refresh_count()` exposes a counter the driver
    // bumps only on the fire-path, so observing its delta across a
    // dead-pipe window tells us the pause is honoured.
    let mut server = start_reconnect_server(Some(4));

    let refresh_interval = Duration::from_millis(200);
    let req = Request::connect_to(URL_REQ_R, reconnect_opts_with_refresh(refresh_interval))
        .await
        .expect("connect");
    req.login("root", "vectioneer").await.expect("login");

    // Live pipe: the refresh helper should tick several times in a
    // second at a 200 ms interval.
    tokio::time::sleep(Duration::from_millis(1200)).await;
    let live_count = req.session_refresh_count();
    assert!(
        live_count >= 3,
        "refresh helper must tick while connected, got count = {live_count}"
    );

    // Server dies → state flips to ConnectionLost. Refresh ticks
    // that land during the dead-pipe window must be skipped.
    assert!(
        wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
        "expected ConnectionLost after server shutdown"
    );
    let baseline = req.session_refresh_count();

    // Sleep > 5× the refresh interval. If the pause were broken,
    // the counter would climb by roughly this many ticks.
    tokio::time::sleep(Duration::from_millis(1500)).await;
    let after_dead_pipe = req.session_refresh_count();
    assert_eq!(
        after_dead_pipe, baseline,
        "no refresh ticks should fire while the pipe is dead (baseline={baseline}, after={after_dead_pipe})"
    );

    let _ = server.kill();
    let _ = server.wait();
}