#![cfg(feature = "unstable")]
mod common;
use std::time::Duration;
use zenoh::{query::Reply, sample::SampleKind};
use zenoh_core::ztimeout;
use crate::common::TestSessions;
const TIMEOUT: Duration = Duration::from_secs(10);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn zenoh_events() {
let mut test_context = TestSessions::new();
let session = test_context.open_listener().await;
let zid = session.zid();
let sub1 =
ztimeout!(session.declare_subscriber(format!("@/{zid}/session/transport/unicast/*")))
.unwrap();
let sub2 = ztimeout!(
session.declare_subscriber(format!("@/{zid}/session/transport/unicast/*/link/*"))
)
.unwrap();
let session2 = test_context.open_connector().await;
let zid2 = session2.zid();
let sample = ztimeout!(sub1.recv_async());
assert!(sample.is_ok());
let key_expr = sample.as_ref().unwrap().key_expr().as_str();
assert!(key_expr.eq(&format!("@/{zid}/session/transport/unicast/{zid2}")));
assert!(sample.as_ref().unwrap().kind() == SampleKind::Put);
let sample = ztimeout!(sub2.recv_async());
assert!(sample.is_ok());
let key_expr = sample.as_ref().unwrap().key_expr().as_str();
assert!(key_expr.starts_with(&format!("@/{zid}/session/transport/unicast/{zid2}/link/")));
assert!(sample.as_ref().unwrap().kind() == SampleKind::Put);
let replies: Vec<Reply> =
ztimeout!(session.get(format!("@/{zid}/session/transport/unicast/*")))
.unwrap()
.into_iter()
.collect();
assert!(replies.len() == 1);
assert!(replies[0].result().is_ok());
let key_expr = replies[0].result().unwrap().key_expr().as_str();
assert!(key_expr.eq(&format!("@/{zid}/session/transport/unicast/{zid2}")));
let replies: Vec<Reply> =
ztimeout!(session.get(format!("@/{zid}/session/transport/unicast/*/link/*")))
.unwrap()
.into_iter()
.collect();
assert!(replies.len() == 1);
assert!(replies[0].result().is_ok());
let key_expr = replies[0].result().unwrap().key_expr().as_str();
assert!(key_expr.starts_with(&format!("@/{zid}/session/transport/unicast/{zid2}/link/")));
ztimeout!(session2.close()).unwrap();
let sample = ztimeout!(sub1.recv_async());
assert!(sample.is_ok());
let key_expr = sample.as_ref().unwrap().key_expr().as_str();
assert!(key_expr.eq(&format!("@/{zid}/session/transport/unicast/{zid2}")));
assert!(sample.as_ref().unwrap().kind() == SampleKind::Delete);
let sample = ztimeout!(sub2.recv_async());
assert!(sample.is_ok());
let key_expr = sample.as_ref().unwrap().key_expr().as_str();
assert!(key_expr.starts_with(&format!("@/{zid}/session/transport/unicast/{zid2}/link/")));
assert!(sample.as_ref().unwrap().kind() == SampleKind::Delete);
ztimeout!(sub2.undeclare()).unwrap();
ztimeout!(sub1.undeclare()).unwrap();
ztimeout!(session.close()).unwrap();
}