use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::mpsc;
use actr_hyper::lifecycle::{
DefaultNetworkEventProcessor, NetworkEvent, NetworkEventHandle, NetworkEventProcessor,
NetworkEventResult,
};
use actr_hyper::test_support::{TestSignalingServer, create_peer_with_websocket, make_actor_id};
#[tokio::test]
async fn test_network_available_triggers_recovery() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
tracing::info!("๐งช Test: Network available triggers recovery");
let server = TestSignalingServer::start().await.unwrap();
let id_peer_a = make_actor_id(100);
let id_peer_b = make_actor_id(200);
let (coordinator_a, signaling_client_a) =
create_peer_with_websocket(id_peer_a.clone(), &server.url())
.await
.unwrap();
let (_coordinator_b, _signaling_client_b) =
create_peer_with_websocket(id_peer_b.clone(), &server.url())
.await
.unwrap();
tracing::info!("๐ Establishing initial peer connection...");
let ready_rx = coordinator_a
.initiate_connection(&id_peer_b)
.await
.expect("initiate failed");
match tokio::time::timeout(Duration::from_secs(10), ready_rx).await {
Ok(Ok(_)) => {
tracing::info!("โ
Initial peer connection established!");
}
Ok(Err(_)) => panic!("Connection failed (channel closed)"),
Err(_) => panic!("Connection timed out"),
}
tokio::time::sleep(Duration::from_millis(500)).await;
let processor = Arc::new(DefaultNetworkEventProcessor::new(
signaling_client_a.clone(),
Some(coordinator_a.clone()),
));
let (event_tx, mut event_rx) = mpsc::channel(10);
let (result_tx, result_rx) = mpsc::channel(10);
let network_handle = NetworkEventHandle::new(event_tx, result_rx);
let processor_clone = processor.clone();
let shutdown_token = tokio_util::sync::CancellationToken::new();
let shutdown_clone = shutdown_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
tracing::info!("๐ฅ Processing event: {:?}", event);
let start = Instant::now();
let result = match &event {
NetworkEvent::Available => processor_clone.process_network_available().await,
NetworkEvent::Lost => processor_clone.process_network_lost().await,
NetworkEvent::TypeChanged { is_wifi, is_cellular } => {
processor_clone.process_network_type_changed(*is_wifi, *is_cellular).await
},
NetworkEvent::CleanupConnections => {
processor_clone.cleanup_connections().await
},
};
let duration_ms = start.elapsed().as_millis() as u64;
let event_result = match result {
Ok(_) => NetworkEventResult::success(event, duration_ms),
Err(e) => NetworkEventResult::failure(event, e, duration_ms),
};
let _ = result_tx.send(event_result).await;
}
_ = shutdown_clone.cancelled() => break,
}
}
});
server.reset_counters();
let initial_ice_restart_count = server.get_ice_restart_count();
tracing::info!("๐ฑ Triggering network available event...");
let result = network_handle
.handle_network_available()
.await
.expect("Failed to handle network available");
tracing::info!(
"๐ Result: success={}, duration={}ms",
result.success,
result.duration_ms
);
assert!(
result.success,
"Network available processing should succeed"
);
tokio::time::sleep(Duration::from_millis(1500)).await;
let new_ice_restart_count = server.get_ice_restart_count();
tracing::info!(
"๐ ICE restart offers: {} -> {}",
initial_ice_restart_count,
new_ice_restart_count
);
assert!(
new_ice_restart_count > initial_ice_restart_count,
"Should have triggered ICE restart"
);
shutdown_token.cancel();
}
#[tokio::test]
async fn test_network_lost_cleanup() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
tracing::info!("๐งช Test: Network lost cleanup");
let server = TestSignalingServer::start().await.unwrap();
let id_peer_a = make_actor_id(300);
let (coordinator_a, signaling_client_a) =
create_peer_with_websocket(id_peer_a.clone(), &server.url())
.await
.unwrap();
let processor = Arc::new(DefaultNetworkEventProcessor::new(
signaling_client_a.clone(),
Some(coordinator_a.clone()),
));
let (event_tx, mut event_rx) = mpsc::channel(10);
let (result_tx, result_rx) = mpsc::channel(10);
let network_handle = NetworkEventHandle::new(event_tx, result_rx);
let processor_clone = processor.clone();
let shutdown_token = tokio_util::sync::CancellationToken::new();
let shutdown_clone = shutdown_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
let start = Instant::now();
let result = match &event {
NetworkEvent::Available => processor_clone.process_network_available().await,
NetworkEvent::Lost => processor_clone.process_network_lost().await,
NetworkEvent::TypeChanged { is_wifi, is_cellular } => {
processor_clone.process_network_type_changed(*is_wifi, *is_cellular).await
},
NetworkEvent::CleanupConnections => {
processor_clone.cleanup_connections().await
},
};
let duration_ms = start.elapsed().as_millis() as u64;
let event_result = match result {
Ok(_) => NetworkEventResult::success(event, duration_ms),
Err(e) => NetworkEventResult::failure(event, e, duration_ms),
};
let _ = result_tx.send(event_result).await;
}
_ = shutdown_clone.cancelled() => break,
}
}
});
assert!(signaling_client_a.is_connected());
tracing::info!("๐ฑ Triggering network lost event...");
let result = network_handle
.handle_network_lost()
.await
.expect("Failed to handle network lost");
tracing::info!("๐ Result: success={}", result.success);
assert!(result.success);
tokio::time::sleep(Duration::from_millis(50)).await;
let is_connected = signaling_client_a.is_connected();
tracing::info!("๏ฟฝ Is connected: {}", is_connected);
assert!(
!is_connected,
"Client should be disconnected after network lost"
);
shutdown_token.cancel();
}
#[tokio::test]
async fn test_result_feedback_mechanism() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
tracing::info!("๐งช Test: Result feedback mechanism");
let (event_tx, mut event_rx) = mpsc::channel(10);
let (result_tx, result_rx) = mpsc::channel(10);
let network_handle = NetworkEventHandle::new(event_tx, result_rx);
let shutdown_token = tokio_util::sync::CancellationToken::new();
let shutdown_clone = shutdown_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
tokio::time::sleep(Duration::from_millis(50)).await;
let result = NetworkEventResult::success(event, 50);
let _ = result_tx.send(result).await;
}
_ = shutdown_clone.cancelled() => break,
}
}
});
tracing::info!("๐ฑ Sending event and waiting for result...");
let result = network_handle
.handle_network_available()
.await
.expect("Failed to get result");
tracing::info!("๐ Got result: {:?}", result);
assert!(matches!(result.event, NetworkEvent::Available));
assert!(result.success);
assert!(result.duration_ms >= 50);
shutdown_token.cancel();
tracing::info!("โ
Result feedback test passed");
}
#[tokio::test]
async fn test_network_repeatedly_changing() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
tracing::info!("๐งช Test: Network repeatedly changing");
let server = TestSignalingServer::start().await.unwrap();
let id_peer_a = make_actor_id(600);
let id_peer_b = make_actor_id(700);
let (coordinator_a, signaling_client_a) =
create_peer_with_websocket(id_peer_a.clone(), &server.url())
.await
.unwrap();
let (_coordinator_b, _signaling_client_b) =
create_peer_with_websocket(id_peer_b.clone(), &server.url())
.await
.unwrap();
tracing::info!("๐ Establishing initial peer connection...");
let ready_rx = coordinator_a
.initiate_connection(&id_peer_b)
.await
.expect("initiate failed");
match tokio::time::timeout(Duration::from_secs(10), ready_rx).await {
Ok(Ok(_)) => {
tracing::info!("โ
Initial peer connection established!");
}
Ok(Err(_)) => panic!("Connection failed (channel closed)"),
Err(_) => panic!("Connection timed out"),
}
tokio::time::sleep(Duration::from_millis(500)).await;
let processor = Arc::new(DefaultNetworkEventProcessor::new(
signaling_client_a.clone(),
Some(coordinator_a.clone()),
));
let (event_tx, mut event_rx) = mpsc::channel(10);
let (result_tx, result_rx) = mpsc::channel(10);
let network_handle = NetworkEventHandle::new(event_tx, result_rx);
let processor_clone = processor.clone();
let shutdown_token = tokio_util::sync::CancellationToken::new();
let shutdown_clone = shutdown_token.clone();
tokio::spawn(async move {
loop {
tokio::select! {
Some(event) = event_rx.recv() => {
let start = Instant::now();
let result = match &event {
NetworkEvent::Available => processor_clone.process_network_available().await,
NetworkEvent::Lost => processor_clone.process_network_lost().await,
NetworkEvent::TypeChanged { is_wifi, is_cellular } => {
processor_clone.process_network_type_changed(*is_wifi, *is_cellular).await
},
NetworkEvent::CleanupConnections => {
processor_clone.cleanup_connections().await
}
};
let duration_ms = start.elapsed().as_millis() as u64;
let event_result = match result {
Ok(_) => NetworkEventResult::success(event, duration_ms),
Err(e) => NetworkEventResult::failure(event, e, duration_ms),
};
let _ = result_tx.send(event_result).await;
}
_ = shutdown_clone.cancelled() => break,
}
}
});
tokio::time::sleep(Duration::from_millis(200)).await;
server.reset_counters();
const CYCLES: usize = 3;
let initial_count = server.get_ice_restart_count();
for cycle in 1..=CYCLES {
tracing::info!("๐ Network change cycle {}/{}", cycle, CYCLES);
tracing::info!("๐ฑ Cycle {}: Triggering network lost event...", cycle);
let result = network_handle
.handle_network_lost()
.await
.expect("Failed to handle network lost");
tracing::info!(
"๐ Cycle {}: Lost result: success={}, duration={}ms",
cycle,
result.success,
result.duration_ms
);
assert!(
result.success,
"Network lost should succeed in cycle {}",
cycle
);
tokio::time::sleep(Duration::from_millis(300)).await;
tracing::info!("๐ฑ Cycle {}: Triggering network available event...", cycle);
let result = network_handle
.handle_network_available()
.await
.expect("Failed to handle network available");
tracing::info!(
"๐ Cycle {}: Available result: success={}, duration={}ms",
cycle,
result.success,
result.duration_ms
);
assert!(
result.success,
"Network available should succeed in cycle {}",
cycle
);
tokio::time::sleep(Duration::from_millis(2000)).await;
}
let final_count = server.get_ice_restart_count();
let delta = final_count - initial_count;
tracing::info!(
"๐ ICE restart offers: {} -> {} (delta: {})",
initial_count,
final_count,
delta
);
assert!(
delta >= 1,
"Should have at least 1 ICE restart offer, got {}",
delta
);
shutdown_token.cancel();
tracing::info!("โ
Network repeatedly changing test completed successfully");
}
#[tokio::test]
async fn test_manual_cleanup_connections() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
tracing::info!("๐งช Test: Manual cleanup_connections");
let server = TestSignalingServer::start().await.unwrap();
let id_peer_a = make_actor_id(800);
let (coordinator_a, signaling_client_a) =
create_peer_with_websocket(id_peer_a.clone(), &server.url())
.await
.unwrap();
let processor = Arc::new(DefaultNetworkEventProcessor::new(
signaling_client_a.clone(),
Some(coordinator_a.clone()),
));
assert!(signaling_client_a.is_connected());
tracing::info!("๐งน Calling cleanup_connections() directly...");
let result = processor.cleanup_connections().await;
tracing::info!("๐ Result: {:?}", result);
assert!(result.is_ok(), "cleanup_connections should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
let is_connected = signaling_client_a.is_connected();
tracing::info!("๐ Is connected after cleanup: {}", is_connected);
assert!(
is_connected,
"Client should be disconnected after cleanup_connections"
);
tracing::info!("๐ Testing that cleanup_connections is NOT debounced...");
signaling_client_a.connect().await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(signaling_client_a.is_connected(), "Should be reconnected");
let start = Instant::now();
let result1 = processor.cleanup_connections().await;
let result2 = processor.cleanup_connections().await;
let elapsed = start.elapsed();
tracing::info!("๐ First cleanup: {:?}", result1);
tracing::info!("๐ Second cleanup: {:?}", result2);
tracing::info!("๐ Elapsed time for both calls: {:?}", elapsed);
assert!(result1.is_ok(), "First cleanup should succeed");
assert!(result2.is_ok(), "Second cleanup should succeed");
assert!(
elapsed < Duration::from_millis(500),
"cleanup_connections should not be debounced"
);
tracing::info!("โ
Manual cleanup_connections test passed!");
}
#[tokio::test]
async fn test_cleanup_then_network_events() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::DEBUG)
.with_file(true)
.with_line_number(true)
.with_test_writer()
.try_init()
.ok();
tracing::info!("๐งช Test: Cleanup then network events (recovery after manual cleanup)");
let server = TestSignalingServer::start().await.unwrap();
let id_peer_a = make_actor_id(900);
let id_peer_b = make_actor_id(1000);
let (coordinator_a, signaling_client_a) =
create_peer_with_websocket(id_peer_a.clone(), &server.url())
.await
.unwrap();
let (_coordinator_b, _signaling_client_b) =
create_peer_with_websocket(id_peer_b.clone(), &server.url())
.await
.unwrap();
tracing::info!("๐ Establishing initial peer connection...");
let ready_rx = coordinator_a
.initiate_connection(&id_peer_b)
.await
.expect("initiate failed");
match tokio::time::timeout(Duration::from_secs(10), ready_rx).await {
Ok(Ok(_)) => {
tracing::info!("โ
Initial peer connection established!");
}
Ok(Err(_)) => panic!("Connection failed (channel closed)"),
Err(_) => panic!("Connection timed out"),
}
tokio::time::sleep(Duration::from_millis(500)).await;
let processor = Arc::new(DefaultNetworkEventProcessor::new(
signaling_client_a.clone(),
Some(coordinator_a.clone()),
));
tracing::info!("๐ฑ Simulating app going to background - calling cleanup_connections()...");
let result = processor.cleanup_connections().await;
assert!(result.is_ok(), "cleanup should succeed");
tokio::time::sleep(Duration::from_millis(100)).await;
assert!(
signaling_client_a.is_connected(),
"Should be disconnected after cleanup"
);
tracing::info!("๐ฑ Simulating app returning from background - triggering network available...");
server.reset_counters();
let result = processor.process_network_available().await;
tracing::info!("๐ Network available result: {:?}", result);
assert!(
result.is_ok(),
"Network available should succeed after cleanup"
);
tokio::time::sleep(Duration::from_millis(200)).await;
assert!(
signaling_client_a.is_connected(),
"Should be reconnected after network available"
);
tracing::info!("๐ Re-establishing connection after cleanup...");
let ready_rx = coordinator_a
.initiate_connection(&id_peer_b)
.await
.expect("re-initiate failed");
match tokio::time::timeout(Duration::from_secs(10), ready_rx).await {
Ok(Ok(_)) => {
tracing::info!("โ
Connection re-established successfully!");
}
Ok(Err(_)) => panic!("Re-connection failed (channel closed)"),
Err(_) => panic!("Re-connection timed out"),
}
tracing::info!("โ
Cleanup then network events test passed!");
}