use super::signaling::TestSignalingServer;
use super::utils::{
create_peer_with_vnet, create_peer_with_websocket, make_actor_id, spawn_echo_responder,
spawn_response_receiver,
};
use super::vnet::VNetPair;
use crate::lifecycle::DefaultNetworkEventProcessor;
use crate::outbound::PeerGate;
use crate::transport::{DefaultWireBuilder, DefaultWireBuilderConfig, PeerTransport};
use crate::wire::webrtc::{SignalingClient, WebRtcCoordinator};
use actr_protocol::{ActrId, RpcEnvelope};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
pub struct TestPeer {
pub id: ActrId,
pub coordinator: Arc<WebRtcCoordinator>,
pub signaling_client: Arc<dyn SignalingClient>,
pub gate: Arc<PeerGate>,
pub transport_manager: Arc<PeerTransport>,
}
impl TestPeer {
pub fn subscribe_events(
&self,
) -> tokio::sync::broadcast::Receiver<crate::transport::ConnectionEvent> {
self.coordinator.subscribe_events()
}
pub async fn pending_count(&self) -> usize {
self.gate.pending_count().await
}
pub async fn restart_ice(&self, target_serial: u64) -> anyhow::Result<()> {
let target_id = make_actor_id(target_serial);
self.coordinator
.restart_ice(&target_id)
.await
.map_err(|e| anyhow::anyhow!("ICE restart failed: {}", e))
}
pub async fn retry_failed(&self) {
self.coordinator.retry_failed_connections().await;
}
pub fn network_processor(&self) -> Arc<DefaultNetworkEventProcessor> {
Arc::new(DefaultNetworkEventProcessor::new(
self.signaling_client.clone(),
Some(self.coordinator.clone()),
))
}
pub fn spawn_request(
&self,
target_serial: u64,
request_id: &str,
timeout_ms: u32,
) -> tokio::task::JoinHandle<actr_protocol::ActorResult<actr_framework::Bytes>> {
let gate = self.gate.clone();
let target_id = make_actor_id(target_serial);
let envelope = RpcEnvelope {
request_id: request_id.to_string(),
route_key: "test.method".to_string(),
payload: Some(bytes::Bytes::from("test_payload")),
timeout_ms: timeout_ms as i64,
..Default::default()
};
tokio::spawn(async move { gate.send_request(&target_id, envelope).await })
}
pub fn send_event(&self, event: crate::transport::ConnectionEvent) {
let _ = self.coordinator.event_sender().send(event);
}
pub fn start_echo_responder(&self, name: &str) -> tokio::task::JoinHandle<()> {
spawn_echo_responder(self.coordinator.clone(), self.gate.clone(), name)
}
pub fn start_response_receiver(&self, name: &str) -> tokio::task::JoinHandle<()> {
spawn_response_receiver(self.coordinator.clone(), self.gate.clone(), name)
}
}
pub struct TestHarness {
pub server: TestSignalingServer,
pub vnet: Option<VNetPair>,
peers: HashMap<u64, TestPeer>,
_bg_tasks: Vec<tokio::task::JoinHandle<()>>,
}
impl TestHarness {
pub async fn new() -> Self {
let server = TestSignalingServer::start()
.await
.expect("Failed to start signaling server");
Self {
server,
vnet: None,
peers: HashMap::new(),
_bg_tasks: Vec::new(),
}
}
pub async fn with_vnet() -> Self {
let server = TestSignalingServer::start()
.await
.expect("Failed to start signaling server");
let vnet = VNetPair::new().await.expect("Failed to create VNet pair");
Self {
server,
vnet: Some(vnet),
peers: HashMap::new(),
_bg_tasks: Vec::new(),
}
}
pub async fn add_peer(&mut self, serial: u64) {
assert!(
!self.peers.contains_key(&serial),
"Peer with serial {} already exists",
serial
);
let id = make_actor_id(serial);
let server_url = self.server.url();
let (coordinator, signaling_client) = if let Some(ref vnet) = self.vnet {
let net = if self.peers.is_empty() {
vnet.net_offerer.clone()
} else {
vnet.net_answerer.clone()
};
create_peer_with_vnet(id.clone(), &server_url, net)
.await
.expect("Failed to create peer with vnet")
} else {
create_peer_with_websocket(id.clone(), &server_url)
.await
.expect("Failed to create peer")
};
let wire_config = DefaultWireBuilderConfig::default();
let wire_builder = Arc::new(DefaultWireBuilder::new(
Some(coordinator.clone()),
wire_config,
));
let transport_manager = Arc::new(PeerTransport::new(id.clone(), wire_builder));
let gate = Arc::new(PeerGate::new(
transport_manager.clone(),
Some(coordinator.clone()),
));
self.peers.insert(
serial,
TestPeer {
id,
coordinator,
signaling_client,
gate,
transport_manager,
},
);
tracing::info!("✅ Added test peer with serial {}", serial);
}
pub fn peer(&self, serial: u64) -> &TestPeer {
self.peers
.get(&serial)
.unwrap_or_else(|| panic!("Peer with serial {} not found", serial))
}
pub fn peer_mut(&mut self, serial: u64) -> &mut TestPeer {
self.peers
.get_mut(&serial)
.unwrap_or_else(|| panic!("Peer with serial {} not found", serial))
}
pub fn peer_serials(&self) -> Vec<u64> {
self.peers.keys().copied().collect()
}
pub fn peer_count(&self) -> usize {
self.peers.len()
}
pub async fn connect(&mut self, from_serial: u64, to_serial: u64) {
self.connect_with_timeout(from_serial, to_serial, Duration::from_secs(15))
.await;
}
pub async fn connect_with_timeout(
&mut self,
from_serial: u64,
to_serial: u64,
timeout: Duration,
) {
let (from_coord, from_gate, to_coord, to_gate, target_id) = {
let from_peer = self.peer(from_serial);
let to_peer = self.peer(to_serial);
(
from_peer.coordinator.clone(),
from_peer.gate.clone(),
to_peer.coordinator.clone(),
to_peer.gate.clone(),
to_peer.id.clone(),
)
};
tracing::info!(
"🔗 Connecting peer {} → peer {} (via gate message)...",
from_serial,
to_serial
);
let echo_handle = spawn_echo_responder(to_coord, to_gate, &format!("echo_{}", to_serial));
self._bg_tasks.push(echo_handle);
let recv_handle = spawn_response_receiver(
from_coord,
from_gate.clone(),
&format!("recv_{}", from_serial),
);
self._bg_tasks.push(recv_handle);
let request_id = format!("connect_test_{}_{}", from_serial, to_serial);
let envelope = RpcEnvelope {
request_id: request_id.clone(),
route_key: "test.ping".to_string(),
payload: Some(bytes::Bytes::from("ping")),
timeout_ms: timeout.as_millis() as i64,
..Default::default()
};
match tokio::time::timeout(timeout, from_gate.send_request(&target_id, envelope)).await {
Ok(Ok(response)) => {
tracing::info!(
"✅ Connection established and verified: {} → {} (response: {} bytes)",
from_serial,
to_serial,
response.len()
);
}
Ok(Err(e)) => panic!("Connection {} → {} failed: {}", from_serial, to_serial, e),
Err(_) => panic!(
"Connection {} → {} timed out after {:?}",
from_serial, to_serial, timeout
),
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
pub fn simulate_disconnect(&self) {
let vnet = self
.vnet
.as_ref()
.expect("simulate_disconnect requires VNet (use TestHarness::with_vnet())");
tracing::warn!("🔴 Simulating full network disconnection (VNet + signaling)");
vnet.block_network();
self.server.pause_forwarding();
}
pub fn simulate_reconnect(&self) {
let vnet = self
.vnet
.as_ref()
.expect("simulate_reconnect requires VNet (use TestHarness::with_vnet())");
tracing::info!("🟢 Simulating network recovery (VNet + signaling)");
self.server.resume_forwarding();
vnet.unblock_network();
}
pub fn is_disconnected(&self) -> bool {
self.vnet.as_ref().is_some_and(|v| v.is_blocked())
}
pub async fn wait_for_ice_restart_count(&self, min_count: u32, timeout: Duration) -> u32 {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let count = self.server.get_ice_restart_count();
if count >= min_count {
return count;
}
if tokio::time::Instant::now() >= deadline {
panic!(
"Timed out waiting for ICE restart count >= {} (current: {})",
min_count, count
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub async fn wait_for_ice_restart_request_count(
&self,
min_count: u32,
timeout: Duration,
) -> u32 {
let deadline = tokio::time::Instant::now() + timeout;
loop {
let count = self.server.get_ice_restart_request_count();
if count >= min_count {
return count;
}
if tokio::time::Instant::now() >= deadline {
panic!(
"Timed out waiting for ICE restart request count >= {} (current: {})",
min_count, count
);
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
pub fn reset_counters(&self) {
self.server.reset_counters();
}
pub fn ice_restart_count(&self) -> u32 {
self.server.get_ice_restart_count()
}
}