rocketmq_remoting/protocol/header/
search_offset_request_header.rs1use cheetah_string::CheetahString;
16use rocketmq_common::common::boundary_type::BoundaryType;
17use rocketmq_macros::RequestHeaderCodecV2;
18use serde::Deserialize;
19use serde::Serialize;
20
21use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
22use crate::rpc::topic_request_header::TopicRequestHeader;
23
24#[derive(Default, Debug, Serialize, Deserialize, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct SearchOffsetRequestHeader {
27 #[required]
28 pub topic: CheetahString,
29
30 #[required]
31 pub queue_id: i32,
32
33 #[required]
34 pub timestamp: i64,
35
36 pub boundary_type: BoundaryType,
37
38 #[serde(flatten)]
39 pub topic_request_header: Option<TopicRequestHeader>,
40}
41
42impl TopicRequestHeaderTrait for SearchOffsetRequestHeader {
43 fn set_lo(&mut self, lo: Option<bool>) {
44 if let Some(header) = self.topic_request_header.as_mut() {
45 header.lo = lo;
46 }
47 }
48
49 fn lo(&self) -> Option<bool> {
50 self.topic_request_header.as_ref().and_then(|h| h.lo)
51 }
52
53 fn set_topic(&mut self, topic: CheetahString) {
54 self.topic = topic;
55 }
56
57 fn topic(&self) -> &CheetahString {
58 &self.topic
59 }
60
61 fn broker_name(&self) -> Option<&CheetahString> {
62 self.topic_request_header
63 .as_ref()
64 .and_then(|h| h.rpc_request_header.as_ref())
65 .and_then(|h| h.broker_name.as_ref())
66 }
67
68 fn set_broker_name(&mut self, broker_name: CheetahString) {
69 if let Some(header) = self.topic_request_header.as_mut() {
70 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
71 rpc_header.broker_name = Some(broker_name);
72 }
73 }
74 }
75
76 fn namespace(&self) -> Option<&str> {
77 self.topic_request_header
78 .as_ref()
79 .and_then(|h| h.rpc_request_header.as_ref())
80 .and_then(|r| r.namespace.as_deref())
81 }
82
83 fn set_namespace(&mut self, namespace: CheetahString) {
84 if let Some(header) = self.topic_request_header.as_mut() {
85 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
86 rpc_header.namespace = Some(namespace);
87 }
88 }
89 }
90
91 fn namespaced(&self) -> Option<bool> {
92 self.topic_request_header
93 .as_ref()
94 .and_then(|h| h.rpc_request_header.as_ref())
95 .and_then(|r| r.namespaced)
96 }
97
98 fn set_namespaced(&mut self, namespaced: bool) {
99 if let Some(header) = self.topic_request_header.as_mut() {
100 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
101 rpc_header.namespaced = Some(namespaced);
102 }
103 }
104 }
105
106 fn oneway(&self) -> Option<bool> {
107 self.topic_request_header
108 .as_ref()
109 .and_then(|h| h.rpc_request_header.as_ref())
110 .and_then(|r| r.oneway)
111 }
112
113 fn set_oneway(&mut self, oneway: bool) {
114 if let Some(header) = self.topic_request_header.as_mut() {
115 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
116 rpc_header.oneway = Some(oneway);
117 }
118 }
119 }
120
121 fn queue_id(&self) -> i32 {
122 self.queue_id
123 }
124
125 fn set_queue_id(&mut self, queue_id: i32) {
126 self.queue_id = queue_id;
127 }
128}
129
130#[cfg(test)]
131mod tests {
132 use super::*;
133
134 #[test]
135 fn search_offset_request_header_default() {
136 let header = SearchOffsetRequestHeader::default();
137 assert_eq!(header.topic, CheetahString::default());
138 assert_eq!(header.queue_id, 0);
139 assert_eq!(header.timestamp, 0);
140 assert_eq!(header.boundary_type, BoundaryType::Lower);
141 }
142
143 #[test]
144 fn search_offset_request_header_creation() {
145 let header = SearchOffsetRequestHeader {
146 topic: CheetahString::from("test_topic"),
147 queue_id: 1,
148 timestamp: 1702345678000,
149 boundary_type: BoundaryType::Upper,
150 topic_request_header: None,
151 };
152
153 assert_eq!(header.topic, CheetahString::from("test_topic"));
154 assert_eq!(header.queue_id, 1);
155 assert_eq!(header.timestamp, 1702345678000);
156 assert_eq!(header.boundary_type, BoundaryType::Upper);
157 }
158
159 #[test]
160 fn search_offset_request_header_serializes_to_json() {
161 let header = SearchOffsetRequestHeader {
162 topic: CheetahString::from("my_topic"),
163 queue_id: 2,
164 timestamp: 1702345678999,
165 boundary_type: BoundaryType::Upper,
166 topic_request_header: None,
167 };
168
169 let json = serde_json::to_string(&header).unwrap();
170
171 assert!(json.contains(r#""topic":"my_topic""#));
173 assert!(json.contains(r#""queueId":2"#));
174 assert!(json.contains(r#""timestamp":1702345678999"#));
175 assert!(json.contains(r#""boundaryType":"UPPER""#));
176 }
177
178 #[test]
179 fn search_offset_request_header_deserializes_from_json() {
180 let json = r#"{
181 "topic": "test_topic",
182 "queueId": 3,
183 "timestamp": 1702345678123,
184 "boundaryType": "LOWER"
185 }"#;
186
187 let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
188
189 assert_eq!(header.topic, CheetahString::from("test_topic"));
190 assert_eq!(header.queue_id, 3);
191 assert_eq!(header.timestamp, 1702345678123);
192 assert_eq!(header.boundary_type, BoundaryType::Lower);
193 }
194
195 #[test]
196 fn search_offset_request_header_deserializes_with_uppercase_boundary_type() {
197 let json = r#"{
198 "topic": "topic1",
199 "queueId": 0,
200 "timestamp": 1000000000,
201 "boundaryType": "UPPER"
202 }"#;
203
204 let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
205
206 assert_eq!(header.boundary_type, BoundaryType::Upper);
207 }
208
209 #[test]
210 fn search_offset_request_header_deserializes_with_lowercase_boundary_type() {
211 let json = r#"{
212 "topic": "topic2",
213 "queueId": 5,
214 "timestamp": 2000000000,
215 "boundaryType": "lower"
216 }"#;
217
218 let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
219
220 assert_eq!(header.boundary_type, BoundaryType::Lower);
221 }
222
223 #[test]
224 fn search_offset_request_header_boundary_type_defaults_to_lower_for_invalid_value() {
225 let json = r#"{
226 "topic": "topic3",
227 "queueId": 1,
228 "timestamp": 3000000000,
229 "boundaryType": "invalid"
230 }"#;
231
232 let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
233
234 assert_eq!(header.boundary_type, BoundaryType::Lower);
236 }
237
238 #[test]
239 fn search_offset_request_header_roundtrip_serialization() {
240 let original = SearchOffsetRequestHeader {
241 topic: CheetahString::from("roundtrip_topic"),
242 queue_id: 10,
243 timestamp: 1702400000000,
244 boundary_type: BoundaryType::Upper,
245 topic_request_header: None,
246 };
247
248 let json = serde_json::to_string(&original).unwrap();
249 let deserialized: SearchOffsetRequestHeader = serde_json::from_str(&json).unwrap();
250
251 assert_eq!(deserialized.topic, original.topic);
252 assert_eq!(deserialized.queue_id, original.queue_id);
253 assert_eq!(deserialized.timestamp, original.timestamp);
254 assert_eq!(deserialized.boundary_type, original.boundary_type);
255 }
256
257 #[test]
258 fn search_offset_request_header_with_negative_queue_id() {
259 let header = SearchOffsetRequestHeader {
260 topic: CheetahString::from("test"),
261 queue_id: -1,
262 timestamp: 1000,
263 boundary_type: BoundaryType::Lower,
264 topic_request_header: None,
265 };
266
267 assert_eq!(header.queue_id, -1);
268 }
269
270 #[test]
271 fn search_offset_request_header_with_empty_topic() {
272 let header = SearchOffsetRequestHeader {
273 topic: CheetahString::new(),
274 queue_id: 0,
275 timestamp: 0,
276 boundary_type: BoundaryType::Lower,
277 topic_request_header: None,
278 };
279
280 assert!(header.topic.is_empty());
281 }
282
283 #[test]
284 fn search_offset_request_header_with_large_timestamp() {
285 let header = SearchOffsetRequestHeader {
286 topic: CheetahString::from("test"),
287 queue_id: 0,
288 timestamp: i64::MAX,
289 boundary_type: BoundaryType::Upper,
290 topic_request_header: None,
291 };
292
293 assert_eq!(header.timestamp, i64::MAX);
294
295 let json = serde_json::to_string(&header).unwrap();
296 let deserialized: SearchOffsetRequestHeader = serde_json::from_str(&json).unwrap();
297 assert_eq!(deserialized.timestamp, i64::MAX);
298 }
299
300 #[test]
302 fn topic_request_header_trait_set_and_get_topic() {
303 let mut header = SearchOffsetRequestHeader::default();
304 let topic = CheetahString::from("test_topic");
305
306 header.set_topic(topic.clone());
307 assert_eq!(header.topic(), &topic);
308 }
309
310 #[test]
311 fn topic_request_header_trait_set_and_get_queue_id() {
312 let mut header = SearchOffsetRequestHeader::default();
313
314 header.set_queue_id(5);
315 assert_eq!(header.queue_id(), 5);
316
317 header.set_queue_id(-1);
318 assert_eq!(header.queue_id(), -1);
319 }
320
321 #[test]
322 fn topic_request_header_trait_lo_with_none_topic_request_header() {
323 let mut header = SearchOffsetRequestHeader::default();
324
325 header.set_lo(Some(true));
327 assert_eq!(header.lo(), None);
328 }
329
330 #[test]
331 fn topic_request_header_trait_lo_with_some_topic_request_header() {
332 let mut header = SearchOffsetRequestHeader {
333 topic: CheetahString::from("test"),
334 queue_id: 0,
335 timestamp: 0,
336 boundary_type: BoundaryType::Lower,
337 topic_request_header: Some(TopicRequestHeader::default()),
338 };
339
340 header.set_lo(Some(true));
341 assert_eq!(header.lo(), Some(true));
342
343 header.set_lo(Some(false));
344 assert_eq!(header.lo(), Some(false));
345
346 header.set_lo(None);
347 assert_eq!(header.lo(), None);
348 }
349
350 #[test]
351 fn topic_request_header_trait_broker_name_with_none() {
352 let header = SearchOffsetRequestHeader::default();
353 assert_eq!(header.broker_name(), None);
354 }
355
356 #[test]
357 fn topic_request_header_trait_set_broker_name_with_none_topic_request_header() {
358 let mut header = SearchOffsetRequestHeader::default();
359
360 header.set_broker_name(CheetahString::from("broker1"));
362 assert_eq!(header.broker_name(), None);
363 }
364
365 #[test]
366 fn topic_request_header_trait_namespace_with_none() {
367 let header = SearchOffsetRequestHeader::default();
368 assert_eq!(header.namespace(), None);
369 }
370
371 #[test]
372 fn topic_request_header_trait_set_namespace_with_none_topic_request_header() {
373 let mut header = SearchOffsetRequestHeader::default();
374
375 header.set_namespace(CheetahString::from("namespace1"));
377 assert_eq!(header.namespace(), None);
378 }
379
380 #[test]
381 fn topic_request_header_trait_namespaced_with_none() {
382 let header = SearchOffsetRequestHeader::default();
383 assert_eq!(header.namespaced(), None);
384 }
385
386 #[test]
387 fn topic_request_header_trait_set_namespaced_with_none_topic_request_header() {
388 let mut header = SearchOffsetRequestHeader::default();
389
390 header.set_namespaced(true);
392 assert_eq!(header.namespaced(), None);
393 }
394
395 #[test]
396 fn topic_request_header_trait_oneway_with_none() {
397 let header = SearchOffsetRequestHeader::default();
398 assert_eq!(header.oneway(), None);
399 }
400
401 #[test]
402 fn topic_request_header_trait_set_oneway_with_none_topic_request_header() {
403 let mut header = SearchOffsetRequestHeader::default();
404
405 header.set_oneway(true);
407 assert_eq!(header.oneway(), None);
408 }
409
410 #[test]
411 fn topic_request_header_trait_all_methods_safe_with_none() {
412 let mut header = SearchOffsetRequestHeader::default();
413
414 header.set_lo(Some(true));
416 header.set_broker_name(CheetahString::from("broker"));
417 header.set_namespace(CheetahString::from("ns"));
418 header.set_namespaced(true);
419 header.set_oneway(false);
420
421 assert_eq!(header.lo(), None);
422 assert_eq!(header.broker_name(), None);
423 assert_eq!(header.namespace(), None);
424 assert_eq!(header.namespaced(), None);
425 assert_eq!(header.oneway(), None);
426 }
427}