zus_rpc_client/
mobile_client.rs

1//! Mobile RPC Client
2//!
3//! High-level client for mobile protocol (16-byte header).
4
5use async_trait::async_trait;
6use bytes::Bytes;
7use parking_lot::Mutex;
8use std::sync::Arc;
9use tracing::instrument;
10
11use zus_common::{MobileRpcEndpoint, Result, ZusError};
12
13/// Mobile RPC Client trait
14#[async_trait]
15pub trait MobileZusRpcClient: Send + Sync {
16  /// Synchronous RPC call
17  async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes>;
18
19  /// Asynchronous RPC call (fire and forget)
20  async fn notify_call(&self, method: &str, request: Bytes) -> Result<()>;
21
22  /// Send heartbeat
23  async fn send_heartbeat(&self) -> Result<()>;
24}
25
26/// Mobile RPC Client implementation
27///
28/// Similar to BaseRpcClient but uses mobile 16-byte protocol.
29///
30/// # Example
31/// ```ignore
32/// let client = MobileRpcClient::new("tcp://localhost:9528").await?;
33/// let response = client.sync_call("MyService.echo", params, 5000).await?;
34/// ```
35pub struct MobileRpcClient {
36  endpoints: Vec<Arc<MobileRpcEndpoint>>,
37  round_robin_index: Mutex<usize>,
38}
39
40impl MobileRpcClient {
41  /// Create client connecting to one or more servers
42  ///
43  /// # Arguments
44  /// * `path` - Server path in format "tcp://host1:port1,host2:port2"
45  ///
46  /// # Example
47  /// ```ignore
48  /// let client = MobileRpcClient::new("tcp://localhost:9528").await?;
49  /// ```
50  pub async fn new(path: &str) -> Result<Self> {
51    if !path.starts_with("tcp://") {
52      return Err(ZusError::Protocol(format!(
53        "Mobile client only supports tcp:// protocol, got: {}",
54        path
55      )));
56    }
57
58    let path = path.trim_start_matches("tcp://");
59    let addresses: Vec<&str> = path.split(',').collect();
60
61    let mut endpoints = Vec::new();
62    for addr in addresses {
63      let parts: Vec<&str> = addr.split(':').collect();
64      if parts.len() == 2 {
65        let host = parts[0].to_string();
66        let port = parts[1].parse::<u16>().unwrap_or(9528);
67        let endpoint = MobileRpcEndpoint::connect(host, port).await?;
68        endpoints.push(Arc::new(endpoint));
69      }
70    }
71
72    if endpoints.is_empty() {
73      return Err(ZusError::Protocol("No valid addresses provided".to_string()));
74    }
75
76    Ok(Self {
77      endpoints,
78      round_robin_index: Mutex::new(0),
79    })
80  }
81
82  /// Create client connecting to a single server
83  pub async fn connect(host: &str, port: u16) -> Result<Self> {
84    let endpoint = MobileRpcEndpoint::connect(host.to_string(), port).await?;
85    Ok(Self {
86      endpoints: vec![Arc::new(endpoint)],
87      round_robin_index: Mutex::new(0),
88    })
89  }
90
91  /// Create a builder for more advanced configuration
92  pub fn builder() -> MobileRpcClientBuilder {
93    MobileRpcClientBuilder::new()
94  }
95
96  /// Select endpoint (round-robin)
97  fn select_endpoint(&self) -> Arc<MobileRpcEndpoint> {
98    let mut index = self.round_robin_index.lock();
99    let endpoint = self.endpoints[*index % self.endpoints.len()].clone();
100    *index += 1;
101    endpoint
102  }
103}
104
105#[async_trait]
106impl MobileZusRpcClient for MobileRpcClient {
107  #[instrument(name = "mobile_client_sync_call", skip(self, request), fields(method = %method, timeout_ms = timeout_ms))]
108  async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes> {
109    let endpoint = self.select_endpoint();
110    let method_bytes = Bytes::copy_from_slice(method.as_bytes());
111    endpoint.sync_call(method_bytes, request, timeout_ms).await
112  }
113
114  #[instrument(name = "mobile_client_notify_call", skip(self, request), fields(method = %method))]
115  async fn notify_call(&self, method: &str, request: Bytes) -> Result<()> {
116    let endpoint = self.select_endpoint();
117    let method_bytes = Bytes::copy_from_slice(method.as_bytes());
118    endpoint.notify_call(method_bytes, request).await
119  }
120
121  async fn send_heartbeat(&self) -> Result<()> {
122    let endpoint = self.select_endpoint();
123    endpoint.send_heartbeat().await
124  }
125}
126
127/// Builder for MobileRpcClient
128///
129/// # Example
130/// ```ignore
131/// let client = MobileRpcClient::builder()
132///     .address("tcp://localhost:9528")
133///     .build()
134///     .await?;
135/// ```
136pub struct MobileRpcClientBuilder {
137  addresses: Vec<(String, u16)>,
138}
139
140impl MobileRpcClientBuilder {
141  pub fn new() -> Self {
142    Self { addresses: Vec::new() }
143  }
144
145  /// Add server address
146  ///
147  /// Supports format: "tcp://host:port" or just "host:port"
148  pub fn address(mut self, addr: &str) -> Self {
149    let addr = addr.trim_start_matches("tcp://");
150    let parts: Vec<&str> = addr.split(':').collect();
151    if parts.len() == 2 {
152      let host = parts[0].to_string();
153      let port = parts[1].parse::<u16>().unwrap_or(9528);
154      self.addresses.push((host, port));
155    }
156    self
157  }
158
159  /// Add multiple servers
160  pub fn addresses(mut self, addrs: &[&str]) -> Self {
161    for addr in addrs {
162      self = self.address(addr);
163    }
164    self
165  }
166
167  /// Add server by host and port
168  pub fn server(mut self, host: &str, port: u16) -> Self {
169    self.addresses.push((host.to_string(), port));
170    self
171  }
172
173  /// Build the client
174  pub async fn build(self) -> Result<MobileRpcClient> {
175    if self.addresses.is_empty() {
176      return Err(ZusError::Protocol("No addresses configured".to_string()));
177    }
178
179    let mut endpoints = Vec::new();
180    for (host, port) in self.addresses {
181      let endpoint = MobileRpcEndpoint::connect(host, port).await?;
182      endpoints.push(Arc::new(endpoint));
183    }
184
185    Ok(MobileRpcClient {
186      endpoints,
187      round_robin_index: Mutex::new(0),
188    })
189  }
190}
191
192impl Default for MobileRpcClientBuilder {
193  fn default() -> Self {
194    Self::new()
195  }
196}