#[cfg(feature = "unstable")]
#[path = "common/mod.rs"]
mod common;
#[cfg(feature = "unstable")]
mod tests {
use std::{
fmt::Debug,
sync::{atomic::AtomicUsize, Arc},
time::Duration,
};
use zenoh::sample::SampleKind;
use crate::common::TestSessions;
async fn collect_events<T: Debug>(events: &flume::Receiver<T>, timeout: Duration) -> Vec<T> {
let mut collected = Vec::new();
while let Ok(event) = tokio::time::timeout(timeout, events.recv_async()).await {
let event = event.expect("Channel closed");
println!("{:?}", event);
collected.push(event);
}
collected
}
const SLEEP: Duration = Duration::from_millis(100);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_info_transports() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs().await;
tokio::time::sleep(SLEEP).await;
for transport in session1.info().transports().await {
println!(
"Transport from session1: zid={}, whatami={:?}",
transport.zid(),
transport.whatami()
);
assert_ne!(
transport.zid().to_string(),
"",
"Transport ZID should not be empty"
);
}
assert!(
session2.info().transports().await.count() > 0,
"Session2 should have at least one transport"
);
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_info_links() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs().await;
tokio::time::sleep(SLEEP).await;
for link in session1.info().links().await {
println!("Link from session1: {} -> {}", link.src(), link.dst());
assert_ne!(
link.src().to_string(),
"",
"Link source should not be empty"
);
assert_ne!(
link.dst().to_string(),
"",
"Link destination should not be empty"
);
}
assert!(
session2.info().links().await.count() > 0,
"Session2 should have at least one link"
);
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_transport_events() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let session1 = test_context.open_listener().await;
let events = session1
.info()
.transport_events_listener()
.history(true)
.with(flume::bounded(32))
.await
.expect("Failed to declare transport events listener");
let session2 = test_context.open_connector().await;
tokio::time::sleep(SLEEP).await;
let put_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
put_events.len() == 1 && put_events[0].kind() == SampleKind::Put,
"Expected exactly 1 Put event, got {} events",
put_events.len()
);
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 1 && delete_events[0].kind() == SampleKind::Delete,
"Expected exactly 1 Delete event, got {} events",
delete_events.len()
);
session1.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_link_events() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let session1 = test_context.open_listener().await;
let events = session1
.info()
.link_events_listener()
.history(true)
.with(flume::bounded(32))
.await
.expect("Failed to declare link events listener");
let session2 = test_context.open_connector().await;
let session3 = test_context.open_connector().await;
tokio::time::sleep(SLEEP).await;
let put_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
put_events.len() == 2 && put_events.iter().all(|e| e.kind() == SampleKind::Put),
"Expected exactly 2 Put events, got {} events",
put_events.len()
);
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 1 && delete_events[0].kind() == SampleKind::Delete,
"First close: expected exactly 1 Delete event, got {:?} events",
delete_events.len()
);
session3.close().await.unwrap();
tokio::time::sleep(SLEEP).await;
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 1 && delete_events[0].kind() == SampleKind::Delete,
"Second close: expected exactly 1 Delete event, got {} events",
delete_events.len()
);
session1.close().await.unwrap();
}
#[cfg(feature = "transport_multilink")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_link_events_multilink() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let config = test_context.get_listener_config("tcp/127.0.0.1:0", 2);
let session1 = test_context.open_listener_with_cfg(config).await;
let session2 = test_context.open_connector().await;
tokio::time::sleep(SLEEP).await;
let transports: Vec<_> = session1.info().transports().await.collect();
assert_eq!(transports.len(), 1, "Should have exactly 1 transport");
let links: Vec<_> = session1.info().links().await.collect();
assert_eq!(
links.len(),
2,
"Should have exactly 2 links in multilink transport"
);
let events = session1
.info()
.link_events_listener()
.history(false)
.with(flume::bounded(32))
.await
.expect("Failed to declare link events listener");
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;
let delete_events = collect_events(&events, Duration::from_millis(200)).await;
assert!(
delete_events.len() == 2
&& delete_events.iter().all(|e| e.kind() == SampleKind::Delete),
"Expected exactly 2 Delete events (one per link), got {} events",
delete_events.len()
);
session1.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_event_history() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, _session2) = test_context.open_pairs().await;
tokio::time::sleep(SLEEP).await;
let transport_events_listener = session1
.info()
.transport_events_listener()
.history(true)
.with(flume::bounded(32))
.await
.expect("Failed to declare transport events listener");
let event = tokio::time::timeout(
Duration::from_secs(5),
transport_events_listener.recv_async(),
)
.await
.expect("Timeout waiting for history transport event")
.expect("Channel closed");
assert!(
event.kind() == SampleKind::Put,
"History event should be Put (opened)"
);
println!("History: Transport {}", event.transport().zid());
let links_events_listener = session1
.info()
.link_events_listener()
.history(true)
.with(flume::bounded(32))
.await
.expect("Failed to declare link events listener");
let event =
tokio::time::timeout(Duration::from_secs(5), links_events_listener.recv_async())
.await
.expect("Timeout waiting for history link event")
.expect("Channel closed");
println!(
"History: Link {} -> {}",
event.link().src(),
event.link().dst()
);
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_links_filter_by_transport() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let session1 = test_context.open_listener().await;
let _session2 = test_context.open_connector().await;
let _session3 = test_context.open_connector().await;
tokio::time::sleep(SLEEP).await;
let transports: Vec<_> = session1.info().transports().await.collect();
assert_eq!(
transports.len(),
2,
"Should have 2 transports (one for each peer)"
);
assert_eq!(
session1.info().links().await.count(),
2,
"Should have 2 links in total"
);
let link = session1
.info()
.links()
.transport(transports[0].clone())
.await
.next()
.unwrap();
assert_eq!(
link.zid(),
transports[0].zid(),
"Filtered link should belong to specified transport"
);
let link2 = session1
.info()
.links()
.transport(transports[1].clone())
.await
.next()
.unwrap();
assert_eq!(
link2.zid(),
transports[1].zid(),
"Filtered link should belong to specified transport"
);
println!("Successfully verified links() filtering by transport");
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_link_events_filter_by_transport() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let session1 = test_context.open_listener().await;
let session2 = test_context.open_connector().await;
tokio::time::sleep(SLEEP).await;
let target_transport = session1.info().transports().await.next().unwrap();
let events_received = Arc::new(AtomicUsize::new(0));
let events_received_clone = events_received.clone();
let _events = session1
.info()
.link_events_listener()
.transport(target_transport)
.history(false)
.callback(move |_event| {
events_received_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
})
.await;
let session3 = test_context.open_connector().await;
tokio::time::sleep(SLEEP).await;
let count = events_received.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(
count, 0,
"Should not receive events for different transport"
);
session2.close().await.unwrap();
tokio::time::sleep(SLEEP).await;
let _session2_new = test_context.open_connector().await;
let start = std::time::Instant::now();
let mut final_count;
loop {
final_count = events_received.load(std::sync::atomic::Ordering::SeqCst);
if final_count > 0 {
break;
}
if start.elapsed() > Duration::from_secs(5) {
panic!("Did not receive filtered link events within timeout");
}
tokio::time::sleep(Duration::from_millis(10)).await;
}
println!(
"Successfully verified links_events_listener() filtering by transport (received {} events)",
final_count
);
session1.close().await.unwrap();
session3.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_transport_events_background() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let session1 = test_context.open_listener().await;
let opened_count = Arc::new(AtomicUsize::new(0));
let closed_count = Arc::new(AtomicUsize::new(0));
let opened_count_clone = opened_count.clone();
let closed_count_clone = closed_count.clone();
session1
.info()
.transport_events_listener()
.history(false)
.callback(move |event| {
if event.kind() == SampleKind::Put {
opened_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
println!("Background: Transport opened: {}", event.transport().zid());
} else {
closed_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
println!("Background: Transport closed");
}
})
.background()
.await
.unwrap();
let session2 = test_context.open_connector().await;
tokio::time::sleep(SLEEP * 2).await;
let opened = opened_count.load(std::sync::atomic::Ordering::SeqCst);
assert!(
opened > 0,
"Should have received at least one transport opened event, got {}",
opened
);
println!("Received {} transport opened events", opened);
session2.close().await.unwrap();
tokio::time::sleep(SLEEP * 2).await;
let closed = closed_count.load(std::sync::atomic::Ordering::SeqCst);
assert!(
closed > 0,
"Should have received at least one transport closed event, got {}",
closed
);
println!("Received {} transport closed events", closed);
let session3 = test_context.open_connector().await;
tokio::time::sleep(SLEEP * 2).await;
let opened_final = opened_count.load(std::sync::atomic::Ordering::SeqCst);
assert!(
opened_final > opened,
"Should have received additional transport opened event after new connection"
);
println!(
"Total transport opened events: {} (initial: {})",
opened_final, opened
);
session1.close().await.unwrap();
session3.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_transport_events_not_triggered_on_local_session_close() {
zenoh_util::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs().await;
tokio::time::sleep(SLEEP).await;
let transport_delete_count = Arc::new(AtomicUsize::new(0));
let transport_delete_count_clone = transport_delete_count.clone();
let link_delete_count = Arc::new(AtomicUsize::new(0));
let link_delete_count_clone = link_delete_count.clone();
session1
.info()
.transport_events_listener()
.history(false)
.callback(move |event| {
if event.kind() == SampleKind::Delete {
transport_delete_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
println!("Unexpected transport Delete event received on local session close");
}
})
.background()
.await
.unwrap();
session1
.info()
.link_events_listener()
.history(false)
.callback(move |event| {
if event.kind() == SampleKind::Delete {
link_delete_count_clone.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
println!("Unexpected link Delete event received on local session close");
}
})
.background()
.await
.unwrap();
session1.close().await.unwrap();
tokio::time::sleep(SLEEP * 2).await;
let transport_deletes = transport_delete_count.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(
transport_deletes, 0,
"Transport Delete event should NOT be triggered when the local session closes, got {} events",
transport_deletes
);
let link_deletes = link_delete_count.load(std::sync::atomic::Ordering::SeqCst);
assert_eq!(
link_deletes, 0,
"Link Delete event should NOT be triggered when the local session closes, got {} events",
link_deletes
);
println!("Verified: no Delete events fired when local session closes");
session2.close().await.unwrap();
}
}