Skip to main content

rocketmq_client_rust/implementation/
mq_client_manager.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicI32;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::sync::LazyLock;
19
20use cheetah_string::CheetahString;
21use dashmap::DashMap;
22use rocketmq_remoting::runtime::RPCHook;
23use rocketmq_rust::ArcMut;
24use tracing::info;
25
26use crate::base::client_config::ClientConfig;
27use crate::factory::mq_client_instance::MQClientInstance;
28use crate::producer::produce_accumulator::ProduceAccumulator;
29
30type ClientInstanceHashMap = DashMap<CheetahString /* clientId */, ArcMut<MQClientInstance>>;
31type AccumulatorHashMap = DashMap<CheetahString /* clientId */, ArcMut<ProduceAccumulator>>;
32
33#[derive(Default)]
34pub struct MQClientManager {
35    factory_table: Arc<ClientInstanceHashMap>,
36    accumulator_table: Arc<AccumulatorHashMap>,
37    factory_index_generator: AtomicI32,
38}
39
40impl MQClientManager {
41    fn new() -> Self {
42        MQClientManager {
43            factory_index_generator: AtomicI32::new(0),
44            factory_table: Arc::new(DashMap::with_capacity(128)),
45            accumulator_table: Arc::new(DashMap::with_capacity(128)),
46        }
47    }
48
49    #[inline]
50    pub fn get_instance() -> &'static MQClientManager {
51        static INSTANCE: LazyLock<MQClientManager> = LazyLock::new(MQClientManager::new);
52        &INSTANCE
53    }
54
55    pub fn get_or_create_mq_client_instance(
56        &self,
57        client_config: ClientConfig,
58        rpc_hook: Option<Arc<dyn RPCHook>>,
59    ) -> ArcMut<MQClientInstance> {
60        let client_id = CheetahString::from_string(client_config.build_mq_client_id());
61
62        self.factory_table
63            .entry(client_id.clone())
64            .or_insert_with(|| {
65                let index = self.factory_index_generator.fetch_add(1, Ordering::Relaxed);
66
67                info!("Created new MQClientInstance for clientId: [{}]", client_id);
68
69                MQClientInstance::new_arc(client_config, index, client_id, rpc_hook)
70            })
71            .clone()
72    }
73
74    pub fn get_or_create_produce_accumulator(&self, client_config: ClientConfig) -> ArcMut<ProduceAccumulator> {
75        let client_id = CheetahString::from_string(client_config.build_mq_client_id());
76
77        self.accumulator_table
78            .entry(client_id.clone())
79            .or_insert_with(|| {
80                info!("Created new ProduceAccumulator for clientId:[{}]", client_id);
81                ArcMut::new(ProduceAccumulator::new(client_id.as_str()))
82            })
83            .clone()
84    }
85
86    pub fn remove_client_factory(&self, client_id: &CheetahString) {
87        self.factory_table.remove(client_id);
88    }
89}