Skip to main content

rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_by_config.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_common::common::message::message_queue::MessageQueue;
17
18use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
19
20pub struct AllocateMessageQueueByConfig {
21    message_queue_list: Vec<MessageQueue>,
22}
23
24impl AllocateMessageQueueByConfig {
25    #[inline]
26    pub fn new(message_queue_list: Vec<MessageQueue>) -> Self {
27        Self { message_queue_list }
28    }
29}
30
31impl AllocateMessageQueueStrategy for AllocateMessageQueueByConfig {
32    fn allocate(
33        &self,
34        consumer_group: &CheetahString,
35        current_cid: &CheetahString,
36        mq_all: &[MessageQueue],
37        cid_all: &[CheetahString],
38    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
39        Ok(self.message_queue_list.clone())
40    }
41
42    #[inline]
43    fn get_name(&self) -> &'static str {
44        "CONFIG"
45    }
46}
47
48#[cfg(test)]
49mod tests {
50    use std::collections::HashMap;
51
52    use cheetah_string::CheetahString;
53    use rocketmq_common::common::message::message_queue::MessageQueue;
54
55    use super::*;
56
57    #[test]
58    fn test_allocate_message_queue_by_config() {
59        let consumer_group = CheetahString::from("test_group");
60        let current_cid = CheetahString::from("CID_PREFIX1");
61        let mq_all = create_message_queue_list(4);
62        let cid_all = create_consumer_id_list(2);
63        let strategy = AllocateMessageQueueByConfig::new(mq_all.clone());
64
65        let mut consumer_allocate_queue = HashMap::new();
66        for consumer_id in &cid_all {
67            let queues = strategy
68                .allocate(&consumer_group, consumer_id, &mq_all, &cid_all)
69                .unwrap();
70            let queue_ids: Vec<i32> = queues.into_iter().map(|mq| mq.queue_id()).collect();
71            consumer_allocate_queue.insert(consumer_id.clone(), queue_ids);
72        }
73
74        assert_eq!(
75            consumer_allocate_queue.get("CID_PREFIX0").unwrap().as_slice(),
76            &[0, 1, 2, 3]
77        );
78        assert_eq!(
79            consumer_allocate_queue.get("CID_PREFIX1").unwrap().as_slice(),
80            &[0, 1, 2, 3]
81        );
82    }
83
84    fn create_consumer_id_list(size: usize) -> Vec<CheetahString> {
85        (0..size).map(|i| format!("CID_PREFIX{}", i).into()).collect()
86    }
87
88    fn create_message_queue_list(size: usize) -> Vec<MessageQueue> {
89        (0..size)
90            .map(|i| MessageQueue::from_parts("topic", "broker", i as i32))
91            .collect()
92    }
93}