#![cfg(feature = "unstable")]
mod common;
use core::time::Duration;
use std::sync::{atomic::AtomicBool, Arc};
use zenoh::handlers::CallbackDrop;
use zenoh_core::ztimeout;
use crate::common::TestSessions;
const TIMEOUT: Duration = Duration::from_secs(60);
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_cancellation_get() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs_client().await;
let queryable = ztimeout!(session1.declare_queryable("test/query_cancellation")).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let cancellation_token = zenoh::cancellation::CancellationToken::default();
let replies = ztimeout!(session2
.get("test/query_cancellation")
.cancellation_token(cancellation_token.clone()))
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
cancellation_token.cancel().await.unwrap();
assert!(replies.is_disconnected());
ztimeout!(queryable
.recv()
.unwrap()
.reply("test/query_cancellation", "ok"))
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let cancellation_token = zenoh::cancellation::CancellationToken::default();
let n = Arc::new(AtomicBool::new(false));
let n_clone = n.clone();
ztimeout!(session2
.get("test/query_cancellation")
.cancellation_token(cancellation_token.clone())
.callback(move |_| {
std::thread::sleep(Duration::from_secs(5));
n_clone.fetch_or(true, std::sync::atomic::Ordering::SeqCst);
}))
.unwrap();
ztimeout!(queryable
.recv()
.unwrap()
.reply("test/query_cancellation", "ok"))
.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
cancellation_token.cancel().await.unwrap();
assert!(n.load(std::sync::atomic::Ordering::SeqCst));
let n = Arc::new(AtomicBool::new(false));
let n_clone = n.clone();
let cb = CallbackDrop {
callback: |_| {},
drop: move || {
std::thread::sleep(Duration::from_secs(5));
n_clone.fetch_or(true, std::sync::atomic::Ordering::SeqCst);
},
};
assert!(cancellation_token.is_cancelled());
assert!(ztimeout!(session2
.get("test/query_cancellation")
.cancellation_token(cancellation_token.clone())
.with(cb))
.is_err());
assert!(n.load(std::sync::atomic::Ordering::SeqCst));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_cancellation_liveliness_get() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs_client().await;
let _token = ztimeout!(session1
.liveliness()
.declare_token("test/liveliness_query_cancellation"))
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let cancellation_token = zenoh::cancellation::CancellationToken::default();
let n = Arc::new(AtomicBool::new(false));
let n_clone = n.clone();
ztimeout!(session2
.liveliness()
.get("test/liveliness_query_cancellation")
.cancellation_token(cancellation_token.clone())
.callback(move |_| {
std::thread::sleep(Duration::from_secs(5));
n_clone.fetch_or(true, std::sync::atomic::Ordering::SeqCst);
}))
.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
cancellation_token.cancel().await.unwrap();
assert!(n.load(std::sync::atomic::Ordering::SeqCst));
let n = Arc::new(AtomicBool::new(false));
let n_clone = n.clone();
let cb = CallbackDrop {
callback: |_| {},
drop: move || {
std::thread::sleep(Duration::from_secs(5));
n_clone.fetch_or(true, std::sync::atomic::Ordering::SeqCst);
},
};
assert!(cancellation_token.is_cancelled());
assert!(ztimeout!(session2
.liveliness()
.get("test/liveliness_query_cancellation")
.cancellation_token(cancellation_token.clone())
.with(cb))
.is_err());
assert!(n.load(std::sync::atomic::Ordering::SeqCst));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_cancellation_querier_get() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs_client().await;
let queryable = ztimeout!(session1.declare_queryable("test/querier_cancellation")).unwrap();
let querier = ztimeout!(session2.declare_querier("test/querier_cancellation")).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let cancellation_token = zenoh::cancellation::CancellationToken::default();
let replies = ztimeout!(querier.get().cancellation_token(cancellation_token.clone())).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
cancellation_token.cancel().await.unwrap();
assert!(replies.is_disconnected());
ztimeout!(queryable
.recv()
.unwrap()
.reply("test/querier_cancellation", "ok"))
.unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let cancellation_token = zenoh::cancellation::CancellationToken::default();
let n = Arc::new(AtomicBool::new(false));
let n_clone = n.clone();
ztimeout!(querier
.get()
.cancellation_token(cancellation_token.clone())
.callback(move |_| {
std::thread::sleep(Duration::from_secs(5));
n_clone.fetch_or(true, std::sync::atomic::Ordering::SeqCst);
}))
.unwrap();
ztimeout!(queryable
.recv()
.unwrap()
.reply("test/querier_cancellation", "ok"))
.unwrap();
tokio::time::sleep(Duration::from_secs(3)).await;
cancellation_token.cancel().await.unwrap();
assert!(n.load(std::sync::atomic::Ordering::SeqCst));
assert!(cancellation_token.is_cancelled());
let n = Arc::new(AtomicBool::new(false));
let n_clone = n.clone();
let cb = CallbackDrop {
callback: |_| {},
drop: move || {
std::thread::sleep(Duration::from_secs(5));
n_clone.fetch_or(true, std::sync::atomic::Ordering::SeqCst);
},
};
assert!(ztimeout!(querier
.get()
.cancellation_token(cancellation_token.clone())
.with(cb))
.is_err());
assert!(n.load(std::sync::atomic::Ordering::SeqCst));
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_cancellation_does_not_prevent_session_from_close() {
zenoh::init_log_from_env_or("error");
let mut test_context = TestSessions::new();
let (session1, session2) = test_context.open_pairs_client().await;
let cancellation_token = zenoh::cancellation::CancellationToken::default();
let ke = "test/query_cancellation_does_not_prevent_session_from_close";
let _queryable = ztimeout!(session1.declare_queryable(ke)).unwrap();
let querier = ztimeout!(session2.declare_querier(ke)).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
let replies = ztimeout!(session2
.get(ke)
.cancellation_token(cancellation_token.clone()))
.unwrap();
let replies2 = ztimeout!(querier.get().cancellation_token(cancellation_token.clone())).unwrap();
ztimeout!(session2.close()).unwrap();
assert!(replies.is_disconnected());
assert!(replies2.is_disconnected());
ztimeout!(cancellation_token.cancel()).unwrap();
}