Skip to main content

rocketmq_remoting/protocol/header/
search_offset_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_common::common::boundary_type::BoundaryType;
17use rocketmq_macros::RequestHeaderCodecV2;
18use serde::Deserialize;
19use serde::Serialize;
20
21use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
22use crate::rpc::topic_request_header::TopicRequestHeader;
23
24#[derive(Default, Debug, Serialize, Deserialize, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct SearchOffsetRequestHeader {
27    #[required]
28    pub topic: CheetahString,
29
30    #[required]
31    pub queue_id: i32,
32
33    #[required]
34    pub timestamp: i64,
35
36    pub boundary_type: BoundaryType,
37
38    #[serde(flatten)]
39    pub topic_request_header: Option<TopicRequestHeader>,
40}
41
42impl TopicRequestHeaderTrait for SearchOffsetRequestHeader {
43    fn set_lo(&mut self, lo: Option<bool>) {
44        if let Some(header) = self.topic_request_header.as_mut() {
45            header.lo = lo;
46        }
47    }
48
49    fn lo(&self) -> Option<bool> {
50        self.topic_request_header.as_ref().and_then(|h| h.lo)
51    }
52
53    fn set_topic(&mut self, topic: CheetahString) {
54        self.topic = topic;
55    }
56
57    fn topic(&self) -> &CheetahString {
58        &self.topic
59    }
60
61    fn broker_name(&self) -> Option<&CheetahString> {
62        self.topic_request_header
63            .as_ref()
64            .and_then(|h| h.rpc_request_header.as_ref())
65            .and_then(|h| h.broker_name.as_ref())
66    }
67
68    fn set_broker_name(&mut self, broker_name: CheetahString) {
69        if let Some(header) = self.topic_request_header.as_mut() {
70            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
71                rpc_header.broker_name = Some(broker_name);
72            }
73        }
74    }
75
76    fn namespace(&self) -> Option<&str> {
77        self.topic_request_header
78            .as_ref()
79            .and_then(|h| h.rpc_request_header.as_ref())
80            .and_then(|r| r.namespace.as_deref())
81    }
82
83    fn set_namespace(&mut self, namespace: CheetahString) {
84        if let Some(header) = self.topic_request_header.as_mut() {
85            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
86                rpc_header.namespace = Some(namespace);
87            }
88        }
89    }
90
91    fn namespaced(&self) -> Option<bool> {
92        self.topic_request_header
93            .as_ref()
94            .and_then(|h| h.rpc_request_header.as_ref())
95            .and_then(|r| r.namespaced)
96    }
97
98    fn set_namespaced(&mut self, namespaced: bool) {
99        if let Some(header) = self.topic_request_header.as_mut() {
100            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
101                rpc_header.namespaced = Some(namespaced);
102            }
103        }
104    }
105
106    fn oneway(&self) -> Option<bool> {
107        self.topic_request_header
108            .as_ref()
109            .and_then(|h| h.rpc_request_header.as_ref())
110            .and_then(|r| r.oneway)
111    }
112
113    fn set_oneway(&mut self, oneway: bool) {
114        if let Some(header) = self.topic_request_header.as_mut() {
115            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
116                rpc_header.oneway = Some(oneway);
117            }
118        }
119    }
120
121    fn queue_id(&self) -> i32 {
122        self.queue_id
123    }
124
125    fn set_queue_id(&mut self, queue_id: i32) {
126        self.queue_id = queue_id;
127    }
128}
129
130#[cfg(test)]
131mod tests {
132    use super::*;
133
134    #[test]
135    fn search_offset_request_header_default() {
136        let header = SearchOffsetRequestHeader::default();
137        assert_eq!(header.topic, CheetahString::default());
138        assert_eq!(header.queue_id, 0);
139        assert_eq!(header.timestamp, 0);
140        assert_eq!(header.boundary_type, BoundaryType::Lower);
141    }
142
143    #[test]
144    fn search_offset_request_header_creation() {
145        let header = SearchOffsetRequestHeader {
146            topic: CheetahString::from("test_topic"),
147            queue_id: 1,
148            timestamp: 1702345678000,
149            boundary_type: BoundaryType::Upper,
150            topic_request_header: None,
151        };
152
153        assert_eq!(header.topic, CheetahString::from("test_topic"));
154        assert_eq!(header.queue_id, 1);
155        assert_eq!(header.timestamp, 1702345678000);
156        assert_eq!(header.boundary_type, BoundaryType::Upper);
157    }
158
159    #[test]
160    fn search_offset_request_header_serializes_to_json() {
161        let header = SearchOffsetRequestHeader {
162            topic: CheetahString::from("my_topic"),
163            queue_id: 2,
164            timestamp: 1702345678999,
165            boundary_type: BoundaryType::Upper,
166            topic_request_header: None,
167        };
168
169        let json = serde_json::to_string(&header).unwrap();
170
171        // Verify camelCase field names and uppercase enum
172        assert!(json.contains(r#""topic":"my_topic""#));
173        assert!(json.contains(r#""queueId":2"#));
174        assert!(json.contains(r#""timestamp":1702345678999"#));
175        assert!(json.contains(r#""boundaryType":"UPPER""#));
176    }
177
178    #[test]
179    fn search_offset_request_header_deserializes_from_json() {
180        let json = r#"{
181            "topic": "test_topic",
182            "queueId": 3,
183            "timestamp": 1702345678123,
184            "boundaryType": "LOWER"
185        }"#;
186
187        let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
188
189        assert_eq!(header.topic, CheetahString::from("test_topic"));
190        assert_eq!(header.queue_id, 3);
191        assert_eq!(header.timestamp, 1702345678123);
192        assert_eq!(header.boundary_type, BoundaryType::Lower);
193    }
194
195    #[test]
196    fn search_offset_request_header_deserializes_with_uppercase_boundary_type() {
197        let json = r#"{
198            "topic": "topic1",
199            "queueId": 0,
200            "timestamp": 1000000000,
201            "boundaryType": "UPPER"
202        }"#;
203
204        let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
205
206        assert_eq!(header.boundary_type, BoundaryType::Upper);
207    }
208
209    #[test]
210    fn search_offset_request_header_deserializes_with_lowercase_boundary_type() {
211        let json = r#"{
212            "topic": "topic2",
213            "queueId": 5,
214            "timestamp": 2000000000,
215            "boundaryType": "lower"
216        }"#;
217
218        let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
219
220        assert_eq!(header.boundary_type, BoundaryType::Lower);
221    }
222
223    #[test]
224    fn search_offset_request_header_boundary_type_defaults_to_lower_for_invalid_value() {
225        let json = r#"{
226            "topic": "topic3",
227            "queueId": 1,
228            "timestamp": 3000000000,
229            "boundaryType": "invalid"
230        }"#;
231
232        let header: SearchOffsetRequestHeader = serde_json::from_str(json).unwrap();
233
234        // Matches Java behavior: defaults to Lower
235        assert_eq!(header.boundary_type, BoundaryType::Lower);
236    }
237
238    #[test]
239    fn search_offset_request_header_roundtrip_serialization() {
240        let original = SearchOffsetRequestHeader {
241            topic: CheetahString::from("roundtrip_topic"),
242            queue_id: 10,
243            timestamp: 1702400000000,
244            boundary_type: BoundaryType::Upper,
245            topic_request_header: None,
246        };
247
248        let json = serde_json::to_string(&original).unwrap();
249        let deserialized: SearchOffsetRequestHeader = serde_json::from_str(&json).unwrap();
250
251        assert_eq!(deserialized.topic, original.topic);
252        assert_eq!(deserialized.queue_id, original.queue_id);
253        assert_eq!(deserialized.timestamp, original.timestamp);
254        assert_eq!(deserialized.boundary_type, original.boundary_type);
255    }
256
257    #[test]
258    fn search_offset_request_header_with_negative_queue_id() {
259        let header = SearchOffsetRequestHeader {
260            topic: CheetahString::from("test"),
261            queue_id: -1,
262            timestamp: 1000,
263            boundary_type: BoundaryType::Lower,
264            topic_request_header: None,
265        };
266
267        assert_eq!(header.queue_id, -1);
268    }
269
270    #[test]
271    fn search_offset_request_header_with_empty_topic() {
272        let header = SearchOffsetRequestHeader {
273            topic: CheetahString::new(),
274            queue_id: 0,
275            timestamp: 0,
276            boundary_type: BoundaryType::Lower,
277            topic_request_header: None,
278        };
279
280        assert!(header.topic.is_empty());
281    }
282
283    #[test]
284    fn search_offset_request_header_with_large_timestamp() {
285        let header = SearchOffsetRequestHeader {
286            topic: CheetahString::from("test"),
287            queue_id: 0,
288            timestamp: i64::MAX,
289            boundary_type: BoundaryType::Upper,
290            topic_request_header: None,
291        };
292
293        assert_eq!(header.timestamp, i64::MAX);
294
295        let json = serde_json::to_string(&header).unwrap();
296        let deserialized: SearchOffsetRequestHeader = serde_json::from_str(&json).unwrap();
297        assert_eq!(deserialized.timestamp, i64::MAX);
298    }
299
300    // Tests for TopicRequestHeaderTrait implementation
301    #[test]
302    fn topic_request_header_trait_set_and_get_topic() {
303        let mut header = SearchOffsetRequestHeader::default();
304        let topic = CheetahString::from("test_topic");
305
306        header.set_topic(topic.clone());
307        assert_eq!(header.topic(), &topic);
308    }
309
310    #[test]
311    fn topic_request_header_trait_set_and_get_queue_id() {
312        let mut header = SearchOffsetRequestHeader::default();
313
314        header.set_queue_id(5);
315        assert_eq!(header.queue_id(), 5);
316
317        header.set_queue_id(-1);
318        assert_eq!(header.queue_id(), -1);
319    }
320
321    #[test]
322    fn topic_request_header_trait_lo_with_none_topic_request_header() {
323        let mut header = SearchOffsetRequestHeader::default();
324
325        // Should not panic when topic_request_header is None
326        header.set_lo(Some(true));
327        assert_eq!(header.lo(), None);
328    }
329
330    #[test]
331    fn topic_request_header_trait_lo_with_some_topic_request_header() {
332        let mut header = SearchOffsetRequestHeader {
333            topic: CheetahString::from("test"),
334            queue_id: 0,
335            timestamp: 0,
336            boundary_type: BoundaryType::Lower,
337            topic_request_header: Some(TopicRequestHeader::default()),
338        };
339
340        header.set_lo(Some(true));
341        assert_eq!(header.lo(), Some(true));
342
343        header.set_lo(Some(false));
344        assert_eq!(header.lo(), Some(false));
345
346        header.set_lo(None);
347        assert_eq!(header.lo(), None);
348    }
349
350    #[test]
351    fn topic_request_header_trait_broker_name_with_none() {
352        let header = SearchOffsetRequestHeader::default();
353        assert_eq!(header.broker_name(), None);
354    }
355
356    #[test]
357    fn topic_request_header_trait_set_broker_name_with_none_topic_request_header() {
358        let mut header = SearchOffsetRequestHeader::default();
359
360        // Should not panic when topic_request_header is None
361        header.set_broker_name(CheetahString::from("broker1"));
362        assert_eq!(header.broker_name(), None);
363    }
364
365    #[test]
366    fn topic_request_header_trait_namespace_with_none() {
367        let header = SearchOffsetRequestHeader::default();
368        assert_eq!(header.namespace(), None);
369    }
370
371    #[test]
372    fn topic_request_header_trait_set_namespace_with_none_topic_request_header() {
373        let mut header = SearchOffsetRequestHeader::default();
374
375        // Should not panic when topic_request_header is None
376        header.set_namespace(CheetahString::from("namespace1"));
377        assert_eq!(header.namespace(), None);
378    }
379
380    #[test]
381    fn topic_request_header_trait_namespaced_with_none() {
382        let header = SearchOffsetRequestHeader::default();
383        assert_eq!(header.namespaced(), None);
384    }
385
386    #[test]
387    fn topic_request_header_trait_set_namespaced_with_none_topic_request_header() {
388        let mut header = SearchOffsetRequestHeader::default();
389
390        // Should not panic when topic_request_header is None
391        header.set_namespaced(true);
392        assert_eq!(header.namespaced(), None);
393    }
394
395    #[test]
396    fn topic_request_header_trait_oneway_with_none() {
397        let header = SearchOffsetRequestHeader::default();
398        assert_eq!(header.oneway(), None);
399    }
400
401    #[test]
402    fn topic_request_header_trait_set_oneway_with_none_topic_request_header() {
403        let mut header = SearchOffsetRequestHeader::default();
404
405        // Should not panic when topic_request_header is None
406        header.set_oneway(true);
407        assert_eq!(header.oneway(), None);
408    }
409
410    #[test]
411    fn topic_request_header_trait_all_methods_safe_with_none() {
412        let mut header = SearchOffsetRequestHeader::default();
413
414        // All these operations should be safe and not panic
415        header.set_lo(Some(true));
416        header.set_broker_name(CheetahString::from("broker"));
417        header.set_namespace(CheetahString::from("ns"));
418        header.set_namespaced(true);
419        header.set_oneway(false);
420
421        assert_eq!(header.lo(), None);
422        assert_eq!(header.broker_name(), None);
423        assert_eq!(header.namespace(), None);
424        assert_eq!(header.namespaced(), None);
425        assert_eq!(header.oneway(), None);
426    }
427}