1use zenoh::{
2 key_expr::KeyExpr, qos::CongestionControl, query::ConsolidationMode,
3};
4
5use crate::{msg::MsgSerde, ZrpcError};
6use std::marker::PhantomData;
7
8use super::ZrpcTypeConfig;
9
10#[derive(Clone, Debug)]
11pub struct ZrpcClientConfig {
12 pub service_id: String,
13 pub target: zenoh::query::QueryTarget,
14 pub channel_size: usize,
15}
16
17#[derive(Clone)]
18pub struct ZrpcClient<C>
19where
20 C: ZrpcTypeConfig,
21{
22 key_expr: KeyExpr<'static>,
23 z_session: zenoh::Session,
24 config: ZrpcClientConfig,
25 _conf: PhantomData<C>,
26}
27
28impl<C> ZrpcClient<C>
29where
30 C: ZrpcTypeConfig,
31{
32 pub async fn new(service_id: String, z_session: zenoh::Session) -> Self {
33 let key_expr = z_session
34 .declare_keyexpr(service_id.clone())
35 .await
36 .expect("Declare key_expr for zenoh");
37 Self {
38 key_expr,
39 z_session,
40 config: ZrpcClientConfig {
41 service_id,
42 target: zenoh::query::QueryTarget::BestMatching,
43 channel_size: 1,
44 },
45 _conf: PhantomData,
46 }
47 }
48
49 pub async fn with_config(
50 config: ZrpcClientConfig,
51 z_session: zenoh::Session,
52 ) -> Self {
53 let key_expr = z_session
54 .declare_keyexpr(config.service_id.clone())
55 .await
56 .expect("Declare key_expr for zenoh");
57 Self {
58 key_expr,
59 z_session,
60 config,
61 _conf: PhantomData,
62 }
63 }
64
65 pub async fn call(
66 &self,
67 payload: &C::In,
68 ) -> Result<C::Out, ZrpcError<C::Err>> {
69 let byte = C::InSerde::to_zbyte(&payload)
70 .map_err(|e| ZrpcError::EncodeError(e))?;
71
72 let (tx, rx) = flume::bounded(self.config.channel_size);
73
74 let _ = self
75 .z_session
76 .get(self.key_expr.clone())
77 .payload(byte)
78 .target(self.config.target)
79 .consolidation(ConsolidationMode::None)
80 .congestion_control(CongestionControl::Block)
81 .callback(move |s| {
83 let _ = tx.send(s);
84 })
85 .await?;
86 let reply = rx.recv_async().await?;
87 match reply.result() {
88 Ok(sample) => {
89 let res = C::OutSerde::from_zbyte(sample.payload())
90 .map_err(|e| ZrpcError::DecodeError(e))?;
91 Ok(res)
92 }
93 Err(err) => {
94 let wrapper = C::ErrSerde::from_zbyte(err.payload())
95 .map_err(|e| ZrpcError::DecodeError(e))?;
96 let zrpc_server_error = C::unwrap(wrapper);
97 let err = match zrpc_server_error {
98 super::ZrpcServerError::AppError(app_err) => {
99 ZrpcError::AppError(app_err)
100 }
101 super::ZrpcServerError::SystemError(zrpc_system_error) => {
102 ZrpcError::ServerSystemError(zrpc_system_error)
103 }
104 };
105 Err(err)
106 }
107 }
108 }
109
110 pub async fn call_with_key(
111 &self,
112 key: String,
113 payload: &C::In,
114 ) -> Result<C::Out, ZrpcError<C::Err>> {
115 let byte = C::InSerde::to_zbyte(&payload)
116 .map_err(|e| ZrpcError::EncodeError(e))?;
117
118 let (tx, rx) = flume::bounded(self.config.channel_size);
119
120 self.z_session
121 .get(self.key_expr.join(&key).unwrap())
122 .payload(byte)
123 .target(self.config.target)
124 .consolidation(ConsolidationMode::None)
125 .congestion_control(CongestionControl::Block)
126 .callback(move |s| {
127 let _ = tx.send(s);
128 })
129 .await?;
130
131 let reply = rx.recv_async().await?;
132 match reply.result() {
133 Ok(sample) => {
134 let res = C::OutSerde::from_zbyte(sample.payload())
135 .map_err(|e| ZrpcError::DecodeError(e))?;
136 Ok(res)
137 }
138 Err(err) => {
139 let wrapper = C::ErrSerde::from_zbyte(err.payload())
140 .map_err(|e| ZrpcError::DecodeError(e))?;
141 let zrpc_server_error = C::unwrap(wrapper);
142 let err = match zrpc_server_error {
143 super::ZrpcServerError::AppError(app_err) => {
144 ZrpcError::AppError(app_err)
145 }
146 super::ZrpcServerError::SystemError(zrpc_system_error) => {
147 ZrpcError::ServerSystemError(zrpc_system_error)
148 }
149 };
150 Err(err)
151 }
152 }
153 }
154}