#![cfg(feature = "__lk-e2e-test")]
#[cfg(feature = "__lk-e2e-test")]
use {
anyhow::{anyhow, bail, Result},
common::test_rooms,
libwebrtc::native::create_random_uuid,
livekit::{ConnectionState, Room, RoomEvent, RoomOptions, SimulateScenario},
livekit_api::access_token::{AccessToken, VideoGrants},
std::{env, net::SocketAddr, time::Duration},
tokio::{
net::{TcpListener, TcpStream},
sync::{mpsc::UnboundedReceiver, watch},
time::timeout,
},
};
mod common;
#[cfg(feature = "__lk-e2e-test")]
async fn assert_recovers(
room: Room,
mut events: UnboundedReceiver<RoomEvent>,
scenario: SimulateScenario,
) -> Result<()> {
assert_eq!(room.connection_state(), ConnectionState::Connected);
room.simulate_scenario(scenario)
.await
.map_err(|e| anyhow!("simulate_scenario failed: {e:?}"))?;
let observe = async {
let mut saw_reconnecting = false;
while let Some(event) = events.recv().await {
match event {
RoomEvent::Reconnecting => saw_reconnecting = true,
RoomEvent::Reconnected => {
if !saw_reconnecting {
bail!("received Reconnected without a preceding Reconnecting");
}
return Ok(());
}
RoomEvent::Disconnected { reason } => {
bail!("room disconnected during reconnection: {reason:?}");
}
_ => {}
}
}
bail!("event stream ended before the room reconnected");
};
timeout(Duration::from_secs(30), observe).await??;
assert_eq!(
room.connection_state(),
ConnectionState::Connected,
"room should be Connected after recovery"
);
Ok(())
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_signal_reconnect_resumes() -> Result<()> {
let (room, events) = test_rooms(1).await?.pop().unwrap();
assert_recovers(room, events, SimulateScenario::SignalReconnect).await
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_full_reconnect_recovers() -> Result<()> {
let (room, events) = test_rooms(1).await?.pop().unwrap();
assert_recovers(room, events, SimulateScenario::FullReconnect).await
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_resume_failure_escalates_to_full_reconnect() -> Result<()> {
let (room, events) = test_rooms(1).await?.pop().unwrap();
assert_recovers(room, events, SimulateScenario::DisconnectSignalOnResume).await
}
#[cfg(feature = "__lk-e2e-test")]
async fn start_killable_proxy(target_host_port: String) -> (SocketAddr, watch::Sender<bool>) {
let listener = TcpListener::bind("127.0.0.1:0").await.expect("bind proxy");
let addr = listener.local_addr().expect("proxy addr");
let (kill_tx, kill_rx) = watch::channel(false);
tokio::spawn(async move {
loop {
let mut kr = kill_rx.clone();
tokio::select! {
_ = kr.changed() => break, accepted = listener.accept() => {
let Ok((mut inbound, _)) = accepted else { break };
let target = target_host_port.clone();
let mut kr2 = kill_rx.clone();
tokio::spawn(async move {
if let Ok(mut outbound) = TcpStream::connect(&target).await {
tokio::select! {
_ = kr2.changed() => {} _ = tokio::io::copy_bidirectional(&mut inbound, &mut outbound) => {}
}
}
});
}
}
}
});
(addr, kill_tx)
}
#[cfg(feature = "__lk-e2e-test")]
#[test_log::test(tokio::test)]
async fn test_reconnect_exhaustion_disconnects() -> Result<()> {
let api_key = env::var("LIVEKIT_API_KEY").unwrap_or_else(|_| "devkey".into());
let api_secret = env::var("LIVEKIT_API_SECRET").unwrap_or_else(|_| "secret".into());
let server_url = env::var("LIVEKIT_URL").unwrap_or_else(|_| "ws://localhost:7880".into());
let target = server_url
.split("://")
.last()
.and_then(|rest| rest.split('/').next())
.unwrap_or("localhost:7880")
.to_string();
let (proxy_addr, kill) = start_killable_proxy(target).await;
let proxy_url = format!("ws://{proxy_addr}");
let room_name = format!("test_room_{}", create_random_uuid());
let token = AccessToken::with_api_key(&api_key, &api_secret)
.with_ttl(Duration::from_secs(30 * 60))
.with_grants(VideoGrants { room_join: true, room: room_name, ..Default::default() })
.with_identity("p0")
.with_name("Participant 0")
.to_jwt()?;
let (room, mut events) = Room::connect(&proxy_url, &token, RoomOptions::default()).await?;
assert_eq!(room.connection_state(), ConnectionState::Connected);
kill.send(true).ok();
let observe = async {
let mut saw_reconnecting = false;
while let Some(event) = events.recv().await {
match event {
RoomEvent::Reconnecting => saw_reconnecting = true,
RoomEvent::Disconnected { reason } => {
if !saw_reconnecting {
bail!("disconnected without attempting reconnection first");
}
return Ok(reason);
}
_ => {}
}
}
bail!("event stream ended before the room reported Disconnected");
};
let _reason = timeout(Duration::from_secs(90), observe).await??;
assert_eq!(
room.connection_state(),
ConnectionState::Disconnected,
"room must reach Disconnected after reconnection is exhausted, not hang in Reconnecting"
);
Ok(())
}