rocketmq_remoting/clients/
rocketmq_default_impl.rs1use std::collections::HashMap;
18use std::collections::HashSet;
19use std::sync::atomic::AtomicI32;
20use std::sync::Arc;
21use std::time::Duration;
22
23use cheetah_string::CheetahString;
24use rand::Rng;
25use rocketmq_runtime::RocketMQRuntime;
26use rocketmq_rust::ArcMut;
27use rocketmq_rust::WeakArcMut;
28use tokio::runtime::Handle;
29use tokio::sync::Mutex;
30use tokio::time;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34use tracing::warn;
35
36use crate::base::connection_net_event::ConnectionNetEvent;
37use crate::clients::Client;
38use crate::clients::RemotingClient;
39use crate::protocol::remoting_command::RemotingCommand;
40use crate::remoting::RemotingService;
41use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
42use crate::runtime::config::client_config::TokioClientConfig;
43use crate::runtime::processor::RequestProcessor;
44use crate::runtime::RPCHook;
45
46const LOCK_TIMEOUT_MILLIS: u64 = 3000;
47
48pub type ArcSyncClient = Arc<Mutex<Client>>;
49
50pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
51 tokio_client_config: Arc<TokioClientConfig>,
52 connection_tables: Arc<Mutex<HashMap<CheetahString , Client>>>,
54 namesrv_addr_list: ArcMut<Vec<CheetahString>>,
55 namesrv_addr_choosed: ArcMut<Option<CheetahString>>,
56 available_namesrv_addr_set: ArcMut<HashSet<CheetahString>>,
57 namesrv_index: Arc<AtomicI32>,
58 client_runtime: Option<RocketMQRuntime>,
59 processor: PR,
60 tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
61}
62impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
63 pub fn new(tokio_client_config: Arc<TokioClientConfig>, processor: PR) -> Self {
64 Self::new_with_cl(tokio_client_config, processor, None)
65 }
66
67 pub fn new_with_cl(
68 tokio_client_config: Arc<TokioClientConfig>,
69 processor: PR,
70 tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
71 ) -> Self {
72 Self {
73 tokio_client_config,
74 connection_tables: Arc::new(Mutex::new(Default::default())),
75 namesrv_addr_list: ArcMut::new(Default::default()),
76 namesrv_addr_choosed: ArcMut::new(Default::default()),
77 available_namesrv_addr_set: ArcMut::new(Default::default()),
78 namesrv_index: Arc::new(AtomicI32::new(init_value_index())),
79 client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")),
80 processor,
81 tx,
82 }
83 }
84}
85
86impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
87 async fn get_and_create_nameserver_client(&self) -> Option<Client> {
88 let mut addr = self.namesrv_addr_choosed.as_ref().clone();
89 if let Some(ref addr) = addr {
90 let guard = self.connection_tables.lock().await;
91 let ct = guard.get(addr);
92 if let Some(ct) = ct {
93 let conn_status = ct.connection().ok;
94 if conn_status {
96 return Some(ct.clone());
97 }
98 }
99 }
100 let connection_tables = self.connection_tables.lock().await;
101
102 addr.clone_from(self.namesrv_addr_choosed.as_ref());
103 if let Some(addr) = addr.as_ref() {
104 let ct = connection_tables.get(addr);
105 if let Some(ct) = ct {
106 let conn_status = ct.connection().ok;
107 if conn_status {
109 return Some(ct.clone());
110 }
111 }
112 }
113 let addr_list = self.namesrv_addr_list.as_ref();
114 if !addr_list.is_empty() {
115 let index = self
116 .namesrv_index
117 .fetch_and(1, std::sync::atomic::Ordering::Release)
118 .abs();
119 let index = index as usize % addr_list.len();
120 let new_addr = &addr_list[index];
121 info!(
122 "new name remoting_server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}",
123 new_addr, new_addr, index
124 );
125 self.namesrv_addr_choosed
126 .mut_from_ref()
127 .replace(new_addr.clone());
128 drop(connection_tables);
129 return self
130 .create_client(
131 new_addr,
132 Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
134 )
135 .await;
136 }
137 None
138 }
139
140 async fn get_and_create_client(&self, addr: Option<&CheetahString>) -> Option<Client> {
141 match addr {
142 None => self.get_and_create_nameserver_client().await,
143 Some(addr) => {
144 if addr.is_empty() {
145 return self.get_and_create_nameserver_client().await;
146 }
147 let client = self.connection_tables.lock().await.get(addr).cloned();
148 if client.is_some() && client.as_ref()?.connection().ok {
150 return client;
151 }
152 self.create_client(
153 addr,
154 Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
155 )
156 .await
157 }
158 }
159 }
160
161 async fn create_client(&self, addr: &CheetahString, duration: Duration) -> Option<Client> {
162 let mut connection_tables = self.connection_tables.lock().await;
163 let cw = connection_tables.get(addr);
164 if let Some(cw) = cw {
165 if cw.connection().ok {
167 return Some(cw.clone());
168 }
169 }
170
171 let cw = connection_tables.get(addr.as_str());
172 if let Some(cw) = cw {
173 if cw.connection().ok {
174 return Some(cw.clone());
176 }
177 } else {
178 let _ = connection_tables.remove(addr.as_str());
179 }
180
181 let addr_inner = addr.to_string();
182
183 match time::timeout(duration, async {
184 Client::connect(addr_inner, self.processor.clone(), self.tx.as_ref()).await
185 })
186 .await
187 {
188 Ok(client_inner) => match client_inner {
189 Ok(client_r) => {
190 let client = client_r;
192 connection_tables.insert(addr.clone(), client.clone());
193 Some(client)
194 }
195 Err(_) => {
196 error!("getAndCreateClient connect to {} failed", addr);
197 None
198 }
199 },
200 Err(_) => {
201 error!("getAndCreateClient connect to {} failed", addr);
202 None
203 }
204 }
205 }
206
207 async fn scan_available_name_srv(&self) {
208 if self.namesrv_addr_list.as_ref().is_empty() {
209 debug!("scanAvailableNameSrv addresses of name remoting_server is null!");
210 return;
211 }
212 for address in self.available_namesrv_addr_set.as_ref().iter() {
213 if !self.namesrv_addr_list.as_ref().contains(address) {
214 warn!("scanAvailableNameSrv remove invalid address {}", address);
215 self.available_namesrv_addr_set
216 .mut_from_ref()
217 .remove(address);
218 }
219 }
220 for namesrv_addr in self.namesrv_addr_list.as_ref().iter() {
221 let client = self.get_and_create_client(Some(namesrv_addr)).await;
222 match client {
223 None => {
224 self.available_namesrv_addr_set
225 .mut_from_ref()
226 .remove(namesrv_addr);
227 }
228 Some(_) => {
229 self.available_namesrv_addr_set
230 .mut_from_ref()
231 .insert(namesrv_addr.clone());
232 }
233 }
234 }
235 }
236}
237
238#[allow(unused_variables)]
239impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR> {
240 async fn start(&self, this: WeakArcMut<Self>) {
241 if let Some(client) = this.upgrade() {
242 let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
243 self.client_runtime
244 .as_ref()
245 .unwrap()
246 .get_handle()
247 .spawn(async move {
248 loop {
249 client.scan_available_name_srv().await;
250 time::sleep(Duration::from_millis(connect_timeout_millis)).await;
251 }
252 });
253 }
254 }
255
256 fn shutdown(&mut self) {
257 if let Some(rt) = self.client_runtime.take() {
258 rt.shutdown();
259 }
260 let connection_tables = self.connection_tables.clone();
261 tokio::task::block_in_place(move || {
262 Handle::current().block_on(async move {
263 connection_tables.lock().await.clear();
264 });
265 });
266 self.namesrv_addr_list.clear();
267 self.available_namesrv_addr_set.clear();
268
269 info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<");
270 }
271
272 fn register_rpc_hook(&mut self, hook: Arc<Box<dyn RPCHook>>) {
273 todo!()
274 }
275
276 fn clear_rpc_hook(&mut self) {
277 todo!()
278 }
279}
280
281#[allow(unused_variables)]
282impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqDefaultClient<PR> {
283 async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>) {
284 let old = self.namesrv_addr_list.mut_from_ref();
285 let mut update = false;
286
287 if !addrs.is_empty() {
288 if old.is_empty() || addrs.len() != old.len() {
289 update = true;
290 } else {
291 for addr in &addrs {
292 if !old.contains(addr) {
293 update = true;
294 break;
295 }
296 }
297 }
298
299 if update {
300 info!(
305 "name remoting_server address updated. NEW : {:?} , OLD: {:?}",
306 addrs, old
307 );
308 self.namesrv_addr_list.mut_from_ref().extend(addrs.clone());
311
312 if let Some(namesrv_addr) = self.namesrv_addr_choosed.as_ref() {
314 if !addrs.contains(namesrv_addr) {
315 let mut remove_vec = Vec::new();
316 let mut result = self.connection_tables.lock().await;
317 for (addr, client) in result.iter() {
318 if addr == namesrv_addr {
319 remove_vec.push(addr.clone());
320 }
321 }
322 for addr in &remove_vec {
323 result.remove(addr);
324 }
325 }
326 }
327 }
328 }
329 }
330
331 fn get_name_server_address_list(&self) -> &[CheetahString] {
332 self.namesrv_addr_list.as_ref()
333 }
334
335 fn get_available_name_srv_list(&self) -> Vec<CheetahString> {
336 self.available_namesrv_addr_set
337 .as_ref()
338 .clone()
339 .into_iter()
340 .collect()
341 }
342
343 async fn invoke_async(
344 &self,
345 addr: Option<&CheetahString>,
346 request: RemotingCommand,
347 timeout_millis: u64,
348 ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
349 let client = self.get_and_create_client(addr).await;
350 match client {
351 None => Err(rocketmq_error::RocketmqError::RemoteError(
352 "get client failed".to_string(),
353 )),
354 Some(mut client) => {
355 match self
356 .client_runtime
357 .as_ref()
358 .unwrap()
359 .get_handle()
360 .spawn(async move {
361 time::timeout(Duration::from_millis(timeout_millis), async move {
362 client.send_read(request, timeout_millis).await
363 })
364 .await
365 })
366 .await
367 {
368 Ok(result) => match result {
369 Ok(response) => match response {
370 Ok(value) => Ok(value),
371 Err(e) => {
372 Err(rocketmq_error::RocketmqError::RemoteError(e.to_string()))
373 }
374 },
375 Err(err) => {
376 Err(rocketmq_error::RocketmqError::RemoteError(err.to_string()))
377 }
378 },
379 Err(err) => Err(rocketmq_error::RocketmqError::RemoteError(err.to_string())),
380 }
381 }
382 }
383 }
384
385 async fn invoke_oneway(
386 &self,
387 addr: &CheetahString,
388 request: RemotingCommand,
389 timeout_millis: u64,
390 ) {
391 let client = self.get_and_create_client(Some(addr)).await;
392 match client {
393 None => {
394 error!("get client failed");
395 }
396 Some(mut client) => {
397 self.client_runtime
398 .as_ref()
399 .unwrap()
400 .get_handle()
401 .spawn(async move {
402 match time::timeout(Duration::from_millis(timeout_millis), async move {
403 let mut request = request;
404 request.mark_oneway_rpc_ref();
405 client.send(request).await
406 })
407 .await
408 {
409 Ok(_) => Ok(()),
410 Err(err) => {
411 Err(rocketmq_error::RocketmqError::RemoteError(err.to_string()))
412 }
413 }
414 });
415 }
416 }
417 }
418
419 fn is_address_reachable(&mut self, addr: &CheetahString) {
420 todo!()
421 }
422
423 fn close_clients(&mut self, addrs: Vec<String>) {
424 todo!()
425 }
426
427 fn register_processor(&mut self, processor: impl RequestProcessor + Sync) {
428 todo!()
429 }
430}
431
432fn init_value_index() -> i32 {
433 let mut rng = rand::rng();
434 rng.random_range(0..999)
435}