use super::e2e_test_server::E2eTestServer;
use super::test_helpers;
use super::test_ua::TestUaEvent;
use crate::call::domain::{CallCommand, LegId};
use crate::config::MediaProxyMode;
use anyhow::{Result, anyhow};
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio_tungstenite::accept_async;
use tracing::info;
struct WsEchoServer {
pub addr: std::net::SocketAddr,
pub connections: Arc<Mutex<u32>>,
stop: tokio_util::sync::CancellationToken,
}
impl WsEchoServer {
async fn start() -> Self {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let connections = Arc::new(Mutex::new(0u32));
let conn_count = connections.clone();
let stop = tokio_util::sync::CancellationToken::new();
let stop_token = stop.clone();
crate::utils::spawn(async move {
loop {
tokio::select! {
_ = stop_token.cancelled() => break,
result = listener.accept() => {
if let Ok((stream, _)) = result {
*conn_count.lock().await += 1;
let ws = accept_async(stream).await.unwrap();
let (mut ws_write, mut ws_read) = ws.split();
crate::utils::spawn(async move {
while let Some(msg) = ws_read.next().await {
match msg {
Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
if ws_write.send(tokio_tungstenite::tungstenite::Message::Binary(data)).await.is_err() {
break;
}
}
Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => break,
Err(_) => break,
_ => {}
}
}
});
}
}
}
}
});
Self {
addr,
connections,
stop,
}
}
async fn connection_count(&self) -> u32 {
*self.connections.lock().await
}
}
impl Drop for WsEchoServer {
fn drop(&mut self) {
self.stop.cancel();
}
}
use test_helpers::pcmu_sdp;
#[tokio::test]
async fn test_voip_bridge_e2e_transfer_command() -> Result<()> {
let _ = tracing_subscriber::fmt()
.with_file(true)
.with_line_number(true)
.try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let registry = server.registry.clone();
let alice = Arc::new(server.create_ua("alice").await?);
let bob = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let alice_sdp = pcmu_sdp("127.0.0.1", 23000);
let bob_sdp = pcmu_sdp("127.0.0.1", 23010);
let caller_handle = {
let alice = alice.clone();
crate::utils::spawn(async move { alice.make_call("bob", Some(alice_sdp)).await })
};
let (bob_dialog_id, _offer_sdp) = {
let mut bob_dialog = None;
let mut offer = None;
for _ in 0..50 {
let events = bob.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, sdp) = event {
bob_dialog = Some(id.clone());
offer = sdp.clone();
bob.answer_call(&id, Some(bob_sdp.clone())).await?;
info!(%id, "Bob answered");
break;
}
}
if bob_dialog.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
(
bob_dialog.ok_or_else(|| anyhow!("Bob never got INVITE"))?,
offer,
)
};
let _ = bob_dialog_id;
let _alice_dialog_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
.await
.map_err(|_| anyhow!("Call setup timeout"))?
.map_err(|e| anyhow!("Caller task error: {}", e))?
.map_err(|e| anyhow!("Call failed: {}", e))?;
for _ in 0..20 {
let _ = alice.process_dialog_events().await;
let _ = bob.process_dialog_events().await;
sleep(Duration::from_millis(50)).await;
}
let session_id = {
let mut sid = None;
for _ in 0..80 {
let sessions = registry.list_recent(10);
for entry in &sessions {
let _ = alice.process_dialog_events().await;
sid = Some(entry.session_id.clone());
break;
}
if sid.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
sid.ok_or_else(|| anyhow!("No active session found"))?
};
info!(%session_id, "Call established");
sleep(Duration::from_millis(500)).await;
let ws_server = WsEchoServer::start().await;
let ws_url = format!("ws://127.0.0.1:{}", ws_server.addr.port());
let handle = registry
.get_handle(&session_id)
.ok_or_else(|| anyhow!("No handle for session"))?;
let caller_leg = LegId::new("caller");
let target = format!("voip_bridge:{ws_url}?samplerate=8000&codec=pcm&_hdr_X-Test=e2e");
info!(%target, "Sending VoipBridge transfer command");
handle
.send_command(CallCommand::Transfer {
target: target.clone(),
attended: false,
leg_id: caller_leg.clone(),
})
.map_err(|e| anyhow!("Failed to send Transfer command: {}", e))?;
sleep(Duration::from_millis(1500)).await;
let conn_count = ws_server.connection_count().await;
assert!(
conn_count >= 1,
"Expected at least 1 WS connection from VoipBridge, got {conn_count}"
);
info!("VoipBridge WS connection confirmed ({conn_count})");
let (test_ws, _) = tokio_tungstenite::connect_async(&ws_url)
.await
.expect("Test client should connect to echo server");
let (mut test_write, mut test_read) = test_ws.split();
let pcm: Vec<i16> = (0..80).map(|i| (i as i16) << 8).collect();
let mut pcm_bytes = Vec::with_capacity(pcm.len() * 2);
for s in &pcm {
pcm_bytes.extend_from_slice(&s.to_ne_bytes());
}
test_write
.send(tokio_tungstenite::tungstenite::Message::Binary(
pcm_bytes.clone().into(),
))
.await
.expect("Test client send PCM");
let echoed = tokio::time::timeout(Duration::from_secs(3), test_read.next())
.await
.expect("Timeout waiting for PCM echo")
.expect("WS stream ended")
.expect("WS error");
let got_bytes = match echoed {
tokio_tungstenite::tungstenite::Message::Binary(data) => data,
other => panic!("Expected Binary, got {other:?}"),
};
assert_eq!(
got_bytes.len(),
pcm_bytes.len(),
"Echo should preserve PCM byte count"
);
let got_pcm: Vec<i16> = got_bytes
.chunks_exact(2)
.map(|c| i16::from_ne_bytes([c[0], c[1]]))
.collect();
assert_eq!(got_pcm, pcm, "Echoed PCM should match original");
info!("VoipBridge E2E test succeeded: WS connected + PCM echo verified");
test_write.close().await.ok();
sleep(Duration::from_millis(200)).await;
Ok(())
}