1use {async_trait::async_trait, bytes::Bytes, tracing::instrument};
7
8use zus_common::{ConnectionPoolConfig, Result};
9
10use crate::endpoint::ZooLBRpcEndpoint;
11
12#[async_trait]
14pub trait ZusRpcClient: Send + Sync {
15 async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes>;
17
18 async fn notify_call(&self, method: &str, request: Bytes) -> Result<()>;
20}
21
22pub struct BaseRpcClient {
24 endpoint: ZooLBRpcEndpoint,
25 #[allow(dead_code)]
26 load_balance_enabled: bool,
27}
28
29impl BaseRpcClient {
30 pub async fn new(server_path: &str) -> Result<Self> {
37 let endpoint = ZooLBRpcEndpoint::new(server_path).await?;
38
39 Ok(Self {
40 endpoint,
41 load_balance_enabled: true,
42 })
43 }
44
45 pub async fn new_with_pooling(server_path: &str, pool_config: ConnectionPoolConfig) -> Result<Self> {
55 let endpoint = ZooLBRpcEndpoint::new_with_pooling(server_path, pool_config).await?;
56
57 Ok(Self {
58 endpoint,
59 load_balance_enabled: true,
60 })
61 }
62
63 pub fn builder() -> BaseRpcClientBuilder {
65 BaseRpcClientBuilder::new()
66 }
67
68 pub async fn for_gateway(server_path: &str) -> Result<Self> {
79 Self::new_with_pooling(server_path, ConnectionPoolConfig::for_gateway()).await
80 }
81
82 pub fn enable_load_balance(&mut self) {
83 self.load_balance_enabled = true;
84 }
85
86 pub fn disable_load_balance(&mut self) {
87 self.load_balance_enabled = false;
88 }
89
90 pub fn is_pooling_enabled(&self) -> bool {
92 self.endpoint.is_pooling_enabled()
93 }
94}
95
96#[async_trait]
97impl ZusRpcClient for BaseRpcClient {
98 #[instrument(name = "client_sync_call", skip(self, request), fields(method = %method, timeout_ms = timeout_ms))]
99 async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes> {
100 let server = self.endpoint.select_one_server().await?;
101 let method_bytes = Bytes::copy_from_slice(method.as_bytes());
102 server.sync_call(method_bytes, request, timeout_ms).await
103 }
104
105 #[instrument(name = "client_notify_call", skip(self, request), fields(method = %method))]
106 async fn notify_call(&self, method: &str, request: Bytes) -> Result<()> {
107 let server = self.endpoint.select_one_server().await?;
108 let method_bytes = Bytes::copy_from_slice(method.as_bytes());
109 server.notify_call(method_bytes, request).await
110 }
111}
112
113pub struct BaseRpcClientBuilder {
128 address: Option<String>,
129 pool_config: Option<ConnectionPoolConfig>,
130 load_balance_enabled: bool,
131}
132
133impl BaseRpcClientBuilder {
134 pub fn new() -> Self {
135 Self {
136 address: None,
137 pool_config: None,
138 load_balance_enabled: true,
139 }
140 }
141
142 pub fn address(mut self, address: &str) -> Self {
148 self.address = Some(address.to_string());
149 self
150 }
151
152 pub fn pool_config(mut self, config: ConnectionPoolConfig) -> Self {
154 self.pool_config = Some(config);
155 self
156 }
157
158 pub fn for_gateway(mut self) -> Self {
160 self.pool_config = Some(ConnectionPoolConfig::for_gateway());
161 self
162 }
163
164 pub fn with_pool_size(mut self, min: usize, max: usize) -> Self {
166 self.pool_config = Some(ConnectionPoolConfig::with_pool_size(min, max));
167 self
168 }
169
170 pub fn load_balance(mut self, enabled: bool) -> Self {
172 self.load_balance_enabled = enabled;
173 self
174 }
175
176 pub async fn build(self) -> Result<BaseRpcClient> {
178 let address = self
179 .address
180 .ok_or_else(|| zus_common::ZusError::Protocol("Address is required".to_string()))?;
181
182 let endpoint = ZooLBRpcEndpoint::with_pool_config(&address, self.pool_config).await?;
183
184 Ok(BaseRpcClient {
185 endpoint,
186 load_balance_enabled: self.load_balance_enabled,
187 })
188 }
189}
190
191impl Default for BaseRpcClientBuilder {
192 fn default() -> Self {
193 Self::new()
194 }
195}