rocketmq_client_rust/implementation/
mq_client_manager.rs1use std::sync::atomic::AtomicI32;
16use std::sync::atomic::Ordering;
17use std::sync::Arc;
18use std::sync::LazyLock;
19
20use cheetah_string::CheetahString;
21use dashmap::DashMap;
22use rocketmq_remoting::runtime::RPCHook;
23use rocketmq_rust::ArcMut;
24use tracing::info;
25
26use crate::base::client_config::ClientConfig;
27use crate::factory::mq_client_instance::MQClientInstance;
28use crate::producer::produce_accumulator::ProduceAccumulator;
29
30type ClientInstanceHashMap = DashMap<CheetahString , ArcMut<MQClientInstance>>;
31type AccumulatorHashMap = DashMap<CheetahString , ArcMut<ProduceAccumulator>>;
32
33#[derive(Default)]
34pub struct MQClientManager {
35 factory_table: Arc<ClientInstanceHashMap>,
36 accumulator_table: Arc<AccumulatorHashMap>,
37 factory_index_generator: AtomicI32,
38}
39
40impl MQClientManager {
41 fn new() -> Self {
42 MQClientManager {
43 factory_index_generator: AtomicI32::new(0),
44 factory_table: Arc::new(DashMap::with_capacity(128)),
45 accumulator_table: Arc::new(DashMap::with_capacity(128)),
46 }
47 }
48
49 #[inline]
50 pub fn get_instance() -> &'static MQClientManager {
51 static INSTANCE: LazyLock<MQClientManager> = LazyLock::new(MQClientManager::new);
52 &INSTANCE
53 }
54
55 pub fn get_or_create_mq_client_instance(
56 &self,
57 client_config: ClientConfig,
58 rpc_hook: Option<Arc<dyn RPCHook>>,
59 ) -> ArcMut<MQClientInstance> {
60 let client_id = CheetahString::from_string(client_config.build_mq_client_id());
61
62 self.factory_table
63 .entry(client_id.clone())
64 .or_insert_with(|| {
65 let index = self.factory_index_generator.fetch_add(1, Ordering::Relaxed);
66
67 info!("Created new MQClientInstance for clientId: [{}]", client_id);
68
69 MQClientInstance::new_arc(client_config, index, client_id, rpc_hook)
70 })
71 .clone()
72 }
73
74 pub fn get_or_create_produce_accumulator(&self, client_config: ClientConfig) -> ArcMut<ProduceAccumulator> {
75 let client_id = CheetahString::from_string(client_config.build_mq_client_id());
76
77 self.accumulator_table
78 .entry(client_id.clone())
79 .or_insert_with(|| {
80 info!("Created new ProduceAccumulator for clientId:[{}]", client_id);
81 ArcMut::new(ProduceAccumulator::new(client_id.as_str()))
82 })
83 .clone()
84 }
85
86 pub fn remove_client_factory(&self, client_id: &CheetahString) {
87 self.factory_table.remove(client_id);
88 }
89}