motorcortex-rust 0.5.0

Motorcortex Rust: a Rust client for the Motorcortex Core real-time control system (async + blocking).
Documentation
//! End-to-end tests for the new `core::Subscribe` handle against the
//! vendored `test_server`.

use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;

use motorcortex_rust::ConnectionOptions;
use motorcortex_rust::core::{ConnectionState, Request, Subscribe};

use crate::{CERT_PATH, URL_REQ, URL_SUB};

#[tokio::test]
async fn test_async_subscribe_drop_without_disconnect_does_not_hang() {
    // The receive thread is blocked on `nng_recv` while connected;
    // dropping the last `Subscribe` handle without a prior
    // `disconnect()` used to hang the driver shutdown forever (join
    // waited on a recv that had no reason to return). The driver
    // now closes the socket on shutdown so the recv unblocks with
    // `Err(Closed)` and the receive thread exits cleanly.
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let sub = Subscribe::connect_to(URL_SUB, opts).await.expect("connect");
    // Let the receive thread actually block inside nng_recv so the
    // shutdown path has something to unblock.
    tokio::time::sleep(Duration::from_millis(50)).await;
    // Drop the handle in a spawn_blocking, wait up to 3 s — if the
    // driver hangs, the test times out and fails cleanly instead of
    // blocking the test runner forever.
    tokio::time::timeout(Duration::from_secs(3), async move {
        tokio::task::spawn_blocking(move || {
            drop(sub);
        })
        .await
        .unwrap();
    })
    .await
    .expect("Subscribe drop without disconnect must not hang");
}

#[tokio::test]
async fn test_async_subscribe_round_trips_connect_and_disconnect() {
    let sub = Subscribe::new();
    assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);

    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    sub.connect(URL_SUB, opts).await.expect("connect");
    assert_eq!(*sub.state().borrow(), ConnectionState::Connected);

    sub.disconnect().await.expect("disconnect");
    assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
}

#[tokio::test]
async fn test_async_subscribe_connect_to_convenience() {
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let sub = Subscribe::connect_to(URL_SUB, opts)
        .await
        .expect("connect_to");
    assert_eq!(*sub.state().borrow(), ConnectionState::Connected);
    sub.disconnect().await.expect("disconnect");
}

#[tokio::test]
async fn test_async_subscribe_receives_callbacks() {
    // Full subscribe loop: create group, register a callback, wait for
    // the receive thread to deliver at least one payload.
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let req = Request::connect_to(URL_REQ, opts.clone())
        .await
        .expect("request connect");
    req.request_parameter_tree().await.expect("tree");
    let sub = Subscribe::connect_to(URL_SUB, opts)
        .await
        .expect("subscribe connect");

    let subscription = sub
        .subscribe(
            &req,
            ["root/Control/dummyDouble"],
            "async-cb",
            1000,
        )
        .await
        .expect("subscribe");

    // Use a `tokio::sync::Notify` so the callback thread can wake the
    // test without a busy loop.
    let notify = Arc::new(Notify::new());
    let notify_cb = Arc::clone(&notify);
    subscription.notify(move |_s| {
        notify_cb.notify_one();
    });

    // Nudge the publisher: writing a fresh value guarantees a payload.
    req.set_parameter("root/Control/dummyDouble", 4.2f64)
        .await
        .expect("set");

    // Wait up to 3 s — the server's publisher cadence is bounded by
    // fdiv * base_period, well under that for a 1 kHz server.
    tokio::time::timeout(Duration::from_secs(3), notify.notified())
        .await
        .expect("callback must fire within 3 s");

    // The buffer should decode to *something* — the callback path is
    // lossy (lands on the most recent payload the receive thread
    // pushed before notify_one fired), so we don't assert on the
    // specific value we set. The stream() test below covers the
    // "exact value we wrote" case with lossless semantics.
    let (_ts, value) = subscription.read::<f64>().expect("decode");
    assert!(value.is_finite(), "callback must deliver a finite f64, got {value}");

    sub.unsubscribe(&req, subscription)
        .await
        .expect("unsubscribe");

    sub.disconnect().await.expect("sub disconnect");
    req.disconnect().await.expect("req disconnect");
}

#[tokio::test]
async fn test_async_subscription_latest_decodes_current_sample() {
    // latest() semantics are lossy — between set_parameter and the
    // await, the server's publisher may have emitted a different
    // cadence sample. This test pins the *plumbing* (latest resolves,
    // decode succeeds, value is finite) rather than a specific value.
    // A stream-based test below covers "did we see the exact value
    // we wrote" via the broadcast sink, which is the right tool for
    // that check.
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let req = Request::connect_to(URL_REQ, opts.clone())
        .await
        .expect("request connect");
    req.request_parameter_tree().await.expect("tree");
    let sub = Subscribe::connect_to(URL_SUB, opts)
        .await
        .expect("subscribe connect");

    let subscription = sub
        .subscribe(&req, ["root/Control/dummyDouble"], "async-latest", 1000)
        .await
        .expect("subscribe");

    req.set_parameter("root/Control/dummyDouble", 3.25f64)
        .await
        .expect("set");

    let (_ts, v) = tokio::time::timeout(Duration::from_secs(3), subscription.latest::<f64>())
        .await
        .expect("latest() must resolve within 3 s")
        .expect("decode ok");
    assert!(v.is_finite(), "latest must decode a finite f64, got {v}");

    sub.unsubscribe(&req, subscription)
        .await
        .expect("unsubscribe");
    sub.disconnect().await.expect("sub disconnect");
    req.disconnect().await.expect("req disconnect");
}

#[tokio::test]
async fn test_async_subscription_stream_observes_newly_set_value() {
    // stream() is lossless within the ring, so if we set a value
    // *after* subscribing to the stream, a later item must contain
    // that exact value (barring Lagged, which doesn't happen here
    // with a 256-capacity ring and a 1 kHz producer we consume
    // promptly).
    use futures::StreamExt;

    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let req = Request::connect_to(URL_REQ, opts.clone())
        .await
        .expect("request connect");
    req.request_parameter_tree().await.expect("tree");
    let sub = Subscribe::connect_to(URL_SUB, opts)
        .await
        .expect("subscribe connect");

    let subscription = sub
        .subscribe(&req, ["root/Control/dummyDouble"], "async-stream", 1000)
        .await
        .expect("subscribe");

    let mut stream = Box::pin(subscription.stream::<f64>(256));

    // Write the value *after* the stream exists so no sample is
    // lost to a subscribe-then-set race.
    req.set_parameter("root/Control/dummyDouble", 42.0f64)
        .await
        .expect("set");

    let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
    let mut saw_value = false;
    while tokio::time::Instant::now() < deadline {
        match tokio::time::timeout_at(deadline, stream.next()).await {
            Ok(Some(Ok((_ts, v)))) if (v - 42.0).abs() < 1e-9 => {
                saw_value = true;
                break;
            }
            Ok(Some(_)) => continue, // different value or Missed — keep looking
            Ok(None) => break,       // stream closed
            Err(_) => break,         // deadline reached
        }
    }
    assert!(saw_value, "stream must eventually surface the value we wrote");

    sub.unsubscribe(&req, subscription)
        .await
        .expect("unsubscribe");
    sub.disconnect().await.expect("sub disconnect");
    req.disconnect().await.expect("req disconnect");
}

#[tokio::test]
async fn test_async_subscribe_clone_sees_shared_state() {
    // A cloned Subscription shares the same inner Arc<RwLock<State>>;
    // a callback installed on the clone fires on the original's data
    // and vice versa.
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let req = Request::connect_to(URL_REQ, opts.clone()).await.expect("req");
    req.request_parameter_tree().await.expect("tree");
    let sub = Subscribe::connect_to(URL_SUB, opts).await.expect("sub");

    let primary = sub
        .subscribe(&req, ["root/Control/dummyDouble"], "async-shared", 1000)
        .await
        .expect("subscribe");
    let secondary = primary.clone();
    assert_eq!(primary.id(), secondary.id());

    let notify = Arc::new(Notify::new());
    let notify_cb = Arc::clone(&notify);
    secondary.notify(move |_s| {
        notify_cb.notify_one();
    });

    req.set_parameter("root/Control/dummyDouble", 9.9f64)
        .await
        .expect("set");
    tokio::time::timeout(Duration::from_secs(3), notify.notified())
        .await
        .expect("callback on clone must fire");

    sub.unsubscribe(&req, primary).await.expect("unsubscribe");
    sub.disconnect().await.expect("sub disconnect");
    req.disconnect().await.expect("req disconnect");
}

#[tokio::test]
async fn test_async_subscribe_resubscribe_keeps_delivering() {
    // Smoke test for Subscribe::resubscribe without a server restart:
    // it should re-register every active group (same alias, same
    // paths, same fdiv), rebind the Subscription handles in place,
    // and callbacks keep firing without the caller touching the
    // handle. Exercises the rebind path even when the server returns
    // the same id — the NNG unsubscribe/subscribe still runs.
    let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
    let req = Request::connect_to(URL_REQ, opts.clone())
        .await
        .expect("request connect");
    req.request_parameter_tree().await.expect("tree");
    let sub = Subscribe::connect_to(URL_SUB, opts)
        .await
        .expect("subscribe connect");

    let subscription = sub
        .subscribe(&req, ["root/Control/dummyDouble"], "async-resubscribe", 1000)
        .await
        .expect("subscribe");

    let notify = Arc::new(Notify::new());
    let notify_cb = Arc::clone(&notify);
    subscription.notify(move |_| {
        notify_cb.notify_one();
    });

    // First callback before resubscribe.
    req.set_parameter("root/Control/dummyDouble", 1.5f64)
        .await
        .expect("set");
    tokio::time::timeout(Duration::from_secs(3), notify.notified())
        .await
        .expect("pre-resubscribe callback must fire");

    // Resubscribe — metadata (alias, paths, fdiv) stays the same.
    let old_alias = subscription.name().to_string();
    let old_paths = subscription.paths();
    let old_fdiv = subscription.fdiv();
    sub.resubscribe(&req).await.expect("resubscribe");
    assert_eq!(subscription.name(), old_alias);
    assert_eq!(subscription.paths(), old_paths);
    assert_eq!(subscription.fdiv(), old_fdiv);

    // Callbacks resume on the same handle.
    req.set_parameter("root/Control/dummyDouble", 2.5f64)
        .await
        .expect("set");
    tokio::time::timeout(Duration::from_secs(3), notify.notified())
        .await
        .expect("post-resubscribe callback must fire");

    sub.unsubscribe(&req, subscription)
        .await
        .expect("unsubscribe");
    sub.disconnect().await.expect("sub disconnect");
    req.disconnect().await.expect("req disconnect");
}