extern crate base64;
extern crate serde;
use std::time::Duration;
use serde::{Deserialize, Serialize};
use zenoh::config::ZenohId;
use zenoh::handlers::FifoChannelHandler;
use zenoh::query::*;
use zenoh::Session;
use crate::request::Request;
use crate::response::Response;
use crate::serialize::{deserialize, serialize};
use crate::status::Code;
use crate::status::Status;
use crate::types::ServerMetadata;
use crate::types::WireMessage;
use crate::zrpcresult::ZRPCResult;
#[derive(Clone, Debug)]
pub struct RPCClientChannel {
z: Session,
service_name: String,
}
impl RPCClientChannel {
pub fn new<IntoString>(z: Session, service_name: IntoString) -> RPCClientChannel
where
IntoString: Into<String>,
{
RPCClientChannel {
z,
service_name: service_name.into(),
}
}
async fn send<T>(
&self,
server_id: ZenohId,
request: &Request<T>,
method: &str,
tout: Duration,
) -> ZRPCResult<FifoChannelHandler<Reply>>
where
T: Serialize + Clone + std::fmt::Debug,
for<'de2> T: Deserialize<'de2>,
{
let req = serialize(&request)?;
let selector = format!(
"@rpc/{}/service/{}/{}",
server_id, self.service_name, method
);
tracing::debug!("Sending {:?} to {:?}", request, selector);
Ok(self
.z
.get(&selector)
.payload(req)
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.timeout(tout)
.await?)
}
pub async fn call_fun<T, U>(
&self,
server_id: ZenohId,
request: Request<T>,
method: &str,
tout: Duration,
) -> Result<Response<U>, Status>
where
T: Serialize + Clone + std::fmt::Debug,
for<'de2> T: Deserialize<'de2>,
U: Serialize + Clone + std::fmt::Debug,
for<'de3> U: Deserialize<'de3>,
{
tracing::debug!("calling function: {request:?}");
let data_receiver = self
.send(server_id, &request, method, tout)
.await
.map_err(|e| Status::new(Code::InternalError, format!("communication error: {e:?}")))?;
let reply = data_receiver.recv_async().await;
tracing::debug!("Response from zenoh is {:?}", reply);
if let Ok(reply) = reply {
match reply.result() {
Ok(sample) => {
let raw_data = sample.payload().to_bytes().to_vec();
let wmsg: WireMessage = deserialize(&raw_data).map_err(|e| {
Status::new(Code::InternalError, format!("deserialization error: {e:?}"))
})?;
match wmsg.payload {
Some(raw_data) => match deserialize::<U>(&raw_data) {
Ok(r) => {
Ok(Response::new(r))
}
Err(_) => Err(wmsg.status),
},
None => Err(wmsg.status),
}
}
Err(e) => {
tracing::error!("Unable to get sample from {e:?}");
Err(Status::new(
Code::InternalError,
format!("Unable to get sample from {e:?}"),
))
}
}
} else {
tracing::error!("No data from server");
Err(Status::new(
Code::InternalError,
format!("No data from call_fun for Request {request:?}"),
))
}
}
pub async fn get_servers_metadata(
&self,
ids: &[ZenohId],
tout: Duration,
) -> Result<Vec<ServerMetadata>, Status> {
let mut tot_metadata = Vec::with_capacity(ids.len());
for id in ids {
let ke = format!("@rpc/{id}/metadata");
let data = self
.z
.get(ke)
.target(QueryTarget::All)
.consolidation(ConsolidationMode::None)
.timeout(tout)
.await
.map_err(|e| {
Status::new(Code::InternalError, format!("communication error: {e:?}"))
})?;
let metadata = data
.into_iter()
.filter_map(|r| r.into_result().ok())
.filter_map(|s| {
let raw_data = s.payload().to_bytes().to_vec();
deserialize::<WireMessage>(&raw_data).ok()
})
.filter_map(|wmgs| wmgs.payload)
.filter_map(|pl| deserialize::<ServerMetadata>(&pl).ok())
.filter(|m| ids.contains(&m.id))
.collect::<Vec<ServerMetadata>>();
tot_metadata.extend_from_slice(&metadata);
}
Ok(tot_metadata)
}
}