rocketmq_client_rust/implementation/
mq_client_manager.rs1use std::collections::HashMap;
18use std::sync::atomic::AtomicI32;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21
22use cheetah_string::CheetahString;
23use lazy_static::lazy_static;
24use parking_lot::RwLock;
25use rocketmq_remoting::runtime::RPCHook;
26use rocketmq_rust::ArcMut;
27use tracing::info;
28
29use crate::base::client_config::ClientConfig;
30use crate::factory::mq_client_instance::MQClientInstance;
31use crate::producer::produce_accumulator::ProduceAccumulator;
32
33type ClientInstanceHashMap = HashMap<CheetahString , ArcMut<MQClientInstance>>;
34type AccumulatorHashMap = HashMap<CheetahString , ArcMut<ProduceAccumulator>>;
35
36#[derive(Default)]
37pub struct MQClientManager {
38 factory_table: Arc<RwLock<ClientInstanceHashMap>>,
39 accumulator_table: Arc<RwLock<AccumulatorHashMap>>,
40 factory_index_generator: AtomicI32,
41}
42
43impl MQClientManager {
44 fn new() -> Self {
45 MQClientManager {
46 factory_index_generator: AtomicI32::new(0),
47 factory_table: Arc::new(RwLock::new(HashMap::new())),
48 accumulator_table: Arc::new(RwLock::new(HashMap::new())),
49 }
50 }
51
52 pub fn get_instance() -> &'static MQClientManager {
53 lazy_static! {
54 static ref INSTANCE: MQClientManager = MQClientManager::new();
55 }
56 &INSTANCE
57 }
58
59 pub fn get_or_create_mq_client_instance(
60 &self,
61 client_config: ClientConfig,
62 rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
63 ) -> ArcMut<MQClientInstance> {
64 let client_id = CheetahString::from_string(client_config.build_mq_client_id());
65 let mut factory_table = self.factory_table.write();
66
67 if let Some(instance) = factory_table.get(&client_id) {
68 return instance.clone();
69 }
70
71 let instance = MQClientInstance::new_arc(
72 client_config.clone(),
73 self.factory_index_generator.fetch_add(1, Ordering::SeqCst),
74 client_id.clone(),
75 rpc_hook,
76 );
77 info!("Created new MQClientInstance for clientId: [{}]", client_id);
78 factory_table.insert(client_id, instance.clone());
79 instance
80 }
81
82 pub fn get_or_create_produce_accumulator(
83 &self,
84 client_config: ClientConfig,
85 ) -> ArcMut<ProduceAccumulator> {
86 let client_id = CheetahString::from_string(client_config.build_mq_client_id());
87 let mut accumulator_table = self.accumulator_table.write();
88
89 if let Some(accumulator) = accumulator_table.get(&client_id) {
90 return accumulator.clone();
91 }
92
93 let accumulator = ArcMut::new(ProduceAccumulator::new(client_id.as_str()));
94 info!(
95 "Created new ProduceAccumulator for clientId:[{}]",
96 client_id
97 );
98 accumulator_table.insert(client_id, accumulator.clone());
99
100 accumulator
101 }
102
103 pub async fn remove_client_factory(&self, client_id: &CheetahString) {
104 self.factory_table.write().remove(client_id);
105 }
106}