rocketmq_remoting/protocol/header/
get_min_offset_request_header.rs1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::rpc::topic_request_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct GetMinOffsetRequestHeader {
26 pub topic: CheetahString,
27
28 pub queue_id: i32,
29
30 #[serde(flatten)]
31 pub topic_request_header: Option<TopicRequestHeader>,
32}
33
34impl TopicRequestHeaderTrait for GetMinOffsetRequestHeader {
35 fn set_lo(&mut self, lo: Option<bool>) {
36 if let Some(header) = self.topic_request_header.as_mut() {
37 header.lo = lo;
38 }
39 }
40
41 fn lo(&self) -> Option<bool> {
42 self.topic_request_header.as_ref().and_then(|h| h.lo)
43 }
44
45 fn set_topic(&mut self, topic: CheetahString) {
46 self.topic = topic;
47 }
48
49 fn topic(&self) -> &CheetahString {
50 &self.topic
51 }
52
53 fn broker_name(&self) -> Option<&CheetahString> {
54 self.topic_request_header
55 .as_ref()
56 .and_then(|h| h.rpc_request_header.as_ref())
57 .and_then(|h| h.broker_name.as_ref())
58 }
59
60 fn set_broker_name(&mut self, broker_name: CheetahString) {
61 if let Some(header) = self.topic_request_header.as_mut() {
62 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
63 rpc_header.broker_name = Some(broker_name);
64 }
65 }
66 }
67
68 fn namespace(&self) -> Option<&str> {
69 self.topic_request_header
70 .as_ref()
71 .and_then(|h| h.rpc_request_header.as_ref())
72 .and_then(|r| r.namespace.as_deref())
73 }
74
75 fn set_namespace(&mut self, namespace: CheetahString) {
76 if let Some(header) = self.topic_request_header.as_mut() {
77 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
78 rpc_header.namespace = Some(namespace);
79 }
80 }
81 }
82
83 fn namespaced(&self) -> Option<bool> {
84 self.topic_request_header
85 .as_ref()
86 .and_then(|h| h.rpc_request_header.as_ref())
87 .and_then(|r| r.namespaced)
88 }
89
90 fn set_namespaced(&mut self, namespaced: bool) {
91 if let Some(header) = self.topic_request_header.as_mut() {
92 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
93 rpc_header.namespaced = Some(namespaced);
94 }
95 }
96 }
97
98 fn oneway(&self) -> Option<bool> {
99 self.topic_request_header
100 .as_ref()
101 .and_then(|h| h.rpc_request_header.as_ref())
102 .and_then(|r| r.oneway)
103 }
104
105 fn set_oneway(&mut self, oneway: bool) {
106 if let Some(header) = self.topic_request_header.as_mut() {
107 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
108 rpc_header.oneway = Some(oneway);
109 }
110 }
111 }
112
113 fn queue_id(&self) -> i32 {
114 self.queue_id
115 }
116
117 fn set_queue_id(&mut self, queue_id: i32) {
118 self.queue_id = queue_id;
119 }
120}
121
122#[cfg(test)]
123mod tests {
124 use std::collections::HashMap;
125
126 use super::*;
127 use crate::protocol::command_custom_header::FromMap;
128 use crate::rpc::rpc_request_header::RpcRequestHeader;
129
130 #[test]
131 fn get_min_offset_request_header_default() {
132 let header = GetMinOffsetRequestHeader::default();
133 assert_eq!(header.topic, "");
134 assert_eq!(header.queue_id, 0);
135 assert!(header.topic_request_header.is_none());
136 }
137
138 #[test]
139 fn get_min_offset_request_header_trait_impl() {
140 let mut header = GetMinOffsetRequestHeader::default();
141
142 assert!(header.lo().is_none());
143 header.topic_request_header = Some(TopicRequestHeader::default());
144 header.set_lo(Some(true));
145 assert_eq!(header.lo(), Some(true));
146
147 header.set_topic(CheetahString::from("test_topic"));
148 assert_eq!(header.topic(), &CheetahString::from("test_topic"));
149
150 assert!(header.broker_name().is_none());
151 header.topic_request_header.as_mut().unwrap().rpc_request_header = Some(RpcRequestHeader::default());
152 header.set_broker_name(CheetahString::from("broker"));
153 assert_eq!(header.broker_name(), Some(&CheetahString::from("broker")));
154
155 assert!(header.namespace().is_none());
156 header.set_namespace(CheetahString::from("ns"));
157 assert_eq!(header.namespace(), Some("ns"));
158
159 assert!(header.namespaced().is_none());
160 header.set_namespaced(true);
161 assert_eq!(header.namespaced(), Some(true));
162
163 assert!(header.oneway().is_none());
164 header.set_oneway(true);
165 assert_eq!(header.oneway(), Some(true));
166
167 header.set_queue_id(1);
168 assert_eq!(header.queue_id(), 1);
169 }
170
171 #[test]
172 fn get_min_offset_request_header_serialization() {
173 let header = GetMinOffsetRequestHeader {
174 topic: CheetahString::from("test"),
175 queue_id: 1,
176 topic_request_header: Some(TopicRequestHeader {
177 lo: Some(true),
178 ..Default::default()
179 }),
180 };
181 let json = serde_json::to_string(&header).unwrap();
182 assert!(json.contains("\"topic\":\"test\""));
183 assert!(json.contains("\"queueId\":1"));
184 assert!(json.contains("\"lo\":true"));
185 }
186
187 #[test]
188 fn get_min_offset_request_header_deserialization() {
189 let json = r#"{"topic":"test","queueId":1,"lo":true}"#;
190 let header: GetMinOffsetRequestHeader = serde_json::from_str(json).unwrap();
191 assert_eq!(header.topic, "test");
192 assert_eq!(header.queue_id, 1);
193 assert_eq!(header.topic_request_header.unwrap().lo, Some(true));
194 }
195
196 #[test]
197 fn get_min_offset_request_header_from_map() {
198 let mut map = HashMap::new();
199 map.insert(CheetahString::from("topic"), CheetahString::from("test_topic"));
200 map.insert(CheetahString::from("queueId"), CheetahString::from("2"));
201 map.insert(CheetahString::from("lo"), CheetahString::from("true"));
202
203 let header = <GetMinOffsetRequestHeader as FromMap>::from(&map).unwrap();
204 assert_eq!(header.topic, "test_topic");
205 assert_eq!(header.queue_id, 2);
206 assert_eq!(header.topic_request_header.unwrap().lo, Some(true));
207 }
208}