rustpbx 0.4.4

A SIP PBX implementation in Rust
Documentation
//! E2E test for VoipBridge: real SIP call + WS PCM16 bridge.
//!
//! 1. Starts a real PBX server with two registered users (alice, bob)
//! 2. Establishes a media call between them
//! 3. Starts a WebSocket echo server for raw PCM16
//! 4. Sends `CallCommand::Transfer` with a `voip_bridge:` target
//! 5. Verifies the WS server receives a connection
//! 6. Exchanges raw PCM16 data and verifies round-trip

use super::e2e_test_server::E2eTestServer;
use super::test_helpers;
use super::test_ua::TestUaEvent;
use crate::call::domain::{CallCommand, LegId};
use crate::config::MediaProxyMode;
use anyhow::{Result, anyhow};
use futures::{SinkExt, StreamExt};
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpListener;
use tokio::sync::Mutex;
use tokio::time::sleep;
use tokio_tungstenite::accept_async;
use tracing::info;

/// Spawn a WS echo server that records the number of connections and allows
/// sending/receiving PCM data via the returned control channels.
struct WsEchoServer {
    pub addr: std::net::SocketAddr,
    pub connections: Arc<Mutex<u32>>,
    stop: tokio_util::sync::CancellationToken,
}

impl WsEchoServer {
    async fn start() -> Self {
        let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
        let addr = listener.local_addr().unwrap();
        let connections = Arc::new(Mutex::new(0u32));
        let conn_count = connections.clone();
        let stop = tokio_util::sync::CancellationToken::new();
        let stop_token = stop.clone();

        tokio::spawn(async move {
            loop {
                tokio::select! {
                    _ = stop_token.cancelled() => break,
                    result = listener.accept() => {
                        if let Ok((stream, _)) = result {
                            *conn_count.lock().await += 1;
                            let ws = accept_async(stream).await.unwrap();
                            let (mut ws_write, mut ws_read) = ws.split();
                            tokio::spawn(async move {
                                while let Some(msg) = ws_read.next().await {
                                    match msg {
                                        Ok(tokio_tungstenite::tungstenite::Message::Binary(data)) => {
                                            if ws_write.send(tokio_tungstenite::tungstenite::Message::Binary(data)).await.is_err() {
                                                break;
                                            }
                                        }
                                        Ok(tokio_tungstenite::tungstenite::Message::Close(_)) => break,
                                        Err(_) => break,
                                        _ => {}
                                    }
                                }
                            });
                        }
                    }
                }
            }
        });

        Self {
            addr,
            connections,
            stop,
        }
    }

    async fn connection_count(&self) -> u32 {
        *self.connections.lock().await
    }
}

impl Drop for WsEchoServer {
    fn drop(&mut self) {
        self.stop.cancel();
    }
}

use test_helpers::pcmu_sdp;

#[tokio::test]
async fn test_voip_bridge_e2e_transfer_command() -> Result<()> {
    let _ = tracing_subscriber::fmt()
        .with_file(true)
        .with_line_number(true)
        .try_init();

    // ── 1. Start PBX server ───────────────────────────────────────────
    let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
    let registry = server.registry.clone();

    // ── 2. Create and register test UAs ───────────────────────────────
    let alice = Arc::new(server.create_ua("alice").await?);
    let bob = server.create_ua("bob").await?;
    sleep(Duration::from_millis(100)).await;

    // ── 3. Establish a call (alice → bob) ─────────────────────────────
    let alice_sdp = pcmu_sdp("127.0.0.1", 23000);
    let bob_sdp = pcmu_sdp("127.0.0.1", 23010);

    let caller_handle = {
        let alice = alice.clone();
        tokio::spawn(async move { alice.make_call("bob", Some(alice_sdp)).await })
    };

    let (bob_dialog_id, _offer_sdp) = {
        let mut bob_dialog = None;
        let mut offer = None;
        for _ in 0..50 {
            let events = bob.process_dialog_events().await?;
            for event in events {
                if let TestUaEvent::IncomingCall(id, sdp) = event {
                    bob_dialog = Some(id.clone());
                    offer = sdp.clone();
                    bob.answer_call(&id, Some(bob_sdp.clone())).await?;
                    info!(%id, "Bob answered");
                    break;
                }
            }
            if bob_dialog.is_some() {
                break;
            }
            sleep(Duration::from_millis(100)).await;
        }
        (
            bob_dialog.ok_or_else(|| anyhow!("Bob never got INVITE"))?,
            offer,
        )
    };
    let _ = bob_dialog_id;

    let _alice_dialog_id = tokio::time::timeout(Duration::from_secs(5), caller_handle)
        .await
        .map_err(|_| anyhow!("Call setup timeout"))?
        .map_err(|e| anyhow!("Caller task error: {}", e))?
        .map_err(|e| anyhow!("Call failed: {}", e))?;

    // Pump dialog events for both sides to let media negotiation complete
    for _ in 0..20 {
        let _ = alice.process_dialog_events().await;
        let _ = bob.process_dialog_events().await;
        sleep(Duration::from_millis(50)).await;
    }

    // Wait for the session to appear in the registry with Connected state
    let session_id = {
        let mut sid = None;
        for _ in 0..80 {
            let sessions = registry.list_recent(10);
            for entry in &sessions {
                // Keep dialog events flowing
                let _ = alice.process_dialog_events().await;
                // Session is active — use the first available one
                sid = Some(entry.session_id.clone());
                break;
            }
            if sid.is_some() {
                break;
            }
            sleep(Duration::from_millis(100)).await;
        }
        sid.ok_or_else(|| anyhow!("No active session found"))?
    };
    info!(%session_id, "Call established");

    // Wait a bit more to ensure the call is fully connected
    sleep(Duration::from_millis(500)).await;

    // ── 4. Start WS echo server ───────────────────────────────────────
    let ws_server = WsEchoServer::start().await;
    let ws_url = format!("ws://127.0.0.1:{}", ws_server.addr.port());

    // ── 5. Send VoipBridge transfer command ───────────────────────────
    let handle = registry
        .get_handle(&session_id)
        .ok_or_else(|| anyhow!("No handle for session"))?;

    let caller_leg = LegId::new("caller");
    let target = format!("voip_bridge:{ws_url}?samplerate=8000&codec=pcm&_hdr_X-Test=e2e");
    info!(%target, "Sending VoipBridge transfer command");

    handle
        .send_command(CallCommand::Transfer {
            target: target.clone(),
            attended: false,
            leg_id: caller_leg.clone(),
        })
        .map_err(|e| anyhow!("Failed to send Transfer command: {}", e))?;

    // ── 6. Verify WS connection is established ────────────────────────
    sleep(Duration::from_millis(1500)).await;
    let conn_count = ws_server.connection_count().await;
    assert!(
        conn_count >= 1,
        "Expected at least 1 WS connection from VoipBridge, got {conn_count}"
    );
    info!("VoipBridge WS connection confirmed ({conn_count})");

    // ── 7. Exchange PCM16 data via the WS echo server ─────────────────
    // Connect directly to the echo server and verify the bridge is alive
    let (test_ws, _) = tokio_tungstenite::connect_async(&ws_url)
        .await
        .expect("Test client should connect to echo server");
    let (mut test_write, mut test_read) = test_ws.split();

    // Send a PCM16 frame (80 samples = 10ms at 8kHz)
    let pcm: Vec<i16> = (0..80).map(|i| (i as i16) << 8).collect();
    let mut pcm_bytes = Vec::with_capacity(pcm.len() * 2);
    for s in &pcm {
        pcm_bytes.extend_from_slice(&s.to_ne_bytes());
    }

    test_write
        .send(tokio_tungstenite::tungstenite::Message::Binary(
            pcm_bytes.clone().into(),
        ))
        .await
        .expect("Test client send PCM");

    let echoed = tokio::time::timeout(Duration::from_secs(3), test_read.next())
        .await
        .expect("Timeout waiting for PCM echo")
        .expect("WS stream ended")
        .expect("WS error");

    let got_bytes = match echoed {
        tokio_tungstenite::tungstenite::Message::Binary(data) => data,
        other => panic!("Expected Binary, got {other:?}"),
    };

    assert_eq!(
        got_bytes.len(),
        pcm_bytes.len(),
        "Echo should preserve PCM byte count"
    );
    let got_pcm: Vec<i16> = got_bytes
        .chunks_exact(2)
        .map(|c| i16::from_ne_bytes([c[0], c[1]]))
        .collect();
    assert_eq!(got_pcm, pcm, "Echoed PCM should match original");

    info!("VoipBridge E2E test succeeded: WS connected + PCM echo verified");

    // ── 8. Clean up: close test WS, let the bridge disconnect gracefully
    test_write.close().await.ok();
    sleep(Duration::from_millis(200)).await;

    Ok(())
}