nacos_sdk/common/remote/grpc/
nacos_grpc_client.rs

1use std::{collections::HashMap, sync::Arc, time::Duration};
2use tower::layer::util::Stack;
3use tracing::{Instrument, instrument};
4
5use crate::api::error::Error;
6use crate::api::plugin::{AuthPlugin, NoopAuthPlugin, init_auth_plugin};
7use crate::common::remote::grpc::message::{
8    GrpcMessage, GrpcMessageBuilder, GrpcRequestMessage, GrpcResponseMessage,
9};
10use crate::common::remote::grpc::message::{GrpcMessageData, request::NacosClientAbilities};
11use crate::common::remote::grpc::nacos_grpc_service::DynamicUnaryCallLayerWrapper;
12
13use super::handlers::client_detection_request_handler::ClientDetectionRequestHandler;
14use super::message::request::ClientDetectionRequest;
15use super::nacos_grpc_connection::{NacosGrpcConnection, SendRequest};
16use super::nacos_grpc_service::{
17    DynamicBiStreamingCallLayer, DynamicBiStreamingCallLayerWrapper, DynamicUnaryCallLayer,
18};
19use super::server_list_service::PollingServerListService;
20use super::tonic::TonicBuilder;
21use super::{config::GrpcConfiguration, nacos_grpc_service::ServerRequestHandler};
22
23const APP_FILED: &str = "app";
24
25pub(crate) struct NacosGrpcClient {
26    app_name: String,
27    send_request: Arc<dyn SendRequest + Send + Sync + 'static>,
28    auth_plugin: Arc<dyn AuthPlugin>,
29}
30
31impl NacosGrpcClient {
32    #[instrument(skip_all)]
33    pub(crate) async fn send_request<Request, Response>(
34        &self,
35        mut request: Request,
36    ) -> Result<Response, Error>
37    where
38        Request: GrpcRequestMessage + 'static,
39        Response: GrpcResponseMessage + 'static,
40    {
41        let mut request_headers = request.take_headers();
42        if let Some(resource) = request.request_resource() {
43            let auth_context = self.auth_plugin.get_login_identity(resource);
44            request_headers.extend(auth_context.contexts);
45        }
46
47        let grpc_request = GrpcMessageBuilder::new(request)
48            .header(APP_FILED.to_owned(), self.app_name.clone())
49            .headers(request_headers)
50            .build();
51        let grpc_request = grpc_request.into_payload()?;
52
53        let grpc_response = self
54            .send_request
55            .send_request(grpc_request)
56            .in_current_span()
57            .await?;
58
59        let grpc_response = GrpcMessage::<Response>::from_payload(grpc_response)?;
60        Ok(grpc_response.into_body())
61    }
62}
63
64type HandlerMap = HashMap<String, Arc<dyn ServerRequestHandler>>;
65type ConnectedListener = Arc<dyn Fn(String) + Send + Sync + 'static>;
66type DisconnectedListener = Arc<dyn Fn(String) + Send + Sync + 'static>;
67
68pub(crate) struct NacosGrpcClientBuilder {
69    app_name: String,
70    client_version: String,
71    namespace: String,
72    labels: HashMap<String, String>,
73    client_abilities: NacosClientAbilities,
74    grpc_config: GrpcConfiguration,
75    server_request_handler_map: HandlerMap,
76    server_list: Vec<String>,
77    connected_listener: Option<ConnectedListener>,
78    disconnected_listener: Option<DisconnectedListener>,
79    unary_call_layer: Option<DynamicUnaryCallLayer>,
80    bi_call_layer: Option<DynamicBiStreamingCallLayer>,
81    auth_plugin: Arc<dyn AuthPlugin>,
82    auth_context: HashMap<String, String>,
83    max_retries: Option<u32>,
84}
85
86impl NacosGrpcClientBuilder {
87    pub(crate) fn new(server_list: Vec<String>) -> Self {
88        Self {
89            app_name: "unknown".to_owned(),
90            client_version: Default::default(),
91            namespace: Default::default(),
92            labels: Default::default(),
93            client_abilities: Default::default(),
94            grpc_config: Default::default(),
95            server_request_handler_map: Default::default(),
96            server_list,
97            connected_listener: None,
98            disconnected_listener: None,
99            unary_call_layer: None,
100            bi_call_layer: None,
101            auth_context: Default::default(),
102            auth_plugin: Arc::new(NoopAuthPlugin::default()),
103            max_retries: None,
104        }
105    }
106
107    pub(crate) fn app_name(self, app_name: String) -> Self {
108        Self { app_name, ..self }
109    }
110
111    pub(crate) fn client_version(self, client_version: String) -> Self {
112        Self {
113            client_version,
114            ..self
115        }
116    }
117
118    pub(crate) fn namespace(self, namespace: String) -> Self {
119        Self { namespace, ..self }
120    }
121
122    pub(crate) fn add_label(mut self, key: String, value: String) -> Self {
123        self.labels.insert(key, value);
124        Self { ..self }
125    }
126
127    pub(crate) fn add_labels(mut self, labels: HashMap<String, String>) -> Self {
128        self.labels.extend(labels);
129        Self { ..self }
130    }
131
132    pub(crate) fn max_retries(mut self, max_retries: Option<u32>) -> Self {
133        self.max_retries = max_retries;
134        Self { ..self }
135    }
136
137    pub(crate) fn support_remote_connection(mut self, enable: bool) -> Self {
138        self.client_abilities.support_remote_connection(enable);
139        Self { ..self }
140    }
141
142    pub(crate) fn support_config_remote_metrics(mut self, enable: bool) -> Self {
143        self.client_abilities.support_config_remote_metrics(enable);
144        Self { ..self }
145    }
146
147    pub(crate) fn support_naming_delta_push(mut self, enable: bool) -> Self {
148        self.client_abilities.support_naming_delta_push(enable);
149        Self { ..self }
150    }
151
152    pub(crate) fn support_naming_remote_metric(mut self, enable: bool) -> Self {
153        self.client_abilities.support_naming_remote_metric(enable);
154        Self { ..self }
155    }
156
157    pub(crate) fn host(self, host: String) -> Self {
158        let grpc_config = self.grpc_config.with_host(host);
159        Self {
160            grpc_config,
161            ..self
162        }
163    }
164
165    pub(crate) fn port(self, port: Option<u32>) -> Self {
166        let grpc_config = self.grpc_config.with_port(port);
167        Self {
168            grpc_config,
169            ..self
170        }
171    }
172
173    pub(crate) fn origin(self, uri: &str) -> Self {
174        let grpc_config = self.grpc_config.with_origin(uri);
175        Self {
176            grpc_config,
177            ..self
178        }
179    }
180
181    pub(crate) fn user_agent(self, ua: String) -> Self {
182        let grpc_config = self.grpc_config.with_user_agent(ua);
183        Self {
184            grpc_config,
185            ..self
186        }
187    }
188
189    pub(crate) fn timeout(self, timeout: Duration) -> Self {
190        let grpc_config = self.grpc_config.with_timeout(timeout);
191        Self {
192            grpc_config,
193            ..self
194        }
195    }
196
197    pub(crate) fn concurrency_limit(self, concurrency_limit: usize) -> Self {
198        let grpc_config = self.grpc_config.with_concurrency_limit(concurrency_limit);
199        Self {
200            grpc_config,
201            ..self
202        }
203    }
204
205    pub(crate) fn rate_limit(self, rate_limit: (u64, Duration)) -> Self {
206        let grpc_config = self.grpc_config.with_rate_limit(rate_limit);
207        Self {
208            grpc_config,
209            ..self
210        }
211    }
212
213    pub(crate) fn init_stream_window_size(self, init_stream_window_size: u32) -> Self {
214        let grpc_config = self
215            .grpc_config
216            .with_init_stream_window_size(init_stream_window_size);
217        Self {
218            grpc_config,
219            ..self
220        }
221    }
222
223    pub(crate) fn init_connection_window_size(self, init_connection_window_size: u32) -> Self {
224        let grpc_config = self
225            .grpc_config
226            .with_init_connection_window_size(init_connection_window_size);
227        Self {
228            grpc_config,
229            ..self
230        }
231    }
232
233    pub(crate) fn tcp_keepalive(self, tcp_keepalive: Duration) -> Self {
234        let grpc_config = self.grpc_config.with_tcp_keepalive(tcp_keepalive);
235        Self {
236            grpc_config,
237            ..self
238        }
239    }
240
241    pub(crate) fn tcp_nodelay(self, tcp_nodelay: bool) -> Self {
242        let grpc_config = self.grpc_config.with_tcp_nodelay(tcp_nodelay);
243        Self {
244            grpc_config,
245            ..self
246        }
247    }
248
249    pub(crate) fn http2_keep_alive_interval(self, http2_keep_alive_interval: Duration) -> Self {
250        let grpc_config = self
251            .grpc_config
252            .with_http2_keep_alive_interval(http2_keep_alive_interval);
253        Self {
254            grpc_config,
255            ..self
256        }
257    }
258
259    pub(crate) fn http2_keep_alive_timeout(self, http2_keep_alive_timeout: Duration) -> Self {
260        let grpc_config = self
261            .grpc_config
262            .with_http2_keep_alive_timeout(http2_keep_alive_timeout);
263        Self {
264            grpc_config,
265            ..self
266        }
267    }
268
269    pub(crate) fn http2_keep_alive_while_idle(self, http2_keep_alive_while_idle: bool) -> Self {
270        let grpc_config = self
271            .grpc_config
272            .with_http2_keep_alive_while_idle(http2_keep_alive_while_idle);
273        Self {
274            grpc_config,
275            ..self
276        }
277    }
278
279    pub(crate) fn connect_timeout(self, connect_timeout: Duration) -> Self {
280        let grpc_config = self.grpc_config.with_connect_timeout(connect_timeout);
281        Self {
282            grpc_config,
283            ..self
284        }
285    }
286
287    pub(crate) fn http2_adaptive_window(self, http2_adaptive_window: bool) -> Self {
288        let grpc_config = self
289            .grpc_config
290            .with_http2_adaptive_window(http2_adaptive_window);
291        Self {
292            grpc_config,
293            ..self
294        }
295    }
296
297    pub(crate) fn auth_plugin(self, auth_plugin: Arc<dyn AuthPlugin>) -> Self {
298        Self {
299            auth_plugin,
300            ..self
301        }
302    }
303
304    pub(crate) fn auth_context(self, auth_context: HashMap<String, String>) -> Self {
305        Self {
306            auth_context,
307            ..self
308        }
309    }
310
311    pub(crate) fn register_server_request_handler<T: GrpcMessageData>(
312        mut self,
313        handler: Arc<dyn ServerRequestHandler>,
314    ) -> Self {
315        self.server_request_handler_map
316            .insert(T::identity().to_string(), handler);
317        Self { ..self }
318    }
319
320    pub(crate) fn connected_listener(
321        mut self,
322        listener: impl Fn(String) + Send + Sync + 'static,
323    ) -> Self {
324        self.connected_listener = Some(Arc::new(listener));
325        Self { ..self }
326    }
327
328    pub(crate) fn disconnected_listener(
329        mut self,
330        listener: impl Fn(String) + Send + Sync + 'static,
331    ) -> Self {
332        self.disconnected_listener = Some(Arc::new(listener));
333        Self { ..self }
334    }
335
336    pub(crate) fn unary_call_layer(self, layer: DynamicUnaryCallLayer) -> Self {
337        let stack = if let Some(unary_call_layer) = self.unary_call_layer {
338            Arc::new(Stack::new(
339                DynamicUnaryCallLayerWrapper(layer),
340                DynamicUnaryCallLayerWrapper(unary_call_layer),
341            ))
342        } else {
343            layer
344        };
345
346        Self {
347            unary_call_layer: Some(stack),
348            ..self
349        }
350    }
351
352    pub(crate) fn bi_call_layer(self, layer: DynamicBiStreamingCallLayer) -> Self {
353        let stack = if let Some(bi_call_layer) = self.bi_call_layer {
354            Arc::new(Stack::new(
355                DynamicBiStreamingCallLayerWrapper(layer),
356                DynamicBiStreamingCallLayerWrapper(bi_call_layer),
357            ))
358        } else {
359            layer
360        };
361
362        Self {
363            bi_call_layer: Some(stack),
364            ..self
365        }
366    }
367
368    pub(crate) fn build(mut self, id: String) -> NacosGrpcClient {
369        self.server_request_handler_map.insert(
370            ClientDetectionRequest::identity().to_string(),
371            Arc::new(ClientDetectionRequestHandler),
372        );
373
374        let send_request = {
375            let server_list = PollingServerListService::new(self.server_list.clone());
376            let mut tonic_builder = TonicBuilder::new(self.grpc_config, server_list);
377            if let Some(layer) = self.unary_call_layer {
378                tonic_builder = tonic_builder.unary_call_layer(layer);
379            }
380
381            if let Some(layer) = self.bi_call_layer {
382                tonic_builder = tonic_builder.bi_call_layer(layer);
383            }
384
385            let mut connection = NacosGrpcConnection::new(
386                id.clone(),
387                tonic_builder,
388                self.server_request_handler_map,
389                self.client_version,
390                self.namespace,
391                self.labels,
392                self.client_abilities,
393                self.max_retries,
394            );
395
396            if let Some(connected_listener) = self.connected_listener {
397                connection = connection.connected_listener(connected_listener);
398            }
399
400            if let Some(disconnected_listener) = self.disconnected_listener {
401                connection = connection.disconnected_listener(disconnected_listener);
402            }
403
404            let failover_connection = connection.into_failover_connection(id.clone());
405            Arc::new(failover_connection) as Arc<dyn SendRequest + Send + Sync + 'static>
406        };
407
408        init_auth_plugin(
409            self.auth_plugin.clone(),
410            self.server_list.clone(),
411            self.auth_context.clone(),
412            id,
413        );
414
415        let app_name = self.app_name;
416        let auth_plugin = self.auth_plugin;
417
418        NacosGrpcClient {
419            app_name,
420            send_request,
421            auth_plugin,
422        }
423    }
424}
425
426#[cfg(test)]
427pub mod tests {
428
429    use crate::common::remote::grpc::{
430        message::{request::HealthCheckRequest, response::HealthCheckResponse},
431        nacos_grpc_connection::MockSendRequest,
432    };
433
434    use mockall::predicate::*;
435
436    use super::*;
437
438    #[tokio::test]
439    pub async fn test_send_request() {
440        let mut health_check_request = HealthCheckRequest::default();
441        health_check_request.request_id = Some("test_health_check_id".to_string());
442
443        let mut mock_send_request = MockSendRequest::new();
444        mock_send_request
445            .expect_send_request()
446            .with(function(|req: &crate::nacos_proto::v2::Payload| {
447                let app_name = &req
448                    .metadata
449                    .as_ref()
450                    .map(|data| data.headers.get(APP_FILED).unwrap().clone())
451                    .unwrap();
452
453                app_name.eq("test_app")
454            }))
455            .returning(|req| {
456                let request = GrpcMessage::<HealthCheckRequest>::from_payload(req).unwrap();
457                let request = request.into_body();
458                let req_id = request.request_id.unwrap();
459
460                let mut response = HealthCheckResponse::default();
461                response.request_id = Some(req_id);
462
463                let payload = GrpcMessageBuilder::new(response)
464                    .build()
465                    .into_payload()
466                    .unwrap();
467                Ok(payload)
468            });
469
470        let nacos_grpc_client = NacosGrpcClient {
471            app_name: "test_app".to_string(),
472            send_request: Arc::new(mock_send_request),
473            auth_plugin: Arc::new(NoopAuthPlugin::default()),
474        };
475
476        let response = nacos_grpc_client
477            .send_request::<HealthCheckRequest, HealthCheckResponse>(health_check_request)
478            .await;
479        let response = response.unwrap();
480
481        assert_eq!(
482            "test_health_check_id".to_string(),
483            response.request_id.unwrap()
484        );
485    }
486}