#![cfg(feature = "unstable")]
mod common;
use std::{
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
time::Duration,
};
#[cfg(all(feature = "internal", feature = "unstable"))]
use zenoh::Wait;
use zenoh::{
key_expr::KeyExpr,
qos::{CongestionControl, Reliability},
query::Querier,
sample::SampleKind,
Session,
};
use zenoh_core::ztimeout;
#[cfg(feature = "internal")]
use crate::common::close_session;
use crate::common::TestSessions;
const TIMEOUT: Duration = Duration::from_secs(60);
const SLEEP: Duration = Duration::from_secs(1);
const MSG_COUNT: usize = 1_000;
const MSG_SIZE: [usize; 2] = [1_024, 100_000];
async fn test_session_pubsub(peer01: &Session, peer02: &Session, reliability: Reliability) {
let key_expr = "test/session";
let msg_count = match reliability {
Reliability::Reliable => MSG_COUNT,
Reliability::BestEffort => 1,
};
let msgs = Arc::new(AtomicUsize::new(0));
for size in MSG_SIZE {
msgs.store(0, Ordering::SeqCst);
println!("[PS][01b] Subscribing on peer01 session");
let c_msgs = msgs.clone();
let sub = ztimeout!(peer01.declare_subscriber(key_expr).callback(move |sample| {
assert_eq!(sample.payload().len(), size);
c_msgs.fetch_add(1, Ordering::Relaxed);
}))
.unwrap();
tokio::time::sleep(SLEEP).await;
println!("[PS][02b] Putting on peer02 session. {MSG_COUNT} msgs of {size} bytes.");
for _ in 0..msg_count {
ztimeout!(peer02
.put(key_expr, vec![0u8; size])
.congestion_control(CongestionControl::Block))
.unwrap();
}
ztimeout!(async {
loop {
let cnt = msgs.load(Ordering::Relaxed);
println!("[PS][03b] Received {cnt}/{msg_count}.");
if cnt < msg_count {
tokio::time::sleep(SLEEP).await;
} else {
break;
}
}
});
tokio::time::sleep(SLEEP).await;
println!("[PS][03b] Unsubscribing on peer01 session");
ztimeout!(sub.undeclare()).unwrap();
tokio::time::sleep(SLEEP).await;
}
}
trait HasGet {
async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler<zenoh::query::Reply>;
}
struct SessionGetter<'a, 'b> {
session: &'a Session,
key_expr: &'b str,
}
impl HasGet for SessionGetter<'_, '_> {
async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler<zenoh::query::Reply> {
let selector = format!("{}?{}", self.key_expr, params);
ztimeout!(self.session.get(selector)).unwrap()
}
}
struct QuerierGetter<'a> {
querier: Querier<'a>,
}
impl HasGet for QuerierGetter<'_> {
async fn get(&self, params: &str) -> zenoh::handlers::FifoChannelHandler<zenoh::query::Reply> {
ztimeout!(self.querier.get().parameters(params)).unwrap()
}
}
async fn test_session_query_reply_internal<Getter: HasGet>(
peer: &Session,
key_expr: &str,
reliability: Reliability,
log_id: &str,
getter: &Getter,
) {
let msg_count = match reliability {
Reliability::Reliable => MSG_COUNT,
Reliability::BestEffort => 1,
};
let msgs = Arc::new(AtomicUsize::new(0));
for size in MSG_SIZE {
msgs.store(0, Ordering::Relaxed);
println!("[{log_id}][01c] Queryable on peer01 session");
let c_msgs = msgs.clone();
let ke = key_expr.to_owned();
let qbl = ztimeout!(peer.declare_queryable(key_expr).callback(move |query| {
c_msgs.fetch_add(1, Ordering::Relaxed);
match query.parameters().as_str() {
"ok_put" => {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current().block_on(async {
ztimeout!(query
.reply(KeyExpr::try_from(&ke).unwrap(), vec![0u8; size].to_vec()))
.unwrap()
})
});
}
"ok_del" => {
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { ztimeout!(query.reply_del(&ke)).unwrap() })
});
}
"err" => {
let rep = vec![0u8; size];
tokio::task::block_in_place(|| {
tokio::runtime::Handle::current()
.block_on(async { ztimeout!(query.reply_err(rep)).unwrap() })
});
}
_ => panic!("Unknown query parameter"),
}
}))
.unwrap();
tokio::time::sleep(SLEEP).await;
println!("[{log_id}][02c] Getting Ok(Put) on peer02 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let rs = getter.get("ok_put").await;
while let Ok(s) = ztimeout!(rs.recv_async()) {
#[cfg(feature = "unstable")]
assert_eq!(s.replier_id(), Some(qbl.id()));
let s = s.result().unwrap();
assert_eq!(s.kind(), SampleKind::Put);
assert_eq!(s.payload().len(), size);
cnt += 1;
}
}
println!("[{log_id}][02c] Got on peer02 session. {cnt}/{msg_count} msgs.");
assert_eq!(msgs.load(Ordering::Relaxed), msg_count);
assert_eq!(cnt, msg_count);
msgs.store(0, Ordering::Relaxed);
println!("[{log_id}][03c] Getting Ok(Delete) on peer02 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let rs = getter.get("ok_del").await;
while let Ok(s) = ztimeout!(rs.recv_async()) {
#[cfg(feature = "unstable")]
assert_eq!(s.replier_id(), Some(qbl.id()));
let s = s.result().unwrap();
assert_eq!(s.kind(), SampleKind::Delete);
assert_eq!(s.payload().len(), 0);
cnt += 1;
}
}
println!("[{log_id}][03c] Got on peer02 session. {cnt}/{msg_count} msgs.");
assert_eq!(msgs.load(Ordering::Relaxed), msg_count);
assert_eq!(cnt, msg_count);
msgs.store(0, Ordering::Relaxed);
println!("[{log_id}][04c] Getting Err() on peer02 session. {msg_count} msgs.");
let mut cnt = 0;
for _ in 0..msg_count {
let rs = getter.get("err").await;
while let Ok(s) = ztimeout!(rs.recv_async()) {
#[cfg(feature = "unstable")]
assert_eq!(s.replier_id(), Some(qbl.id()));
let e = s.result().unwrap_err();
assert_eq!(e.payload().len(), size);
cnt += 1;
}
}
println!("[{log_id}][04c] Got on peer02 session. {cnt}/{msg_count} msgs.");
assert_eq!(msgs.load(Ordering::Relaxed), msg_count);
assert_eq!(cnt, msg_count);
println!("[{log_id}][03c] Unqueryable on peer01 session");
ztimeout!(qbl.undeclare()).unwrap();
tokio::time::sleep(SLEEP).await;
}
}
async fn test_session_getrep(peer01: &Session, peer02: &Session, reliability: Reliability) {
let key_expr = "test/session";
let querier = SessionGetter {
session: peer02,
key_expr,
};
ztimeout!(test_session_query_reply_internal(
peer01,
key_expr,
reliability,
"QR",
&querier
))
}
async fn test_session_qrrep(peer01: &Session, peer02: &Session, reliability: Reliability) {
let key_expr = "test/session";
println!("[QQ][00c] Declaring Querier on peer02 session");
let querier = QuerierGetter {
querier: ztimeout!(peer02.declare_querier(key_expr)).unwrap(),
};
ztimeout!(test_session_query_reply_internal(
peer01,
key_expr,
reliability,
"QQ",
&querier
))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_session_unicast() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = test_context.open_pairs().await;
test_session_pubsub(&peer01, &peer02, Reliability::Reliable).await;
test_session_getrep(&peer01, &peer02, Reliability::Reliable).await;
test_session_qrrep(&peer01, &peer02, Reliability::Reliable).await;
test_context.close().await;
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_session_multicast() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = test_context.open_pairs_multicast("udp/224.0.0.1:0").await;
test_session_pubsub(&peer01, &peer02, Reliability::BestEffort).await;
test_context.close().await;
}
#[cfg(feature = "internal")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_2sessions_1runtime_init() {
let mut test_context = TestSessions::new();
let (r1, r2) = test_context.open_pairs_runtime().await;
println!("[RI][02a] Creating peer01 session from runtime 1");
let peer01 = zenoh::session::init(r1.clone().into()).await.unwrap();
println!("[RI][02b] Creating peer02 session from runtime 2");
let peer02 = zenoh::session::init(r2.clone().into()).await.unwrap();
println!("[RI][02c] Creating peer01a session from runtime 1");
let peer01a = zenoh::session::init(r1.clone().into()).await.unwrap();
println!("[RI][03c] Closing peer01a session");
drop(peer01a);
test_session_pubsub(&peer01, &peer02, Reliability::Reliable).await;
close_session(peer01, peer02).await;
println!("[ ][01e] Closing r1 runtime");
ztimeout!(r1.close()).unwrap();
println!("[ ][02e] Closing r2 runtime");
ztimeout!(r2.close()).unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_session_close() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (_peer01, _peer02) = test_context.open_pairs().await;
test_context.close().await;
}
#[cfg(all(feature = "internal", feature = "unstable"))]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_session_close_in_background_async() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = test_context.open_pairs().await;
let close_task_1 = peer01.close().in_background().await;
let close_task_2 = peer02.close().in_background().await;
let close_all = async move {
close_task_1.await.unwrap();
close_task_2.await.unwrap();
};
ztimeout!(close_all);
}
#[cfg(all(feature = "internal", feature = "unstable"))]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_session_close_in_background_sync() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (peer01, peer02) = test_context.open_pairs().await;
let close_task_1 = peer01.close().in_background().await;
let close_task_2 = peer02.close().in_background().await;
close_task_1.wait().unwrap();
close_task_2.wait().unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_undeclare_subscribers_same_keyexpr() {
let key_expr = "test/undeclare/subscribers";
let session = zenoh::open(zenoh::Config::default()).await.unwrap();
let sub1 = session.declare_subscriber(key_expr).await.unwrap();
let sub2 = session.declare_subscriber(key_expr).await.unwrap();
tokio::time::sleep(SLEEP).await;
ztimeout!(sub1.undeclare()).unwrap();
ztimeout!(sub2.undeclare()).unwrap();
}
#[cfg(feature = "internal")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_session_from_cloned_config() {
use zenoh::Config;
let (mut pub_config, mut sub_config) = {
let mut common_config = Config::default();
common_config
.scouting
.multicast
.set_enabled(Some(false))
.unwrap();
let pub_config = common_config.clone();
let sub_config = common_config;
(pub_config, sub_config)
};
sub_config
.listen
.endpoints
.set(vec!["tcp/127.0.0.1:0".parse().unwrap()])
.unwrap();
let sub_session = zenoh::open(sub_config).await.unwrap();
let locator = TestSessions::get_locators_from_session(&sub_session).await;
pub_config.connect.endpoints.set(locator).unwrap();
let _pub_session = zenoh::open(pub_config).await.unwrap();
}