use std::collections::HashSet;
use std::sync::{Arc, Mutex};
use actr_hyper::test_support::TestHarness;
use actr_hyper::test_support::WebRtcFragmentSendEvent;
use actr_hyper::test_support::wait_for_data_channel_opened;
use actr_protocol::PayloadType;
use std::time::{Duration, Instant};
fn init_tracing() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_single_pending_request_id_registration() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let gate = harness.peer(100).gate.clone();
let target_id = harness.peer(200).id.clone();
let _rx = gate
.register_pending_for_test("dedup_test_1", target_id)
.await;
assert_eq!(
gate.pending_count().await,
1,
"one request_id should create exactly one pending entry"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_two_pending_requests_have_distinct_request_ids() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let gate = harness.peer(100).gate.clone();
let target_id = harness.peer(200).id.clone();
let _rx1 = gate
.register_pending_for_test("distinct_req_1", target_id.clone())
.await;
let _rx2 = gate
.register_pending_for_test("distinct_req_2", target_id)
.await;
assert_eq!(
gate.pending_count().await,
2,
"two distinct request_ids should create two pending entries"
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_handle_response_deduplicates_by_request_id() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let gate = harness.peer(100).gate.clone();
let target_id = harness.peer(200).id.clone();
let response_rx = gate
.register_pending_for_test("dedup_response_test", target_id)
.await;
let first = gate
.handle_response("dedup_response_test", Ok(bytes::Bytes::from("pong")))
.await
.expect("handle_response should not error");
assert!(
first,
"first response for request_id should be delivered (Ok(true))"
);
let second = gate
.handle_response("dedup_response_test", Ok(bytes::Bytes::from("pong2")))
.await
.expect("handle_response should not error");
assert!(
!second,
"duplicate response for same request_id should be dropped (Ok(false))"
);
match tokio::time::timeout(Duration::from_secs(3), response_rx).await {
Ok(Ok(Ok(response))) => {
assert_eq!(
&response[..],
b"pong",
"caller should receive the first response, not the duplicate"
);
}
Ok(Ok(Err(e))) => panic!("response should succeed, got: {}", e),
Ok(Err(e)) => panic!("response sender dropped: {}", e),
Err(_) => panic!("response wait timed out"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_late_response_after_connection_closed_is_dropped() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let mut event_rx = harness.peer(100).subscribe_events();
let target_id = harness.peer(200).id.clone();
harness.connect(100, 200).await;
let session_id = wait_for_data_channel_opened(
&mut event_rx,
&target_id,
PayloadType::RpcReliable,
Duration::from_secs(5),
)
.await;
let gate = harness.peer(100).gate.clone();
let response_rx = gate
.register_pending_for_test("late_response_test", target_id.clone())
.await;
assert_eq!(
harness.peer(100).pending_count().await,
1,
"request should be pending before close"
);
harness
.peer(100)
.coordinator
.close_data_channel_for_test(&target_id, PayloadType::RpcReliable)
.await
.expect("should close data channel");
actr_hyper::test_support::wait_for_data_channel_close_chain(
&mut event_rx,
&target_id,
session_id,
Duration::from_secs(10),
)
.await;
tokio::time::sleep(Duration::from_millis(300)).await;
assert_eq!(
harness.peer(100).pending_count().await,
0,
"pending request should be cleaned up after ConnectionClosed"
);
let late_result = gate
.handle_response("late_response_test", Ok(bytes::Bytes::from("late_pong")))
.await
.expect("handle_response should not error");
assert!(
!late_result,
"late response for cleaned-up request_id should be dropped (Ok(false))"
);
match tokio::time::timeout(Duration::from_secs(3), response_rx).await {
Ok(Ok(Err(e))) => {
let msg = e.to_string();
assert!(
msg.contains("Connection")
|| msg.contains("Unavailable")
|| msg.contains("recovering"),
"caller should get connection error, not late response, got: {}",
msg
);
tracing::info!("caller correctly got error, not late response: {}", msg);
}
Ok(Ok(Ok(response))) => panic!(
"caller should not receive late response after cleanup: {:?}",
response
),
Ok(Err(e)) => panic!("response sender dropped without cleanup error: {}", e),
Err(_) => panic!("response wait timed out"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_successful_send_does_not_retry_and_uses_transport_msg_id() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
let mut event_rx = harness.peer(100).subscribe_events();
let target_id = harness.peer(200).id.clone();
harness.connect(100, 200).await;
wait_for_data_channel_opened(
&mut event_rx,
&target_id,
PayloadType::RpcReliable,
Duration::from_secs(5),
)
.await;
let captured: Arc<Mutex<Vec<WebRtcFragmentSendEvent>>> = Arc::new(Mutex::new(Vec::new()));
let captured_clone = captured.clone();
let _guard = actr_hyper::test_support::install_webrtc_fragment_send_hook_for_test(Arc::new(
move |event: WebRtcFragmentSendEvent| {
captured_clone.lock().unwrap().push(event);
Box::pin(async {})
},
));
captured.lock().unwrap().clear();
let start = Instant::now();
let handle = harness
.peer(100)
.spawn_request(200, "no_retry_success", 10_000);
let result = tokio::time::timeout(Duration::from_secs(10), handle).await;
match result {
Ok(Ok(Ok(_))) => {}
Ok(Ok(Err(e))) => panic!("call should succeed: {}", e),
Ok(Err(e)) => panic!("task panicked: {}", e),
Err(_) => panic!("call timed out"),
}
let elapsed = start.elapsed();
let events = captured.lock().unwrap();
assert!(
!events.is_empty(),
"should have captured at least one fragment send event"
);
let msg_ids: HashSet<u32> = events.iter().map(|e| e.msg_id).collect();
assert_eq!(
msg_ids.len(),
1,
"successful call should send data exactly once (1 unique msg_id), got {} msg_ids: {:?}",
msg_ids.len(),
msg_ids
);
for event in events.iter() {
tracing::info!(
"fragment: msg_id={}, frag_index={}/total={}, payload_len={}, msg_len={}",
event.msg_id,
event.frag_index,
event.total_frags,
event.fragment_payload_len,
event.message_len,
);
}
tracing::info!(
"call completed in {:?} with {} fragments, {} unique msg_ids",
elapsed,
events.len(),
msg_ids.len()
);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn test_tell_has_no_pending_response_channel() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
harness.connect(100, 200).await;
let gate = harness.peer(100).gate.clone();
let target_id = harness.peer(200).id.clone();
let envelope = actr_protocol::RpcEnvelope {
request_id: "tell_retry_test".into(),
route_key: "test.tell".into(),
payload: Some(bytes::Bytes::from("fire_and_forget")),
timeout_ms: 0,
..Default::default()
};
let result = gate.send_message(&target_id, envelope.clone()).await;
assert!(result.is_ok(), "tell should succeed");
assert_eq!(
harness.peer(100).pending_count().await,
0,
"tell should not create pending requests"
);
let late = gate
.handle_response("tell_retry_test", Ok(bytes::Bytes::from("unexpected")))
.await
.expect("handle_response should not error");
assert!(!late, "tell request_id should never be in pending_requests");
}