use std::sync::Arc;
use std::time::Duration;
use tokio::sync::Notify;
use motorcortex_rust::ConnectionOptions;
use motorcortex_rust::core::{ConnectionState, Request, Subscribe};
use crate::{CERT_PATH, URL_REQ, URL_SUB};
#[tokio::test]
async fn test_async_subscribe_drop_without_disconnect_does_not_hang() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let sub = Subscribe::connect_to(URL_SUB, opts).await.expect("connect");
tokio::time::sleep(Duration::from_millis(50)).await;
tokio::time::timeout(Duration::from_secs(3), async move {
tokio::task::spawn_blocking(move || {
drop(sub);
})
.await
.unwrap();
})
.await
.expect("Subscribe drop without disconnect must not hang");
}
#[tokio::test]
async fn test_async_subscribe_round_trips_connect_and_disconnect() {
let sub = Subscribe::new();
assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
sub.connect(URL_SUB, opts).await.expect("connect");
assert_eq!(*sub.state().borrow(), ConnectionState::Connected);
sub.disconnect().await.expect("disconnect");
assert_eq!(*sub.state().borrow(), ConnectionState::Disconnected);
}
#[tokio::test]
async fn test_async_subscribe_connect_to_convenience() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let sub = Subscribe::connect_to(URL_SUB, opts)
.await
.expect("connect_to");
assert_eq!(*sub.state().borrow(), ConnectionState::Connected);
sub.disconnect().await.expect("disconnect");
}
#[tokio::test]
async fn test_async_subscribe_receives_callbacks() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let req = Request::connect_to(URL_REQ, opts.clone())
.await
.expect("request connect");
req.request_parameter_tree().await.expect("tree");
let sub = Subscribe::connect_to(URL_SUB, opts)
.await
.expect("subscribe connect");
let subscription = sub
.subscribe(
&req,
["root/Control/dummyDouble"],
"async-cb",
1000,
)
.await
.expect("subscribe");
let notify = Arc::new(Notify::new());
let notify_cb = Arc::clone(¬ify);
subscription.notify(move |_s| {
notify_cb.notify_one();
});
req.set_parameter("root/Control/dummyDouble", 4.2f64)
.await
.expect("set");
tokio::time::timeout(Duration::from_secs(3), notify.notified())
.await
.expect("callback must fire within 3 s");
let (_ts, value) = subscription.read::<f64>().expect("decode");
assert!(value.is_finite(), "callback must deliver a finite f64, got {value}");
sub.unsubscribe(&req, subscription)
.await
.expect("unsubscribe");
sub.disconnect().await.expect("sub disconnect");
req.disconnect().await.expect("req disconnect");
}
#[tokio::test]
async fn test_async_subscription_latest_decodes_current_sample() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let req = Request::connect_to(URL_REQ, opts.clone())
.await
.expect("request connect");
req.request_parameter_tree().await.expect("tree");
let sub = Subscribe::connect_to(URL_SUB, opts)
.await
.expect("subscribe connect");
let subscription = sub
.subscribe(&req, ["root/Control/dummyDouble"], "async-latest", 1000)
.await
.expect("subscribe");
req.set_parameter("root/Control/dummyDouble", 3.25f64)
.await
.expect("set");
let (_ts, v) = tokio::time::timeout(Duration::from_secs(3), subscription.latest::<f64>())
.await
.expect("latest() must resolve within 3 s")
.expect("decode ok");
assert!(v.is_finite(), "latest must decode a finite f64, got {v}");
sub.unsubscribe(&req, subscription)
.await
.expect("unsubscribe");
sub.disconnect().await.expect("sub disconnect");
req.disconnect().await.expect("req disconnect");
}
#[tokio::test]
async fn test_async_subscription_stream_observes_newly_set_value() {
use futures::StreamExt;
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let req = Request::connect_to(URL_REQ, opts.clone())
.await
.expect("request connect");
req.request_parameter_tree().await.expect("tree");
let sub = Subscribe::connect_to(URL_SUB, opts)
.await
.expect("subscribe connect");
let subscription = sub
.subscribe(&req, ["root/Control/dummyDouble"], "async-stream", 1000)
.await
.expect("subscribe");
let mut stream = Box::pin(subscription.stream::<f64>(256));
req.set_parameter("root/Control/dummyDouble", 42.0f64)
.await
.expect("set");
let deadline = tokio::time::Instant::now() + Duration::from_secs(3);
let mut saw_value = false;
while tokio::time::Instant::now() < deadline {
match tokio::time::timeout_at(deadline, stream.next()).await {
Ok(Some(Ok((_ts, v)))) if (v - 42.0).abs() < 1e-9 => {
saw_value = true;
break;
}
Ok(Some(_)) => continue, Ok(None) => break, Err(_) => break, }
}
assert!(saw_value, "stream must eventually surface the value we wrote");
sub.unsubscribe(&req, subscription)
.await
.expect("unsubscribe");
sub.disconnect().await.expect("sub disconnect");
req.disconnect().await.expect("req disconnect");
}
#[tokio::test]
async fn test_async_subscribe_clone_sees_shared_state() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let req = Request::connect_to(URL_REQ, opts.clone()).await.expect("req");
req.request_parameter_tree().await.expect("tree");
let sub = Subscribe::connect_to(URL_SUB, opts).await.expect("sub");
let primary = sub
.subscribe(&req, ["root/Control/dummyDouble"], "async-shared", 1000)
.await
.expect("subscribe");
let secondary = primary.clone();
assert_eq!(primary.id(), secondary.id());
let notify = Arc::new(Notify::new());
let notify_cb = Arc::clone(¬ify);
secondary.notify(move |_s| {
notify_cb.notify_one();
});
req.set_parameter("root/Control/dummyDouble", 9.9f64)
.await
.expect("set");
tokio::time::timeout(Duration::from_secs(3), notify.notified())
.await
.expect("callback on clone must fire");
sub.unsubscribe(&req, primary).await.expect("unsubscribe");
sub.disconnect().await.expect("sub disconnect");
req.disconnect().await.expect("req disconnect");
}
#[tokio::test]
async fn test_async_subscribe_resubscribe_keeps_delivering() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000);
let req = Request::connect_to(URL_REQ, opts.clone())
.await
.expect("request connect");
req.request_parameter_tree().await.expect("tree");
let sub = Subscribe::connect_to(URL_SUB, opts)
.await
.expect("subscribe connect");
let subscription = sub
.subscribe(&req, ["root/Control/dummyDouble"], "async-resubscribe", 1000)
.await
.expect("subscribe");
let notify = Arc::new(Notify::new());
let notify_cb = Arc::clone(¬ify);
subscription.notify(move |_| {
notify_cb.notify_one();
});
req.set_parameter("root/Control/dummyDouble", 1.5f64)
.await
.expect("set");
tokio::time::timeout(Duration::from_secs(3), notify.notified())
.await
.expect("pre-resubscribe callback must fire");
let old_alias = subscription.name().to_string();
let old_paths = subscription.paths();
let old_fdiv = subscription.fdiv();
sub.resubscribe(&req).await.expect("resubscribe");
assert_eq!(subscription.name(), old_alias);
assert_eq!(subscription.paths(), old_paths);
assert_eq!(subscription.fdiv(), old_fdiv);
req.set_parameter("root/Control/dummyDouble", 2.5f64)
.await
.expect("set");
tokio::time::timeout(Duration::from_secs(3), notify.notified())
.await
.expect("post-resubscribe callback must fire");
sub.unsubscribe(&req, subscription)
.await
.expect("unsubscribe");
sub.disconnect().await.expect("sub disconnect");
req.disconnect().await.expect("req disconnect");
}