rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_by_config.rs1use cheetah_string::CheetahString;
18use rocketmq_common::common::message::message_queue::MessageQueue;
19
20use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
21
22pub struct AllocateMessageQueueByConfig {
23 message_queue_list: Vec<MessageQueue>,
24}
25
26impl AllocateMessageQueueByConfig {
27 #[inline]
28 pub fn new(message_queue_list: Vec<MessageQueue>) -> Self {
29 Self { message_queue_list }
30 }
31}
32
33impl AllocateMessageQueueStrategy for AllocateMessageQueueByConfig {
34 fn allocate(
35 &self,
36 consumer_group: &CheetahString,
37 current_cid: &CheetahString,
38 mq_all: &[MessageQueue],
39 cid_all: &[CheetahString],
40 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
41 Ok(self.message_queue_list.clone())
42 }
43
44 #[inline]
45 fn get_name(&self) -> &'static str {
46 "CONFIG"
47 }
48}
49
50#[cfg(test)]
51mod tests {
52 use std::collections::HashMap;
53
54 use cheetah_string::CheetahString;
55 use rocketmq_common::common::message::message_queue::MessageQueue;
56
57 use super::*;
58
59 #[test]
60 fn test_allocate_message_queue_by_config() {
61 let consumer_group = CheetahString::from("test_group");
62 let current_cid = CheetahString::from("CID_PREFIX1");
63 let mq_all = create_message_queue_list(4);
64 let cid_all = create_consumer_id_list(2);
65 let strategy = AllocateMessageQueueByConfig::new(mq_all.clone());
66
67 let mut consumer_allocate_queue = HashMap::new();
68 for consumer_id in &cid_all {
69 let queues = strategy
70 .allocate(&consumer_group, consumer_id, &mq_all, &cid_all)
71 .unwrap();
72 let queue_ids: Vec<i32> = queues.into_iter().map(|mq| mq.get_queue_id()).collect();
73 consumer_allocate_queue.insert(consumer_id.clone(), queue_ids);
74 }
75
76 assert_eq!(
77 consumer_allocate_queue
78 .get("CID_PREFIX0")
79 .unwrap()
80 .as_slice(),
81 &[0, 1, 2, 3]
82 );
83 assert_eq!(
84 consumer_allocate_queue
85 .get("CID_PREFIX1")
86 .unwrap()
87 .as_slice(),
88 &[0, 1, 2, 3]
89 );
90 }
91
92 fn create_consumer_id_list(size: usize) -> Vec<CheetahString> {
93 (0..size)
94 .map(|i| format!("CID_PREFIX{}", i).into())
95 .collect()
96 }
97
98 fn create_message_queue_list(size: usize) -> Vec<MessageQueue> {
99 (0..size)
100 .map(|i| MessageQueue::from_parts("topic", "broker", i as i32))
101 .collect()
102 }
103}