rocketmq_remoting/clients/
rocketmq_default_impl.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19use std::sync::atomic::AtomicI32;
20use std::sync::Arc;
21use std::time::Duration;
22
23use cheetah_string::CheetahString;
24use rand::Rng;
25use rocketmq_runtime::RocketMQRuntime;
26use rocketmq_rust::ArcMut;
27use rocketmq_rust::WeakArcMut;
28use tokio::runtime::Handle;
29use tokio::sync::Mutex;
30use tokio::time;
31use tracing::debug;
32use tracing::error;
33use tracing::info;
34use tracing::warn;
35
36use crate::base::connection_net_event::ConnectionNetEvent;
37use crate::clients::Client;
38use crate::clients::RemotingClient;
39use crate::protocol::remoting_command::RemotingCommand;
40use crate::remoting::RemotingService;
41use crate::request_processor::default_request_processor::DefaultRemotingRequestProcessor;
42use crate::runtime::config::client_config::TokioClientConfig;
43use crate::runtime::processor::RequestProcessor;
44use crate::runtime::RPCHook;
45
46const LOCK_TIMEOUT_MILLIS: u64 = 3000;
47
48pub type ArcSyncClient = Arc<Mutex<Client>>;
49
50pub struct RocketmqDefaultClient<PR = DefaultRemotingRequestProcessor> {
51    tokio_client_config: Arc<TokioClientConfig>,
52    //cache connection
53    connection_tables: Arc<Mutex<HashMap<CheetahString /* ip:port */, Client>>>,
54    namesrv_addr_list: ArcMut<Vec<CheetahString>>,
55    namesrv_addr_choosed: ArcMut<Option<CheetahString>>,
56    available_namesrv_addr_set: ArcMut<HashSet<CheetahString>>,
57    namesrv_index: Arc<AtomicI32>,
58    client_runtime: Option<RocketMQRuntime>,
59    processor: PR,
60    tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
61}
62impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
63    pub fn new(tokio_client_config: Arc<TokioClientConfig>, processor: PR) -> Self {
64        Self::new_with_cl(tokio_client_config, processor, None)
65    }
66
67    pub fn new_with_cl(
68        tokio_client_config: Arc<TokioClientConfig>,
69        processor: PR,
70        tx: Option<tokio::sync::broadcast::Sender<ConnectionNetEvent>>,
71    ) -> Self {
72        Self {
73            tokio_client_config,
74            connection_tables: Arc::new(Mutex::new(Default::default())),
75            namesrv_addr_list: ArcMut::new(Default::default()),
76            namesrv_addr_choosed: ArcMut::new(Default::default()),
77            available_namesrv_addr_set: ArcMut::new(Default::default()),
78            namesrv_index: Arc::new(AtomicI32::new(init_value_index())),
79            client_runtime: Some(RocketMQRuntime::new_multi(10, "client-thread")),
80            processor,
81            tx,
82        }
83    }
84}
85
86impl<PR: RequestProcessor + Sync + Clone + 'static> RocketmqDefaultClient<PR> {
87    async fn get_and_create_nameserver_client(&self) -> Option<Client> {
88        let mut addr = self.namesrv_addr_choosed.as_ref().clone();
89        if let Some(ref addr) = addr {
90            let guard = self.connection_tables.lock().await;
91            let ct = guard.get(addr);
92            if let Some(ct) = ct {
93                let conn_status = ct.connection().ok;
94                //let conn_status = ct.lock().await.connection().ok;
95                if conn_status {
96                    return Some(ct.clone());
97                }
98            }
99        }
100        let connection_tables = self.connection_tables.lock().await;
101
102        addr.clone_from(self.namesrv_addr_choosed.as_ref());
103        if let Some(addr) = addr.as_ref() {
104            let ct = connection_tables.get(addr);
105            if let Some(ct) = ct {
106                let conn_status = ct.connection().ok;
107                //let conn_status = ct.lock().await.connection().ok;
108                if conn_status {
109                    return Some(ct.clone());
110                }
111            }
112        }
113        let addr_list = self.namesrv_addr_list.as_ref();
114        if !addr_list.is_empty() {
115            let index = self
116                .namesrv_index
117                .fetch_and(1, std::sync::atomic::Ordering::Release)
118                .abs();
119            let index = index as usize % addr_list.len();
120            let new_addr = &addr_list[index];
121            info!(
122                "new name remoting_server is chosen. OLD: {} , NEW: {}. namesrvIndex = {}",
123                new_addr, new_addr, index
124            );
125            self.namesrv_addr_choosed
126                .mut_from_ref()
127                .replace(new_addr.clone());
128            drop(connection_tables);
129            return self
130                .create_client(
131                    new_addr,
132                    //&mut connection_tables,
133                    Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
134                )
135                .await;
136        }
137        None
138    }
139
140    async fn get_and_create_client(&self, addr: Option<&CheetahString>) -> Option<Client> {
141        match addr {
142            None => self.get_and_create_nameserver_client().await,
143            Some(addr) => {
144                if addr.is_empty() {
145                    return self.get_and_create_nameserver_client().await;
146                }
147                let client = self.connection_tables.lock().await.get(addr).cloned();
148                // if client.is_some() && client.as_ref()?.lock().await.connection().ok {
149                if client.is_some() && client.as_ref()?.connection().ok {
150                    return client;
151                }
152                self.create_client(
153                    addr,
154                    Duration::from_millis(self.tokio_client_config.connect_timeout_millis as u64),
155                )
156                .await
157            }
158        }
159    }
160
161    async fn create_client(&self, addr: &CheetahString, duration: Duration) -> Option<Client> {
162        let mut connection_tables = self.connection_tables.lock().await;
163        let cw = connection_tables.get(addr);
164        if let Some(cw) = cw {
165            // if cw.lock().await.connection().ok {
166            if cw.connection().ok {
167                return Some(cw.clone());
168            }
169        }
170
171        let cw = connection_tables.get(addr.as_str());
172        if let Some(cw) = cw {
173            if cw.connection().ok {
174                // if cw.lock().await.connection().ok {
175                return Some(cw.clone());
176            }
177        } else {
178            let _ = connection_tables.remove(addr.as_str());
179        }
180
181        let addr_inner = addr.to_string();
182
183        match time::timeout(duration, async {
184            Client::connect(addr_inner, self.processor.clone(), self.tx.as_ref()).await
185        })
186        .await
187        {
188            Ok(client_inner) => match client_inner {
189                Ok(client_r) => {
190                    //let client = Arc::new(Mutex::new(client_r));
191                    let client = client_r;
192                    connection_tables.insert(addr.clone(), client.clone());
193                    Some(client)
194                }
195                Err(_) => {
196                    error!("getAndCreateClient connect to {} failed", addr);
197                    None
198                }
199            },
200            Err(_) => {
201                error!("getAndCreateClient connect to {} failed", addr);
202                None
203            }
204        }
205    }
206
207    async fn scan_available_name_srv(&self) {
208        if self.namesrv_addr_list.as_ref().is_empty() {
209            debug!("scanAvailableNameSrv addresses of name remoting_server is null!");
210            return;
211        }
212        for address in self.available_namesrv_addr_set.as_ref().iter() {
213            if !self.namesrv_addr_list.as_ref().contains(address) {
214                warn!("scanAvailableNameSrv remove invalid address {}", address);
215                self.available_namesrv_addr_set
216                    .mut_from_ref()
217                    .remove(address);
218            }
219        }
220        for namesrv_addr in self.namesrv_addr_list.as_ref().iter() {
221            let client = self.get_and_create_client(Some(namesrv_addr)).await;
222            match client {
223                None => {
224                    self.available_namesrv_addr_set
225                        .mut_from_ref()
226                        .remove(namesrv_addr);
227                }
228                Some(_) => {
229                    self.available_namesrv_addr_set
230                        .mut_from_ref()
231                        .insert(namesrv_addr.clone());
232                }
233            }
234        }
235    }
236}
237
238#[allow(unused_variables)]
239impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingService for RocketmqDefaultClient<PR> {
240    async fn start(&self, this: WeakArcMut<Self>) {
241        if let Some(client) = this.upgrade() {
242            let connect_timeout_millis = self.tokio_client_config.connect_timeout_millis as u64;
243            self.client_runtime
244                .as_ref()
245                .unwrap()
246                .get_handle()
247                .spawn(async move {
248                    loop {
249                        client.scan_available_name_srv().await;
250                        time::sleep(Duration::from_millis(connect_timeout_millis)).await;
251                    }
252                });
253        }
254    }
255
256    fn shutdown(&mut self) {
257        if let Some(rt) = self.client_runtime.take() {
258            rt.shutdown();
259        }
260        let connection_tables = self.connection_tables.clone();
261        tokio::task::block_in_place(move || {
262            Handle::current().block_on(async move {
263                connection_tables.lock().await.clear();
264            });
265        });
266        self.namesrv_addr_list.clear();
267        self.available_namesrv_addr_set.clear();
268
269        info!(">>>>>>>>>>>>>>>RemotingClient shutdown success<<<<<<<<<<<<<<<<<");
270    }
271
272    fn register_rpc_hook(&mut self, hook: Arc<Box<dyn RPCHook>>) {
273        todo!()
274    }
275
276    fn clear_rpc_hook(&mut self) {
277        todo!()
278    }
279}
280
281#[allow(unused_variables)]
282impl<PR: RequestProcessor + Sync + Clone + 'static> RemotingClient for RocketmqDefaultClient<PR> {
283    async fn update_name_server_address_list(&self, addrs: Vec<CheetahString>) {
284        let old = self.namesrv_addr_list.mut_from_ref();
285        let mut update = false;
286
287        if !addrs.is_empty() {
288            if old.is_empty() || addrs.len() != old.len() {
289                update = true;
290            } else {
291                for addr in &addrs {
292                    if !old.contains(addr) {
293                        update = true;
294                        break;
295                    }
296                }
297            }
298
299            if update {
300                // Shuffle the addresses
301                // Shuffle logic is not implemented here as it is not available in standard library
302                // You can implement it using various algorithms like Fisher-Yates shuffle
303
304                info!(
305                    "name remoting_server address updated. NEW : {:?} , OLD: {:?}",
306                    addrs, old
307                );
308                /* let mut rng = thread_rng();
309                addrs.shuffle(&mut rng);*/
310                self.namesrv_addr_list.mut_from_ref().extend(addrs.clone());
311
312                // should close the channel if choosed addr is not exist.
313                if let Some(namesrv_addr) = self.namesrv_addr_choosed.as_ref() {
314                    if !addrs.contains(namesrv_addr) {
315                        let mut remove_vec = Vec::new();
316                        let mut result = self.connection_tables.lock().await;
317                        for (addr, client) in result.iter() {
318                            if addr == namesrv_addr {
319                                remove_vec.push(addr.clone());
320                            }
321                        }
322                        for addr in &remove_vec {
323                            result.remove(addr);
324                        }
325                    }
326                }
327            }
328        }
329    }
330
331    fn get_name_server_address_list(&self) -> &[CheetahString] {
332        self.namesrv_addr_list.as_ref()
333    }
334
335    fn get_available_name_srv_list(&self) -> Vec<CheetahString> {
336        self.available_namesrv_addr_set
337            .as_ref()
338            .clone()
339            .into_iter()
340            .collect()
341    }
342
343    async fn invoke_async(
344        &self,
345        addr: Option<&CheetahString>,
346        request: RemotingCommand,
347        timeout_millis: u64,
348    ) -> rocketmq_error::RocketMQResult<RemotingCommand> {
349        let client = self.get_and_create_client(addr).await;
350        match client {
351            None => Err(rocketmq_error::RocketmqError::RemoteError(
352                "get client failed".to_string(),
353            )),
354            Some(mut client) => {
355                match self
356                    .client_runtime
357                    .as_ref()
358                    .unwrap()
359                    .get_handle()
360                    .spawn(async move {
361                        time::timeout(Duration::from_millis(timeout_millis), async move {
362                            client.send_read(request, timeout_millis).await
363                        })
364                        .await
365                    })
366                    .await
367                {
368                    Ok(result) => match result {
369                        Ok(response) => match response {
370                            Ok(value) => Ok(value),
371                            Err(e) => {
372                                Err(rocketmq_error::RocketmqError::RemoteError(e.to_string()))
373                            }
374                        },
375                        Err(err) => {
376                            Err(rocketmq_error::RocketmqError::RemoteError(err.to_string()))
377                        }
378                    },
379                    Err(err) => Err(rocketmq_error::RocketmqError::RemoteError(err.to_string())),
380                }
381            }
382        }
383    }
384
385    async fn invoke_oneway(
386        &self,
387        addr: &CheetahString,
388        request: RemotingCommand,
389        timeout_millis: u64,
390    ) {
391        let client = self.get_and_create_client(Some(addr)).await;
392        match client {
393            None => {
394                error!("get client failed");
395            }
396            Some(mut client) => {
397                self.client_runtime
398                    .as_ref()
399                    .unwrap()
400                    .get_handle()
401                    .spawn(async move {
402                        match time::timeout(Duration::from_millis(timeout_millis), async move {
403                            let mut request = request;
404                            request.mark_oneway_rpc_ref();
405                            client.send(request).await
406                        })
407                        .await
408                        {
409                            Ok(_) => Ok(()),
410                            Err(err) => {
411                                Err(rocketmq_error::RocketmqError::RemoteError(err.to_string()))
412                            }
413                        }
414                    });
415            }
416        }
417    }
418
419    fn is_address_reachable(&mut self, addr: &CheetahString) {
420        todo!()
421    }
422
423    fn close_clients(&mut self, addrs: Vec<String>) {
424        todo!()
425    }
426
427    fn register_processor(&mut self, processor: impl RequestProcessor + Sync) {
428        todo!()
429    }
430}
431
432fn init_value_index() -> i32 {
433    let mut rng = rand::rng();
434    rng.random_range(0..999)
435}