rocketmq_remoting/protocol/header/
pop_message_request_header.rs

1/*
2 * Licensed to the Apache Software Foundation (ASF) under one or more
3 * contributor license agreements.  See the NOTICE file distributed with
4 * this work for additional information regarding copyright ownership.
5 * The ASF licenses this file to You under the Apache License, Version 2.0
6 * (the "License"); you may not use this file except in compliance with
7 * the License.  You may obtain a copy of the License at
8 *
9 *     http://www.apache.org/licenses/LICENSE-2.0
10 *
11 * Unless required by applicable law or agreed to in writing, software
12 * distributed under the License is distributed on an "AS IS" BASIS,
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 * See the License for the specific language governing permissions and
15 * limitations under the License.
16 */
17use std::fmt::Display;
18
19use cheetah_string::CheetahString;
20use rocketmq_common::TimeUtils::get_current_millis;
21use rocketmq_macros::RequestHeaderCodecV2;
22use serde::Deserialize;
23use serde::Serialize;
24
25use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
26
27#[derive(Clone, Debug, Serialize, Deserialize, RequestHeaderCodecV2)]
28#[serde(rename_all = "camelCase")]
29pub struct PopMessageRequestHeader {
30    #[required]
31    pub consumer_group: CheetahString,
32    #[required]
33    pub topic: CheetahString,
34    #[required]
35    pub queue_id: i32,
36    #[required]
37    pub max_msg_nums: u32,
38    #[required]
39    pub invisible_time: u64,
40    #[required]
41    pub poll_time: u64,
42    #[required]
43    pub born_time: u64,
44    #[required]
45    pub init_mode: i32,
46    pub exp_type: Option<CheetahString>,
47    pub exp: Option<CheetahString>,
48    pub order: Option<bool>,
49    pub attempt_id: Option<CheetahString>,
50
51    #[serde(flatten)]
52    pub topic_request_header: Option<TopicRequestHeader>,
53}
54
55impl PopMessageRequestHeader {
56    pub fn is_timeout_too_much(&self) -> bool {
57        get_current_millis() as i64 - self.born_time as i64 - self.poll_time as i64 > 500
58    }
59}
60
61impl Default for PopMessageRequestHeader {
62    fn default() -> Self {
63        PopMessageRequestHeader {
64            consumer_group: CheetahString::new(),
65            topic: CheetahString::new(),
66            queue_id: 0,
67            max_msg_nums: 0,
68            invisible_time: 0,
69            poll_time: 0,
70            born_time: 0,
71            init_mode: 0,
72            exp_type: None,
73            exp: None,
74            order: Some(false),
75            attempt_id: None,
76            topic_request_header: None,
77        }
78    }
79}
80
81impl Display for PopMessageRequestHeader {
82    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
83        write!(
84            f,
85            "PopMessageRequestHeader [consumer_group={}, topic={}, queue_id={}, max_msg_nums={}, \
86             invisible_time={}, poll_time={}, born_time={}, init_mode={}, exp_type={}, exp={}, \
87             order={}, attempt_id={}]",
88            self.consumer_group,
89            self.topic,
90            self.queue_id,
91            self.max_msg_nums,
92            self.invisible_time,
93            self.poll_time,
94            self.born_time,
95            self.init_mode,
96            self.exp_type.as_ref().unwrap_or(&CheetahString::new()),
97            self.exp.as_ref().unwrap_or(&CheetahString::new()),
98            self.order.as_ref().unwrap_or(&false),
99            self.attempt_id.as_ref().unwrap_or(&CheetahString::new())
100        )
101    }
102}
103
104#[cfg(test)]
105mod tests {
106    use cheetah_string::CheetahString;
107
108    use super::*;
109
110    #[test]
111    fn default_pop_message_request_header() {
112        let header = PopMessageRequestHeader::default();
113        assert_eq!(header.consumer_group, CheetahString::new());
114        assert_eq!(header.topic, CheetahString::new());
115        assert_eq!(header.queue_id, 0);
116        assert_eq!(header.max_msg_nums, 0);
117        assert_eq!(header.invisible_time, 0);
118        assert_eq!(header.poll_time, 0);
119        assert_eq!(header.born_time, 0);
120        assert_eq!(header.init_mode, 0);
121        assert!(header.exp_type.is_none());
122        assert!(header.exp.is_none());
123        assert_eq!(header.order, Some(false));
124        assert!(header.attempt_id.is_none());
125        assert!(header.topic_request_header.is_none());
126    }
127
128    #[test]
129    fn display_pop_message_request_header() {
130        let header = PopMessageRequestHeader {
131            consumer_group: CheetahString::from("group1"),
132            topic: CheetahString::from("topic1"),
133            queue_id: 1,
134            max_msg_nums: 10,
135            invisible_time: 1000,
136            poll_time: 2000,
137            born_time: 3000,
138            init_mode: 1,
139            exp_type: Some(CheetahString::from("type1")),
140            exp: Some(CheetahString::from("exp1")),
141            order: Some(true),
142            attempt_id: Some(CheetahString::from("attempt1")),
143            topic_request_header: None,
144        };
145        assert_eq!(
146            format!("{}", header),
147            "PopMessageRequestHeader [consumer_group=group1, topic=topic1, queue_id=1, \
148             max_msg_nums=10, invisible_time=1000, poll_time=2000, born_time=3000, init_mode=1, \
149             exp_type=type1, exp=exp1, order=true, attempt_id=attempt1]"
150        );
151    }
152
153    #[test]
154    fn display_pop_message_request_header_with_defaults() {
155        let header = PopMessageRequestHeader::default();
156        assert_eq!(
157            format!("{}", header),
158            "PopMessageRequestHeader [consumer_group=, topic=, queue_id=0, max_msg_nums=0, \
159             invisible_time=0, poll_time=0, born_time=0, init_mode=0, exp_type=, exp=, \
160             order=false, attempt_id=]"
161        );
162    }
163}