flare_zrpc/
client.rs

1use zenoh::{
2    key_expr::KeyExpr, qos::CongestionControl, query::ConsolidationMode,
3};
4
5use crate::{msg::MsgSerde, ZrpcError};
6use std::marker::PhantomData;
7
8use super::ZrpcTypeConfig;
9
10#[derive(Clone, Debug)]
11pub struct ZrpcClientConfig {
12    pub service_id: String,
13    pub target: zenoh::query::QueryTarget,
14    pub channel_size: usize,
15}
16
17#[derive(Clone)]
18pub struct ZrpcClient<C>
19where
20    C: ZrpcTypeConfig,
21{
22    key_expr: KeyExpr<'static>,
23    z_session: zenoh::Session,
24    config: ZrpcClientConfig,
25    _conf: PhantomData<C>,
26}
27
28impl<C> ZrpcClient<C>
29where
30    C: ZrpcTypeConfig,
31{
32    pub async fn new(service_id: String, z_session: zenoh::Session) -> Self {
33        let key_expr = z_session
34            .declare_keyexpr(service_id.clone())
35            .await
36            .expect("Declare key_expr for zenoh");
37        Self {
38            key_expr,
39            z_session,
40            config: ZrpcClientConfig {
41                service_id,
42                target: zenoh::query::QueryTarget::BestMatching,
43                channel_size: 1,
44            },
45            _conf: PhantomData,
46        }
47    }
48
49    pub async fn with_config(
50        config: ZrpcClientConfig,
51        z_session: zenoh::Session,
52    ) -> Self {
53        let key_expr = z_session
54            .declare_keyexpr(config.service_id.clone())
55            .await
56            .expect("Declare key_expr for zenoh");
57        Self {
58            key_expr,
59            z_session,
60            config,
61            _conf: PhantomData,
62        }
63    }
64
65    pub async fn call(
66        &self,
67        payload: &C::In,
68    ) -> Result<C::Out, ZrpcError<C::Err>> {
69        let byte = C::InSerde::to_zbyte(&payload)
70            .map_err(|e| ZrpcError::EncodeError(e))?;
71
72        let (tx, rx) = flume::bounded(self.config.channel_size);
73
74        let _ = self
75            .z_session
76            .get(self.key_expr.clone())
77            .payload(byte)
78            .target(self.config.target)
79            .consolidation(ConsolidationMode::None)
80            .congestion_control(CongestionControl::Block)
81            // .with((tx, rx))
82            .callback(move |s| {
83                let _ = tx.send(s);
84            })
85            .await?;
86        let reply = rx.recv_async().await?;
87        match reply.result() {
88            Ok(sample) => {
89                let res = C::OutSerde::from_zbyte(sample.payload())
90                    .map_err(|e| ZrpcError::DecodeError(e))?;
91                Ok(res)
92            }
93            Err(err) => {
94                let wrapper = C::ErrSerde::from_zbyte(err.payload())
95                    .map_err(|e| ZrpcError::DecodeError(e))?;
96                let zrpc_server_error = C::unwrap(wrapper);
97                let err = match zrpc_server_error {
98                    super::ZrpcServerError::AppError(app_err) => {
99                        ZrpcError::AppError(app_err)
100                    }
101                    super::ZrpcServerError::SystemError(zrpc_system_error) => {
102                        ZrpcError::ServerSystemError(zrpc_system_error)
103                    }
104                };
105                Err(err)
106            }
107        }
108    }
109
110    pub async fn call_with_key(
111        &self,
112        key: String,
113        payload: &C::In,
114    ) -> Result<C::Out, ZrpcError<C::Err>> {
115        let byte = C::InSerde::to_zbyte(&payload)
116            .map_err(|e| ZrpcError::EncodeError(e))?;
117
118        let (tx, rx) = flume::bounded(self.config.channel_size);
119
120        self.z_session
121            .get(self.key_expr.join(&key).unwrap())
122            .payload(byte)
123            .target(self.config.target)
124            .consolidation(ConsolidationMode::None)
125            .congestion_control(CongestionControl::Block)
126            .callback(move |s| {
127                let _ = tx.send(s);
128            })
129            .await?;
130
131        let reply = rx.recv_async().await?;
132        match reply.result() {
133            Ok(sample) => {
134                let res = C::OutSerde::from_zbyte(sample.payload())
135                    .map_err(|e| ZrpcError::DecodeError(e))?;
136                Ok(res)
137            }
138            Err(err) => {
139                let wrapper = C::ErrSerde::from_zbyte(err.payload())
140                    .map_err(|e| ZrpcError::DecodeError(e))?;
141                let zrpc_server_error = C::unwrap(wrapper);
142                let err = match zrpc_server_error {
143                    super::ZrpcServerError::AppError(app_err) => {
144                        ZrpcError::AppError(app_err)
145                    }
146                    super::ZrpcServerError::SystemError(zrpc_system_error) => {
147                        ZrpcError::ServerSystemError(zrpc_system_error)
148                    }
149                };
150                Err(err)
151            }
152        }
153    }
154}