use std::time::Duration;
use zenoh_config::WhatAmI;
use zenoh_core::ztimeout;
use zenoh_link::EndPoint;
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_adminspace_wonly() {
const TIMEOUT: Duration = Duration::from_secs(60);
zenoh_util::init_log_from_env_or("error");
let router = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen.endpoints.set(vec![]).unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.adminspace.set_enabled(true).unwrap();
c.adminspace.permissions.set_read(false).unwrap();
c.adminspace.permissions.set_write(true).unwrap();
let s = ztimeout!(zenoh::open(c)).unwrap();
s
};
let zid = router.zid();
let root = router
.get(format!("@/{zid}/router"))
.await
.unwrap()
.into_iter()
.next();
assert!(root.is_none());
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_adminspace_read() {
const TIMEOUT: Duration = Duration::from_secs(60);
const ROUTER_ENDPOINT: &str = "tcp/localhost:31000";
const MULTICAST_ENDPOINT: &str = "udp/224.0.0.224:31000";
zenoh_util::init_log_from_env_or("error");
let router = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen
.endpoints
.set(vec![
ROUTER_ENDPOINT.parse::<EndPoint>().unwrap(),
MULTICAST_ENDPOINT.parse::<EndPoint>().unwrap(),
])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.adminspace.set_enabled(true).unwrap();
c.adminspace.permissions.set_read(true).unwrap();
c.adminspace.permissions.set_write(false).unwrap();
c.plugins_loading.set_enabled(true).unwrap();
let plugin_search_dir = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.join("target/debug");
c.plugins_loading
.set_search_dirs(zenoh_util::LibSearchDirs::from_paths(&[plugin_search_dir
.to_str()
.unwrap()]))
.unwrap();
c.insert_json5("plugins/rest/http_port", "\"8080\"")
.unwrap();
c.insert_json5("plugins/rest/__required__", "true").unwrap();
let s = ztimeout!(zenoh::open(c)).unwrap();
s
};
let zid = router.zid();
let router2 = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen.endpoints.set(vec![]).unwrap();
c.connect
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
ztimeout!(zenoh::open(c)).unwrap()
};
let zid2 = router2.zid();
let peer = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Peer)).unwrap();
c.listen
.endpoints
.set(vec![MULTICAST_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
ztimeout!(zenoh::open(c)).unwrap()
};
let root = router
.get(format!("@/{zid}/router"))
.await
.unwrap()
.into_iter()
.next();
assert!(root.is_some());
let metrics = router
.get(format!("@/{zid}/router/metrics"))
.await
.unwrap()
.into_iter()
.next();
assert!(metrics.is_some());
let routers_graph = router
.get(format!("@/{zid}/router/linkstate/north"))
.await
.unwrap()
.into_iter()
.next();
assert!(routers_graph.is_some());
let subscribers: Vec<String> = router
.get(format!("@/{zid}/router/subscriber/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(subscribers, vec![] as Vec<String>);
let subscriber = router.declare_subscriber("some/key").await.unwrap();
let subscribers: Vec<String> = router
.get(format!("@/{zid}/router/subscriber/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(
subscribers,
vec![format!("@/{zid}/router/subscriber/some/key")]
);
subscriber.undeclare().await.unwrap();
let subscribers: Vec<String> = router
.get(format!("@/{zid}/router/subscriber/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(subscribers, vec![] as Vec<String>);
let publishers: Vec<String> = router
.get(format!("@/{zid}/router/publisher/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(publishers, vec![] as Vec<String>);
let publisher = router.declare_publisher("some/key").await.unwrap();
let publishers: Vec<String> = router
.get(format!("@/{zid}/router/publisher/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(
publishers,
vec![format!("@/{zid}/router/publisher/some/key")]
);
publisher.undeclare().await.unwrap();
let publishers: Vec<String> = router
.get(format!("@/{zid}/router/publisher/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(publishers, vec![] as Vec<String>);
let queryables: Vec<String> = router
.get(format!("@/{zid}/router/queryable/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(queryables, vec![] as Vec<String>);
let queryable = router.declare_queryable("some/key").await.unwrap();
let queryables: Vec<String> = router
.get(format!("@/{zid}/router/queryable/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(
queryables,
vec![format!("@/{zid}/router/queryable/some/key")]
);
queryable.undeclare().await.unwrap();
let queryables: Vec<String> = router
.get(format!("@/{zid}/router/queryable/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(queryables, vec![] as Vec<String>);
let queriers: Vec<String> = router
.get(format!("@/{zid}/router/querier/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(queriers, vec![] as Vec<String>);
let querier = router.declare_querier("some/key").await.unwrap();
let queriers: Vec<String> = router
.get(format!("@/{zid}/router/querier/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(queriers, vec![format!("@/{zid}/router/querier/some/key")]);
querier.undeclare().await.unwrap();
let queriers: Vec<String> = router
.get(format!("@/{zid}/router/querier/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(queriers, vec![] as Vec<String>);
let tokens: Vec<String> = router
.get(format!("@/{zid}/router/token/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(tokens, vec![] as Vec<String>);
let token = router.liveliness().declare_token("some/key").await.unwrap();
let tokens: Vec<String> = router
.get(format!("@/{zid}/router/token/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(tokens, vec![format!("@/{zid}/router/token/some/key")]);
token.undeclare().await.unwrap();
let tokens: Vec<String> = router
.get(format!("@/{zid}/router/token/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(tokens, vec![] as Vec<String>);
let routes: Vec<String> = router
.get(format!("@/{zid}/router/route/successor/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert!(!routes.is_empty());
let route = router
.get(format!(
"@/{zid}/router/route/successor/src/{zid}/dst/{zid2}"
))
.await
.unwrap()
.into_iter()
.next();
assert!(route.is_some());
let plugins = router
.get(format!("@/{zid}/router/plugins/**"))
.await
.unwrap()
.into_iter()
.next();
#[cfg(feature = "plugins")]
assert!(plugins.is_some());
#[cfg(not(feature = "plugins"))]
assert!(plugins.is_none());
let plugins_status = router
.get(format!("@/{zid}/router/status/plugins/**"))
.await
.unwrap()
.into_iter()
.next();
#[cfg(feature = "plugins")]
assert!(plugins_status.is_some());
#[cfg(not(feature = "plugins"))]
assert!(plugins_status.is_none());
let count = router.get("@/**").await.unwrap().iter().count();
assert!(count > 0);
let count = router.get("@/*/**").await.unwrap().iter().count();
assert!(count > 0);
peer.close().await.unwrap();
router2.close().await.unwrap();
router.close().await.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_adminspace_ronly() {
const TIMEOUT: Duration = Duration::from_secs(60);
zenoh_util::init_log_from_env_or("error");
let router = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen.endpoints.set(vec![]).unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.adminspace.set_enabled(true).unwrap();
c.adminspace.permissions.set_read(true).unwrap();
c.adminspace.permissions.set_write(false).unwrap();
let s = ztimeout!(zenoh::open(c)).unwrap();
s
};
let zid = router.zid();
router
.put(format!("@/{zid}/router/config/zid"), "1")
.await
.unwrap();
router
.delete(format!("@/{zid}/router/config/zid"))
.await
.unwrap();
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_adminspace_write() {
const TIMEOUT: Duration = Duration::from_secs(60);
zenoh_util::init_log_from_env_or("error");
let router = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen.endpoints.set(vec![]).unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.adminspace.set_enabled(true).unwrap();
c.adminspace.permissions.set_read(true).unwrap();
c.adminspace.permissions.set_write(true).unwrap();
let s = ztimeout!(zenoh::open(c)).unwrap();
s
};
let zid = router.zid();
router
.put(format!("@/{zid}/router/config/zid"), "1")
.await
.unwrap();
router
.delete(format!("@/{zid}/router/config/zid"))
.await
.unwrap();
}
macro_rules! navigate_json_path {
($json:expr, $field_path:expr) => {{
let field_path = $field_path;
let parts: Vec<&str> = field_path.split('.').collect();
let mut current = &$json;
for (i, part) in parts.iter().enumerate() {
assert!(
current.get(part).is_some(),
"JSON field '{}' does not exist (failed at '{}')",
field_path,
parts[..=i].join(".")
);
current = ¤t[part];
}
(field_path, current)
}};
}
macro_rules! assert_json_field_eq {
($json:expr, $field:expr, $expected:expr) => {{
let (field_path, current) = navigate_json_path!($json, $field);
let expected_value = serde_json::json!($expected);
assert_eq!(
current, &expected_value,
"JSON field '{}' should equal {:?}, got: {:?}",
field_path, expected_value, current
);
}};
}
macro_rules! assert_json_field {
($json:expr, $field:expr, bool) => {{
let (field_path, current) = navigate_json_path!($json, $field);
assert!(
current.is_boolean(),
"JSON field '{}' should be a boolean, got: {:?}",
field_path,
current
);
}};
($json:expr, $field:expr, str) => {{
let (field_path, current) = navigate_json_path!($json, $field);
assert!(
current.is_string(),
"JSON field '{}' should be a string, got: {:?}",
field_path,
current
);
}};
($json:expr, $field:expr, number) => {{
let (field_path, current) = navigate_json_path!($json, $field);
assert!(
current.is_number(),
"JSON field '{}' should be a number, got: {:?}",
field_path,
current
);
}};
($json:expr, $field:expr, array) => {{
let (field_path, current) = navigate_json_path!($json, $field);
assert!(
current.is_array(),
"JSON field '{}' should be an array, got: {:?}",
field_path,
current
);
}};
($json:expr, $field:expr, object) => {{
let (field_path, current) = navigate_json_path!($json, $field);
assert!(
current.is_object(),
"JSON field '{}' should be an object, got: {:?}",
field_path,
current
);
}};
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_adminspace_transports_and_links() {
const TIMEOUT: Duration = Duration::from_secs(60);
const ROUTER_ENDPOINT: &str = "tcp/localhost:31001";
const ROUTER_CONNECT_ENDPOINT: &str = "tcp/localhost:31001?rel=1;prio=1-7";
zenoh_util::init_log_from_env_or("error");
let router1 = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.adminspace.set_enabled(true).unwrap();
c.adminspace.permissions.set_read(true).unwrap();
c.adminspace.permissions.set_write(false).unwrap();
c.transport.unicast.qos.set_enabled(true).unwrap();
ztimeout!(zenoh::open(c)).unwrap()
};
let zid1 = router1.zid();
let transports_unicast: Vec<String> = router1
.get(format!("@/{zid1}/session/transport/unicast/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert_eq!(transports_unicast, vec![] as Vec<String>);
let (transport_tx, mut transport_rx) = tokio::sync::mpsc::unbounded_channel();
let (link_tx, mut link_rx) = tokio::sync::mpsc::unbounded_channel();
let transport_subscriber = router1
.declare_subscriber(format!("@/{zid1}/session/transport/unicast/*"))
.callback(move |sample| {
transport_tx.send(sample).unwrap();
})
.await
.unwrap();
let link_subscriber = router1
.declare_subscriber(format!("@/{zid1}/session/transport/unicast/**/link/*"))
.callback(move |sample| {
link_tx.send(sample).unwrap();
})
.await
.unwrap();
let router2 = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen.endpoints.set(vec![]).unwrap();
c.connect
.endpoints
.set(vec![ROUTER_CONNECT_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.transport.unicast.qos.set_enabled(true).unwrap();
ztimeout!(zenoh::open(c)).unwrap()
};
let zid2 = router2.zid();
tokio::time::sleep(Duration::from_millis(500)).await;
tokio::time::sleep(Duration::from_millis(200)).await;
let transport_sample_sub = transport_rx
.try_recv()
.expect("Should receive transport notification");
assert!(
transport_rx.try_recv().is_err(),
"Should receive exactly one transport notification"
);
assert_eq!(
transport_sample_sub.key_expr().as_str(),
format!("@/{zid1}/session/transport/unicast/{zid2}")
);
assert_eq!(
transport_sample_sub.encoding(),
&zenoh::bytes::Encoding::APPLICATION_JSON
);
let transport_bytes_sub = transport_sample_sub.payload().to_bytes();
let transport_json_sub: serde_json::Value = serde_json::from_slice(&transport_bytes_sub)
.expect("Failed to parse transport JSON from subscription");
println!(
"\nTransport JSON from subscription:\n{}",
serde_json::to_string_pretty(&transport_json_sub).unwrap()
);
assert_json_field_eq!(transport_json_sub, "zid", &zid2.to_string());
assert_json_field_eq!(transport_json_sub, "whatami", "router");
assert_json_field!(transport_json_sub, "is_qos", bool);
#[cfg(feature = "shared-memory")]
assert_json_field!(transport_json_sub, "is_shm", bool);
let link_sample_sub = link_rx
.try_recv()
.expect("Should receive link notification");
assert!(
link_rx.try_recv().is_err(),
"Should receive exactly one link notification"
);
assert!(
link_sample_sub
.key_expr()
.as_str()
.contains(&format!("@/{zid1}/session/transport/unicast/{zid2}/link/")),
"Link key expression should contain the expected transport path"
);
assert_eq!(
link_sample_sub.encoding(),
&zenoh::bytes::Encoding::APPLICATION_JSON
);
let link_bytes_sub = link_sample_sub.payload().to_bytes();
let link_json_sub: serde_json::Value = serde_json::from_slice(&link_bytes_sub)
.expect("Failed to parse link JSON from subscription");
println!(
"\nLink JSON from subscription:\n{}",
serde_json::to_string_pretty(&link_json_sub).unwrap()
);
assert_json_field!(link_json_sub, "src", str);
assert_json_field!(link_json_sub, "dst", str);
assert_json_field!(link_json_sub, "mtu", number);
assert_json_field_eq!(link_json_sub, "is_streamed", true);
assert_json_field!(link_json_sub, "interfaces", array);
assert_json_field_eq!(link_json_sub, "priorities.start", "RealTime");
assert_json_field_eq!(link_json_sub, "priorities.end", "Background");
assert_json_field_eq!(link_json_sub, "reliability", "Reliable");
let transports_unicast: Vec<String> = router1
.get(format!("@/{zid1}/session/transport/unicast/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert!(!transports_unicast.is_empty());
assert!(transports_unicast
.iter()
.any(|k| k.contains(&zid2.to_string())));
let transport_reply = router1
.get(format!("@/{zid1}/session/transport/unicast/{zid2}"))
.await
.unwrap()
.into_iter()
.next();
assert!(transport_reply.is_some());
let binding = transport_reply.unwrap();
let transport_sample = binding.result().ok().unwrap();
assert_eq!(
transport_sample.key_expr().as_str(),
format!("@/{zid1}/session/transport/unicast/{zid2}")
);
assert_eq!(
transport_sample.encoding(),
&zenoh::bytes::Encoding::APPLICATION_JSON
);
let transport_bytes = transport_sample.payload().to_bytes();
let transport_json: serde_json::Value =
serde_json::from_slice(&transport_bytes).expect("Failed to parse transport JSON");
println!(
"TransportPeer JSON:\n{}",
serde_json::to_string_pretty(&transport_json).unwrap()
);
assert_json_field_eq!(transport_json, "zid", &zid2.to_string());
assert_json_field_eq!(transport_json, "whatami", "router");
assert_json_field!(transport_json, "is_qos", bool);
#[cfg(feature = "shared-memory")]
assert_json_field!(transport_json, "is_shm", bool);
let links: Vec<String> = router1
.get(format!("@/{zid1}/session/transport/unicast/{zid2}/link/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert!(!links.is_empty());
let link_key = &links[0];
let link_id = link_key.split('/').next_back().unwrap();
let link_reply = router1
.get(format!(
"@/{zid1}/session/transport/unicast/{zid2}/link/{link_id}"
))
.await
.unwrap()
.into_iter()
.next();
assert!(link_reply.is_some());
let binding = link_reply.unwrap();
let link_sample = binding.result().ok().unwrap();
assert_eq!(
link_sample.encoding(),
&zenoh::bytes::Encoding::APPLICATION_JSON
);
let link_bytes = link_sample.payload().to_bytes();
let link_json: serde_json::Value =
serde_json::from_slice(&link_bytes).expect("Failed to parse link JSON");
println!(
"\nLink JSON:\n{}",
serde_json::to_string_pretty(&link_json).unwrap()
);
println!("\nNote: 'priorities' and 'reliability' are set via endpoint metadata (e.g., ?rel=1;prio=1-7)");
assert_json_field!(link_json, "src", str);
assert_json_field!(link_json, "dst", str);
assert_json_field!(link_json, "mtu", number);
assert_json_field_eq!(link_json, "is_streamed", true);
assert_json_field!(link_json, "interfaces", array);
assert_json_field_eq!(link_json, "priorities.start", "RealTime");
assert_json_field_eq!(link_json, "priorities.end", "Background");
assert_json_field_eq!(link_json, "reliability", "Reliable");
let all_transports: Vec<String> = router1
.get(format!("@/{zid1}/session/**"))
.await
.unwrap()
.iter()
.map(|r| r.result().ok().unwrap().key_expr().to_string())
.collect();
assert!(all_transports
.iter()
.any(|k| k.contains("transport/unicast") && k.contains(&zid2.to_string())));
transport_subscriber.undeclare().await.unwrap();
link_subscriber.undeclare().await.unwrap();
router2.close().await.unwrap();
router1.close().await.unwrap();
}
#[cfg(feature = "stats")]
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn test_adminspace_regression_1() {
const TIMEOUT: Duration = Duration::from_secs(60);
const ROUTER_ENDPOINT: &str = "tcp/localhost:31002";
const ROUTER_CONNECT_ENDPOINT: &str = "tcp/localhost:31002?rel=1;prio=1-7";
zenoh_util::init_log_from_env_or("error");
let router1 = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen
.endpoints
.set(vec![ROUTER_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.adminspace.set_enabled(true).unwrap();
c.adminspace.permissions.set_read(true).unwrap();
c.adminspace.permissions.set_write(false).unwrap();
c.transport.unicast.qos.set_enabled(true).unwrap();
ztimeout!(zenoh::open(c)).unwrap()
};
let zid1 = router1.zid();
let _router2 = {
let mut c = zenoh_config::Config::default();
c.set_mode(Some(WhatAmI::Router)).unwrap();
c.listen.endpoints.set(vec![]).unwrap();
c.connect
.endpoints
.set(vec![ROUTER_CONNECT_ENDPOINT.parse::<EndPoint>().unwrap()])
.unwrap();
c.scouting.multicast.set_enabled(Some(false)).unwrap();
c.transport.unicast.qos.set_enabled(true).unwrap();
ztimeout!(zenoh::open(c)).unwrap()
};
tokio::time::sleep(Duration::from_millis(500)).await;
let reply = router1
.get(format!("@/{zid1}/router?_stats=true"))
.await
.unwrap()
.into_iter()
.next();
assert!(reply.is_some());
let binding = reply.unwrap();
let sample = binding.result().ok().unwrap();
assert_eq!(sample.encoding(), &zenoh::bytes::Encoding::APPLICATION_JSON);
let bytes = sample.payload().to_bytes();
let json: serde_json::Value =
serde_json::from_slice(&bytes).expect("Failed to parse transport JSON");
let sessions = navigate_json_path!(json, "sessions").1.as_array().unwrap();
assert_eq!(sessions.len(), 1);
let rx_t_bytes = navigate_json_path!(sessions[0], "stats.rx_bytes")
.1
.as_u64()
.unwrap();
let tx_t_bytes = navigate_json_path!(sessions[0], "stats.tx_bytes")
.1
.as_u64()
.unwrap();
let links = navigate_json_path!(sessions[0], "links")
.1
.as_array()
.unwrap();
assert_eq!(links.len(), 1);
let rx_l_bytes = navigate_json_path!(links[0], "stats.rx_bytes")
.1
.as_u64()
.unwrap();
let tx_l_bytes = navigate_json_path!(links[0], "stats.tx_bytes")
.1
.as_u64()
.unwrap();
assert_ne!(rx_t_bytes, 0);
assert_ne!(tx_t_bytes, 0);
assert_ne!(rx_l_bytes, 0);
assert_ne!(tx_l_bytes, 0);
assert_eq!(rx_t_bytes, rx_l_bytes);
assert_eq!(tx_t_bytes, tx_l_bytes);
}