use anyhow::{Context, Result};
use chrono::Utc;
use futures_util::future::try_join_all;
use libwebrtc::native::create_random_uuid;
use livekit::{Room, RoomEvent, RoomOptions};
use livekit_api::access_token::{AccessToken, VideoGrants};
use std::{env, time::Duration};
use tokio::{
sync::mpsc::UnboundedReceiver,
time::{self, timeout},
};
pub mod audio;
pub mod video;
struct TestEnvironment {
api_key: String,
api_secret: String,
server_url: String,
}
impl TestEnvironment {
pub fn from_env_or_defaults() -> Self {
Self {
api_key: env::var("LIVEKIT_API_KEY").unwrap_or("devkey".into()),
api_secret: env::var("LIVEKIT_API_SECRET").unwrap_or("secret".into()),
server_url: env::var("LIVEKIT_URL").unwrap_or("http://localhost:7880".into()),
}
}
}
#[derive(Debug, Clone)]
pub struct TestRoomOptions {
pub grants: VideoGrants,
pub room: RoomOptions,
}
impl Default for TestRoomOptions {
fn default() -> Self {
Self {
grants: VideoGrants { room_join: true, ..Default::default() },
room: Default::default(),
}
}
}
impl From<RoomOptions> for TestRoomOptions {
fn from(room: RoomOptions) -> Self {
Self { room, ..Default::default() }
}
}
impl From<VideoGrants> for TestRoomOptions {
fn from(grants: VideoGrants) -> Self {
Self { grants, ..Default::default() }
}
}
fn is_local_server_url(url: &str) -> bool {
url.contains("localhost:7880") || url.contains("127.0.0.1:7880")
}
pub async fn test_rooms(count: usize) -> Result<Vec<(Room, UnboundedReceiver<RoomEvent>)>> {
test_rooms_with_options((0..count).map(|_| TestRoomOptions::default())).await
}
pub async fn test_rooms_with_options(
options: impl IntoIterator<Item = TestRoomOptions>,
) -> Result<Vec<(Room, UnboundedReceiver<RoomEvent>)>> {
let test_env = TestEnvironment::from_env_or_defaults();
let force_v0 = is_local_server_url(&test_env.server_url);
let room_name = format!("test_room_{}", create_random_uuid());
if force_v0 {
log::info!("Using localhost test server: forcing single_peer_connection=false for E2E");
}
let tokens = options
.into_iter()
.enumerate()
.map(|(id, mut options)| -> Result<(String, RoomOptions)> {
if force_v0 {
options.room.single_peer_connection = false;
}
options.grants.room = room_name.clone();
let token = AccessToken::with_api_key(&test_env.api_key, &test_env.api_secret)
.with_ttl(Duration::from_secs(30 * 60)) .with_grants(options.grants)
.with_identity(&format!("p{}", id))
.with_name(&format!("Participant {}", id))
.to_jwt()
.context("Failed to generate JWT")?;
Ok((token, options.room))
})
.collect::<Result<Vec<_>>>()?;
let count = tokens.len();
let rooms = try_join_all(tokens.into_iter().map(|(token, options)| {
let server_url = test_env.server_url.clone();
async move {
Room::connect(&server_url, &token, options).await.context("Failed to connect to room")
}
}))
.await?;
let all_connected_time = Utc::now();
let wait_participant_visibility = async {
while rooms.iter().any(|(room, _)| room.remote_participants().len() != count - 1) {
time::sleep(Duration::from_millis(10)).await;
}
log::info!("All participants visible after {}", Utc::now() - all_connected_time);
};
timeout(Duration::from_secs(5), wait_participant_visibility)
.await
.context("Not all participants became visible")?;
Ok(rooms)
}