rocketmq_client_rust/consumer/rebalance_strategy/
allocate_message_queue_by_config.rs1use cheetah_string::CheetahString;
16use rocketmq_common::common::message::message_queue::MessageQueue;
17
18use crate::consumer::allocate_message_queue_strategy::AllocateMessageQueueStrategy;
19
20pub struct AllocateMessageQueueByConfig {
21 message_queue_list: Vec<MessageQueue>,
22}
23
24impl AllocateMessageQueueByConfig {
25 #[inline]
26 pub fn new(message_queue_list: Vec<MessageQueue>) -> Self {
27 Self { message_queue_list }
28 }
29}
30
31impl AllocateMessageQueueStrategy for AllocateMessageQueueByConfig {
32 fn allocate(
33 &self,
34 consumer_group: &CheetahString,
35 current_cid: &CheetahString,
36 mq_all: &[MessageQueue],
37 cid_all: &[CheetahString],
38 ) -> rocketmq_error::RocketMQResult<Vec<MessageQueue>> {
39 Ok(self.message_queue_list.clone())
40 }
41
42 #[inline]
43 fn get_name(&self) -> &'static str {
44 "CONFIG"
45 }
46}
47
48#[cfg(test)]
49mod tests {
50 use std::collections::HashMap;
51
52 use cheetah_string::CheetahString;
53 use rocketmq_common::common::message::message_queue::MessageQueue;
54
55 use super::*;
56
57 #[test]
58 fn test_allocate_message_queue_by_config() {
59 let consumer_group = CheetahString::from("test_group");
60 let current_cid = CheetahString::from("CID_PREFIX1");
61 let mq_all = create_message_queue_list(4);
62 let cid_all = create_consumer_id_list(2);
63 let strategy = AllocateMessageQueueByConfig::new(mq_all.clone());
64
65 let mut consumer_allocate_queue = HashMap::new();
66 for consumer_id in &cid_all {
67 let queues = strategy
68 .allocate(&consumer_group, consumer_id, &mq_all, &cid_all)
69 .unwrap();
70 let queue_ids: Vec<i32> = queues.into_iter().map(|mq| mq.queue_id()).collect();
71 consumer_allocate_queue.insert(consumer_id.clone(), queue_ids);
72 }
73
74 assert_eq!(
75 consumer_allocate_queue.get("CID_PREFIX0").unwrap().as_slice(),
76 &[0, 1, 2, 3]
77 );
78 assert_eq!(
79 consumer_allocate_queue.get("CID_PREFIX1").unwrap().as_slice(),
80 &[0, 1, 2, 3]
81 );
82 }
83
84 fn create_consumer_id_list(size: usize) -> Vec<CheetahString> {
85 (0..size).map(|i| format!("CID_PREFIX{}", i).into()).collect()
86 }
87
88 fn create_message_queue_list(size: usize) -> Vec<MessageQueue> {
89 (0..size)
90 .map(|i| MessageQueue::from_parts("topic", "broker", i as i32))
91 .collect()
92 }
93}