use super::e2e_test_server::E2eTestServer;
use super::rtp_utils::{RtpPacket, RtpReceiver, RtpSender, extract_media_endpoint};
use super::test_helpers;
use super::test_ua::TestUaEvent;
use crate::config::MediaProxyMode;
use anyhow::Result;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::sleep;
use tracing::info;
use test_helpers::pcmu_sdp;
fn pcma_sdp_fixed(ip: &str, port: u16) -> String {
format!(
"v=0\r\n\
o=- 1778630403 1778630404 IN IP4 {ip}\r\n\
s=-\r\n\
c=IN IP4 {ip}\r\n\
t=0 0\r\n\
m=audio {port} RTP/AVP 8 126\r\n\
a=rtpmap:8 PCMA/8000\r\n\
a=rtpmap:126 telephone-event/8000\r\n\
a=sendrecv\r\n"
)
}
#[tokio::test]
async fn test_early_media_183_then_same_sdp_200ok_rtp_flow() -> Result<()> {
let _ = tracing_subscriber::fmt::try_init();
let server = Arc::new(E2eTestServer::start_with_mode(MediaProxyMode::All).await?);
let caller_receiver = RtpReceiver::bind(0).await?;
let caller_sender = RtpSender::bind().await?;
let caller_port = caller_receiver.port()?;
let callee_sender = RtpSender::bind().await?;
let callee_receiver = RtpReceiver::bind(0).await?;
let callee_port = callee_receiver.port()?;
let caller_ua = Arc::new(server.create_ua("alice").await?);
let callee_ua = server.create_ua("bob").await?;
sleep(Duration::from_millis(100)).await;
let bob_early_sdp = pcma_sdp_fixed("127.0.0.1", callee_port);
let bob_200ok_sdp = bob_early_sdp.clone();
let alice_sdp = pcmu_sdp("127.0.0.1", caller_port);
let caller_ua_clone = caller_ua.clone();
let caller_handle =
crate::utils::spawn(async move { caller_ua_clone.make_call("bob", Some(alice_sdp)).await });
let mut bob_dialog_id = None;
let mut bob_received_offer: Option<String> = None;
for _ in 0..50 {
let events = callee_ua.process_dialog_events().await?;
for event in events {
if let TestUaEvent::IncomingCall(id, offer) = event {
bob_dialog_id = Some(id.clone());
bob_received_offer = offer;
info!(dialog_id = %id, "Bob: sending 183 + early-media SDP");
callee_ua
.send_ringing(&id, Some(bob_early_sdp.clone()))
.await?;
break;
}
}
if bob_dialog_id.is_some() {
break;
}
sleep(Duration::from_millis(100)).await;
}
let bob_id =
bob_dialog_id.ok_or_else(|| anyhow::anyhow!("Bob never received INVITE from proxy"))?;
sleep(Duration::from_millis(400)).await;
info!(dialog_id = %bob_id, "Bob: sending 200 OK with SAME SDP (bug trigger)");
callee_ua.answer_call(&bob_id, Some(bob_200ok_sdp)).await?;
let caller_id = tokio::time::timeout(Duration::from_secs(8), caller_handle)
.await
.map_err(|_| anyhow::anyhow!("Alice's INVITE timed out waiting for 200 OK"))?
.map_err(|e| anyhow::anyhow!("Join error: {}", e))?
.map_err(|e| anyhow::anyhow!("Call setup failed: {}", e))?;
info!(dialog_id = %caller_id, "Call established");
let caller_answer = caller_ua
.get_negotiated_answer_sdp(&caller_id)
.await
.ok_or_else(|| anyhow::anyhow!("Alice has no negotiated answer SDP"))?;
let callee_offer = bob_received_offer
.ok_or_else(|| anyhow::anyhow!("Bob never received an offer SDP in the INVITE"))?;
let caller_target = extract_media_endpoint(&caller_answer)
.ok_or_else(|| anyhow::anyhow!("Cannot parse proxy A-leg endpoint from caller_answer"))?;
let callee_target = extract_media_endpoint(&callee_offer).ok_or_else(|| {
anyhow::anyhow!("Cannot parse proxy B-leg endpoint from callee_offer (P1)")
})?;
info!(
caller_target = %caller_target,
callee_target = %callee_target,
"Proxy media endpoints"
);
caller_receiver.start_receiving();
callee_receiver.start_receiving();
let callee_packets = RtpPacket::create_sequence(
100, 1000, 60000, 0xB2B2_B2B2u32, 8, 160, 160, );
callee_sender.start_sending(callee_target, callee_packets, 20);
let caller_packets = RtpPacket::create_sequence(
100,
2000,
50000,
0xA1A1_A1A1u32,
0, 160,
160,
);
caller_sender.start_sending(caller_target, caller_packets, 20);
sleep(Duration::from_millis(2500)).await;
callee_sender.stop();
caller_sender.stop();
sleep(Duration::from_millis(300)).await;
let caller_stats = caller_receiver.get_stats().await;
let callee_stats = callee_receiver.get_stats().await;
info!(
caller_received = caller_stats.packets_received,
callee_received = callee_stats.packets_received,
"RTP stats after 183→200OK-same-SDP call"
);
caller_ua.hangup(&caller_id).await.ok();
caller_receiver.stop();
callee_receiver.stop();
server.stop();
assert!(
caller_stats.packets_received > 0,
"BUG REPRODUCED: Alice received 0 RTP packets from Bob (rtp_to_webrtc=0). \
Root cause: after 183+SDP followed by 200-OK with identical SDP, \
rtp_pc.create_offer() allocates a new port, but Bob still sends RTP to \
the original INVITE port (P1). SSRC latching never triggers on the new port, \
so the B→A forwarder never starts."
);
assert!(
callee_stats.packets_received > 0,
"Alice→Bob direction also broken (unexpected). Got 0 packets."
);
info!("test_early_media_183_then_same_sdp_200ok_rtp_flow PASSED");
Ok(())
}