use actr_hyper::test_support::{TestHarness, make_actor_id};
use std::time::Duration;
fn init_tracing() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
}
#[tokio::test]
async fn test_stale_callbacks_do_not_kill_new_connection() {
init_tracing();
let mut harness = TestHarness::with_vnet().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
tracing::info!("🔗 Phase 1: Establishing connection 100 → 200...");
harness.connect(100, 200).await;
tracing::info!("✅ Phase 1 complete: Connection established and verified");
let mut event_rx = harness.peer(100).subscribe_events();
harness.reset_counters();
tracing::info!("🔴 Phase 2: Simulating full network outage...");
tracing::info!(" Old DataChannel callbacks will fire asynchronously");
harness.simulate_disconnect();
tracing::info!("⏳ Waiting 10s for ICE disconnection + DC close callbacks...");
tokio::time::sleep(Duration::from_secs(10)).await;
let mut stale_event_count = 0;
while let Ok(event) = event_rx.try_recv() {
tracing::info!("📨 Stale event during outage: {:?}", event);
stale_event_count += 1;
}
tracing::info!(
"📊 Received {} events during outage (will become stale after reconnect)",
stale_event_count
);
tracing::info!("🟢 Phase 3: Restoring network...");
harness.simulate_reconnect();
tokio::time::sleep(Duration::from_millis(500)).await;
tracing::info!("📱 Triggering retry_failed_connections()...");
harness.peer(100).retry_failed().await;
tracing::info!("⏳ Waiting for ICE restart + new connection...");
tokio::time::sleep(Duration::from_secs(10)).await;
tracing::info!("📤 Phase 4: Verifying new connection is alive...");
tracing::info!(" If stale callbacks killed the new connection, this will FAIL");
let peer_a = harness.peer(100);
let handle1 = peer_a.spawn_request(200, "stale_guard_verify_1", 15000);
match tokio::time::timeout(Duration::from_secs(15), handle1).await {
Ok(Ok(Ok(response))) => {
tracing::info!(
"✅ Message 1 succeeded on NEW connection! ({} bytes)",
response.len()
);
}
Ok(Ok(Err(e))) => {
let err_str = format!("{:?}", e);
if err_str.to_lowercase().contains("closed") {
panic!(
"❌ BUG: Stale callback killed new connection! Error: {}",
err_str
);
}
panic!("❌ Message 1 failed: {}", err_str);
}
Ok(Err(e)) => panic!("Task panicked: {}", e),
Err(_) => panic!("❌ Message 1 timed out — new connection may be dead"),
}
tracing::info!("📤 Phase 5: Sending 3 more messages to verify stability...");
for i in 2..=4 {
let peer_a = harness.peer(100);
let req_id = format!("stale_guard_verify_{}", i);
let handle = peer_a.spawn_request(200, &req_id, 10000);
match tokio::time::timeout(Duration::from_secs(10), handle).await {
Ok(Ok(Ok(response))) => {
tracing::info!(" ✅ Message {} succeeded ({} bytes)", i, response.len());
}
Ok(Ok(Err(e))) => {
panic!("❌ Message {} failed: {:?}", i, e);
}
Ok(Err(e)) => panic!("Task {} panicked: {}", i, e),
Err(_) => panic!("❌ Message {} timed out", i),
}
tokio::time::sleep(Duration::from_millis(200)).await;
}
let total_restarts = harness.ice_restart_count();
tracing::info!("╔══════════════════════════════════════════════════════╗");
tracing::info!("║ Stale Callback Guard Test — PASSED ║");
tracing::info!("╠══════════════════════════════════════════════════════╣");
tracing::info!("║ • Old connection's stale callbacks were SILENCED ║");
tracing::info!("║ • New connection survived and handled 4 messages ║");
tracing::info!(
"║ • ICE restarts: {} ║",
total_restarts
);
tracing::info!("║ • Bug 'stale callbacks killing new conns' is FIXED ║");
tracing::info!("╚══════════════════════════════════════════════════════╝");
tracing::info!("✅ test_stale_callbacks_do_not_kill_new_connection passed!");
}
#[tokio::test]
async fn test_session_id_filtering_ignores_stale_events() {
init_tracing();
let mut harness = TestHarness::new().await;
harness.add_peer(100).await;
harness.add_peer(200).await;
tracing::info!("🔗 Establishing connection 100 → 200...");
harness.connect(100, 200).await;
tracing::info!("✅ Connection established");
let mut event_rx = harness.peer(100).subscribe_events();
tracing::info!("🧪 Injecting stale ConnectionClosed event (session_id=0)...");
let stale_event = actr_hyper::transport::ConnectionEvent::ConnectionClosed {
peer_id: make_actor_id(200),
session_id: 0, };
harness.peer(100).send_event(stale_event);
tokio::time::sleep(Duration::from_millis(500)).await;
tracing::info!("📤 Verifying connection still works after stale event injection...");
let peer_a = harness.peer(100);
let handle = peer_a.spawn_request(200, "after_stale_inject", 10000);
match tokio::time::timeout(Duration::from_secs(10), handle).await {
Ok(Ok(Ok(response))) => {
tracing::info!(
"✅ Connection survived stale event! Response: {} bytes",
response.len()
);
}
Ok(Ok(Err(e))) => {
panic!("❌ BUG: Stale event killed the connection! Error: {:?}", e);
}
Ok(Err(e)) => panic!("Task panicked: {}", e),
Err(_) => panic!("❌ Connection dead after stale event injection!"),
}
let mut events_seen = 0;
while let Ok(event) = event_rx.try_recv() {
tracing::debug!(" Event observed: {:?}", event);
events_seen += 1;
}
tracing::info!("📊 Events processed after injection: {}", events_seen);
tracing::info!("╔══════════════════════════════════════════════════════╗");
tracing::info!("║ Session ID Filtering Test — PASSED ║");
tracing::info!("╠══════════════════════════════════════════════════════╣");
tracing::info!("║ • Stale ConnectionClosed (session_id=0) was ignored ║");
tracing::info!("║ • Current connection survived and handled messages ║");
tracing::info!("║ • Coordinator event listener filtering works ║");
tracing::info!("╚══════════════════════════════════════════════════════╝");
tracing::info!("✅ test_session_id_filtering_ignores_stale_events passed!");
}