use anyhow::Result;
use rustrtc::PeerConnection;
use rustrtc::RtcConfiguration;
use std::sync::Arc;
use std::time::Duration;
use tokio::time::timeout;
use webrtc::api::APIBuilder;
use webrtc::api::interceptor_registry::register_default_interceptors;
use webrtc::api::media_engine::MediaEngine;
use webrtc::interceptor::registry::Registry;
use webrtc::peer_connection::configuration::RTCConfiguration as WebrtcConfiguration;
use webrtc::peer_connection::sdp::session_description::RTCSessionDescription;
#[tokio::test]
async fn interop_datachannel_test() -> Result<()> {
rustls::crypto::CryptoProvider::install_default(rustls::crypto::ring::default_provider()).ok();
let _ = env_logger::builder().is_test(true).try_init();
let rust_config = RtcConfiguration::default();
let rust_pc = PeerConnection::new(rust_config);
let rust_dc = rust_pc.create_data_channel(
"test-channel",
Some(rustrtc::transports::sctp::DataChannelConfig {
negotiated: Some(0),
..Default::default()
}),
)?;
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
let webrtc_config = WebrtcConfiguration::default();
let webrtc_pc = api.new_peer_connection(webrtc_config).await?;
let mut dc_init = webrtc::data_channel::data_channel_init::RTCDataChannelInit::default();
dc_init.negotiated = Some(0);
let webrtc_dc = webrtc_pc
.create_data_channel("test-channel", Some(dc_init))
.await?;
let _ = rust_pc.create_offer().await?;
loop {
if rust_pc.ice_transport().gather_state()
== rustrtc::transports::ice::IceGathererState::Complete
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let offer = rust_pc.create_offer().await?; rust_pc.set_local_description(offer.clone())?;
let webrtc_desc = RTCSessionDescription::offer(offer.to_sdp_string())?;
webrtc_pc.set_remote_description(webrtc_desc).await?;
let answer = webrtc_pc.create_answer(None).await?;
let mut gather_complete = webrtc_pc.gathering_complete_promise().await;
webrtc_pc.set_local_description(answer.clone()).await?;
let _ = gather_complete.recv().await;
let answer = webrtc_pc.local_description().await.unwrap();
let rust_answer = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, &answer.sdp)?;
println!("Rust Answer SDP: {:?}", rust_answer);
rust_pc.set_remote_description(rust_answer).await?;
println!("Waiting for ICE Connected...");
rust_pc.wait_for_connected().await?;
println!("ICE Connected (PeerConnection Connected)");
println!("Waiting for DataChannel...");
let (open_tx, mut open_rx) = tokio::sync::mpsc::channel::<()>(1);
let open_tx = Arc::new(open_tx);
webrtc_dc.on_open(Box::new(move || {
let open_tx = open_tx.clone();
Box::pin(async move {
let _ = open_tx.send(()).await;
})
}));
let _ = timeout(Duration::from_secs(2), open_rx.recv())
.await
.map_err(|_| anyhow::anyhow!("Timeout waiting for DataChannel open"))?;
println!("WebRTC DataChannel opened");
println!("Sending data from RustRTC...");
let data = b"Hello WebRTC";
tokio::time::sleep(Duration::from_millis(500)).await;
rust_pc.send_data(0, data).await?;
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::channel::<String>(1);
let msg_tx = Arc::new(msg_tx);
webrtc_dc.on_message(Box::new(
move |msg: webrtc::data_channel::data_channel_message::DataChannelMessage| {
let tx = msg_tx.clone();
Box::pin(async move {
let s = String::from_utf8_lossy(&msg.data).to_string();
println!("WebRTC received message: {}", s);
let _ = tx.send(s).await;
})
},
));
let msg = timeout(Duration::from_secs(2), msg_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("WebRTC did not receive message"))?;
assert_eq!(msg, "Hello WebRTC");
println!("Sending data from WebRTC...");
webrtc_dc.send_text("Hello RustRTC").await?;
let mut received_msg = false;
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(2) {
if let Ok(Some(event)) = timeout(Duration::from_millis(100), rust_dc.recv()).await {
match event {
rustrtc::transports::sctp::DataChannelEvent::Message(data) => {
let s = String::from_utf8_lossy(&data).to_string();
println!("RustRTC received message: {}", s);
assert_eq!(s, "Hello RustRTC");
received_msg = true;
break;
}
_ => {}
}
}
}
assert!(received_msg, "RustRTC did not receive message");
rust_pc.close();
webrtc_pc.close().await?;
Ok(())
}
#[tokio::test]
async fn interop_datachannel_dcep_test() -> Result<()> {
rustls::crypto::CryptoProvider::install_default(rustls::crypto::ring::default_provider()).ok();
let _ = env_logger::builder().is_test(true).try_init();
let rust_config = RtcConfiguration::default();
let rust_pc = PeerConnection::new(rust_config);
let rust_dc = rust_pc.create_data_channel("dcep-channel", None)?;
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
let webrtc_config = WebrtcConfiguration::default();
let webrtc_pc = api.new_peer_connection(webrtc_config).await?;
let (dc_tx, mut dc_rx) =
tokio::sync::mpsc::channel::<Arc<webrtc::data_channel::RTCDataChannel>>(1);
let dc_tx = Arc::new(dc_tx);
webrtc_pc.on_data_channel(Box::new(
move |dc: Arc<webrtc::data_channel::RTCDataChannel>| {
let dc_tx = dc_tx.clone();
Box::pin(async move {
let _ = dc_tx.send(dc).await;
})
},
));
let _ = rust_pc.create_offer().await?;
loop {
if rust_pc.ice_transport().gather_state()
== rustrtc::transports::ice::IceGathererState::Complete
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let offer = rust_pc.create_offer().await?;
rust_pc.set_local_description(offer.clone())?;
let webrtc_desc = RTCSessionDescription::offer(offer.to_sdp_string())?;
webrtc_pc.set_remote_description(webrtc_desc).await?;
let answer = webrtc_pc.create_answer(None).await?;
let mut gather_complete = webrtc_pc.gathering_complete_promise().await;
webrtc_pc.set_local_description(answer.clone()).await?;
let _ = gather_complete.recv().await;
let answer = webrtc_pc.local_description().await.unwrap();
let rust_answer = rustrtc::SessionDescription::parse(rustrtc::SdpType::Answer, &answer.sdp)?;
rust_pc.set_remote_description(rust_answer).await?;
rust_pc.wait_for_connected().await?;
let webrtc_dc = timeout(Duration::from_secs(5), dc_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("WebRTC did not receive DataChannel"))?;
assert_eq!(webrtc_dc.label(), "dcep-channel");
let (open_tx, mut open_rx) = tokio::sync::mpsc::channel::<()>(1);
let open_tx = Arc::new(open_tx);
webrtc_dc.on_open(Box::new(move || {
let open_tx = open_tx.clone();
Box::pin(async move {
let _ = open_tx.send(()).await;
})
}));
let _ = timeout(Duration::from_secs(2), open_rx.recv()).await;
let data = b"Hello DCEP";
rust_pc.send_data(rust_dc.id, data).await?;
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::channel::<String>(1);
let msg_tx = Arc::new(msg_tx);
webrtc_dc.on_message(Box::new(
move |msg: webrtc::data_channel::data_channel_message::DataChannelMessage| {
let tx = msg_tx.clone();
Box::pin(async move {
let s = String::from_utf8_lossy(&msg.data).to_string();
let _ = tx.send(s).await;
})
},
));
let msg = timeout(Duration::from_secs(2), msg_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("WebRTC did not receive message"))?;
assert_eq!(msg, "Hello DCEP");
rust_pc.close();
webrtc_pc.close().await?;
Ok(())
}
#[tokio::test]
async fn interop_datachannel_incoming_test() -> Result<()> {
rustls::crypto::CryptoProvider::install_default(rustls::crypto::ring::default_provider()).ok();
let _ = env_logger::builder().is_test(true).try_init();
let rust_config = RtcConfiguration::default();
let rust_pc = PeerConnection::new(rust_config);
let mut m = MediaEngine::default();
m.register_default_codecs()?;
let mut registry = Registry::new();
registry = register_default_interceptors(registry, &mut m)?;
let api = APIBuilder::new()
.with_media_engine(m)
.with_interceptor_registry(registry)
.build();
let webrtc_config = WebrtcConfiguration::default();
let webrtc_pc = api.new_peer_connection(webrtc_config).await?;
let webrtc_dc = webrtc_pc
.create_data_channel("incoming-channel", None)
.await?;
let offer = webrtc_pc.create_offer(None).await?;
let mut gather_complete = webrtc_pc.gathering_complete_promise().await;
webrtc_pc.set_local_description(offer.clone()).await?;
let _ = gather_complete.recv().await;
let offer = webrtc_pc.local_description().await.unwrap();
let rust_offer = rustrtc::SessionDescription::parse(rustrtc::SdpType::Offer, &offer.sdp)?;
rust_pc.set_remote_description(rust_offer).await?;
let _answer = rust_pc.create_answer().await?;
loop {
if rust_pc.ice_transport().gather_state()
== rustrtc::transports::ice::IceGathererState::Complete
{
break;
}
tokio::time::sleep(Duration::from_millis(100)).await;
}
let answer = rust_pc.create_answer().await?; rust_pc.set_local_description(answer.clone())?;
let webrtc_answer = RTCSessionDescription::answer(answer.to_sdp_string())?;
webrtc_pc.set_remote_description(webrtc_answer).await?;
rust_pc.wait_for_connected().await?;
println!("Waiting for DataChannel on RustRTC...");
loop {
let event = timeout(Duration::from_secs(5), rust_pc.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("No event received"))?;
match event {
rustrtc::PeerConnectionEvent::DataChannel(dc) => {
println!("Received DataChannel: {}", dc.label);
assert_eq!(dc.label, "incoming-channel");
rust_pc.send_data(dc.id, b"Hello from Rust").await?;
break;
}
rustrtc::PeerConnectionEvent::Track(_) => {
println!("Received Track event, waiting for DataChannel...");
}
}
}
let (msg_tx, mut msg_rx) = tokio::sync::mpsc::channel::<String>(1);
let msg_tx = Arc::new(msg_tx);
webrtc_dc.on_message(Box::new(
move |msg: webrtc::data_channel::data_channel_message::DataChannelMessage| {
let tx = msg_tx.clone();
Box::pin(async move {
let s = String::from_utf8_lossy(&msg.data).to_string();
let _ = tx.send(s).await;
})
},
));
let msg = timeout(Duration::from_secs(2), msg_rx.recv())
.await?
.ok_or_else(|| anyhow::anyhow!("WebRTC did not receive message"))?;
assert_eq!(msg, "Hello from Rust");
rust_pc.close();
webrtc_pc.close().await?;
Ok(())
}