use tokio::io::AsyncReadExt;
use zenoh::config::ZenohId;
use zenoh::key_expr::format::KeFormat;
use zenoh::key_expr::KeyExpr;
use zrpc::prelude::*;
use std::collections::HashSet;
use std::str::FromStr;
use std::time::Duration;
use zenoh::Session;
#[allow(unused)]
#[derive(Clone, Debug)]
pub struct ServerChecker {
pub(crate) ch: RPCClientChannel,
pub(crate) z: Session,
pub(crate) tout: Duration,
pub(crate) labels: HashSet<String>,
pub(crate) service_name: String,
kef: String,
}
impl ServerChecker {
pub fn new(session: Session, service_name: String) -> Self {
let kef = format!("@rpc/${{zid:*}}/service/{service_name}");
Self {
kef,
ch: RPCClientChannel::new(session.clone(), &service_name),
z: session,
tout: std::time::Duration::from_secs(60u16 as u64),
labels: HashSet::new(),
service_name: service_name.clone(),
}
}
async fn find_servers_livelines(&self) -> Result<Vec<ZenohId>, Status> {
let res = self
.z
.liveliness()
.get(format!("@rpc/*/service/{}", self.service_name))
.await
.map_err(|e| {
Status::unavailable(format!("Unable to perform liveliness query: {e:?}"))
})?;
let ids = res
.into_iter()
.filter_map(|e| e.into_result().ok())
.map(|e| self.extract_id_from_ke(e.key_expr()))
.collect::<Result<Vec<ZenohId>, Status>>()?;
Ok(ids)
}
async fn get_servers_metadata(&self) -> Result<Vec<ServerMetadata>, Status> {
let ids = self.find_servers_livelines().await?;
self.ch.get_servers_metadata(&ids, self.tout).await
}
async fn find_servers(&self) -> Result<Vec<ZenohId>, Status> {
let metadatas = self.get_servers_metadata().await?;
let ids: Vec<ZenohId> = metadatas
.into_iter()
.filter(|m| m.labels.is_superset(&self.labels))
.map(|m| m.id)
.collect();
Ok(ids)
}
fn extract_id_from_ke(&self, ke: &KeyExpr) -> Result<ZenohId, Status> {
let ke_format = KeFormat::new(&self.kef)
.map_err(|e| Status::internal_error(format!("Cannot create KE format: {e:?}")))?;
let id_str = ke_format
.parse(ke)
.map_err(|e| Status::internal_error(format!("Unable to parse key expression: {e:?}")))?
.get("zid")
.map_err(|e| {
Status::internal_error(format!(
"Unable to get server id from key expression: {e:?}"
))
})?;
ZenohId::from_str(id_str)
.map_err(|e| Status::internal_error(format!("Unable to convert str to ZenohId: {e:?}")))
}
}
#[tokio::main]
async fn main() {
{
let config = zenoh::config::Config::from_file("./zenoh.json").unwrap();
let zsession = zenoh::open(config).await.unwrap();
let z = zsession.clone();
let server_checker = ServerChecker::new(z, "K8sController".into());
println!("Checking server liveliness");
let res = server_checker.find_servers_livelines().await.unwrap();
println!("Res is: {res:?}");
press_to_continue().await;
println!("Checking server metadata");
let res = server_checker.get_servers_metadata().await.unwrap();
println!("Res is: {res:?}");
press_to_continue().await;
println!("Checking server ids");
let res = server_checker.find_servers().await.unwrap();
println!("Res is: {res:?}");
press_to_continue().await;
}
}
async fn press_to_continue() {
println!("Press ENTER to continue...");
let buffer = &mut [0u8];
tokio::io::stdin().read_exact(buffer).await.unwrap();
}