Skip to main content

rocketmq_remoting/protocol/header/
get_max_offset_request_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::rpc::topic_request_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct GetMaxOffsetRequestHeader {
26    pub topic: CheetahString,
27
28    pub queue_id: i32,
29
30    pub committed: bool,
31
32    #[serde(flatten)]
33    pub topic_request_header: Option<TopicRequestHeader>,
34}
35
36impl TopicRequestHeaderTrait for GetMaxOffsetRequestHeader {
37    fn set_lo(&mut self, lo: Option<bool>) {
38        if let Some(header) = self.topic_request_header.as_mut() {
39            header.lo = lo;
40        }
41    }
42
43    fn lo(&self) -> Option<bool> {
44        self.topic_request_header.as_ref().and_then(|h| h.lo)
45    }
46
47    fn set_topic(&mut self, topic: CheetahString) {
48        self.topic = topic;
49    }
50
51    fn topic(&self) -> &CheetahString {
52        &self.topic
53    }
54
55    fn broker_name(&self) -> Option<&CheetahString> {
56        self.topic_request_header
57            .as_ref()
58            .and_then(|h| h.rpc_request_header.as_ref())
59            .and_then(|h| h.broker_name.as_ref())
60    }
61
62    fn set_broker_name(&mut self, broker_name: CheetahString) {
63        if let Some(header) = self.topic_request_header.as_mut() {
64            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
65                rpc_header.broker_name = Some(broker_name);
66            }
67        }
68    }
69
70    fn namespace(&self) -> Option<&str> {
71        self.topic_request_header
72            .as_ref()
73            .and_then(|h| h.rpc_request_header.as_ref())
74            .and_then(|r| r.namespace.as_deref())
75    }
76
77    fn set_namespace(&mut self, namespace: CheetahString) {
78        if let Some(header) = self.topic_request_header.as_mut() {
79            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
80                rpc_header.namespace = Some(namespace);
81            }
82        }
83    }
84
85    fn namespaced(&self) -> Option<bool> {
86        self.topic_request_header
87            .as_ref()
88            .and_then(|h| h.rpc_request_header.as_ref())
89            .and_then(|r| r.namespaced)
90    }
91
92    fn set_namespaced(&mut self, namespaced: bool) {
93        if let Some(header) = self.topic_request_header.as_mut() {
94            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
95                rpc_header.namespaced = Some(namespaced);
96            }
97        }
98    }
99
100    fn oneway(&self) -> Option<bool> {
101        self.topic_request_header
102            .as_ref()
103            .and_then(|h| h.rpc_request_header.as_ref())
104            .and_then(|r| r.oneway)
105    }
106
107    fn set_oneway(&mut self, oneway: bool) {
108        if let Some(header) = self.topic_request_header.as_mut() {
109            if let Some(rpc_header) = header.rpc_request_header.as_mut() {
110                rpc_header.oneway = Some(oneway);
111            }
112        }
113    }
114
115    fn queue_id(&self) -> i32 {
116        self.queue_id
117    }
118
119    fn set_queue_id(&mut self, queue_id: i32) {
120        self.queue_id = queue_id;
121    }
122}
123
124#[cfg(test)]
125mod tests {
126    use crate::protocol::header::get_max_offset_request_header::GetMaxOffsetRequestHeader;
127    use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
128    use crate::rpc::rpc_request_header::RpcRequestHeader;
129    use crate::rpc::topic_request_header::TopicRequestHeader;
130    use cheetah_string::CheetahString;
131    use cheetah_string::{self};
132    #[test]
133    fn get_max_offset_request_header_with_required_fields_only() {
134        let header = GetMaxOffsetRequestHeader {
135            topic: cheetah_string::CheetahString::from("testTopic"),
136            queue_id: 1,
137            committed: true,
138            topic_request_header: None,
139        };
140
141        assert_eq!(header.topic, cheetah_string::CheetahString::from("testTopic"));
142        assert_eq!(header.queue_id, 1);
143        assert!(header.committed);
144        assert!(header.topic_request_header.is_none());
145    }
146
147    #[test]
148    fn get_max_offset_request_header_with_all_fields() {
149        let rpc_header = RpcRequestHeader {
150            namespace: Some(CheetahString::from("ns1")),
151            namespaced: Some(true),
152            broker_name: Some(CheetahString::from("broker-0")),
153            oneway: Some(false),
154        };
155        let topic_req = TopicRequestHeader {
156            rpc_request_header: Some(rpc_header),
157            lo: Some(true),
158        };
159        let header = GetMaxOffsetRequestHeader {
160            topic: CheetahString::from("testTopic"),
161            queue_id: 1,
162            committed: true,
163            topic_request_header: Some(topic_req),
164        };
165
166        assert_eq!(header.topic, CheetahString::from("testTopic"));
167        assert_eq!(header.queue_id, 1);
168        assert!(header.committed);
169        assert!(header.topic_request_header.is_some());
170    }
171
172    #[test]
173    fn get_max_offset_request_header_with_empty_topic() {
174        let header = GetMaxOffsetRequestHeader {
175            topic: CheetahString::from(""),
176            queue_id: 0,
177            committed: false,
178            topic_request_header: None,
179        };
180        assert_eq!(header.topic, CheetahString::from(""));
181        assert_eq!(header.queue_id, 0);
182        assert!(!header.committed);
183        assert!(header.topic_request_header.is_none());
184    }
185
186    #[test]
187    fn get_max_offset_request_header_with_long_values() {
188        let long_string = "a".repeat(1000);
189        let header = GetMaxOffsetRequestHeader {
190            topic: CheetahString::from(&long_string),
191            queue_id: 1,
192            committed: true,
193            topic_request_header: None,
194        };
195        assert_eq!(header.topic, CheetahString::from(&long_string));
196        assert_eq!(header.queue_id, 1);
197        assert!(header.committed);
198        assert!(header.topic_request_header.is_none());
199    }
200
201    #[test]
202    fn fn_lo() {
203        let rpc_header = RpcRequestHeader {
204            namespace: Some(CheetahString::from("ns1")),
205            namespaced: Some(true),
206            broker_name: Some(CheetahString::from("broker-0")),
207            oneway: Some(false),
208        };
209        let topic_req = TopicRequestHeader {
210            rpc_request_header: Some(rpc_header),
211            lo: Some(true),
212        };
213        let header = GetMaxOffsetRequestHeader {
214            topic: CheetahString::from("testTopic"),
215            queue_id: 1,
216            committed: true,
217            topic_request_header: Some(topic_req),
218        };
219
220        assert_eq!(header.lo(), Some(true));
221    }
222
223    #[test]
224    fn fn_set_lo() {
225        let rpc_header = RpcRequestHeader {
226            namespace: Some(CheetahString::from("ns1")),
227            namespaced: Some(true),
228            broker_name: Some(CheetahString::from("broker-0")),
229            oneway: Some(false),
230        };
231        let topic_req = TopicRequestHeader {
232            rpc_request_header: Some(rpc_header),
233            lo: Some(true),
234        };
235        let mut header = GetMaxOffsetRequestHeader {
236            topic: CheetahString::from("testTopic"),
237            queue_id: 1,
238            committed: true,
239            topic_request_header: Some(topic_req),
240        };
241
242        header.set_lo(Some(false));
243        assert_eq!(header.lo(), Some(false));
244    }
245
246    #[test]
247    fn fn_topic() {
248        let rpc_header = RpcRequestHeader {
249            namespace: Some(CheetahString::from("ns1")),
250            namespaced: Some(true),
251            broker_name: Some(CheetahString::from("broker-0")),
252            oneway: Some(false),
253        };
254        let topic_req = TopicRequestHeader {
255            rpc_request_header: Some(rpc_header),
256            lo: Some(true),
257        };
258        let header = GetMaxOffsetRequestHeader {
259            topic: CheetahString::from("testTopic"),
260            queue_id: 1,
261            committed: true,
262            topic_request_header: Some(topic_req),
263        };
264        assert_eq!(header.topic(), &CheetahString::from("testTopic"));
265    }
266
267    #[test]
268    fn fn_set_topic() {
269        let rpc_header = RpcRequestHeader {
270            namespace: Some(CheetahString::from("ns1")),
271            namespaced: Some(true),
272            broker_name: Some(CheetahString::from("broker-0")),
273            oneway: Some(false),
274        };
275        let topic_req = TopicRequestHeader {
276            rpc_request_header: Some(rpc_header),
277            lo: Some(true),
278        };
279        let mut header = GetMaxOffsetRequestHeader {
280            topic: CheetahString::from("testTopic"),
281            queue_id: 1,
282            committed: true,
283            topic_request_header: Some(topic_req),
284        };
285        header.set_topic(CheetahString::from("test_topic"));
286        assert_eq!(header.topic(), &CheetahString::from("test_topic"));
287    }
288
289    #[test]
290    fn fn_broker_name() {
291        let rpc_header = RpcRequestHeader {
292            namespace: Some(CheetahString::from("ns1")),
293            namespaced: Some(true),
294            broker_name: Some(CheetahString::from("broker-0")),
295            oneway: Some(false),
296        };
297        let topic_req = TopicRequestHeader {
298            rpc_request_header: Some(rpc_header),
299            lo: Some(true),
300        };
301        let header = GetMaxOffsetRequestHeader {
302            topic: CheetahString::from("testTopic"),
303            queue_id: 1,
304            committed: true,
305            topic_request_header: Some(topic_req),
306        };
307        assert_eq!(header.broker_name(), Some(&CheetahString::from("broker-0")));
308    }
309
310    #[test]
311    fn fn_set_broker_name() {
312        let rpc_header = RpcRequestHeader {
313            namespace: Some(CheetahString::from("ns1")),
314            namespaced: Some(true),
315            broker_name: Some(CheetahString::from("broker-0")),
316            oneway: Some(false),
317        };
318        let topic_req = TopicRequestHeader {
319            rpc_request_header: Some(rpc_header),
320            lo: Some(true),
321        };
322        let mut header = GetMaxOffsetRequestHeader {
323            topic: CheetahString::from("testTopic"),
324            queue_id: 1,
325            committed: true,
326            topic_request_header: Some(topic_req),
327        };
328        header.set_broker_name(CheetahString::from("broker-1"));
329        assert_eq!(header.broker_name(), Some(&CheetahString::from("broker-1")));
330    }
331
332    #[test]
333    fn fn_namespace() {
334        let rpc_header = RpcRequestHeader {
335            namespace: Some(CheetahString::from("ns1")),
336            namespaced: Some(true),
337            broker_name: Some(CheetahString::from("broker-0")),
338            oneway: Some(false),
339        };
340        let topic_req = TopicRequestHeader {
341            rpc_request_header: Some(rpc_header),
342            lo: Some(true),
343        };
344        let header = GetMaxOffsetRequestHeader {
345            topic: CheetahString::from("testTopic"),
346            queue_id: 1,
347            committed: true,
348            topic_request_header: Some(topic_req),
349        };
350        assert_eq!(header.namespace(), Some(CheetahString::from("ns1")).as_deref());
351    }
352
353    #[test]
354    fn fn_set_namespace() {
355        let rpc_header = RpcRequestHeader {
356            namespace: Some(CheetahString::from("ns1")),
357            namespaced: Some(true),
358            broker_name: Some(CheetahString::from("broker-0")),
359            oneway: Some(false),
360        };
361        let topic_req = TopicRequestHeader {
362            rpc_request_header: Some(rpc_header),
363            lo: Some(true),
364        };
365        let mut header = GetMaxOffsetRequestHeader {
366            topic: CheetahString::from("testTopic"),
367            queue_id: 1,
368            committed: true,
369            topic_request_header: Some(topic_req),
370        };
371        header.set_namespace(CheetahString::from("ns2"));
372        assert_eq!(header.namespace(), Some(CheetahString::from("ns2")).as_deref());
373    }
374
375    #[test]
376    fn fn_namespaced() {
377        let rpc_header = RpcRequestHeader {
378            namespace: Some(CheetahString::from("ns1")),
379            namespaced: Some(true),
380            broker_name: Some(CheetahString::from("broker-0")),
381            oneway: Some(false),
382        };
383        let topic_req = TopicRequestHeader {
384            rpc_request_header: Some(rpc_header),
385            lo: Some(true),
386        };
387        let header = GetMaxOffsetRequestHeader {
388            topic: CheetahString::from("testTopic"),
389            queue_id: 1,
390            committed: true,
391            topic_request_header: Some(topic_req),
392        };
393        assert_eq!(header.namespaced(), Some(true));
394    }
395
396    #[test]
397    fn fn_set_namespaced() {
398        let rpc_header = RpcRequestHeader {
399            namespace: Some(CheetahString::from("ns1")),
400            namespaced: Some(true),
401            broker_name: Some(CheetahString::from("broker-0")),
402            oneway: Some(false),
403        };
404        let topic_req = TopicRequestHeader {
405            rpc_request_header: Some(rpc_header),
406            lo: Some(true),
407        };
408        let mut header = GetMaxOffsetRequestHeader {
409            topic: CheetahString::from("testTopic"),
410            queue_id: 1,
411            committed: true,
412            topic_request_header: Some(topic_req),
413        };
414        header.set_namespaced(false);
415        assert_eq!(header.namespaced(), Some(false));
416    }
417
418    #[test]
419    fn fn_oneway() {
420        let rpc_header = RpcRequestHeader {
421            namespace: Some(CheetahString::from("ns1")),
422            namespaced: Some(true),
423            broker_name: Some(CheetahString::from("broker-0")),
424            oneway: Some(false),
425        };
426        let topic_req = TopicRequestHeader {
427            rpc_request_header: Some(rpc_header),
428            lo: Some(true),
429        };
430        let header = GetMaxOffsetRequestHeader {
431            topic: CheetahString::from("testTopic"),
432            queue_id: 1,
433            committed: true,
434            topic_request_header: Some(topic_req),
435        };
436        assert_eq!(header.oneway(), Some(false));
437    }
438
439    #[test]
440    fn fn_set_oneway() {
441        let rpc_header = RpcRequestHeader {
442            namespace: Some(CheetahString::from("ns1")),
443            namespaced: Some(true),
444            broker_name: Some(CheetahString::from("broker-0")),
445            oneway: Some(false),
446        };
447        let topic_req = TopicRequestHeader {
448            rpc_request_header: Some(rpc_header),
449            lo: Some(true),
450        };
451        let mut header = GetMaxOffsetRequestHeader {
452            topic: CheetahString::from("testTopic"),
453            queue_id: 1,
454            committed: true,
455            topic_request_header: Some(topic_req),
456        };
457        header.set_oneway(true);
458        assert_eq!(header.oneway(), Some(true));
459    }
460
461    #[test]
462    fn fn_queue_id() {
463        let rpc_header = RpcRequestHeader {
464            namespace: Some(CheetahString::from("ns1")),
465            namespaced: Some(true),
466            broker_name: Some(CheetahString::from("broker-0")),
467            oneway: Some(false),
468        };
469        let topic_req = TopicRequestHeader {
470            rpc_request_header: Some(rpc_header),
471            lo: Some(true),
472        };
473        let header = GetMaxOffsetRequestHeader {
474            topic: CheetahString::from("testTopic"),
475            queue_id: 1,
476            committed: true,
477            topic_request_header: Some(topic_req),
478        };
479        assert_eq!(header.queue_id(), 1);
480    }
481
482    #[test]
483    fn fn_set_queue_id() {
484        let rpc_header = RpcRequestHeader {
485            namespace: Some(CheetahString::from("ns1")),
486            namespaced: Some(true),
487            broker_name: Some(CheetahString::from("broker-0")),
488            oneway: Some(false),
489        };
490        let topic_req = TopicRequestHeader {
491            rpc_request_header: Some(rpc_header),
492            lo: Some(true),
493        };
494        let mut header = GetMaxOffsetRequestHeader {
495            topic: CheetahString::from("testTopic"),
496            queue_id: 1,
497            committed: true,
498            topic_request_header: Some(topic_req),
499        };
500        header.set_queue_id(2);
501        assert_eq!(header.queue_id(), 2);
502    }
503}