nacos_sdk/common/remote/grpc/
nacos_grpc_client.rs1use std::{collections::HashMap, sync::Arc, time::Duration};
2use tower::layer::util::Stack;
3use tracing::{Instrument, instrument};
4
5use crate::api::error::Error;
6use crate::api::plugin::{AuthPlugin, NoopAuthPlugin, init_auth_plugin};
7use crate::common::remote::grpc::message::{
8 GrpcMessage, GrpcMessageBuilder, GrpcRequestMessage, GrpcResponseMessage,
9};
10use crate::common::remote::grpc::message::{GrpcMessageData, request::NacosClientAbilities};
11use crate::common::remote::grpc::nacos_grpc_service::DynamicUnaryCallLayerWrapper;
12
13use super::handlers::client_detection_request_handler::ClientDetectionRequestHandler;
14use super::message::request::ClientDetectionRequest;
15use super::nacos_grpc_connection::{NacosGrpcConnection, SendRequest};
16use super::nacos_grpc_service::{
17 DynamicBiStreamingCallLayer, DynamicBiStreamingCallLayerWrapper, DynamicUnaryCallLayer,
18};
19use super::server_list_service::PollingServerListService;
20use super::tonic::TonicBuilder;
21use super::{config::GrpcConfiguration, nacos_grpc_service::ServerRequestHandler};
22
23const APP_FILED: &str = "app";
24
25pub(crate) struct NacosGrpcClient {
26 app_name: String,
27 send_request: Arc<dyn SendRequest + Send + Sync + 'static>,
28 auth_plugin: Arc<dyn AuthPlugin>,
29}
30
31impl NacosGrpcClient {
32 #[instrument(skip_all)]
33 pub(crate) async fn send_request<Request, Response>(
34 &self,
35 mut request: Request,
36 ) -> Result<Response, Error>
37 where
38 Request: GrpcRequestMessage + 'static,
39 Response: GrpcResponseMessage + 'static,
40 {
41 let mut request_headers = request.take_headers();
42 if let Some(resource) = request.request_resource() {
43 let auth_context = self.auth_plugin.get_login_identity(resource);
44 request_headers.extend(auth_context.contexts);
45 }
46
47 let grpc_request = GrpcMessageBuilder::new(request)
48 .header(APP_FILED.to_owned(), self.app_name.clone())
49 .headers(request_headers)
50 .build();
51 let grpc_request = grpc_request.into_payload()?;
52
53 let grpc_response = self
54 .send_request
55 .send_request(grpc_request)
56 .in_current_span()
57 .await?;
58
59 let grpc_response = GrpcMessage::<Response>::from_payload(grpc_response)?;
60 Ok(grpc_response.into_body())
61 }
62}
63
64type HandlerMap = HashMap<String, Arc<dyn ServerRequestHandler>>;
65type ConnectedListener = Arc<dyn Fn(String) + Send + Sync + 'static>;
66type DisconnectedListener = Arc<dyn Fn(String) + Send + Sync + 'static>;
67
68pub(crate) struct NacosGrpcClientBuilder {
69 app_name: String,
70 client_version: String,
71 namespace: String,
72 labels: HashMap<String, String>,
73 client_abilities: NacosClientAbilities,
74 grpc_config: GrpcConfiguration,
75 server_request_handler_map: HandlerMap,
76 server_list: Vec<String>,
77 connected_listener: Option<ConnectedListener>,
78 disconnected_listener: Option<DisconnectedListener>,
79 unary_call_layer: Option<DynamicUnaryCallLayer>,
80 bi_call_layer: Option<DynamicBiStreamingCallLayer>,
81 auth_plugin: Arc<dyn AuthPlugin>,
82 auth_context: HashMap<String, String>,
83 max_retries: Option<u32>,
84}
85
86impl NacosGrpcClientBuilder {
87 pub(crate) fn new(server_list: Vec<String>) -> Self {
88 Self {
89 app_name: "unknown".to_owned(),
90 client_version: Default::default(),
91 namespace: Default::default(),
92 labels: Default::default(),
93 client_abilities: Default::default(),
94 grpc_config: Default::default(),
95 server_request_handler_map: Default::default(),
96 server_list,
97 connected_listener: None,
98 disconnected_listener: None,
99 unary_call_layer: None,
100 bi_call_layer: None,
101 auth_context: Default::default(),
102 auth_plugin: Arc::new(NoopAuthPlugin::default()),
103 max_retries: None,
104 }
105 }
106
107 pub(crate) fn app_name(self, app_name: String) -> Self {
108 Self { app_name, ..self }
109 }
110
111 pub(crate) fn client_version(self, client_version: String) -> Self {
112 Self {
113 client_version,
114 ..self
115 }
116 }
117
118 pub(crate) fn namespace(self, namespace: String) -> Self {
119 Self { namespace, ..self }
120 }
121
122 pub(crate) fn add_label(mut self, key: String, value: String) -> Self {
123 self.labels.insert(key, value);
124 Self { ..self }
125 }
126
127 pub(crate) fn add_labels(mut self, labels: HashMap<String, String>) -> Self {
128 self.labels.extend(labels);
129 Self { ..self }
130 }
131
132 pub(crate) fn max_retries(mut self, max_retries: Option<u32>) -> Self {
133 self.max_retries = max_retries;
134 Self { ..self }
135 }
136
137 pub(crate) fn support_remote_connection(mut self, enable: bool) -> Self {
138 self.client_abilities.support_remote_connection(enable);
139 Self { ..self }
140 }
141
142 pub(crate) fn support_config_remote_metrics(mut self, enable: bool) -> Self {
143 self.client_abilities.support_config_remote_metrics(enable);
144 Self { ..self }
145 }
146
147 pub(crate) fn support_naming_delta_push(mut self, enable: bool) -> Self {
148 self.client_abilities.support_naming_delta_push(enable);
149 Self { ..self }
150 }
151
152 pub(crate) fn support_naming_remote_metric(mut self, enable: bool) -> Self {
153 self.client_abilities.support_naming_remote_metric(enable);
154 Self { ..self }
155 }
156
157 pub(crate) fn host(self, host: String) -> Self {
158 let grpc_config = self.grpc_config.with_host(host);
159 Self {
160 grpc_config,
161 ..self
162 }
163 }
164
165 pub(crate) fn port(self, port: Option<u32>) -> Self {
166 let grpc_config = self.grpc_config.with_port(port);
167 Self {
168 grpc_config,
169 ..self
170 }
171 }
172
173 pub(crate) fn origin(self, uri: &str) -> Self {
174 let grpc_config = self.grpc_config.with_origin(uri);
175 Self {
176 grpc_config,
177 ..self
178 }
179 }
180
181 pub(crate) fn user_agent(self, ua: String) -> Self {
182 let grpc_config = self.grpc_config.with_user_agent(ua);
183 Self {
184 grpc_config,
185 ..self
186 }
187 }
188
189 pub(crate) fn timeout(self, timeout: Duration) -> Self {
190 let grpc_config = self.grpc_config.with_timeout(timeout);
191 Self {
192 grpc_config,
193 ..self
194 }
195 }
196
197 pub(crate) fn concurrency_limit(self, concurrency_limit: usize) -> Self {
198 let grpc_config = self.grpc_config.with_concurrency_limit(concurrency_limit);
199 Self {
200 grpc_config,
201 ..self
202 }
203 }
204
205 pub(crate) fn rate_limit(self, rate_limit: (u64, Duration)) -> Self {
206 let grpc_config = self.grpc_config.with_rate_limit(rate_limit);
207 Self {
208 grpc_config,
209 ..self
210 }
211 }
212
213 pub(crate) fn init_stream_window_size(self, init_stream_window_size: u32) -> Self {
214 let grpc_config = self
215 .grpc_config
216 .with_init_stream_window_size(init_stream_window_size);
217 Self {
218 grpc_config,
219 ..self
220 }
221 }
222
223 pub(crate) fn init_connection_window_size(self, init_connection_window_size: u32) -> Self {
224 let grpc_config = self
225 .grpc_config
226 .with_init_connection_window_size(init_connection_window_size);
227 Self {
228 grpc_config,
229 ..self
230 }
231 }
232
233 pub(crate) fn tcp_keepalive(self, tcp_keepalive: Duration) -> Self {
234 let grpc_config = self.grpc_config.with_tcp_keepalive(tcp_keepalive);
235 Self {
236 grpc_config,
237 ..self
238 }
239 }
240
241 pub(crate) fn tcp_nodelay(self, tcp_nodelay: bool) -> Self {
242 let grpc_config = self.grpc_config.with_tcp_nodelay(tcp_nodelay);
243 Self {
244 grpc_config,
245 ..self
246 }
247 }
248
249 pub(crate) fn http2_keep_alive_interval(self, http2_keep_alive_interval: Duration) -> Self {
250 let grpc_config = self
251 .grpc_config
252 .with_http2_keep_alive_interval(http2_keep_alive_interval);
253 Self {
254 grpc_config,
255 ..self
256 }
257 }
258
259 pub(crate) fn http2_keep_alive_timeout(self, http2_keep_alive_timeout: Duration) -> Self {
260 let grpc_config = self
261 .grpc_config
262 .with_http2_keep_alive_timeout(http2_keep_alive_timeout);
263 Self {
264 grpc_config,
265 ..self
266 }
267 }
268
269 pub(crate) fn http2_keep_alive_while_idle(self, http2_keep_alive_while_idle: bool) -> Self {
270 let grpc_config = self
271 .grpc_config
272 .with_http2_keep_alive_while_idle(http2_keep_alive_while_idle);
273 Self {
274 grpc_config,
275 ..self
276 }
277 }
278
279 pub(crate) fn connect_timeout(self, connect_timeout: Duration) -> Self {
280 let grpc_config = self.grpc_config.with_connect_timeout(connect_timeout);
281 Self {
282 grpc_config,
283 ..self
284 }
285 }
286
287 pub(crate) fn http2_adaptive_window(self, http2_adaptive_window: bool) -> Self {
288 let grpc_config = self
289 .grpc_config
290 .with_http2_adaptive_window(http2_adaptive_window);
291 Self {
292 grpc_config,
293 ..self
294 }
295 }
296
297 pub(crate) fn auth_plugin(self, auth_plugin: Arc<dyn AuthPlugin>) -> Self {
298 Self {
299 auth_plugin,
300 ..self
301 }
302 }
303
304 pub(crate) fn auth_context(self, auth_context: HashMap<String, String>) -> Self {
305 Self {
306 auth_context,
307 ..self
308 }
309 }
310
311 pub(crate) fn register_server_request_handler<T: GrpcMessageData>(
312 mut self,
313 handler: Arc<dyn ServerRequestHandler>,
314 ) -> Self {
315 self.server_request_handler_map
316 .insert(T::identity().to_string(), handler);
317 Self { ..self }
318 }
319
320 pub(crate) fn connected_listener(
321 mut self,
322 listener: impl Fn(String) + Send + Sync + 'static,
323 ) -> Self {
324 self.connected_listener = Some(Arc::new(listener));
325 Self { ..self }
326 }
327
328 pub(crate) fn disconnected_listener(
329 mut self,
330 listener: impl Fn(String) + Send + Sync + 'static,
331 ) -> Self {
332 self.disconnected_listener = Some(Arc::new(listener));
333 Self { ..self }
334 }
335
336 pub(crate) fn unary_call_layer(self, layer: DynamicUnaryCallLayer) -> Self {
337 let stack = if let Some(unary_call_layer) = self.unary_call_layer {
338 Arc::new(Stack::new(
339 DynamicUnaryCallLayerWrapper(layer),
340 DynamicUnaryCallLayerWrapper(unary_call_layer),
341 ))
342 } else {
343 layer
344 };
345
346 Self {
347 unary_call_layer: Some(stack),
348 ..self
349 }
350 }
351
352 pub(crate) fn bi_call_layer(self, layer: DynamicBiStreamingCallLayer) -> Self {
353 let stack = if let Some(bi_call_layer) = self.bi_call_layer {
354 Arc::new(Stack::new(
355 DynamicBiStreamingCallLayerWrapper(layer),
356 DynamicBiStreamingCallLayerWrapper(bi_call_layer),
357 ))
358 } else {
359 layer
360 };
361
362 Self {
363 bi_call_layer: Some(stack),
364 ..self
365 }
366 }
367
368 pub(crate) fn build(mut self, id: String) -> NacosGrpcClient {
369 self.server_request_handler_map.insert(
370 ClientDetectionRequest::identity().to_string(),
371 Arc::new(ClientDetectionRequestHandler),
372 );
373
374 let send_request = {
375 let server_list = PollingServerListService::new(self.server_list.clone());
376 let mut tonic_builder = TonicBuilder::new(self.grpc_config, server_list);
377 if let Some(layer) = self.unary_call_layer {
378 tonic_builder = tonic_builder.unary_call_layer(layer);
379 }
380
381 if let Some(layer) = self.bi_call_layer {
382 tonic_builder = tonic_builder.bi_call_layer(layer);
383 }
384
385 let mut connection = NacosGrpcConnection::new(
386 id.clone(),
387 tonic_builder,
388 self.server_request_handler_map,
389 self.client_version,
390 self.namespace,
391 self.labels,
392 self.client_abilities,
393 self.max_retries,
394 );
395
396 if let Some(connected_listener) = self.connected_listener {
397 connection = connection.connected_listener(connected_listener);
398 }
399
400 if let Some(disconnected_listener) = self.disconnected_listener {
401 connection = connection.disconnected_listener(disconnected_listener);
402 }
403
404 let failover_connection = connection.into_failover_connection(id.clone());
405 Arc::new(failover_connection) as Arc<dyn SendRequest + Send + Sync + 'static>
406 };
407
408 init_auth_plugin(
409 self.auth_plugin.clone(),
410 self.server_list.clone(),
411 self.auth_context.clone(),
412 id,
413 );
414
415 let app_name = self.app_name;
416 let auth_plugin = self.auth_plugin;
417
418 NacosGrpcClient {
419 app_name,
420 send_request,
421 auth_plugin,
422 }
423 }
424}
425
426#[cfg(test)]
427pub mod tests {
428
429 use crate::common::remote::grpc::{
430 message::{request::HealthCheckRequest, response::HealthCheckResponse},
431 nacos_grpc_connection::MockSendRequest,
432 };
433
434 use mockall::predicate::*;
435
436 use super::*;
437
438 #[tokio::test]
439 pub async fn test_send_request() {
440 let mut health_check_request = HealthCheckRequest::default();
441 health_check_request.request_id = Some("test_health_check_id".to_string());
442
443 let mut mock_send_request = MockSendRequest::new();
444 mock_send_request
445 .expect_send_request()
446 .with(function(|req: &crate::nacos_proto::v2::Payload| {
447 let app_name = &req
448 .metadata
449 .as_ref()
450 .map(|data| data.headers.get(APP_FILED).unwrap().clone())
451 .unwrap();
452
453 app_name.eq("test_app")
454 }))
455 .returning(|req| {
456 let request = GrpcMessage::<HealthCheckRequest>::from_payload(req).unwrap();
457 let request = request.into_body();
458 let req_id = request.request_id.unwrap();
459
460 let mut response = HealthCheckResponse::default();
461 response.request_id = Some(req_id);
462
463 let payload = GrpcMessageBuilder::new(response)
464 .build()
465 .into_payload()
466 .unwrap();
467 Ok(payload)
468 });
469
470 let nacos_grpc_client = NacosGrpcClient {
471 app_name: "test_app".to_string(),
472 send_request: Arc::new(mock_send_request),
473 auth_plugin: Arc::new(NoopAuthPlugin::default()),
474 };
475
476 let response = nacos_grpc_client
477 .send_request::<HealthCheckRequest, HealthCheckResponse>(health_check_request)
478 .await;
479 let response = response.unwrap();
480
481 assert_eq!(
482 "test_health_check_id".to_string(),
483 response.request_id.unwrap()
484 );
485 }
486}