1use cheetah_string::CheetahString;
16use rocketmq_macros::RequestHeaderCodecV2;
17use serde::Deserialize;
18use serde::Serialize;
19
20use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
21use crate::rpc::topic_request_header::TopicRequestHeader;
22
23#[derive(Debug, Clone, Serialize, Deserialize, RequestHeaderCodecV2)]
24#[serde(rename_all = "camelCase")]
25pub struct GetMaxOffsetRequestHeader {
26 pub topic: CheetahString,
27
28 pub queue_id: i32,
29
30 pub committed: bool,
31
32 #[serde(flatten)]
33 pub topic_request_header: Option<TopicRequestHeader>,
34}
35
36impl TopicRequestHeaderTrait for GetMaxOffsetRequestHeader {
37 fn set_lo(&mut self, lo: Option<bool>) {
38 if let Some(header) = self.topic_request_header.as_mut() {
39 header.lo = lo;
40 }
41 }
42
43 fn lo(&self) -> Option<bool> {
44 self.topic_request_header.as_ref().and_then(|h| h.lo)
45 }
46
47 fn set_topic(&mut self, topic: CheetahString) {
48 self.topic = topic;
49 }
50
51 fn topic(&self) -> &CheetahString {
52 &self.topic
53 }
54
55 fn broker_name(&self) -> Option<&CheetahString> {
56 self.topic_request_header
57 .as_ref()
58 .and_then(|h| h.rpc_request_header.as_ref())
59 .and_then(|h| h.broker_name.as_ref())
60 }
61
62 fn set_broker_name(&mut self, broker_name: CheetahString) {
63 if let Some(header) = self.topic_request_header.as_mut() {
64 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
65 rpc_header.broker_name = Some(broker_name);
66 }
67 }
68 }
69
70 fn namespace(&self) -> Option<&str> {
71 self.topic_request_header
72 .as_ref()
73 .and_then(|h| h.rpc_request_header.as_ref())
74 .and_then(|r| r.namespace.as_deref())
75 }
76
77 fn set_namespace(&mut self, namespace: CheetahString) {
78 if let Some(header) = self.topic_request_header.as_mut() {
79 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
80 rpc_header.namespace = Some(namespace);
81 }
82 }
83 }
84
85 fn namespaced(&self) -> Option<bool> {
86 self.topic_request_header
87 .as_ref()
88 .and_then(|h| h.rpc_request_header.as_ref())
89 .and_then(|r| r.namespaced)
90 }
91
92 fn set_namespaced(&mut self, namespaced: bool) {
93 if let Some(header) = self.topic_request_header.as_mut() {
94 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
95 rpc_header.namespaced = Some(namespaced);
96 }
97 }
98 }
99
100 fn oneway(&self) -> Option<bool> {
101 self.topic_request_header
102 .as_ref()
103 .and_then(|h| h.rpc_request_header.as_ref())
104 .and_then(|r| r.oneway)
105 }
106
107 fn set_oneway(&mut self, oneway: bool) {
108 if let Some(header) = self.topic_request_header.as_mut() {
109 if let Some(rpc_header) = header.rpc_request_header.as_mut() {
110 rpc_header.oneway = Some(oneway);
111 }
112 }
113 }
114
115 fn queue_id(&self) -> i32 {
116 self.queue_id
117 }
118
119 fn set_queue_id(&mut self, queue_id: i32) {
120 self.queue_id = queue_id;
121 }
122}
123
124#[cfg(test)]
125mod tests {
126 use crate::protocol::header::get_max_offset_request_header::GetMaxOffsetRequestHeader;
127 use crate::protocol::header::message_operation_header::TopicRequestHeaderTrait;
128 use crate::rpc::rpc_request_header::RpcRequestHeader;
129 use crate::rpc::topic_request_header::TopicRequestHeader;
130 use cheetah_string::CheetahString;
131 use cheetah_string::{self};
132 #[test]
133 fn get_max_offset_request_header_with_required_fields_only() {
134 let header = GetMaxOffsetRequestHeader {
135 topic: cheetah_string::CheetahString::from("testTopic"),
136 queue_id: 1,
137 committed: true,
138 topic_request_header: None,
139 };
140
141 assert_eq!(header.topic, cheetah_string::CheetahString::from("testTopic"));
142 assert_eq!(header.queue_id, 1);
143 assert!(header.committed);
144 assert!(header.topic_request_header.is_none());
145 }
146
147 #[test]
148 fn get_max_offset_request_header_with_all_fields() {
149 let rpc_header = RpcRequestHeader {
150 namespace: Some(CheetahString::from("ns1")),
151 namespaced: Some(true),
152 broker_name: Some(CheetahString::from("broker-0")),
153 oneway: Some(false),
154 };
155 let topic_req = TopicRequestHeader {
156 rpc_request_header: Some(rpc_header),
157 lo: Some(true),
158 };
159 let header = GetMaxOffsetRequestHeader {
160 topic: CheetahString::from("testTopic"),
161 queue_id: 1,
162 committed: true,
163 topic_request_header: Some(topic_req),
164 };
165
166 assert_eq!(header.topic, CheetahString::from("testTopic"));
167 assert_eq!(header.queue_id, 1);
168 assert!(header.committed);
169 assert!(header.topic_request_header.is_some());
170 }
171
172 #[test]
173 fn get_max_offset_request_header_with_empty_topic() {
174 let header = GetMaxOffsetRequestHeader {
175 topic: CheetahString::from(""),
176 queue_id: 0,
177 committed: false,
178 topic_request_header: None,
179 };
180 assert_eq!(header.topic, CheetahString::from(""));
181 assert_eq!(header.queue_id, 0);
182 assert!(!header.committed);
183 assert!(header.topic_request_header.is_none());
184 }
185
186 #[test]
187 fn get_max_offset_request_header_with_long_values() {
188 let long_string = "a".repeat(1000);
189 let header = GetMaxOffsetRequestHeader {
190 topic: CheetahString::from(&long_string),
191 queue_id: 1,
192 committed: true,
193 topic_request_header: None,
194 };
195 assert_eq!(header.topic, CheetahString::from(&long_string));
196 assert_eq!(header.queue_id, 1);
197 assert!(header.committed);
198 assert!(header.topic_request_header.is_none());
199 }
200
201 #[test]
202 fn fn_lo() {
203 let rpc_header = RpcRequestHeader {
204 namespace: Some(CheetahString::from("ns1")),
205 namespaced: Some(true),
206 broker_name: Some(CheetahString::from("broker-0")),
207 oneway: Some(false),
208 };
209 let topic_req = TopicRequestHeader {
210 rpc_request_header: Some(rpc_header),
211 lo: Some(true),
212 };
213 let header = GetMaxOffsetRequestHeader {
214 topic: CheetahString::from("testTopic"),
215 queue_id: 1,
216 committed: true,
217 topic_request_header: Some(topic_req),
218 };
219
220 assert_eq!(header.lo(), Some(true));
221 }
222
223 #[test]
224 fn fn_set_lo() {
225 let rpc_header = RpcRequestHeader {
226 namespace: Some(CheetahString::from("ns1")),
227 namespaced: Some(true),
228 broker_name: Some(CheetahString::from("broker-0")),
229 oneway: Some(false),
230 };
231 let topic_req = TopicRequestHeader {
232 rpc_request_header: Some(rpc_header),
233 lo: Some(true),
234 };
235 let mut header = GetMaxOffsetRequestHeader {
236 topic: CheetahString::from("testTopic"),
237 queue_id: 1,
238 committed: true,
239 topic_request_header: Some(topic_req),
240 };
241
242 header.set_lo(Some(false));
243 assert_eq!(header.lo(), Some(false));
244 }
245
246 #[test]
247 fn fn_topic() {
248 let rpc_header = RpcRequestHeader {
249 namespace: Some(CheetahString::from("ns1")),
250 namespaced: Some(true),
251 broker_name: Some(CheetahString::from("broker-0")),
252 oneway: Some(false),
253 };
254 let topic_req = TopicRequestHeader {
255 rpc_request_header: Some(rpc_header),
256 lo: Some(true),
257 };
258 let header = GetMaxOffsetRequestHeader {
259 topic: CheetahString::from("testTopic"),
260 queue_id: 1,
261 committed: true,
262 topic_request_header: Some(topic_req),
263 };
264 assert_eq!(header.topic(), &CheetahString::from("testTopic"));
265 }
266
267 #[test]
268 fn fn_set_topic() {
269 let rpc_header = RpcRequestHeader {
270 namespace: Some(CheetahString::from("ns1")),
271 namespaced: Some(true),
272 broker_name: Some(CheetahString::from("broker-0")),
273 oneway: Some(false),
274 };
275 let topic_req = TopicRequestHeader {
276 rpc_request_header: Some(rpc_header),
277 lo: Some(true),
278 };
279 let mut header = GetMaxOffsetRequestHeader {
280 topic: CheetahString::from("testTopic"),
281 queue_id: 1,
282 committed: true,
283 topic_request_header: Some(topic_req),
284 };
285 header.set_topic(CheetahString::from("test_topic"));
286 assert_eq!(header.topic(), &CheetahString::from("test_topic"));
287 }
288
289 #[test]
290 fn fn_broker_name() {
291 let rpc_header = RpcRequestHeader {
292 namespace: Some(CheetahString::from("ns1")),
293 namespaced: Some(true),
294 broker_name: Some(CheetahString::from("broker-0")),
295 oneway: Some(false),
296 };
297 let topic_req = TopicRequestHeader {
298 rpc_request_header: Some(rpc_header),
299 lo: Some(true),
300 };
301 let header = GetMaxOffsetRequestHeader {
302 topic: CheetahString::from("testTopic"),
303 queue_id: 1,
304 committed: true,
305 topic_request_header: Some(topic_req),
306 };
307 assert_eq!(header.broker_name(), Some(&CheetahString::from("broker-0")));
308 }
309
310 #[test]
311 fn fn_set_broker_name() {
312 let rpc_header = RpcRequestHeader {
313 namespace: Some(CheetahString::from("ns1")),
314 namespaced: Some(true),
315 broker_name: Some(CheetahString::from("broker-0")),
316 oneway: Some(false),
317 };
318 let topic_req = TopicRequestHeader {
319 rpc_request_header: Some(rpc_header),
320 lo: Some(true),
321 };
322 let mut header = GetMaxOffsetRequestHeader {
323 topic: CheetahString::from("testTopic"),
324 queue_id: 1,
325 committed: true,
326 topic_request_header: Some(topic_req),
327 };
328 header.set_broker_name(CheetahString::from("broker-1"));
329 assert_eq!(header.broker_name(), Some(&CheetahString::from("broker-1")));
330 }
331
332 #[test]
333 fn fn_namespace() {
334 let rpc_header = RpcRequestHeader {
335 namespace: Some(CheetahString::from("ns1")),
336 namespaced: Some(true),
337 broker_name: Some(CheetahString::from("broker-0")),
338 oneway: Some(false),
339 };
340 let topic_req = TopicRequestHeader {
341 rpc_request_header: Some(rpc_header),
342 lo: Some(true),
343 };
344 let header = GetMaxOffsetRequestHeader {
345 topic: CheetahString::from("testTopic"),
346 queue_id: 1,
347 committed: true,
348 topic_request_header: Some(topic_req),
349 };
350 assert_eq!(header.namespace(), Some(CheetahString::from("ns1")).as_deref());
351 }
352
353 #[test]
354 fn fn_set_namespace() {
355 let rpc_header = RpcRequestHeader {
356 namespace: Some(CheetahString::from("ns1")),
357 namespaced: Some(true),
358 broker_name: Some(CheetahString::from("broker-0")),
359 oneway: Some(false),
360 };
361 let topic_req = TopicRequestHeader {
362 rpc_request_header: Some(rpc_header),
363 lo: Some(true),
364 };
365 let mut header = GetMaxOffsetRequestHeader {
366 topic: CheetahString::from("testTopic"),
367 queue_id: 1,
368 committed: true,
369 topic_request_header: Some(topic_req),
370 };
371 header.set_namespace(CheetahString::from("ns2"));
372 assert_eq!(header.namespace(), Some(CheetahString::from("ns2")).as_deref());
373 }
374
375 #[test]
376 fn fn_namespaced() {
377 let rpc_header = RpcRequestHeader {
378 namespace: Some(CheetahString::from("ns1")),
379 namespaced: Some(true),
380 broker_name: Some(CheetahString::from("broker-0")),
381 oneway: Some(false),
382 };
383 let topic_req = TopicRequestHeader {
384 rpc_request_header: Some(rpc_header),
385 lo: Some(true),
386 };
387 let header = GetMaxOffsetRequestHeader {
388 topic: CheetahString::from("testTopic"),
389 queue_id: 1,
390 committed: true,
391 topic_request_header: Some(topic_req),
392 };
393 assert_eq!(header.namespaced(), Some(true));
394 }
395
396 #[test]
397 fn fn_set_namespaced() {
398 let rpc_header = RpcRequestHeader {
399 namespace: Some(CheetahString::from("ns1")),
400 namespaced: Some(true),
401 broker_name: Some(CheetahString::from("broker-0")),
402 oneway: Some(false),
403 };
404 let topic_req = TopicRequestHeader {
405 rpc_request_header: Some(rpc_header),
406 lo: Some(true),
407 };
408 let mut header = GetMaxOffsetRequestHeader {
409 topic: CheetahString::from("testTopic"),
410 queue_id: 1,
411 committed: true,
412 topic_request_header: Some(topic_req),
413 };
414 header.set_namespaced(false);
415 assert_eq!(header.namespaced(), Some(false));
416 }
417
418 #[test]
419 fn fn_oneway() {
420 let rpc_header = RpcRequestHeader {
421 namespace: Some(CheetahString::from("ns1")),
422 namespaced: Some(true),
423 broker_name: Some(CheetahString::from("broker-0")),
424 oneway: Some(false),
425 };
426 let topic_req = TopicRequestHeader {
427 rpc_request_header: Some(rpc_header),
428 lo: Some(true),
429 };
430 let header = GetMaxOffsetRequestHeader {
431 topic: CheetahString::from("testTopic"),
432 queue_id: 1,
433 committed: true,
434 topic_request_header: Some(topic_req),
435 };
436 assert_eq!(header.oneway(), Some(false));
437 }
438
439 #[test]
440 fn fn_set_oneway() {
441 let rpc_header = RpcRequestHeader {
442 namespace: Some(CheetahString::from("ns1")),
443 namespaced: Some(true),
444 broker_name: Some(CheetahString::from("broker-0")),
445 oneway: Some(false),
446 };
447 let topic_req = TopicRequestHeader {
448 rpc_request_header: Some(rpc_header),
449 lo: Some(true),
450 };
451 let mut header = GetMaxOffsetRequestHeader {
452 topic: CheetahString::from("testTopic"),
453 queue_id: 1,
454 committed: true,
455 topic_request_header: Some(topic_req),
456 };
457 header.set_oneway(true);
458 assert_eq!(header.oneway(), Some(true));
459 }
460
461 #[test]
462 fn fn_queue_id() {
463 let rpc_header = RpcRequestHeader {
464 namespace: Some(CheetahString::from("ns1")),
465 namespaced: Some(true),
466 broker_name: Some(CheetahString::from("broker-0")),
467 oneway: Some(false),
468 };
469 let topic_req = TopicRequestHeader {
470 rpc_request_header: Some(rpc_header),
471 lo: Some(true),
472 };
473 let header = GetMaxOffsetRequestHeader {
474 topic: CheetahString::from("testTopic"),
475 queue_id: 1,
476 committed: true,
477 topic_request_header: Some(topic_req),
478 };
479 assert_eq!(header.queue_id(), 1);
480 }
481
482 #[test]
483 fn fn_set_queue_id() {
484 let rpc_header = RpcRequestHeader {
485 namespace: Some(CheetahString::from("ns1")),
486 namespaced: Some(true),
487 broker_name: Some(CheetahString::from("broker-0")),
488 oneway: Some(false),
489 };
490 let topic_req = TopicRequestHeader {
491 rpc_request_header: Some(rpc_header),
492 lo: Some(true),
493 };
494 let mut header = GetMaxOffsetRequestHeader {
495 topic: CheetahString::from("testTopic"),
496 queue_id: 1,
497 committed: true,
498 topic_request_header: Some(topic_req),
499 };
500 header.set_queue_id(2);
501 assert_eq!(header.queue_id(), 2);
502 }
503}