rocketmq_remoting/protocol/subscription/
subscription_group_config.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::collections::HashMap;
18use std::collections::HashSet;
19
20use cheetah_string::CheetahString;
21use rocketmq_common::common::mix_all::MASTER_ID;
22use serde::Deserialize;
23use serde::Serialize;
24
25use crate::protocol::subscription::group_retry_policy::GroupRetryPolicy;
26use crate::protocol::subscription::simple_subscription_data::SimpleSubscriptionData;
27
28#[derive(Debug, Clone, Serialize, Deserialize)]
29#[serde(rename_all = "camelCase")]
30pub struct SubscriptionGroupConfig {
31    group_name: CheetahString,
32
33    consume_enable: bool,
34    consume_from_min_enable: bool,
35    consume_broadcast_enable: bool,
36    consume_message_orderly: bool,
37
38    retry_queue_nums: i32,
39    retry_max_times: i32,
40    group_retry_policy: GroupRetryPolicy,
41
42    broker_id: u64,
43    which_broker_when_consume_slowly: u64,
44
45    notify_consumer_ids_changed_enable: bool,
46
47    group_sys_flag: i32,
48
49    consume_timeout_minute: i32,
50
51    subscription_data_set: Option<HashSet<SimpleSubscriptionData>>,
52    attributes: HashMap<CheetahString, CheetahString>,
53}
54
55impl SubscriptionGroupConfig {
56    pub fn new(group_name: CheetahString) -> Self {
57        Self {
58            group_name,
59            ..Default::default()
60        }
61    }
62}
63
64impl Default for SubscriptionGroupConfig {
65    fn default() -> Self {
66        SubscriptionGroupConfig {
67            group_name: CheetahString::default(),
68
69            consume_enable: true,
70            consume_from_min_enable: true,
71            consume_broadcast_enable: true,
72            consume_message_orderly: false,
73
74            retry_queue_nums: 1,
75            retry_max_times: 16,
76            group_retry_policy: GroupRetryPolicy::default(),
77
78            broker_id: MASTER_ID,
79            which_broker_when_consume_slowly: 1,
80
81            notify_consumer_ids_changed_enable: true,
82
83            group_sys_flag: 0,
84
85            consume_timeout_minute: 15,
86
87            subscription_data_set: None,
88            attributes: HashMap::new(),
89        }
90    }
91}
92
93impl SubscriptionGroupConfig {
94    #[inline]
95    pub fn group_name(&self) -> &CheetahString {
96        &self.group_name
97    }
98
99    #[inline]
100    pub fn consume_enable(&self) -> bool {
101        self.consume_enable
102    }
103
104    #[inline]
105    pub fn consume_from_min_enable(&self) -> bool {
106        self.consume_from_min_enable
107    }
108
109    #[inline]
110    pub fn consume_broadcast_enable(&self) -> bool {
111        self.consume_broadcast_enable
112    }
113
114    #[inline]
115    pub fn consume_message_orderly(&self) -> bool {
116        self.consume_message_orderly
117    }
118
119    #[inline]
120    pub fn retry_queue_nums(&self) -> i32 {
121        self.retry_queue_nums
122    }
123
124    #[inline]
125    pub fn retry_max_times(&self) -> i32 {
126        self.retry_max_times
127    }
128
129    #[inline]
130    pub fn group_retry_policy(&self) -> &GroupRetryPolicy {
131        &self.group_retry_policy
132    }
133
134    #[inline]
135    pub fn broker_id(&self) -> u64 {
136        self.broker_id
137    }
138
139    #[inline]
140    pub fn which_broker_when_consume_slowly(&self) -> u64 {
141        self.which_broker_when_consume_slowly
142    }
143
144    #[inline]
145    pub fn notify_consumer_ids_changed_enable(&self) -> bool {
146        self.notify_consumer_ids_changed_enable
147    }
148
149    #[inline]
150    pub fn group_sys_flag(&self) -> i32 {
151        self.group_sys_flag
152    }
153
154    #[inline]
155    pub fn consume_timeout_minute(&self) -> i32 {
156        self.consume_timeout_minute
157    }
158
159    #[inline]
160    pub fn subscription_data_set(&self) -> Option<&HashSet<SimpleSubscriptionData>> {
161        self.subscription_data_set.as_ref()
162    }
163
164    #[inline]
165    pub fn attributes(&self) -> &HashMap<CheetahString, CheetahString> {
166        &self.attributes
167    }
168
169    #[inline]
170    pub fn set_group_name(&mut self, group_name: CheetahString) {
171        self.group_name = group_name;
172    }
173
174    #[inline]
175    pub fn set_consume_enable(&mut self, consume_enable: bool) {
176        self.consume_enable = consume_enable;
177    }
178
179    #[inline]
180    pub fn set_consume_from_min_enable(&mut self, consume_from_min_enable: bool) {
181        self.consume_from_min_enable = consume_from_min_enable;
182    }
183
184    #[inline]
185    pub fn set_consume_broadcast_enable(&mut self, consume_broadcast_enable: bool) {
186        self.consume_broadcast_enable = consume_broadcast_enable;
187    }
188
189    #[inline]
190    pub fn set_consume_message_orderly(&mut self, consume_message_orderly: bool) {
191        self.consume_message_orderly = consume_message_orderly;
192    }
193
194    #[inline]
195    pub fn set_retry_queue_nums(&mut self, retry_queue_nums: i32) {
196        self.retry_queue_nums = retry_queue_nums;
197    }
198
199    #[inline]
200    pub fn set_retry_max_times(&mut self, retry_max_times: i32) {
201        self.retry_max_times = retry_max_times;
202    }
203
204    #[inline]
205    pub fn set_group_retry_policy(&mut self, group_retry_policy: GroupRetryPolicy) {
206        self.group_retry_policy = group_retry_policy;
207    }
208
209    #[inline]
210    pub fn set_broker_id(&mut self, broker_id: u64) {
211        self.broker_id = broker_id;
212    }
213
214    #[inline]
215    pub fn set_which_broker_when_consume_slowly(&mut self, which_broker_when_consume_slowly: u64) {
216        self.which_broker_when_consume_slowly = which_broker_when_consume_slowly;
217    }
218
219    #[inline]
220    pub fn set_notify_consumer_ids_changed_enable(
221        &mut self,
222        notify_consumer_ids_changed_enable: bool,
223    ) {
224        self.notify_consumer_ids_changed_enable = notify_consumer_ids_changed_enable;
225    }
226
227    #[inline]
228    pub fn set_group_sys_flag(&mut self, group_sys_flag: i32) {
229        self.group_sys_flag = group_sys_flag;
230    }
231
232    #[inline]
233    pub fn set_consume_timeout_minute(&mut self, consume_timeout_minute: i32) {
234        self.consume_timeout_minute = consume_timeout_minute;
235    }
236
237    #[inline]
238    pub fn set_subscription_data_set(
239        &mut self,
240        subscription_data_set: Option<HashSet<SimpleSubscriptionData>>,
241    ) {
242        self.subscription_data_set = subscription_data_set;
243    }
244
245    #[inline]
246    pub fn set_attributes(&mut self, attributes: HashMap<CheetahString, CheetahString>) {
247        self.attributes = attributes;
248    }
249}
250
251#[cfg(test)]
252mod subscription_group_config_tests {
253    use super::*;
254    //use crate::protocol::subscription::group_retry_policy::RetryPolicy;
255
256    #[test]
257    fn creating_default_subscription_group_config() {
258        let config = SubscriptionGroupConfig::default();
259        assert_eq!(config.group_name, "");
260        assert!(config.consume_enable);
261        assert!(config.consume_from_min_enable);
262        assert!(config.consume_broadcast_enable);
263        assert!(!config.consume_message_orderly);
264        assert_eq!(config.retry_queue_nums, 1);
265        assert_eq!(config.retry_max_times, 16);
266        // assert_eq!(config.group_retry_policy, GroupRetryPolicy::default());
267        assert_eq!(config.broker_id, MASTER_ID);
268        assert_eq!(config.which_broker_when_consume_slowly, 1);
269        assert!(config.notify_consumer_ids_changed_enable);
270        assert_eq!(config.group_sys_flag, 0);
271        assert_eq!(config.consume_timeout_minute, 15);
272        assert!(config.subscription_data_set.is_none());
273        assert!(config.attributes.is_empty());
274    }
275
276    #[test]
277    fn setting_and_getting_fields() {
278        let mut config = SubscriptionGroupConfig::default();
279        config.set_group_name("test_group".into());
280        config.set_consume_enable(false);
281        config.set_consume_from_min_enable(true);
282        config.set_consume_broadcast_enable(false);
283        config.set_consume_message_orderly(true);
284        config.set_retry_queue_nums(2);
285        config.set_retry_max_times(10);
286        //config.set_group_retry_policy(GroupRetryPolicy::Custom(RetryPolicy::FixedDelay(100)));
287        config.set_broker_id(2);
288        config.set_which_broker_when_consume_slowly(2);
289        config.set_notify_consumer_ids_changed_enable(false);
290        config.set_group_sys_flag(1);
291        config.set_consume_timeout_minute(30);
292        config.set_subscription_data_set(Some(HashSet::new()));
293        config.set_attributes(HashMap::from([("key".into(), "value".into())]));
294
295        assert_eq!(config.group_name(), "test_group");
296        assert!(!config.consume_enable());
297        assert!(config.consume_from_min_enable());
298        assert!(!config.consume_broadcast_enable());
299        assert!(config.consume_message_orderly());
300        assert_eq!(config.retry_queue_nums(), 2);
301        assert_eq!(config.retry_max_times(), 10);
302        /*        assert_eq!(
303            config.group_retry_policy(),
304            &GroupRetryPolicy::Custom(RetryPolicy::FixedDelay(100))
305        );*/
306        assert_eq!(config.broker_id(), 2);
307        assert_eq!(config.which_broker_when_consume_slowly(), 2);
308        assert!(!config.notify_consumer_ids_changed_enable());
309        assert_eq!(config.group_sys_flag(), 1);
310        assert_eq!(config.consume_timeout_minute(), 30);
311        assert!(config.subscription_data_set().is_some());
312        assert_eq!(
313            config.attributes(),
314            &HashMap::from([("key".into(), "value".into())])
315        );
316    }
317}