rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_by_config.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use cheetah_string::CheetahString;
18use rocketmq_common::common::message::message_queue::MessageQueue;
19
20use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
21
22pub struct AllocateMessageQueueByConfig {
23    message_queue_list: Vec<MessageQueue>,
24}
25
26impl AllocateMessageQueueByConfig {
27    #[inline]
28    pub fn new(message_queue_list: Vec<MessageQueue>) -> Self {
29        Self { message_queue_list }
30    }
31}
32
33impl AllocateMessageQueueStrategy for AllocateMessageQueueByConfig {
34    fn allocate(
35        &self,
36        consumer_group: &CheetahString,
37        current_cid: &CheetahString,
38        mq_all: &[MessageQueue],
39        cid_all: &[CheetahString],
40    ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
41        Ok(self.message_queue_list.clone())
42    }
43
44    #[inline]
45    fn get_name(&self) -> &'static str {
46        "CONFIG"
47    }
48}
49
50#[cfg(test)]
51mod tests {
52    use std::collections::HashMap;
53
54    use cheetah_string::CheetahString;
55    use rocketmq_common::common::message::message_queue::MessageQueue;
56
57    use super::*;
58
59    #[test]
60    fn test_allocate_message_queue_by_config() {
61        let consumer_group = CheetahString::from("test_group");
62        let current_cid = CheetahString::from("CID_PREFIX1");
63        let mq_all = create_message_queue_list(4);
64        let cid_all = create_consumer_id_list(2);
65        let strategy = AllocateMessageQueueByConfig::new(mq_all.clone());
66
67        let mut consumer_allocate_queue = HashMap::new();
68        for consumer_id in &cid_all {
69            let queues = strategy
70                .allocate(&consumer_group, consumer_id, &mq_all, &cid_all)
71                .unwrap();
72            let queue_ids: Vec<i32> = queues.into_iter().map(|mq| mq.get_queue_id()).collect();
73            consumer_allocate_queue.insert(consumer_id.clone(), queue_ids);
74        }
75
76        assert_eq!(
77            consumer_allocate_queue
78                .get("CID_PREFIX0")
79                .unwrap()
80                .as_slice(),
81            &[0, 1, 2, 3]
82        );
83        assert_eq!(
84            consumer_allocate_queue
85                .get("CID_PREFIX1")
86                .unwrap()
87                .as_slice(),
88            &[0, 1, 2, 3]
89        );
90    }
91
92    fn create_consumer_id_list(size: usize) -> Vec<CheetahString> {
93        (0..size)
94            .map(|i| format!("CID_PREFIX{}", i).into())
95            .collect()
96    }
97
98    fn create_message_queue_list(size: usize) -> Vec<MessageQueue> {
99        (0..size)
100            .map(|i| MessageQueue::from_parts("topic", "broker", i as i32))
101            .collect()
102    }
103}