use bytes::Bytes;
use lvqr_cluster::{FederationConnectState, FederationLink};
use lvqr_moq::{Origin, Track};
use lvqr_test_utils::{TestServer, TestServerConfig};
use std::time::{Duration, Instant};
const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
const PROPAGATION_TIMEOUT: Duration = Duration::from_secs(30);
#[cfg_attr(
target_os = "macos",
ignore = "federation forward_track empirically blocks on macos-latest CI -- see tracking/TEST_AUDIT_2026_04_28.md P4.5"
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn federation_link_propagates_broadcast_between_two_clusters() {
let _ = tracing_subscriber::fmt()
.with_env_filter("lvqr=debug,moq_lite=info")
.with_test_writer()
.try_init();
let server_a = TestServer::start(TestServerConfig::default())
.await
.expect("start server A");
let relay_a = server_a.relay_addr();
let url_a = format!("https://127.0.0.1:{}/", relay_a.port());
let link = FederationLink::new(url_a.clone(), "", vec!["live/room1".into()]).with_disable_tls_verify(true);
let server_b = TestServer::start(TestServerConfig::default().with_federation_link(link))
.await
.expect("start server B");
let relay_b = server_b.relay_addr();
let runner = server_b
.federation_runner()
.expect("B must have installed a FederationRunner for the configured link");
let status_handle = runner.status_handle();
let connect_deadline = Instant::now() + CONNECT_TIMEOUT;
loop {
let snap = status_handle.snapshot();
if snap.iter().any(|s| s.state == FederationConnectState::Connected) {
break;
}
if Instant::now() >= connect_deadline {
panic!(
"federation link on B did not reach Connected within {:?}; latest snapshot: {:?}",
CONNECT_TIMEOUT, snap
);
}
tokio::time::sleep(Duration::from_millis(25)).await;
}
let mut broadcast_a = server_a
.origin()
.create_broadcast("live/room1")
.expect("create broadcast on server A");
let mut track_a = broadcast_a
.create_track(Track::new("0.mp4"))
.expect("create 0.mp4 track on A");
let mut group_a = track_a.append_group().expect("append first group on A");
group_a
.write_frame(Bytes::from_static(b"hello-federation"))
.expect("write frame on A");
group_a.finish().expect("finish group on A");
let mut client_config = moq_native::ClientConfig::default();
client_config.tls.disable_verify = Some(true);
let client = client_config.init().expect("init moq client");
let sub_origin = Origin::produce();
let mut announcements = sub_origin.consume();
let client = client.with_consume(sub_origin);
let url_b: url::Url = format!("https://127.0.0.1:{}/", relay_b.port())
.parse()
.expect("valid url for B");
let _session = tokio::time::timeout(CONNECT_TIMEOUT, client.connect(url_b))
.await
.expect("client connect to B timed out")
.expect("client connect to B failed");
let (path, bc) = tokio::time::timeout(PROPAGATION_TIMEOUT, announcements.announced())
.await
.expect("announcement timeout on B")
.expect("announcement stream on B closed");
assert_eq!(
path.as_str(),
"live/room1",
"B must announce the federated broadcast under the same name as A"
);
let bc = bc.expect("expected B announce, got unannounce");
let frame: Bytes = {
let deadline = Instant::now() + PROPAGATION_TIMEOUT;
loop {
let attempt: Result<Bytes, &'static str> = async {
let mut track_sub = bc.subscribe_track(&Track::new("0.mp4")).expect("subscribe 0.mp4 on B");
let mut group = match tokio::time::timeout(Duration::from_millis(500), track_sub.next_group()).await {
Ok(Ok(Some(g))) => g,
Ok(Ok(None)) => return Err("track closed before a group landed"),
Ok(Err(_)) => return Err("next_group remote error"),
Err(_) => return Err("next_group inner timeout (500ms)"),
};
match tokio::time::timeout(Duration::from_millis(500), group.read_frame()).await {
Ok(Ok(Some(f))) => Ok(f),
Ok(Ok(None)) => Err("group closed before the federated frame arrived"),
Ok(Err(_)) => Err("read_frame remote error"),
Err(_) => Err("read_frame inner timeout (500ms)"),
}
}
.await;
match attempt {
Ok(f) => break f,
Err(attempt_label) => {
if Instant::now() >= deadline {
panic!(
"0.mp4 frame on B never arrived within {:?}; last attempt: {}",
PROPAGATION_TIMEOUT, attempt_label
);
}
tokio::time::sleep(Duration::from_millis(50)).await;
}
}
}
};
assert_eq!(
&*frame, b"hello-federation",
"federated frame bytes must equal the source"
);
drop(broadcast_a);
drop(track_a);
server_a.shutdown().await.expect("shutdown A");
server_b.shutdown().await.expect("shutdown B");
}