use std::process::{Child, Command};
use std::sync::Arc;
use std::time::Duration;
use std::{env, thread};
use tokio::sync::Notify;
use motorcortex_rust::core::{Request, Subscribe};
use motorcortex_rust::{ConnectionOptions, ConnectionState};
use crate::CERT_PATH;
const URL_REQ_R: &str = "wss://localhost:5578";
const URL_SUB_R: &str = "wss://localhost:5577";
fn reconnect_opts() -> ConnectionOptions {
ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
.with_reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
.with_token_refresh_interval(Duration::ZERO)
}
fn reconnect_opts_with_refresh(interval: Duration) -> ConnectionOptions {
ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
.with_reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
.with_token_refresh_interval(interval)
}
fn server_bin() -> String {
format!(
"{}/tests/server/build/test_server",
env!("CARGO_MANIFEST_DIR")
)
}
fn server_config() -> String {
format!(
"{}/tests/server/config/reconnect_config.json",
env!("CARGO_MANIFEST_DIR")
)
}
fn start_reconnect_server(lifetime_secs: Option<u32>) -> Child {
let mut cmd = Command::new(server_bin());
cmd.arg("-c").arg(server_config()).arg("-s");
if let Some(secs) = lifetime_secs {
cmd.env("MCX_TEST_SERVER_LIFETIME_SEC", secs.to_string());
}
let child = cmd.spawn().expect("failed to start reconnect test server");
thread::sleep(Duration::from_secs(3));
child
}
async fn wait_for_state(
req: &Request,
want: ConnectionState,
deadline: Duration,
) -> bool {
let start = std::time::Instant::now();
let mut rx = req.state();
loop {
if *rx.borrow() == want {
return true;
}
if start.elapsed() >= deadline {
return false;
}
let _ = tokio::time::timeout(Duration::from_millis(200), rx.changed()).await;
}
}
#[tokio::test]
async fn test_connection_lost_emits_state() {
let mut server = start_reconnect_server(Some(4));
let req = Request::connect_to(URL_REQ_R, reconnect_opts())
.await
.expect("connect");
req.login("root", "vectioneer").await.expect("login");
assert!(
wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
"expected ConnectionLost after server shutdown, got {:?}",
*req.state().borrow()
);
let _ = server.wait();
}
#[tokio::test]
async fn test_reconnect_restores_session() {
let mut first = start_reconnect_server(Some(4));
let req = Request::connect_to(URL_REQ_R, reconnect_opts())
.await
.expect("connect");
req.login("root", "vectioneer").await.expect("login");
req.get_session_token().await.expect("get_session_token");
assert!(
wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
"expected ConnectionLost"
);
let _ = first.wait();
let mut second = start_reconnect_server(None);
assert!(
wait_for_state(&req, ConnectionState::Connected, Duration::from_secs(15)).await,
"expected Connected after reconnect, got {:?}",
*req.state().borrow()
);
req.request_parameter_tree()
.await
.expect("request_parameter_tree must succeed after reconnect");
req.disconnect().await.expect("disconnect");
let _ = second.kill();
let _ = second.wait();
}
#[tokio::test]
async fn test_session_expired_emits_state() {
let mut server = start_reconnect_server(None);
let req = Request::connect_to(URL_REQ_R, reconnect_opts())
.await
.expect("connect");
req.login("root", "vectioneer").await.expect("login");
let status = req
.restore_session("totally-invalid-token")
.await
.expect("restore_session RPC completes even for bad tokens");
use motorcortex_rust::StatusCode;
assert!(
!matches!(status, StatusCode::Ok | StatusCode::ReadOnlyMode),
"expected non-Ok status for invalid token, got {status:?}"
);
req.disconnect().await.expect("disconnect");
let _ = server.kill();
let _ = server.wait();
}
#[tokio::test]
async fn test_reconnect_disabled_emits_disconnected_on_drop() {
let mut server = start_reconnect_server(Some(4));
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
.with_reconnect(false)
.with_token_refresh_interval(Duration::ZERO);
let req = Request::connect_to(URL_REQ_R, opts).await.expect("connect");
req.login("root", "vectioneer").await.expect("login");
assert!(
wait_for_state(&req, ConnectionState::Disconnected, Duration::from_secs(12)).await,
"expected Disconnected after server shutdown with reconnect=false, got {:?}",
*req.state().borrow()
);
let _ = server.kill();
let _ = server.wait();
}
#[tokio::test]
async fn test_max_reconnect_attempts_allows_happy_path() {
let opts = ConnectionOptions::new(CERT_PATH.to_string(), 5000, 5000)
.with_reconnect_backoff(Duration::from_millis(100), Duration::from_secs(5))
.with_token_refresh_interval(Duration::ZERO)
.with_max_reconnect_attempts(Some(3));
let mut first = start_reconnect_server(Some(4));
let req = Request::connect_to(URL_REQ_R, opts).await.expect("connect");
req.login("root", "vectioneer").await.expect("login");
req.get_session_token().await.expect("token");
assert!(
wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
"expected ConnectionLost"
);
let _ = first.wait();
let mut second = start_reconnect_server(None);
assert!(
wait_for_state(&req, ConnectionState::Connected, Duration::from_secs(15)).await,
"expected Connected after restore with max_reconnect_attempts set, got {:?}",
*req.state().borrow()
);
req.request_parameter_tree()
.await
.expect("RPC must work on the restored session");
req.disconnect().await.expect("disconnect");
let _ = second.kill();
let _ = second.wait();
}
#[tokio::test]
async fn test_resubscribe_after_server_restart_delivers_payloads() {
let mut first = start_reconnect_server(Some(5));
let req = Request::connect_to(URL_REQ_R, reconnect_opts())
.await
.expect("connect req");
req.login("root", "vectioneer").await.expect("login");
req.get_session_token().await.expect("token");
req.request_parameter_tree().await.expect("tree");
let sub = Subscribe::connect_to(URL_SUB_R, reconnect_opts())
.await
.expect("connect sub");
let subscription = sub
.subscribe(&req, ["root/Control/dummyDouble"], "resub-e2e", 1000)
.await
.expect("subscribe");
let original_alias = subscription.name().to_string();
let notify = Arc::new(Notify::new());
let notify_cb = Arc::clone(¬ify);
subscription.notify(move |_| {
notify_cb.notify_one();
});
req.set_parameter("root/Control/dummyDouble", 1.5f64)
.await
.expect("set");
tokio::time::timeout(Duration::from_secs(3), notify.notified())
.await
.expect("pre-crash callback must fire");
assert!(
wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
"expected ConnectionLost after server shutdown"
);
let _ = first.wait();
let mut second = start_reconnect_server(None);
let ready_state = tokio::time::timeout(Duration::from_secs(15), async {
let mut rx = req.state();
loop {
let s = *rx.borrow();
if matches!(s, ConnectionState::Connected | ConnectionState::SessionExpired) {
return s;
}
rx.changed().await.ok();
}
})
.await
.expect("state must settle to Connected or SessionExpired after reconnect");
if ready_state == ConnectionState::SessionExpired {
req.login("root", "vectioneer")
.await
.expect("re-login after SessionExpired");
}
req.request_parameter_tree()
.await
.expect("tree on fresh server");
sub.resubscribe(&req)
.await
.expect("resubscribe after server restart");
assert_eq!(subscription.name(), original_alias);
req.set_parameter("root/Control/dummyDouble", 2.5f64)
.await
.expect("set");
tokio::time::timeout(Duration::from_secs(5), notify.notified())
.await
.expect("post-resubscribe callback must fire against fresh server");
sub.unsubscribe(&req, subscription)
.await
.expect("unsubscribe");
sub.disconnect().await.expect("sub disconnect");
req.disconnect().await.expect("req disconnect");
let _ = second.kill();
let _ = second.wait();
}
#[tokio::test]
async fn test_token_refresh_paused_while_disconnected() {
let mut server = start_reconnect_server(Some(4));
let refresh_interval = Duration::from_millis(200);
let req = Request::connect_to(URL_REQ_R, reconnect_opts_with_refresh(refresh_interval))
.await
.expect("connect");
req.login("root", "vectioneer").await.expect("login");
tokio::time::sleep(Duration::from_millis(1200)).await;
let live_count = req.session_refresh_count();
assert!(
live_count >= 3,
"refresh helper must tick while connected, got count = {live_count}"
);
assert!(
wait_for_state(&req, ConnectionState::ConnectionLost, Duration::from_secs(12)).await,
"expected ConnectionLost after server shutdown"
);
let baseline = req.session_refresh_count();
tokio::time::sleep(Duration::from_millis(1500)).await;
let after_dead_pipe = req.session_refresh_count();
assert_eq!(
after_dead_pipe, baseline,
"no refresh ticks should fire while the pipe is dead (baseline={baseline}, after={after_dead_pipe})"
);
let _ = server.kill();
let _ = server.wait();
}