#[cfg(feature = "__lk-e2e-test")]
use {
anyhow::{anyhow, Ok, Result},
common::{test_rooms, test_rooms_with_options, TestRoomOptions},
futures_util::StreamExt,
livekit::{prelude::*, SimulateScenario},
livekit_api::access_token::VideoGrants,
std::time::{Duration, Instant},
test_case::test_case,
tokio::{
time::{self, timeout},
try_join,
},
};
mod common;
#[cfg(feature = "__lk-e2e-test")]
#[test_case(120., 8_192 ; "high_fps_single_packet")]
#[test_case(10., 196_608 ; "low_fps_multi_packet")]
#[test_log::test(tokio::test)]
async fn test_data_track(publish_fps: f64, payload_len: usize) {
const PUBLISH_DURATION: Duration = Duration::from_secs(10);
const MIN_PERCENTAGE: f32 = 0.9;
let mut rooms = test_rooms(2).await.unwrap();
let (pub_room, _) = rooms.pop().unwrap();
let (_, mut sub_room_event_rx) = rooms.pop().unwrap();
let pub_identity = pub_room.local_participant().identity();
let frame_count = (PUBLISH_DURATION.as_secs_f64() * publish_fps).round() as u64;
log::info!("Publishing {} frames", frame_count);
let local_track = pub_room.local_participant().publish_data_track("my_track").await.unwrap();
log::info!("Track published");
let remote_track = wait_for_remote_track(&mut sub_room_event_rx).await.unwrap();
log::info!("Got remote track: {}", remote_track.info().sid());
let publish = async {
assert!(local_track.is_published());
assert!(!local_track.info().uses_e2ee());
assert_eq!(local_track.info().name(), "my_track");
let sleep_duration = Duration::from_secs_f64(1.0 / publish_fps as f64);
for index in 0..frame_count {
local_track.try_push(vec![index as u8; payload_len].into()).unwrap();
time::sleep(sleep_duration).await;
}
Ok(())
};
let mut recv_count = 0;
let recv_min = (frame_count as f32 * MIN_PERCENTAGE) as u64;
let subscribe = async {
assert!(remote_track.is_published());
assert!(!remote_track.info().uses_e2ee());
assert_eq!(remote_track.info().name(), "my_track");
assert_eq!(remote_track.publisher_identity(), pub_identity.as_str());
let mut subscription = remote_track.subscribe().await.unwrap();
while let Some(frame) = subscription.next().await {
let payload = frame.payload();
if let Some(first_byte) = payload.first() {
assert!(payload.iter().all(|byte| byte == first_byte));
}
assert_eq!(frame.user_timestamp(), None);
recv_count += 1;
if recv_count >= recv_min {
break;
}
}
assert!(remote_track.is_published());
Ok(())
};
let result = timeout(PUBLISH_DURATION + Duration::from_secs(25), async {
try_join!(publish, subscribe)
})
.await;
let recv_percent = recv_count as f32 / frame_count as f32;
log::info!("Received {}/{} frames ({:.2}%)", recv_count, frame_count, recv_percent * 100.);
if result.is_err() {
panic!("Not enough frames received before timeout");
}
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_publish_many_tracks() -> Result<()> {
const TRACK_COUNT: usize = 256;
let (room, _) = test_rooms(1).await?.pop().unwrap();
let publish_tracks = async {
let mut tracks = Vec::with_capacity(TRACK_COUNT);
let start = Instant::now();
for idx in 0..TRACK_COUNT {
let name = format!("track_{}", idx);
let track = room.local_participant().publish_data_track(name.clone()).await?;
assert!(track.is_published());
assert_eq!(track.info().name(), name);
tracks.push(track);
}
let elapsed = start.elapsed();
log::info!(
"Publishing {} tracks took {:.2?} (average {:.2?} per track)",
TRACK_COUNT,
elapsed,
elapsed / TRACK_COUNT as u32
);
Ok(tracks)
};
let tracks = timeout(Duration::from_secs(5), publish_tracks).await??;
for track in &tracks {
track.try_push(vec![0xFA; 196_608].into())?;
}
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_publish_unauthorized() -> Result<()> {
let (room, _) = test_rooms_with_options([TestRoomOptions {
grants: VideoGrants { room_join: true, can_publish_data: false, ..Default::default() },
..Default::default()
}])
.await?
.pop()
.unwrap();
let result = room.local_participant().publish_data_track("my_track").await;
assert!(matches!(result, Err(PublishError::NotAllowed)));
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_publish_duplicate_name() -> Result<()> {
let (room, _) = test_rooms(1).await?.pop().unwrap();
#[allow(unused)]
let first = room.local_participant().publish_data_track("first").await?;
let second_result = room.local_participant().publish_data_track("first").await;
assert!(matches!(second_result, Err(PublishError::DuplicateName)));
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_e2ee() -> Result<()> {
use livekit::e2ee::{
key_provider::{KeyProvider, KeyProviderOptions},
EncryptionType,
};
use livekit::E2eeOptions;
const SHARED_SECRET: &[u8] = b"password";
let key_provider1 =
KeyProvider::with_shared_key(KeyProviderOptions::default(), SHARED_SECRET.to_vec());
let mut options1 = RoomOptions::default();
options1.encryption =
Some(E2eeOptions { key_provider: key_provider1, encryption_type: EncryptionType::Gcm });
let key_provider2 =
KeyProvider::with_shared_key(KeyProviderOptions::default(), SHARED_SECRET.to_vec());
let mut options2 = RoomOptions::default();
options2.encryption =
Some(E2eeOptions { key_provider: key_provider2, encryption_type: EncryptionType::Gcm });
let mut rooms = test_rooms_with_options([options1.into(), options2.into()]).await?;
let (pub_room, _) = rooms.pop().unwrap();
let (sub_room, mut sub_room_event_rx) = rooms.pop().unwrap();
pub_room.e2ee_manager().set_enabled(true);
sub_room.e2ee_manager().set_enabled(true);
let publish = async move {
let track = pub_room.local_participant().publish_data_track("my_track").await?;
assert!(track.info().uses_e2ee());
for index in 0..5 {
track.try_push(vec![index as u8; 196_608].into())?;
time::sleep(Duration::from_millis(25)).await;
}
Ok(())
};
let subscribe = async move {
let track = wait_for_remote_track(&mut sub_room_event_rx).await?;
assert!(track.info().uses_e2ee());
let mut subscription = track.subscribe().await?;
while let Some(frame) = subscription.next().await {
let payload = frame.payload();
if let Some(first_byte) = payload.first() {
assert!(payload.iter().all(|byte| byte == first_byte));
}
}
Ok(())
};
timeout(Duration::from_secs(5), async { try_join!(publish, subscribe) }).await??;
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_published_state() -> Result<()> {
const PUBLISH_DURATION: Duration = Duration::from_millis(500);
let mut rooms = test_rooms(2).await?;
let (pub_room, _) = rooms.pop().unwrap();
let (_, mut sub_room_event_rx) = rooms.pop().unwrap();
let publish = async move {
let track = pub_room.local_participant().publish_data_track("my_track").await?;
assert!(track.is_published());
time::sleep(PUBLISH_DURATION).await;
track.unpublish();
Ok(())
};
let subscribe = async move {
let track = wait_for_remote_track(&mut sub_room_event_rx).await?;
assert!(track.is_published());
let elapsed = {
let start = Instant::now();
track.wait_for_unpublish().await;
start.elapsed()
};
assert!(elapsed.abs_diff(PUBLISH_DURATION) <= Duration::from_millis(20));
assert!(!track.is_published());
Ok(())
};
timeout(Duration::from_secs(5), async { try_join!(publish, subscribe) }).await??;
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_resubscribe() -> Result<()> {
const ITERATIONS: usize = 10;
let mut rooms = test_rooms(2).await?;
let (pub_room, _) = rooms.pop().unwrap();
let (_, mut sub_room_event_rx) = rooms.pop().unwrap();
let publish = async move {
let track = pub_room.local_participant().publish_data_track("my_track").await.unwrap();
loop {
_ = track.try_push(vec![0xFA; 64].into());
time::sleep(Duration::from_millis(50)).await;
}
};
let subscribe = async move {
let track = wait_for_remote_track(&mut sub_room_event_rx).await.unwrap();
let mut successful_subscriptions = 0;
for _ in 0..ITERATIONS {
let mut stream = track.subscribe().await.unwrap();
while let Some(frame) = stream.next().await {
assert!(!frame.payload().is_empty());
successful_subscriptions += 1;
break;
}
std::mem::drop(stream);
time::sleep(Duration::from_millis(50)).await;
}
assert_eq!(successful_subscriptions, ITERATIONS);
};
let _ = timeout(Duration::from_secs(5), async {
tokio::select! { _ = publish => (), _ = subscribe => () };
})
.await?;
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_frame_with_user_timestamp() -> Result<()> {
let mut rooms = test_rooms(2).await?;
let (pub_room, _) = rooms.pop().unwrap();
let (_, mut sub_room_event_rx) = rooms.pop().unwrap();
let publish = async move {
let track = pub_room.local_participant().publish_data_track("my_track").await.unwrap();
loop {
let frame = DataTrackFrame::new(vec![0xFA; 64]).with_user_timestamp_now();
_ = track.try_push(frame);
time::sleep(Duration::from_millis(50)).await;
}
};
let subscribe = async move {
let track = wait_for_remote_track(&mut sub_room_event_rx).await.unwrap();
let mut stream = track.subscribe().await.unwrap();
let mut got_frame = false;
while let Some(frame) = stream.next().await {
assert!(!frame.payload().is_empty());
let duration = frame.duration_since_timestamp().expect("Missing timestamp");
assert!(duration.as_millis() < 1000);
got_frame = true;
break;
}
if !got_frame {
panic!("No frame received");
}
};
let _ = timeout(Duration::from_secs(5), async {
tokio::select! { _ = publish => (), _ = subscribe => () };
})
.await?;
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_case(SimulateScenario::SignalReconnect; "signal_reconnect")]
#[test_case(SimulateScenario::ForceTcp; "full_reconnect")]
#[test_log::test(tokio::test)]
async fn test_subscriber_side_fault(scenario: SimulateScenario) -> Result<()> {
let mut rooms = test_rooms(2).await?;
let (pub_room, _) = rooms.pop().unwrap();
let (sub_room, mut sub_room_event_rx) = rooms.pop().unwrap();
let publish = async move {
let track = pub_room.local_participant().publish_data_track("my_track").await.unwrap();
loop {
_ = track.try_push(vec![0xFA; 64].into());
time::sleep(Duration::from_millis(50)).await;
}
};
let subscribe = async move {
let track = wait_for_remote_track(&mut sub_room_event_rx).await.unwrap();
let mut stream = track.subscribe().await.unwrap();
sub_room.simulate_scenario(scenario).await.unwrap();
assert!(track.is_published());
let mut got_frame = false;
while let Some(frame) = stream.next().await {
assert!(!frame.payload().is_empty());
got_frame = true;
break;
}
if !got_frame {
panic!("No frame received");
}
};
let _ = timeout(Duration::from_secs(15), async {
tokio::select! { _ = publish => (), _ = subscribe => () };
})
.await?;
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_case(SimulateScenario::SignalReconnect; "signal_reconnect")]
#[test_case(SimulateScenario::ForceTcp; "full_reconnect")]
#[test_log::test(tokio::test)]
async fn test_publisher_side_fault(scenario: SimulateScenario) -> Result<()> {
let mut rooms = test_rooms(1).await?;
let (pub_room, _) = rooms.pop().unwrap();
let publish = async move {
let track = pub_room.local_participant().publish_data_track("my_track").await.unwrap();
let initial_sid = track.info().sid().clone();
pub_room.simulate_scenario(scenario).await.unwrap();
assert!(track.is_published(), "Should still be reported as published");
if scenario == SimulateScenario::ForceTcp {
time::sleep(Duration::from_millis(2000)).await;
assert_ne!(initial_sid, track.info().sid(), "Should have new SID");
}
assert!(track.is_published(), "Should still be reported as published");
track.try_push(vec![0xFA; 64].into()).expect("Should be able to push frame");
};
let _ = timeout(Duration::from_secs(10), publish).await?;
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
async fn wait_for_remote_track(
rx: &mut tokio::sync::mpsc::UnboundedReceiver<RoomEvent>,
) -> Result<RemoteDataTrack> {
while let Some(event) = rx.recv().await {
if let RoomEvent::DataTrackPublished(track) = event {
return Ok(track);
}
}
Err(anyhow!("No track published"))
}