Skip to main content

rocketmq_remoting/protocol/header/
pop_message_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::fmt::Display;
16
17use cheetah_string::CheetahString;
18use rocketmq_common::TimeUtils::current_millis;
19use rocketmq_macros::RequestHeaderCodecV2;
20use serde::Deserialize;
21use serde::Serialize;
22
23use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
24
25#[derive(Clone, Debug, Serialize, Deserialize, RequestHeaderCodecV2)]
26#[serde(rename_all = "camelCase")]
27pub struct PopMessageRequestHeader {
28    #[required]
29    pub consumer_group: CheetahString,
30    #[required]
31    pub topic: CheetahString,
32    #[required]
33    pub queue_id: i32,
34    #[required]
35    pub max_msg_nums: u32,
36    #[required]
37    pub invisible_time: u64,
38    #[required]
39    pub poll_time: u64,
40    #[required]
41    pub born_time: u64,
42    #[required]
43    pub init_mode: i32,
44    pub exp_type: Option<CheetahString>,
45    pub exp: Option<CheetahString>,
46    pub order: Option<bool>,
47    pub attempt_id: Option<CheetahString>,
48
49    #[serde(flatten)]
50    pub topic_request_header: Option<TopicRequestHeader>,
51}
52
53impl PopMessageRequestHeader {
54    pub fn is_timeout_too_much(&self) -> bool {
55        current_millis() as i64 - self.born_time as i64 - self.poll_time as i64 > 500
56    }
57}
58
59impl Default for PopMessageRequestHeader {
60    fn default() -> Self {
61        PopMessageRequestHeader {
62            consumer_group: CheetahString::new(),
63            topic: CheetahString::new(),
64            queue_id: 0,
65            max_msg_nums: 0,
66            invisible_time: 0,
67            poll_time: 0,
68            born_time: 0,
69            init_mode: 0,
70            exp_type: None,
71            exp: None,
72            order: Some(false),
73            attempt_id: None,
74            topic_request_header: None,
75        }
76    }
77}
78
79impl Display for PopMessageRequestHeader {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        write!(
82            f,
83            "PopMessageRequestHeader [consumer_group={}, topic={}, queue_id={}, max_msg_nums={}, invisible_time={}, \
84             poll_time={}, born_time={}, init_mode={}, exp_type={}, exp={}, order={}, attempt_id={}]",
85            self.consumer_group,
86            self.topic,
87            self.queue_id,
88            self.max_msg_nums,
89            self.invisible_time,
90            self.poll_time,
91            self.born_time,
92            self.init_mode,
93            self.exp_type.as_ref().unwrap_or(&CheetahString::new()),
94            self.exp.as_ref().unwrap_or(&CheetahString::new()),
95            self.order.as_ref().unwrap_or(&false),
96            self.attempt_id.as_ref().unwrap_or(&CheetahString::new())
97        )
98    }
99}
100
101#[cfg(test)]
102mod tests {
103    use cheetah_string::CheetahString;
104
105    use super::*;
106
107    #[test]
108    fn default_pop_message_request_header() {
109        let header = PopMessageRequestHeader::default();
110        assert_eq!(header.consumer_group, CheetahString::new());
111        assert_eq!(header.topic, CheetahString::new());
112        assert_eq!(header.queue_id, 0);
113        assert_eq!(header.max_msg_nums, 0);
114        assert_eq!(header.invisible_time, 0);
115        assert_eq!(header.poll_time, 0);
116        assert_eq!(header.born_time, 0);
117        assert_eq!(header.init_mode, 0);
118        assert!(header.exp_type.is_none());
119        assert!(header.exp.is_none());
120        assert_eq!(header.order, Some(false));
121        assert!(header.attempt_id.is_none());
122        assert!(header.topic_request_header.is_none());
123    }
124
125    #[test]
126    fn display_pop_message_request_header() {
127        let header = PopMessageRequestHeader {
128            consumer_group: CheetahString::from("group1"),
129            topic: CheetahString::from("topic1"),
130            queue_id: 1,
131            max_msg_nums: 10,
132            invisible_time: 1000,
133            poll_time: 2000,
134            born_time: 3000,
135            init_mode: 1,
136            exp_type: Some(CheetahString::from("type1")),
137            exp: Some(CheetahString::from("exp1")),
138            order: Some(true),
139            attempt_id: Some(CheetahString::from("attempt1")),
140            topic_request_header: None,
141        };
142        assert_eq!(
143            format!("{}", header),
144            "PopMessageRequestHeader [consumer_group=group1, topic=topic1, queue_id=1, max_msg_nums=10, \
145             invisible_time=1000, poll_time=2000, born_time=3000, init_mode=1, exp_type=type1, exp=exp1, order=true, \
146             attempt_id=attempt1]"
147        );
148    }
149
150    #[test]
151    fn display_pop_message_request_header_with_defaults() {
152        let header = PopMessageRequestHeader::default();
153        assert_eq!(
154            format!("{}", header),
155            "PopMessageRequestHeader [consumer_group=, topic=, queue_id=0, max_msg_nums=0, invisible_time=0, \
156             poll_time=0, born_time=0, init_mode=0, exp_type=, exp=, order=false, attempt_id=]"
157        );
158    }
159}