use actr_hyper::test_support::TestHarness;
use actr_hyper::test_support::{
expect_request_eventually_ok, wait_for_data_channel_close_chain, wait_for_data_channel_opened,
wait_for_peer_state,
};
use actr_hyper::transport::{ConnectionEvent, ConnectionState};
use actr_protocol::PayloadType;
use std::time::{Duration, Instant};
fn init_tracing() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_call_succeeds_after_wirepool_reconnect() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let mut event_rx = harness.peer(100).subscribe_events();
let target_id = harness.peer(200).id.clone();
harness.connect(100, 200).await;
harness.reset_counters();
let _initial_session = wait_for_data_channel_opened(
&mut event_rx,
&target_id,
PayloadType::RpcReliable,
Duration::from_secs(5),
)
.await;
harness
.vnet
.as_ref()
.expect("test requires VNet")
.block_network();
let (_session_id, _state) = wait_for_peer_state(
&mut event_rx,
&target_id,
&[ConnectionState::Disconnected, ConnectionState::Failed],
Duration::from_secs(12),
)
.await;
harness.vnet.as_ref().unwrap().unblock_network();
let response = expect_request_eventually_ok(
&harness,
100,
200,
"wirepool_reconnect",
Duration::from_secs(20),
5_000,
)
.await;
tracing::info!(
"call succeeded after WirePool reconnection: {} bytes",
response.len()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_call_fails_during_outage_then_succeeds_after_recovery() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
harness.reset_counters();
harness.simulate_disconnect();
tokio::time::sleep(Duration::from_secs(3)).await;
let handle = harness.peer(100).spawn_request(200, "outage_call", 3_000);
match tokio::time::timeout(Duration::from_secs(5), handle).await {
Ok(Ok(Err(_))) => {
tracing::info!("call correctly failed during outage");
}
Ok(Ok(Ok(_))) => {
tracing::info!("call unexpectedly succeeded (buffered data)");
}
Ok(Err(e)) => panic!("task panicked: {}", e),
Err(_) => panic!("call timed out without error"),
}
harness.simulate_reconnect();
harness.peer(100).retry_failed().await;
let response = expect_request_eventually_ok(
&harness,
100,
200,
"post_recovery_call",
Duration::from_secs(30),
5_000,
)
.await;
tracing::info!("call succeeded after recovery: {} bytes", response.len());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_concurrent_calls_share_dest_transport() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let peer_100 = harness.peer(100);
let peer_200 = harness.peer(200);
let echo_handle = peer_100.start_echo_responder("echo_100");
let recv_handle_200 = peer_200.start_response_receiver("recv_200");
let _ = (echo_handle, recv_handle_200);
let target_id = peer_100.id.clone();
let gate_200 = peer_200.gate.clone();
let handle_1 = tokio::spawn({
let gate = gate_200.clone();
let target = target_id.clone();
async move {
gate.send_request(
&target,
actr_protocol::RpcEnvelope {
request_id: "concurrent_1".into(),
route_key: "test.method".into(),
payload: Some(bytes::Bytes::from("req1")),
timeout_ms: 15_000,
..Default::default()
},
)
.await
}
});
let handle_2 = tokio::spawn({
let gate = gate_200.clone();
let target = target_id.clone();
async move {
gate.send_request(
&target,
actr_protocol::RpcEnvelope {
request_id: "concurrent_2".into(),
route_key: "test.method".into(),
payload: Some(bytes::Bytes::from("req2")),
timeout_ms: 15_000,
..Default::default()
},
)
.await
}
});
let result_1 = tokio::time::timeout(Duration::from_secs(15), handle_1).await;
let result_2 = tokio::time::timeout(Duration::from_secs(15), handle_2).await;
let r1 = result_1
.expect("call_1 should complete")
.expect("call_1 no panic");
let r2 = result_2
.expect("call_2 should complete")
.expect("call_2 no panic");
assert!(r1.is_ok(), "call_1 should succeed, got: {:?}", r1);
assert!(r2.is_ok(), "call_2 should succeed, got: {:?}", r2);
let dest_count = peer_200.transport_manager.dest_count().await;
assert_eq!(
dest_count, 1,
"only one DestTransport should exist for the target, got {}",
dest_count
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_call_returns_promptly_after_connection_closed_cleanup() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let mut event_rx = harness.peer(100).subscribe_events();
let target_id = harness.peer(200).id.clone();
harness.connect(100, 200).await;
let session_id = wait_for_data_channel_opened(
&mut event_rx,
&target_id,
PayloadType::RpcReliable,
Duration::from_secs(5),
)
.await;
let peer_100 = harness.peer(100);
let request_handle = peer_100.spawn_request(200, "cleanup_race_test", 30_000);
peer_100
.coordinator
.close_data_channel_for_test(&target_id, PayloadType::RpcReliable)
.await
.expect("should close data channel");
wait_for_data_channel_close_chain(
&mut event_rx,
&target_id,
session_id,
Duration::from_secs(10),
)
.await;
let start = Instant::now();
match tokio::time::timeout(Duration::from_secs(8), request_handle).await {
Ok(Ok(Err(e))) => {
let elapsed = start.elapsed();
let msg = e.to_string();
assert!(
msg.contains("Connection")
|| msg.contains("Unavailable")
|| msg.contains("recovering"),
"expected connection error, got: {}",
msg
);
assert!(
elapsed < Duration::from_secs(5),
"call should return promptly after cleanup, took {:?}",
elapsed
);
tracing::info!("call returned promptly in {:?} with: {}", elapsed, msg);
}
Ok(Ok(Ok(_))) => {
tracing::info!("call succeeded (data was buffered before close)");
}
Ok(Err(e)) => panic!("task panicked: {}", e),
Err(_) => {
panic!("call did not return within 8s — cleanup may not have fired");
}
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_stale_close_event_does_not_kill_pending_requests() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
let target_id = harness.peer(200).id.clone();
let baseline = harness
.peer(100)
.spawn_request(200, "baseline_stale", 5_000);
let _ = tokio::time::timeout(Duration::from_secs(5), baseline)
.await
.expect("baseline should complete")
.expect("no panic")
.expect("baseline ok");
let on_current_conn = harness
.peer(100)
.spawn_request(200, "stale_event_test", 5_000);
harness
.peer(100)
.send_event(ConnectionEvent::ConnectionClosed {
peer_id: target_id.clone(),
session_id: 99999, });
match tokio::time::timeout(Duration::from_secs(8), on_current_conn).await {
Ok(Ok(Ok(response))) => {
tracing::info!("call survived stale close event: {} bytes", response.len());
}
Ok(Ok(Err(e))) => {
let msg = e.to_string();
assert!(
!msg.contains("Connection closed"),
"call was killed by stale close event: {}",
msg
);
tracing::warn!("call failed for unrelated reason: {}", msg);
}
Ok(Err(e)) => panic!("task panicked: {}", e),
Err(_) => panic!("call timed out — may have been killed by stale event"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_tell_does_not_register_pending_requests() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
let target_id = harness.peer(200).id.clone();
let gate = harness.peer(100).gate.clone();
assert_eq!(
harness.peer(100).pending_count().await,
0,
"no pending requests before tell"
);
let envelope = actr_protocol::RpcEnvelope {
request_id: "tell_no_pending".into(),
route_key: "test.tell".into(),
payload: Some(bytes::Bytes::from("fire_and_forget")),
timeout_ms: 0, ..Default::default()
};
let result = gate.send_message(&target_id, envelope).await;
assert!(
result.is_ok(),
"tell should succeed on a healthy connection"
);
assert_eq!(
harness.peer(100).pending_count().await,
0,
"tell should not register pending requests"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_send_data_stream_rejected_during_recovery() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
let target_id = harness.peer(200).id.clone();
let gate = harness.peer(100).gate.clone();
let healthy_result = gate
.send_data_stream(
&target_id,
PayloadType::StreamReliable,
"healthy_stream",
bytes::Bytes::from("test"),
)
.await;
assert!(
healthy_result.is_ok(),
"send_data_stream should succeed on healthy connection"
);
harness
.peer(100)
.coordinator
.begin_network_recovery("test stream recovery")
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let start = Instant::now();
let result = gate
.send_data_stream(
&target_id,
PayloadType::StreamReliable,
"recovery_stream",
bytes::Bytes::from("test"),
)
.await;
let elapsed = start.elapsed();
assert!(
result.is_err(),
"send_data_stream should fail during recovery window"
);
let msg = result.unwrap_err().to_string();
assert!(
msg.contains("recovering") || msg.contains("Unavailable"),
"expected recovery/unavailable error, got: {}",
msg
);
assert!(
elapsed < Duration::from_secs(2),
"send_data_stream should fail fast without retry, took {:?}",
elapsed
);
tracing::info!(
"send_data_stream correctly rejected without retry in {:?}: {}",
elapsed,
msg
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_call_envelope_timeout_truncates_retry_backoff() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
harness.reset_counters();
harness.simulate_disconnect();
tokio::time::sleep(Duration::from_secs(3)).await;
let start = Instant::now();
let handle = harness
.peer(100)
.spawn_request(200, "short_timeout_test", 3_000);
match tokio::time::timeout(Duration::from_secs(6), handle).await {
Ok(Ok(Err(e))) => {
let elapsed = start.elapsed();
let msg = e.to_string();
assert!(
msg.contains("timeout") || msg.contains("Unavailable"),
"expected timeout/unavailable error, got: {}",
msg
);
assert!(
elapsed >= Duration::from_secs(3),
"should wait at least the 3s timeout, took {:?}",
elapsed
);
assert!(
elapsed < Duration::from_secs(5),
"should not exceed envelope timeout by much, took {:?}",
elapsed
);
tracing::info!(
"call correctly timed out in {:?} (3s envelope < 17s full retry)",
elapsed
);
}
Ok(Ok(Ok(_))) => panic!("call should timeout when network is down"),
Ok(Err(e)) => panic!("task panicked: {}", e),
Err(_) => panic!("outer timeout — call did not respect envelope timeout"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_call_succeeds_after_full_disconnect_reconnect_cycle() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
harness.reset_counters();
harness.simulate_disconnect();
tokio::time::sleep(Duration::from_secs(8)).await;
harness.simulate_reconnect();
harness.peer(100).retry_failed().await;
let response = expect_request_eventually_ok(
&harness,
100,
200,
"full_cycle_retry",
Duration::from_secs(30),
5_000,
)
.await;
tracing::info!(
"call succeeded after full disconnect/reconnect: {} bytes",
response.len()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_call_fast_fails_during_recovery_window() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
harness
.peer(100)
.coordinator
.begin_network_recovery("test recovery window")
.await;
tokio::time::sleep(Duration::from_millis(50)).await;
let start = Instant::now();
let handle = harness
.peer(100)
.spawn_request(200, "recovery_window_fast_fail", 30_000);
match tokio::time::timeout(Duration::from_secs(5), handle).await {
Ok(Ok(Err(e))) => {
let elapsed = start.elapsed();
let msg = e.to_string();
assert!(
msg.contains("Connection recovering"),
"expected 'Connection recovering' error, got: {}",
msg
);
assert!(
elapsed < Duration::from_secs(2),
"preflight_send should fast-fail, took {:?}",
elapsed
);
tracing::info!(
"preflight_send correctly fast-failed in {:?}: {}",
elapsed,
msg
);
}
Ok(Ok(Ok(_))) => panic!("call should be blocked during recovery window"),
Ok(Err(e)) => panic!("task panicked: {}", e),
Err(_) => panic!("call timed out — preflight_send did not fast-fail"),
}
}