rocketmq_remoting/protocol/subscription/
subscription_group_config.rs1use std::collections::HashMap;
18use std::collections::HashSet;
19
20use cheetah_string::CheetahString;
21use rocketmq_common::common::mix_all::MASTER_ID;
22use serde::Deserialize;
23use serde::Serialize;
24
25use crate::protocol::subscription::group_retry_policy::GroupRetryPolicy;
26use crate::protocol::subscription::simple_subscription_data::SimpleSubscriptionData;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(rename_all = "camelCase")]
30pub struct SubscriptionGroupConfig {
31 group_name: CheetahString,
32
33 consume_enable: bool,
34 consume_from_min_enable: bool,
35 consume_broadcast_enable: bool,
36 consume_message_orderly: bool,
37
38 retry_queue_nums: i32,
39 retry_max_times: i32,
40 group_retry_policy: GroupRetryPolicy,
41
42 broker_id: u64,
43 which_broker_when_consume_slowly: u64,
44
45 notify_consumer_ids_changed_enable: bool,
46
47 group_sys_flag: i32,
48
49 consume_timeout_minute: i32,
50
51 subscription_data_set: Option<HashSet<SimpleSubscriptionData>>,
52 attributes: HashMap<CheetahString, CheetahString>,
53}
54
55impl SubscriptionGroupConfig {
56 pub fn new(group_name: CheetahString) -> Self {
57 Self {
58 group_name,
59 ..Default::default()
60 }
61 }
62}
63
64impl Default for SubscriptionGroupConfig {
65 fn default() -> Self {
66 SubscriptionGroupConfig {
67 group_name: CheetahString::default(),
68
69 consume_enable: true,
70 consume_from_min_enable: true,
71 consume_broadcast_enable: true,
72 consume_message_orderly: false,
73
74 retry_queue_nums: 1,
75 retry_max_times: 16,
76 group_retry_policy: GroupRetryPolicy::default(),
77
78 broker_id: MASTER_ID,
79 which_broker_when_consume_slowly: 1,
80
81 notify_consumer_ids_changed_enable: true,
82
83 group_sys_flag: 0,
84
85 consume_timeout_minute: 15,
86
87 subscription_data_set: None,
88 attributes: HashMap::new(),
89 }
90 }
91}
92
93impl SubscriptionGroupConfig {
94 #[inline]
95 pub fn group_name(&self) -> &CheetahString {
96 &self.group_name
97 }
98
99 #[inline]
100 pub fn consume_enable(&self) -> bool {
101 self.consume_enable
102 }
103
104 #[inline]
105 pub fn consume_from_min_enable(&self) -> bool {
106 self.consume_from_min_enable
107 }
108
109 #[inline]
110 pub fn consume_broadcast_enable(&self) -> bool {
111 self.consume_broadcast_enable
112 }
113
114 #[inline]
115 pub fn consume_message_orderly(&self) -> bool {
116 self.consume_message_orderly
117 }
118
119 #[inline]
120 pub fn retry_queue_nums(&self) -> i32 {
121 self.retry_queue_nums
122 }
123
124 #[inline]
125 pub fn retry_max_times(&self) -> i32 {
126 self.retry_max_times
127 }
128
129 #[inline]
130 pub fn group_retry_policy(&self) -> &GroupRetryPolicy {
131 &self.group_retry_policy
132 }
133
134 #[inline]
135 pub fn broker_id(&self) -> u64 {
136 self.broker_id
137 }
138
139 #[inline]
140 pub fn which_broker_when_consume_slowly(&self) -> u64 {
141 self.which_broker_when_consume_slowly
142 }
143
144 #[inline]
145 pub fn notify_consumer_ids_changed_enable(&self) -> bool {
146 self.notify_consumer_ids_changed_enable
147 }
148
149 #[inline]
150 pub fn group_sys_flag(&self) -> i32 {
151 self.group_sys_flag
152 }
153
154 #[inline]
155 pub fn consume_timeout_minute(&self) -> i32 {
156 self.consume_timeout_minute
157 }
158
159 #[inline]
160 pub fn subscription_data_set(&self) -> Option<&HashSet<SimpleSubscriptionData>> {
161 self.subscription_data_set.as_ref()
162 }
163
164 #[inline]
165 pub fn attributes(&self) -> &HashMap<CheetahString, CheetahString> {
166 &self.attributes
167 }
168
169 #[inline]
170 pub fn set_group_name(&mut self, group_name: CheetahString) {
171 self.group_name = group_name;
172 }
173
174 #[inline]
175 pub fn set_consume_enable(&mut self, consume_enable: bool) {
176 self.consume_enable = consume_enable;
177 }
178
179 #[inline]
180 pub fn set_consume_from_min_enable(&mut self, consume_from_min_enable: bool) {
181 self.consume_from_min_enable = consume_from_min_enable;
182 }
183
184 #[inline]
185 pub fn set_consume_broadcast_enable(&mut self, consume_broadcast_enable: bool) {
186 self.consume_broadcast_enable = consume_broadcast_enable;
187 }
188
189 #[inline]
190 pub fn set_consume_message_orderly(&mut self, consume_message_orderly: bool) {
191 self.consume_message_orderly = consume_message_orderly;
192 }
193
194 #[inline]
195 pub fn set_retry_queue_nums(&mut self, retry_queue_nums: i32) {
196 self.retry_queue_nums = retry_queue_nums;
197 }
198
199 #[inline]
200 pub fn set_retry_max_times(&mut self, retry_max_times: i32) {
201 self.retry_max_times = retry_max_times;
202 }
203
204 #[inline]
205 pub fn set_group_retry_policy(&mut self, group_retry_policy: GroupRetryPolicy) {
206 self.group_retry_policy = group_retry_policy;
207 }
208
209 #[inline]
210 pub fn set_broker_id(&mut self, broker_id: u64) {
211 self.broker_id = broker_id;
212 }
213
214 #[inline]
215 pub fn set_which_broker_when_consume_slowly(&mut self, which_broker_when_consume_slowly: u64) {
216 self.which_broker_when_consume_slowly = which_broker_when_consume_slowly;
217 }
218
219 #[inline]
220 pub fn set_notify_consumer_ids_changed_enable(
221 &mut self,
222 notify_consumer_ids_changed_enable: bool,
223 ) {
224 self.notify_consumer_ids_changed_enable = notify_consumer_ids_changed_enable;
225 }
226
227 #[inline]
228 pub fn set_group_sys_flag(&mut self, group_sys_flag: i32) {
229 self.group_sys_flag = group_sys_flag;
230 }
231
232 #[inline]
233 pub fn set_consume_timeout_minute(&mut self, consume_timeout_minute: i32) {
234 self.consume_timeout_minute = consume_timeout_minute;
235 }
236
237 #[inline]
238 pub fn set_subscription_data_set(
239 &mut self,
240 subscription_data_set: Option<HashSet<SimpleSubscriptionData>>,
241 ) {
242 self.subscription_data_set = subscription_data_set;
243 }
244
245 #[inline]
246 pub fn set_attributes(&mut self, attributes: HashMap<CheetahString, CheetahString>) {
247 self.attributes = attributes;
248 }
249}
250
251#[cfg(test)]
252mod subscription_group_config_tests {
253 use super::*;
254 #[test]
257 fn creating_default_subscription_group_config() {
258 let config = SubscriptionGroupConfig::default();
259 assert_eq!(config.group_name, "");
260 assert!(config.consume_enable);
261 assert!(config.consume_from_min_enable);
262 assert!(config.consume_broadcast_enable);
263 assert!(!config.consume_message_orderly);
264 assert_eq!(config.retry_queue_nums, 1);
265 assert_eq!(config.retry_max_times, 16);
266 assert_eq!(config.broker_id, MASTER_ID);
268 assert_eq!(config.which_broker_when_consume_slowly, 1);
269 assert!(config.notify_consumer_ids_changed_enable);
270 assert_eq!(config.group_sys_flag, 0);
271 assert_eq!(config.consume_timeout_minute, 15);
272 assert!(config.subscription_data_set.is_none());
273 assert!(config.attributes.is_empty());
274 }
275
276 #[test]
277 fn setting_and_getting_fields() {
278 let mut config = SubscriptionGroupConfig::default();
279 config.set_group_name("test_group".into());
280 config.set_consume_enable(false);
281 config.set_consume_from_min_enable(true);
282 config.set_consume_broadcast_enable(false);
283 config.set_consume_message_orderly(true);
284 config.set_retry_queue_nums(2);
285 config.set_retry_max_times(10);
286 config.set_broker_id(2);
288 config.set_which_broker_when_consume_slowly(2);
289 config.set_notify_consumer_ids_changed_enable(false);
290 config.set_group_sys_flag(1);
291 config.set_consume_timeout_minute(30);
292 config.set_subscription_data_set(Some(HashSet::new()));
293 config.set_attributes(HashMap::from([("key".into(), "value".into())]));
294
295 assert_eq!(config.group_name(), "test_group");
296 assert!(!config.consume_enable());
297 assert!(config.consume_from_min_enable());
298 assert!(!config.consume_broadcast_enable());
299 assert!(config.consume_message_orderly());
300 assert_eq!(config.retry_queue_nums(), 2);
301 assert_eq!(config.retry_max_times(), 10);
302 assert_eq!(config.broker_id(), 2);
307 assert_eq!(config.which_broker_when_consume_slowly(), 2);
308 assert!(!config.notify_consumer_ids_changed_enable());
309 assert_eq!(config.group_sys_flag(), 1);
310 assert_eq!(config.consume_timeout_minute(), 30);
311 assert!(config.subscription_data_set().is_some());
312 assert_eq!(
313 config.attributes(),
314 &HashMap::from([("key".into(), "value".into())])
315 );
316 }
317}