flare_rpc_core/client/
call.rs1use crate::interceptor::ctxinterceprot::AppContextConfig;
2use flare_core::context::AppContext;
3use flare_core::context::AppContextBuilder;
4use std::future::Future;
5use std::pin::Pin;
6use std::str::FromStr;
7use tonic::{Request, Response, Status};
8use tower::Layer;
9use tower::Service;
10use std::sync::{Arc, Mutex};
11use std::collections::HashMap;
12
13pub async fn call_rpc<C, P, R>(
14 ctx: AppContext,
15 client: C,
16 params: P,
17 rpc_call: impl FnOnce(C, Request<P>) -> Pin<Box<dyn Future<Output = Result<Response<R>, Status>> + Send + 'static>> + Send + 'static,
18) -> Result<Response<R>, Status>
19where
20 C: Clone + Send + 'static,
21 P: Send + 'static,
22 R: Send + 'static,
23{
24 let mut request = Request::new(params);
26 crate::interceptor::ctxinterceprot::build_req_metadata_form_ctx(&ctx, &mut request);
27
28 rpc_call(client, request).await
30}
31
32#[cfg(test)]
33mod tests {
34 use super::*;
35 use crate::client::call::tests::echo_client::EchoClient;
36 use crate::client::call::tests::echo_server::{Echo, EchoServer};
37 use std::net::SocketAddr;
38 use std::str::FromStr;
39 use tonic::transport::Channel;
40 use crate::discover::ServiceError;
41
42 tonic::include_proto!("echo");
43
44 struct EchoService;
45
46 #[tonic::async_trait]
47 impl Echo for EchoService {
48 async fn echo(&self, request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
49 let metadata = request.metadata();
51 let ctx = crate::interceptor::build_context_from_metadata(metadata)?;
52
53 let msg = request.into_inner().message;
55
56 let response = format!("Echo: {} (from user: {:?})", msg, ctx.user_id());
58 Ok(Response::new(EchoResponse { message: response }))
59 }
60 }
61
62 #[tokio::test]
63 async fn test_echo_with_context() -> Result<(), Box<dyn std::error::Error>> {
64 let echo_service = EchoService;
66 let addr = "127.0.0.1:50051";
67 let socket_addr = SocketAddr::from_str(addr)?;
68
69 let server_future = tonic::transport::Server::builder()
70 .add_service(EchoServer::new(echo_service))
71 .serve(socket_addr);
72 tokio::spawn(server_future);
73 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
74
75 let endpoint = Channel::from_shared("http://127.0.0.1:50051")
77 .map_err(|e| ServiceError::ConnectionError(e.to_string()))?
78 .connect_timeout(std::time::Duration::from_secs(5))
79 .tcp_keepalive(Some(std::time::Duration::from_secs(30)))
80 .http2_keep_alive_interval(std::time::Duration::from_secs(30));
81
82 let channel = endpoint
83 .connect()
84 .await
85 .map_err(|e| ServiceError::ConnectionError(e.to_string()))?;
86
87 let client = EchoClient::new(channel);
88
89 let ctx = AppContextBuilder::new()
91 .remote_addr("127.0.0.1:12345".to_string())
92 .user_id("test-user-001".to_string())
93 .platform(1) .client_id("test-client-001".to_string())
95 .with_language(Some("zh-CN".to_string()))
96 .with_conn_id("test-conn-001".to_string())
97 .with_client_msg_id("test-msg-001".to_string())
98 .values(Arc::new(Mutex::new({
99 let mut values = HashMap::new();
100 values.insert("request_id".to_string(), "test-123".to_string());
101 values.insert("trace_id".to_string(), "trace-001".to_string());
102 values.insert("session_id".to_string(), "session-001".to_string());
103 values
104 })))
105 .build()
106 .expect("Failed to build AppContext");
107
108 let params = EchoRequest {
109 message: "Hello".to_string(),
110 };
111
112 let response = call_rpc(
114 ctx,
115 client,
116 params,
117 |mut client, request| Box::pin(async move {
118 client.echo(request).await
119 })
120 ).await?;
121
122 let response_msg = response.into_inner().message;
124 assert!(response_msg.contains("Hello"));
125
126 Ok(())
127 }
128}