zus_rpc_client/
mobile_client.rs1use async_trait::async_trait;
6use bytes::Bytes;
7use parking_lot::Mutex;
8use std::sync::Arc;
9use tracing::instrument;
10
11use zus_common::{MobileRpcEndpoint, Result, ZusError};
12
13#[async_trait]
15pub trait MobileZusRpcClient: Send + Sync {
16 async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes>;
18
19 async fn notify_call(&self, method: &str, request: Bytes) -> Result<()>;
21
22 async fn send_heartbeat(&self) -> Result<()>;
24}
25
26pub struct MobileRpcClient {
36 endpoints: Vec<Arc<MobileRpcEndpoint>>,
37 round_robin_index: Mutex<usize>,
38}
39
40impl MobileRpcClient {
41 pub async fn new(path: &str) -> Result<Self> {
51 if !path.starts_with("tcp://") {
52 return Err(ZusError::Protocol(format!(
53 "Mobile client only supports tcp:// protocol, got: {}",
54 path
55 )));
56 }
57
58 let path = path.trim_start_matches("tcp://");
59 let addresses: Vec<&str> = path.split(',').collect();
60
61 let mut endpoints = Vec::new();
62 for addr in addresses {
63 let parts: Vec<&str> = addr.split(':').collect();
64 if parts.len() == 2 {
65 let host = parts[0].to_string();
66 let port = parts[1].parse::<u16>().unwrap_or(9528);
67 let endpoint = MobileRpcEndpoint::connect(host, port).await?;
68 endpoints.push(Arc::new(endpoint));
69 }
70 }
71
72 if endpoints.is_empty() {
73 return Err(ZusError::Protocol("No valid addresses provided".to_string()));
74 }
75
76 Ok(Self {
77 endpoints,
78 round_robin_index: Mutex::new(0),
79 })
80 }
81
82 pub async fn connect(host: &str, port: u16) -> Result<Self> {
84 let endpoint = MobileRpcEndpoint::connect(host.to_string(), port).await?;
85 Ok(Self {
86 endpoints: vec![Arc::new(endpoint)],
87 round_robin_index: Mutex::new(0),
88 })
89 }
90
91 pub fn builder() -> MobileRpcClientBuilder {
93 MobileRpcClientBuilder::new()
94 }
95
96 fn select_endpoint(&self) -> Arc<MobileRpcEndpoint> {
98 let mut index = self.round_robin_index.lock();
99 let endpoint = self.endpoints[*index % self.endpoints.len()].clone();
100 *index += 1;
101 endpoint
102 }
103}
104
105#[async_trait]
106impl MobileZusRpcClient for MobileRpcClient {
107 #[instrument(name = "mobile_client_sync_call", skip(self, request), fields(method = %method, timeout_ms = timeout_ms))]
108 async fn sync_call(&self, method: &str, request: Bytes, timeout_ms: u64) -> Result<Bytes> {
109 let endpoint = self.select_endpoint();
110 let method_bytes = Bytes::copy_from_slice(method.as_bytes());
111 endpoint.sync_call(method_bytes, request, timeout_ms).await
112 }
113
114 #[instrument(name = "mobile_client_notify_call", skip(self, request), fields(method = %method))]
115 async fn notify_call(&self, method: &str, request: Bytes) -> Result<()> {
116 let endpoint = self.select_endpoint();
117 let method_bytes = Bytes::copy_from_slice(method.as_bytes());
118 endpoint.notify_call(method_bytes, request).await
119 }
120
121 async fn send_heartbeat(&self) -> Result<()> {
122 let endpoint = self.select_endpoint();
123 endpoint.send_heartbeat().await
124 }
125}
126
127pub struct MobileRpcClientBuilder {
137 addresses: Vec<(String, u16)>,
138}
139
140impl MobileRpcClientBuilder {
141 pub fn new() -> Self {
142 Self { addresses: Vec::new() }
143 }
144
145 pub fn address(mut self, addr: &str) -> Self {
149 let addr = addr.trim_start_matches("tcp://");
150 let parts: Vec<&str> = addr.split(':').collect();
151 if parts.len() == 2 {
152 let host = parts[0].to_string();
153 let port = parts[1].parse::<u16>().unwrap_or(9528);
154 self.addresses.push((host, port));
155 }
156 self
157 }
158
159 pub fn addresses(mut self, addrs: &[&str]) -> Self {
161 for addr in addrs {
162 self = self.address(addr);
163 }
164 self
165 }
166
167 pub fn server(mut self, host: &str, port: u16) -> Self {
169 self.addresses.push((host.to_string(), port));
170 self
171 }
172
173 pub async fn build(self) -> Result<MobileRpcClient> {
175 if self.addresses.is_empty() {
176 return Err(ZusError::Protocol("No addresses configured".to_string()));
177 }
178
179 let mut endpoints = Vec::new();
180 for (host, port) in self.addresses {
181 let endpoint = MobileRpcEndpoint::connect(host, port).await?;
182 endpoints.push(Arc::new(endpoint));
183 }
184
185 Ok(MobileRpcClient {
186 endpoints,
187 round_robin_index: Mutex::new(0),
188 })
189 }
190}
191
192impl Default for MobileRpcClientBuilder {
193 fn default() -> Self {
194 Self::new()
195 }
196}