wireman_core/client/
mod.rs

1#![allow(clippy::module_name_repetitions)]
2pub(crate) mod codec;
3pub mod reflection;
4pub mod tls;
5
6use crate::descriptor::response::StreamingResponse;
7use crate::descriptor::RequestMessage;
8use crate::descriptor::ResponseMessage;
9use crate::error::Error;
10use crate::Result;
11use tls::TlsConfig;
12use tonic::transport::Uri;
13use tonic::{client::Grpc, transport::Channel};
14
15/// Represents a `gRPC` client for making RPC calls.
16#[derive(Clone, Debug)]
17pub struct GrpcClient {
18    grpc: Grpc<Channel>,
19}
20
21impl GrpcClient {
22    /// Returns a new Grpc Client. if no tls is given, the standard tonic
23    /// client is used.
24    ///
25    /// # Errors
26    ///
27    /// Errors if tls config cannot be build.
28    pub fn new<T: Into<Uri>>(uri: T, tls_config: Option<TlsConfig>) -> Result<Self> {
29        let builder = Channel::builder(uri.into());
30
31        let channel = if let Some(tls_config) = tls_config {
32            builder.tls_config(tls_config.0)?.connect_lazy()
33        } else {
34            builder.connect_lazy()
35        };
36
37        Ok(GrpcClient {
38            grpc: Grpc::new(channel),
39        })
40    }
41
42    /// Make a unary `gRPC` call.
43    ///
44    /// # Errors
45    /// - `gRPC` client is not ready
46    /// - Server call failed
47    pub async fn unary(&mut self, request: &RequestMessage) -> Result<ResponseMessage> {
48        self.grpc.ready().await.map_err(Error::GrpcNotReady)?;
49
50        let path = request.path();
51        let codec = request.codec();
52
53        let request = request.clone().into();
54        let response = self.grpc.unary(request, path, codec).await?;
55
56        Ok(response.into_inner())
57    }
58
59    /// Make a streaming `gRPC` call.
60    ///
61    /// # Errors
62    /// - `gRPC` client is not ready
63    /// - Server call failed
64    pub async fn server_streaming(
65        &mut self,
66        request: &RequestMessage,
67    ) -> Result<StreamingResponse> {
68        self.grpc.ready().await.map_err(Error::GrpcNotReady)?;
69
70        let path = request.path();
71        let codec = request.codec();
72
73        let request = request.clone().into();
74        let response = self.grpc.server_streaming(request, path, codec).await?;
75
76        Ok(StreamingResponse::new(response.into_inner()))
77    }
78}
79
80/// Creates a new `gRPC` client and sends a message to a `gRPC` server.
81/// This method is async.
82///
83/// # Errors
84/// - Internal error calling the `gRPC` server
85pub async fn call_unary_async(
86    request: &RequestMessage,
87    tls: Option<TlsConfig>,
88) -> Result<ResponseMessage> {
89    let uri = request.uri()?;
90
91    let mut client = GrpcClient::new(uri, tls)?;
92
93    client.unary(request).await
94}
95
96/// Creates a new `gRPC` client and sends a message to a `gRPC` server.
97/// This method is async.
98///
99/// # Errors
100/// - Internal error calling the `gRPC` server
101pub async fn call_server_streaming(
102    req: &RequestMessage,
103    tls: Option<TlsConfig>,
104) -> Result<StreamingResponse> {
105    let uri = req.uri()?;
106
107    let mut client = GrpcClient::new(uri, tls)?;
108
109    client.server_streaming(req).await
110}