flare_rpc_core/client/
call.rs

1use crate::interceptor::ctxinterceprot::AppContextConfig;
2use flare_core::context::AppContext;
3use flare_core::context::AppContextBuilder;
4use std::future::Future;
5use std::pin::Pin;
6use std::str::FromStr;
7use tonic::{Request, Response, Status};
8use tower::Layer;
9use tower::Service;
10use std::sync::{Arc, Mutex};
11use std::collections::HashMap;
12
13pub async fn call_rpc<C, P, R>(
14    ctx: AppContext,
15    client: C,
16    params: P,
17    rpc_call: impl FnOnce(C, Request<P>) -> Pin<Box<dyn Future<Output = Result<Response<R>, Status>> + Send + 'static>> + Send + 'static,
18) -> Result<Response<R>, Status>
19where
20    C: Clone + Send + 'static,
21    P: Send + 'static,
22    R: Send + 'static,
23{
24    // 创建请求并添加上下文元数据
25    let mut request = Request::new(params);
26    crate::interceptor::ctxinterceprot::build_req_metadata_form_ctx(&ctx, &mut request);
27    
28    // 执行调用
29    rpc_call(client, request).await
30}
31
32#[cfg(test)]
33mod tests {
34    use super::*;
35    use crate::client::call::tests::echo_client::EchoClient;
36    use crate::client::call::tests::echo_server::{Echo, EchoServer};
37    use std::net::SocketAddr;
38    use std::str::FromStr;
39    use tonic::transport::Channel;
40    use crate::discover::ServiceError;
41
42    tonic::include_proto!("echo");
43
44    struct EchoService;
45
46    #[tonic::async_trait]
47    impl Echo for EchoService {
48        async fn echo(&self, request: Request<EchoRequest>) -> Result<Response<EchoResponse>, Status> {
49            // 从请求的 metadata 中获取上下文信息
50            let metadata = request.metadata();
51            let ctx = crate::interceptor::build_context_from_metadata(metadata)?;
52            
53            // 获取请求消息
54            let msg = request.into_inner().message;
55            
56            // 构造响应,加入上下文信息
57            let response = format!("Echo: {} (from user: {:?})", msg, ctx.user_id());
58            Ok(Response::new(EchoResponse { message: response }))
59        }
60    }
61
62    #[tokio::test]
63    async fn test_echo_with_context() -> Result<(), Box<dyn std::error::Error>> {
64        // 1. 启动测试服务器
65        let echo_service = EchoService;
66        let addr = "127.0.0.1:50051";
67        let socket_addr = SocketAddr::from_str(addr)?;
68
69        let server_future = tonic::transport::Server::builder()
70            .add_service(EchoServer::new(echo_service))
71            .serve(socket_addr);
72        tokio::spawn(server_future);
73        tokio::time::sleep(std::time::Duration::from_secs(1)).await;
74
75        // 2. 创建客户端连接
76        let endpoint = Channel::from_shared("http://127.0.0.1:50051")
77            .map_err(|e| ServiceError::ConnectionError(e.to_string()))?
78            .connect_timeout(std::time::Duration::from_secs(5))
79            .tcp_keepalive(Some(std::time::Duration::from_secs(30)))
80            .http2_keep_alive_interval(std::time::Duration::from_secs(30));
81            
82        let channel = endpoint
83            .connect()
84            .await
85            .map_err(|e| ServiceError::ConnectionError(e.to_string()))?;
86
87        let client = EchoClient::new(channel);
88
89        // 3. 准备上下文和请求参数
90        let ctx = AppContextBuilder::new()
91            .remote_addr("127.0.0.1:12345".to_string())
92            .user_id("test-user-001".to_string())
93            .platform(1)  // 假设 1 代表 Web 平台
94            .client_id("test-client-001".to_string())
95            .with_language(Some("zh-CN".to_string()))
96            .with_conn_id("test-conn-001".to_string())
97            .with_client_msg_id("test-msg-001".to_string())
98            .values(Arc::new(Mutex::new({
99                let mut values = HashMap::new();
100                values.insert("request_id".to_string(), "test-123".to_string());
101                values.insert("trace_id".to_string(), "trace-001".to_string());
102                values.insert("session_id".to_string(), "session-001".to_string());
103                values
104            })))
105            .build()
106            .expect("Failed to build AppContext");
107
108        let params = EchoRequest {
109            message: "Hello".to_string(),
110        };
111
112        // 4. 调用 RPC
113        let response = call_rpc(
114            ctx,
115            client,
116            params,
117            |mut client, request| Box::pin(async move {
118                client.echo(request).await
119            })
120        ).await?;
121
122        // 5. 验证响应
123        let response_msg = response.into_inner().message;
124        assert!(response_msg.contains("Hello"));
125        
126        Ok(())
127    }
128}