zus_rpc_client/
client.rs

1//! ZUS RPC Client with Connection Pooling Support
2//!
3//! Provides high-level RPC client with optional connection pooling
4//! for high-throughput scenarios like gateways.
5
6use {async_trait::async_trait, bytes::Bytes, tracing::instrument};
7
8use zus_common::{ConnectionPoolConfig, Result};
9
10use crate::endpoint::ZooLBRpcEndpoint;
11
12/// Abstract ZUS RPC Client (replacing Java's AbstractZusRpcClient)
13#[async_trait]
14pub trait ZusRpcClient: Send + Sync {
15  /// Synchronous RPC call
16  async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes>;
17
18  /// Asynchronous RPC call (fire and forget)
19  async fn notify_call(&self, method: &str, request: Bytes) -> Result<()>;
20}
21
22/// Base RPC Client implementation
23pub struct BaseRpcClient {
24  endpoint: ZooLBRpcEndpoint,
25  #[allow(dead_code)]
26  load_balance_enabled: bool,
27}
28
29impl BaseRpcClient {
30  /// Create client with default configuration (no pooling)
31  ///
32  /// # Example
33  /// ```ignore
34  /// let client = BaseRpcClient::new("zns://zoo1:2181/services/myservice").await?;
35  /// ```
36  pub async fn new(server_path: &str) -> Result<Self> {
37    let endpoint = ZooLBRpcEndpoint::new(server_path).await?;
38
39    Ok(Self {
40      endpoint,
41      load_balance_enabled: true,
42    })
43  }
44
45  /// Create client with connection pooling enabled
46  ///
47  /// # Example
48  /// ```ignore
49  /// let client = BaseRpcClient::new_with_pooling(
50  ///     "zns://zoo1:2181/services/myservice",
51  ///     ConnectionPoolConfig::for_gateway()
52  /// ).await?;
53  /// ```
54  pub async fn new_with_pooling(server_path: &str, pool_config: ConnectionPoolConfig) -> Result<Self> {
55    let endpoint = ZooLBRpcEndpoint::new_with_pooling(server_path, pool_config).await?;
56
57    Ok(Self {
58      endpoint,
59      load_balance_enabled: true,
60    })
61  }
62
63  /// Create a builder for more advanced configuration
64  pub fn builder() -> BaseRpcClientBuilder {
65    BaseRpcClientBuilder::new()
66  }
67
68  /// Create client optimized for gateway scenarios
69  ///
70  /// Uses connection pooling with sensible defaults for high-throughput:
71  /// - min_connections: 2
72  /// - max_connections: 10
73  ///
74  /// # Example
75  /// ```ignore
76  /// let client = BaseRpcClient::for_gateway("zns://zoo1:2181/services/backend").await?;
77  /// ```
78  pub async fn for_gateway(server_path: &str) -> Result<Self> {
79    Self::new_with_pooling(server_path, ConnectionPoolConfig::for_gateway()).await
80  }
81
82  pub fn enable_load_balance(&mut self) {
83    self.load_balance_enabled = true;
84  }
85
86  pub fn disable_load_balance(&mut self) {
87    self.load_balance_enabled = false;
88  }
89
90  /// Check if connection pooling is enabled
91  pub fn is_pooling_enabled(&self) -> bool {
92    self.endpoint.is_pooling_enabled()
93  }
94}
95
96#[async_trait]
97impl ZusRpcClient for BaseRpcClient {
98  #[instrument(name = "client_sync_call", skip(self, request), fields(method = %method, timeout_ms = timeout_ms))]
99  async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes> {
100    let server = self.endpoint.select_one_server().await?;
101    let method_bytes = Bytes::copy_from_slice(method.as_bytes());
102    server.sync_call(method_bytes, request, timeout_ms).await
103  }
104
105  #[instrument(name = "client_notify_call", skip(self, request), fields(method = %method))]
106  async fn notify_call(&self, method: &str, request: Bytes) -> Result<()> {
107    let server = self.endpoint.select_one_server().await?;
108    let method_bytes = Bytes::copy_from_slice(method.as_bytes());
109    server.notify_call(method_bytes, request).await
110  }
111}
112
113/// Builder for BaseRpcClient with advanced configuration
114///
115/// # Example
116/// ```ignore
117/// let client = BaseRpcClient::builder()
118///     .address("zns://zoo1:2181/services/myservice")
119///     .pool_config(ConnectionPoolConfig {
120///         min_connections: 4,
121///         max_connections: 20,
122///         ..Default::default()
123///     })
124///     .build()
125///     .await?;
126/// ```
127pub struct BaseRpcClientBuilder {
128  address: Option<String>,
129  pool_config: Option<ConnectionPoolConfig>,
130  load_balance_enabled: bool,
131}
132
133impl BaseRpcClientBuilder {
134  pub fn new() -> Self {
135    Self {
136      address: None,
137      pool_config: None,
138      load_balance_enabled: true,
139    }
140  }
141
142  /// Set the server address (required)
143  ///
144  /// Supports:
145  /// - `zns://host:port/service/path` for service discovery
146  /// - `tcp://host:port` for direct connection
147  pub fn address(mut self, address: &str) -> Self {
148    self.address = Some(address.to_string());
149    self
150  }
151
152  /// Enable connection pooling with the specified configuration
153  pub fn pool_config(mut self, config: ConnectionPoolConfig) -> Self {
154    self.pool_config = Some(config);
155    self
156  }
157
158  /// Use gateway-optimized pooling configuration
159  pub fn for_gateway(mut self) -> Self {
160    self.pool_config = Some(ConnectionPoolConfig::for_gateway());
161    self
162  }
163
164  /// Use custom pool size
165  pub fn with_pool_size(mut self, min: usize, max: usize) -> Self {
166    self.pool_config = Some(ConnectionPoolConfig::with_pool_size(min, max));
167    self
168  }
169
170  /// Enable/disable load balancing (default: enabled)
171  pub fn load_balance(mut self, enabled: bool) -> Self {
172    self.load_balance_enabled = enabled;
173    self
174  }
175
176  /// Build the client
177  pub async fn build(self) -> Result<BaseRpcClient> {
178    let address = self
179      .address
180      .ok_or_else(|| zus_common::ZusError::Protocol("Address is required".to_string()))?;
181
182    let endpoint = ZooLBRpcEndpoint::with_pool_config(&address, self.pool_config).await?;
183
184    Ok(BaseRpcClient {
185      endpoint,
186      load_balance_enabled: self.load_balance_enabled,
187    })
188  }
189}
190
191impl Default for BaseRpcClientBuilder {
192  fn default() -> Self {
193    Self::new()
194  }
195}