#[cfg(all(feature = "unstable", feature = "internal", feature = "test"))]
mod runtime_state_weak_tests {
use test_case::test_matrix;
use zenoh_config::{ModeDependentValue, WhatAmI, WhatAmIMatcher};
use zenoh_link::EndPoint;
use crate::{
api::{config::Config, session::open},
net::runtime::Runtime,
Session,
};
fn get_static_runtime(session: &Session) -> Option<&Runtime> {
session.static_runtime()
}
async fn create_session(
mode: WhatAmI,
listen_endpoints: Vec<EndPoint>,
connect_endpoints: Vec<EndPoint>,
gossip: bool,
) -> Session {
let mut config = Config::default();
let _ = config.set_mode(Some(mode)).unwrap();
config.scouting.multicast.set_enabled(Some(false)).unwrap();
config.connect.endpoints.set(connect_endpoints).unwrap();
config.listen.endpoints.set(listen_endpoints).unwrap();
config.scouting.gossip.set_enabled(Some(gossip)).unwrap();
if gossip {
config
.scouting
.gossip
.set_autoconnect(Some(ModeDependentValue::Unique(WhatAmIMatcher::empty())))
.unwrap();
}
open(config).await.unwrap()
}
async fn create_silent_session(mode: WhatAmI, gossip: bool) -> Session {
create_session(mode, vec![], vec![], gossip).await
}
#[test_matrix(
[WhatAmI::Peer],
[true, false]
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn alive_before_close(mode: WhatAmI, gossip: bool) {
let session = create_silent_session(mode, gossip).await;
let runtime = get_static_runtime(&session).expect("runtime should be present");
let weak = runtime.state_weak();
assert!(weak.upgrade().is_some());
}
#[test_matrix(
[WhatAmI::Peer],
[true, false]
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn still_alive_after_close(mode: WhatAmI, gossip: bool) {
let session = create_silent_session(mode, gossip).await;
let runtime = get_static_runtime(&session).unwrap();
let weak = runtime.state_weak();
session.close().await.unwrap();
assert!(weak.upgrade().is_some());
}
#[test_matrix(
[WhatAmI::Peer],
[true, false]
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn dead_after_drop(mode: WhatAmI, gossip: bool) {
let session = create_silent_session(mode, gossip).await;
let runtime = get_static_runtime(&session).unwrap();
let weak = runtime.state_weak();
session.close().await.unwrap(); drop(session); assert!(weak.upgrade().is_none());
}
#[test_matrix(
[WhatAmI::Peer],
[true, false]
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn multiple_sessions_independent(mode: WhatAmI, gossip: bool) {
let session1 = create_silent_session(mode, gossip).await;
let runtime1 = get_static_runtime(&session1).unwrap();
let weak1 = runtime1.state_weak();
let session2 = create_silent_session(mode, gossip).await;
let runtime2 = get_static_runtime(&session2).unwrap();
let weak2 = runtime2.state_weak();
assert!(weak1.upgrade().is_some());
assert!(weak2.upgrade().is_some());
session1.close().await.unwrap();
assert!(weak1.upgrade().is_some());
assert!(weak2.upgrade().is_some());
drop(session1);
assert!(weak1.upgrade().is_none());
assert!(weak2.upgrade().is_some());
session2.close().await.unwrap();
assert!(weak1.upgrade().is_none());
assert!(weak2.upgrade().is_some());
drop(session2);
assert!(weak1.upgrade().is_none());
assert!(weak2.upgrade().is_none());
}
async fn create_clique(num: usize, mode: WhatAmI, gossip: bool) -> Vec<Session> {
let port_offset = calc_offset(mode, gossip);
let port = 12450 + port_offset;
let peer_endpoint = format!("tcp/127.0.0.1:{}", port);
let main = create_session(mode, vec![peer_endpoint.parse().unwrap()], vec![], gossip).await;
let mut result = vec![main];
for _ in 0..num {
let client =
create_session(mode, vec![], vec![peer_endpoint.parse().unwrap()], gossip).await;
result.push(client);
}
result
}
fn calc_offset(mode: WhatAmI, gossip: bool) -> u16 {
let mode = match mode {
WhatAmI::Router => 0,
WhatAmI::Peer => 1,
WhatAmI::Client => 2,
};
let gossip = match gossip {
true => 0,
false => 1,
};
mode + gossip * 3
}
#[test_matrix(
[WhatAmI::Peer],
[true, false]
)]
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn dead_after_drop_many_times(mode: WhatAmI, gossip: bool) {
let num_clients = 10;
let sessions = create_clique(num_clients, mode, gossip).await;
for session in &sessions {
assert!(session.inner_weak().upgrade().is_some());
let runtime = get_static_runtime(session).expect("runtime should be present");
assert!(runtime.state_weak().upgrade().is_some());
}
for session in sessions {
let runtime_state = get_static_runtime(&session)
.expect("runtime should be present")
.state_weak();
let session_inner = session.inner_weak();
assert!(session_inner.upgrade().is_some());
assert!(runtime_state.upgrade().is_some());
session.close().await.unwrap();
drop(session);
assert!(session_inner.upgrade().is_none());
assert!(runtime_state.upgrade().is_none());
}
}
}