Skip to main content

rocketmq_remoting/protocol/header/
notification_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::namesrv::topic_operation_header::TopicRequestHeader;
21
22#[derive(Debug, Serialize, Deserialize, RequestHeaderCodecV2)]
23pub struct NotificationRequestHeader {
24    #[serde(rename = "consumerGroup")]
25    #[required]
26    pub consumer_group: CheetahString,
27
28    #[serde(rename = "topic")]
29    #[required]
30    pub topic: CheetahString,
31
32    #[serde(rename = "queueId")]
33    #[required]
34    pub queue_id: i32,
35
36    #[serde(rename = "pollTime")]
37    #[required]
38    pub poll_time: i64,
39
40    #[serde(rename = "bornTime")]
41    #[required]
42    pub born_time: i64,
43
44    /// Indicates if the message is ordered; defaults to false.
45    #[serde(default)]
46    pub order: bool,
47
48    /// Attempt ID
49    #[serde(rename = "attemptId", skip_serializing_if = "Option::is_none")]
50    pub attempt_id: Option<CheetahString>,
51
52    #[serde(flatten)]
53    pub topic_request_header: Option<TopicRequestHeader>,
54}
55
56#[cfg(test)]
57mod tests {
58    use serde_json;
59
60    use super::*;
61
62    #[test]
63    fn test_notification_request_header_serialization() {
64        let header = NotificationRequestHeader {
65            consumer_group: CheetahString::from("consumer_group_1"),
66            topic: CheetahString::from("test_topic"),
67            queue_id: 10,
68            poll_time: 1234567890,
69            born_time: 1234567891,
70            order: true,
71            attempt_id: Some(CheetahString::from("attempt_1")),
72            topic_request_header: None,
73        };
74
75        let serialized = serde_json::to_string(&header).expect("Failed to serialize header");
76
77        let deserialized: NotificationRequestHeader =
78            serde_json::from_str(&serialized).expect("Failed to deserialize header");
79        assert_eq!(header.queue_id, deserialized.queue_id);
80    }
81
82    #[test]
83    fn test_notification_request_header_default_order() {
84        let header = NotificationRequestHeader {
85            consumer_group: CheetahString::from("consumer_group_1"),
86            topic: CheetahString::from("test_topic"),
87            queue_id: 10,
88            poll_time: 1234567890,
89            born_time: 1234567891,
90            order: false, // Defaults to false
91            attempt_id: None,
92            topic_request_header: None,
93        };
94
95        let serialized = serde_json::to_string(&header).expect("Failed to serialize header");
96
97        let deserialized: NotificationRequestHeader =
98            serde_json::from_str(&serialized).expect("Failed to deserialize header");
99        assert_eq!(header.queue_id, deserialized.queue_id);
100    }
101}