rocketmq_client_rust/implementation/
mq_client_manager.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::sync::atomic::AtomicI32;
19use std::sync::atomic::Ordering;
20use std::sync::Arc;
21
22use cheetah_string::CheetahString;
23use lazy_static::lazy_static;
24use parking_lot::RwLock;
25use rocketmq_remoting::runtime::RPCHook;
26use rocketmq_rust::ArcMut;
27use tracing::info;
28
29use crate::base::client_config::ClientConfig;
30use crate::factory::mq_client_instance::MQClientInstance;
31use crate::producer::produce_accumulator::ProduceAccumulator;
32
33type ClientInstanceHashMap = HashMap<CheetahString /* clientId */, ArcMut<MQClientInstance>>;
34type AccumulatorHashMap = HashMap<CheetahString /* clientId */, ArcMut<ProduceAccumulator>>;
35
36#[derive(Default)]
37pub struct MQClientManager {
38    factory_table: Arc<RwLock<ClientInstanceHashMap>>,
39    accumulator_table: Arc<RwLock<AccumulatorHashMap>>,
40    factory_index_generator: AtomicI32,
41}
42
43impl MQClientManager {
44    fn new() -> Self {
45        MQClientManager {
46            factory_index_generator: AtomicI32::new(0),
47            factory_table: Arc::new(RwLock::new(HashMap::new())),
48            accumulator_table: Arc::new(RwLock::new(HashMap::new())),
49        }
50    }
51
52    pub fn get_instance() -> &'static MQClientManager {
53        lazy_static! {
54            static ref INSTANCE: MQClientManager = MQClientManager::new();
55        }
56        &INSTANCE
57    }
58
59    pub fn get_or_create_mq_client_instance(
60        &self,
61        client_config: ClientConfig,
62        rpc_hook: Option<Arc<Box<dyn RPCHook>>>,
63    ) -> ArcMut<MQClientInstance> {
64        let client_id = CheetahString::from_string(client_config.build_mq_client_id());
65        let mut factory_table = self.factory_table.write();
66
67        if let Some(instance) = factory_table.get(&client_id) {
68            return instance.clone();
69        }
70
71        let instance = MQClientInstance::new_arc(
72            client_config.clone(),
73            self.factory_index_generator.fetch_add(1, Ordering::SeqCst),
74            client_id.clone(),
75            rpc_hook,
76        );
77        info!("Created new MQClientInstance for clientId: [{}]", client_id);
78        factory_table.insert(client_id, instance.clone());
79        instance
80    }
81
82    pub fn get_or_create_produce_accumulator(
83        &self,
84        client_config: ClientConfig,
85    ) -> ArcMut<ProduceAccumulator> {
86        let client_id = CheetahString::from_string(client_config.build_mq_client_id());
87        let mut accumulator_table = self.accumulator_table.write();
88
89        if let Some(accumulator) = accumulator_table.get(&client_id) {
90            return accumulator.clone();
91        }
92
93        let accumulator = ArcMut::new(ProduceAccumulator::new(client_id.as_str()));
94        info!(
95            "Created new ProduceAccumulator for clientId:[{}]",
96            client_id
97        );
98        accumulator_table.insert(client_id, accumulator.clone());
99
100        accumulator
101    }
102
103    pub async fn remove_client_factory(&self, client_id: &CheetahString) {
104        self.factory_table.write().remove(client_id);
105    }
106}