1use std::collections::HashSet;
16
17use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
18use serde::Deserialize;
19use serde::Serialize;
20
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22#[serde(rename_all = "camelCase")]
23pub struct QueryAssignmentResponseBody {
24 pub message_queue_assignments: HashSet<MessageQueueAssignment>,
25}
26
27#[cfg(test)]
28mod tests {
29 use super::*;
30 use crate::protocol::RemotingDeserializable;
31 use crate::protocol::RemotingSerializable;
32 use cheetah_string::CheetahString;
33 use rocketmq_common::common::message::message_enum::MessageRequestMode;
34 use rocketmq_common::common::message::message_queue::MessageQueue;
35 use std::collections::HashMap;
36
37 fn create_message_queue(topic: &str, broker: &str, queue_id: i32) -> MessageQueue {
38 let mut mq = MessageQueue::new();
39 mq.set_topic(CheetahString::from(topic));
40 mq.set_broker_name(CheetahString::from(broker));
41 mq.set_queue_id(queue_id);
42 mq
43 }
44
45 fn create_assignment(
46 topic: &str,
47 broker: &str,
48 queue_id: i32,
49 mode: MessageRequestMode,
50 attachments: Option<HashMap<CheetahString, CheetahString>>,
51 ) -> MessageQueueAssignment {
52 MessageQueueAssignment {
53 message_queue: Some(create_message_queue(topic, broker, queue_id)),
54 mode,
55 attachments,
56 }
57 }
58
59 fn create_body_with_assignments(assignments: Vec<MessageQueueAssignment>) -> QueryAssignmentResponseBody {
60 QueryAssignmentResponseBody {
61 message_queue_assignments: assignments.into_iter().collect(),
62 }
63 }
64
65 #[test]
68 fn query_assignment_response_body_default_is_empty() {
69 let body = QueryAssignmentResponseBody::default();
70 assert!(body.message_queue_assignments.is_empty());
71 }
72
73 #[test]
74 fn query_assignment_response_body_with_empty_hashset() {
75 let body = QueryAssignmentResponseBody {
76 message_queue_assignments: HashSet::new(),
77 };
78 assert!(body.message_queue_assignments.is_empty());
79 }
80
81 #[test]
82 fn query_assignment_response_body_clone_preserves_data() {
83 let body = create_body_with_assignments(vec![create_assignment(
84 "test-topic",
85 "broker-a",
86 0,
87 MessageRequestMode::Pull,
88 None,
89 )]);
90 let cloned = body.clone();
91 assert_eq!(
92 body.message_queue_assignments.len(),
93 cloned.message_queue_assignments.len()
94 );
95 }
96
97 #[test]
98 fn query_assignment_response_body_clone_is_independent() {
99 let body = create_body_with_assignments(vec![create_assignment(
100 "topic",
101 "broker",
102 0,
103 MessageRequestMode::Pull,
104 None,
105 )]);
106 let mut cloned = body.clone();
107 cloned.message_queue_assignments.insert(create_assignment(
108 "other",
109 "broker",
110 99,
111 MessageRequestMode::Pop,
112 None,
113 ));
114 assert_eq!(body.message_queue_assignments.len(), 1);
115 assert_eq!(cloned.message_queue_assignments.len(), 2);
116 }
117
118 #[test]
119 fn query_assignment_response_body_debug_format() {
120 let body = create_body_with_assignments(vec![create_assignment(
121 "test-topic",
122 "broker-a",
123 0,
124 MessageRequestMode::Pull,
125 None,
126 )]);
127 let debug_str = format!("{:?}", body);
128 assert!(debug_str.contains("QueryAssignmentResponseBody"));
129 assert!(debug_str.contains("message_queue_assignments"));
130 }
131
132 #[test]
135 fn query_assignment_response_body_hashset_deduplication() {
136 let mut assignments = HashSet::new();
137
138 let mq_gen = || MessageQueue::new();
139
140 let a1 = MessageQueueAssignment {
141 message_queue: Some(mq_gen()),
142 mode: MessageRequestMode::Pop,
143 attachments: None,
144 };
145
146 let a2 = MessageQueueAssignment {
147 message_queue: Some(mq_gen()),
148 mode: MessageRequestMode::Pop,
149 attachments: None,
150 };
151
152 assignments.insert(a1);
153 assignments.insert(a2);
154
155 assert_eq!(assignments.len(), 1);
156 }
157
158 #[test]
159 fn query_assignment_response_body_hashset_ordering_not_guaranteed() {
160 let assignments = vec![
161 create_assignment("topic", "broker", 0, MessageRequestMode::Pull, None),
162 create_assignment("topic", "broker", 1, MessageRequestMode::Pull, None),
163 create_assignment("topic", "broker", 2, MessageRequestMode::Pull, None),
164 ];
165 let body = create_body_with_assignments(assignments);
166
167 let queue_ids: Vec<i32> = body
168 .message_queue_assignments
169 .iter()
170 .filter_map(|a| a.message_queue.as_ref().map(|mq| mq.queue_id()))
171 .collect();
172
173 assert_eq!(queue_ids.len(), 3);
174 assert!(queue_ids.contains(&0));
175 assert!(queue_ids.contains(&1));
176 assert!(queue_ids.contains(&2));
177 }
178
179 #[test]
180 fn query_assignment_response_body_hashset_equality() {
181 let body1 = create_body_with_assignments(vec![create_assignment(
182 "topic",
183 "broker",
184 0,
185 MessageRequestMode::Pull,
186 None,
187 )]);
188 let body2 = create_body_with_assignments(vec![create_assignment(
189 "topic",
190 "broker",
191 0,
192 MessageRequestMode::Pull,
193 None,
194 )]);
195 assert_eq!(body1.message_queue_assignments, body2.message_queue_assignments);
196 }
197
198 #[test]
201 fn query_assignment_response_body_serialization_camelcase() {
202 let body = create_body_with_assignments(vec![create_assignment(
203 "topic",
204 "broker",
205 0,
206 MessageRequestMode::Pull,
207 None,
208 )]);
209 let json = serde_json::to_string(&body).unwrap();
210 assert!(json.contains("\"messageQueueAssignments\""));
211 assert!(!json.contains("\"message_queue_assignments\""));
212 }
213
214 #[test]
215 fn query_assignment_response_body_serialization_with_modes() {
216 let assignments = vec![
217 create_assignment("topic", "broker", 0, MessageRequestMode::Pull, None),
218 create_assignment("topic", "broker", 1, MessageRequestMode::Pop, None),
219 ];
220 let body = create_body_with_assignments(assignments);
221 let json = serde_json::to_string(&body).unwrap();
222 assert!(json.contains("\"PULL\""));
223 assert!(json.contains("\"POP\""));
224 }
225
226 #[test]
227 fn query_assignment_response_body_serialization_empty() {
228 let body = QueryAssignmentResponseBody::default();
229 let json = serde_json::to_string(&body).unwrap();
230 assert_eq!(json, "{\"messageQueueAssignments\":[]}");
231 }
232
233 #[test]
234 fn query_assignment_response_body_roundtrip_consistency() {
235 let assignments = vec![create_assignment(
236 "test-topic",
237 "broker-a",
238 5,
239 MessageRequestMode::Pull,
240 None,
241 )];
242 let body = create_body_with_assignments(assignments);
243 let json = serde_json::to_string(&body).unwrap();
244 let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
245 assert_eq!(
246 body.message_queue_assignments.len(),
247 recovered.message_queue_assignments.len()
248 );
249 }
250
251 #[test]
252 fn query_assignment_response_body_encode_decode() {
253 let body = create_body_with_assignments(vec![create_assignment(
254 "test-topic",
255 "broker-a",
256 3,
257 MessageRequestMode::Pull,
258 None,
259 )]);
260
261 let encoded = body.encode().expect("encode should succeed");
262 assert!(!encoded.is_empty());
263
264 let decoded = QueryAssignmentResponseBody::decode(&encoded).expect("decode should succeed");
265 assert_eq!(
266 body.message_queue_assignments.len(),
267 decoded.message_queue_assignments.len()
268 );
269 }
270
271 #[test]
272 fn query_assignment_response_body_encode_produces_valid_json() {
273 let body = create_body_with_assignments(vec![create_assignment(
274 "topic",
275 "broker",
276 0,
277 MessageRequestMode::Pull,
278 None,
279 )]);
280
281 let encoded = body.encode().expect("encode should succeed");
282 let json_str = String::from_utf8(encoded).expect("encoded bytes should be valid UTF-8");
283
284 assert!(json_str.contains("messageQueueAssignments"));
285 let _: serde_json::Value = serde_json::from_str(&json_str).expect("encoded JSON should be valid");
286 }
287
288 #[test]
291 fn query_assignment_response_body_deserialization() {
292 let json = r#"{
293 "messageQueueAssignments": [
294 {
295 "messageQueue": {
296 "topic": "test-topic",
297 "brokerName": "broker-b",
298 "queueId": 2
299 },
300 "mode": "PULL",
301 "attachments": {"version": "1.0"}
302 }
303 ]
304 }"#;
305
306 let deserialized: QueryAssignmentResponseBody =
307 serde_json::from_str(json).expect("Failed to deserialize QueryAssignmentResponseBody");
308
309 assert_eq!(deserialized.message_queue_assignments.len(), 1);
310
311 let assignment = deserialized
312 .message_queue_assignments
313 .iter()
314 .next()
315 .expect("Should have at least one assignment");
316 let mq = assignment
317 .message_queue
318 .as_ref()
319 .expect("MessageQueue should be present");
320
321 assert_eq!(mq.topic(), "test-topic");
322 assert_eq!(mq.queue_id(), 2);
323
324 let attachments = assignment.attachments.as_ref().expect("Attachments should be present");
325 assert_eq!(
326 attachments
327 .get(&CheetahString::from("version"))
328 .expect("Version key should exist"),
329 "1.0"
330 );
331 }
332
333 #[test]
334 fn query_assignment_response_body_deserialization_missing_optional_fields() {
335 let json = r#"{
336 "messageQueueAssignments": [
337 {
338 "messageQueue": {
339 "topic": "topic",
340 "brokerName": "broker",
341 "queueId": 1
342 },
343 "mode": "PULL"
344 }
345 ]
346 }"#;
347 let body: QueryAssignmentResponseBody = serde_json::from_str(json).expect("Failed to deserialize");
348 assert_eq!(body.message_queue_assignments.len(), 1);
349 let assignment = body
350 .message_queue_assignments
351 .iter()
352 .next()
353 .expect("Should have at least one assignment");
354 assert!(assignment.attachments.is_none());
355 }
356
357 #[test]
358 fn query_assignment_response_body_deserialization_extra_fields_ignored() {
359 let json = r#"{
360 "messageQueueAssignments": [
361 {
362 "messageQueue": {
363 "topic": "topic",
364 "brokerName": "broker",
365 "queueId": 1
366 },
367 "mode": "PULL",
368 "attachments": null,
369 "extraField": "should be ignored",
370 "anotherField": 123
371 }
372 ]
373 }"#;
374 let body: QueryAssignmentResponseBody = serde_json::from_str(json).expect("Failed to deserialize");
375 assert_eq!(body.message_queue_assignments.len(), 1);
376 let assignment = body
377 .message_queue_assignments
378 .iter()
379 .next()
380 .expect("Should have at least one assignment");
381 let mq = assignment
382 .message_queue
383 .as_ref()
384 .expect("MessageQueue should be present");
385 assert_eq!(mq.topic(), "topic");
386 assert_eq!(mq.queue_id(), 1);
387 }
388
389 #[test]
390 fn query_assignment_response_body_deserialization_malformed_json() {
391 let json = r#"{ invalid json }"#;
392 let result: Result<QueryAssignmentResponseBody, _> = serde_json::from_str(json);
393 assert!(result.is_err());
394 }
395
396 #[test]
397 fn query_assignment_response_body_deserialization_wrong_data_types() {
398 let json = r#"{
399 "messageQueueAssignments": "not an array"
400 }"#;
401 let result: Result<QueryAssignmentResponseBody, _> = serde_json::from_str(json);
402 assert!(result.is_err());
403 }
404
405 #[test]
406 fn query_assignment_response_body_nested_structure() {
407 let json = r#"{
408 "messageQueueAssignments": [
409 {
410 "messageQueue": {
411 "topic": "my-topic",
412 "brokerName": "my-broker",
413 "queueId": 3
414 },
415 "mode": "PULL",
416 "attachments": null
417 }
418 ]
419 }"#;
420 let body: QueryAssignmentResponseBody = serde_json::from_str(json).expect("Failed to deserialize");
421 assert_eq!(body.message_queue_assignments.len(), 1);
422 let assignment = body
423 .message_queue_assignments
424 .iter()
425 .next()
426 .expect("Should have at least one assignment");
427 let mq = assignment
428 .message_queue
429 .as_ref()
430 .expect("MessageQueue should be present");
431 assert_eq!(mq.topic(), "my-topic");
432 assert_eq!(mq.broker_name(), "my-broker");
433 assert_eq!(mq.queue_id(), 3);
434 }
435
436 #[test]
439 fn query_assignment_response_body_with_none_message_queue() {
440 let assignment = MessageQueueAssignment {
441 message_queue: None,
442 mode: MessageRequestMode::Pull,
443 attachments: None,
444 };
445 let body = create_body_with_assignments(vec![assignment]);
446 assert_eq!(body.message_queue_assignments.len(), 1);
447 let json = serde_json::to_string(&body).unwrap();
448 let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
449 assert_eq!(recovered.message_queue_assignments.len(), 1);
450 let assignment = recovered
451 .message_queue_assignments
452 .iter()
453 .next()
454 .expect("Should have at least one assignment");
455 assert!(assignment.message_queue.is_none());
456 }
457
458 #[test]
459 fn query_assignment_response_body_empty_attachments_vs_none() {
460 let empty_attachments = HashMap::new();
461 let body_with_empty = create_body_with_assignments(vec![create_assignment(
462 "topic",
463 "broker",
464 0,
465 MessageRequestMode::Pull,
466 Some(empty_attachments),
467 )]);
468
469 let body_with_none = create_body_with_assignments(vec![create_assignment(
470 "topic",
471 "broker",
472 0,
473 MessageRequestMode::Pull,
474 None,
475 )]);
476
477 let json_empty = serde_json::to_string(&body_with_empty).unwrap();
478 let json_none = serde_json::to_string(&body_with_none).unwrap();
479
480 assert_ne!(json_empty, json_none);
481 }
482
483 #[test]
484 fn query_assignment_response_body_with_attachments() {
485 let mut attachments = HashMap::new();
486 attachments.insert(CheetahString::from("key1"), CheetahString::from("value1"));
487 attachments.insert(CheetahString::from("key2"), CheetahString::from("value2"));
488 let assignments = vec![create_assignment(
489 "topic",
490 "broker",
491 0,
492 MessageRequestMode::Pull,
493 Some(attachments),
494 )];
495 let body = create_body_with_assignments(assignments);
496 let json = serde_json::to_string(&body).unwrap();
497 assert!(json.contains("\"key1\""));
498 assert!(json.contains("\"value1\""));
499 let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
500 assert_eq!(recovered.message_queue_assignments.len(), 1);
501 let assignment = recovered
502 .message_queue_assignments
503 .iter()
504 .next()
505 .expect("Should have at least one assignment");
506 let recovered_attachments = assignment.attachments.as_ref().expect("Attachments should be present");
507 assert_eq!(
508 recovered_attachments
509 .get(&CheetahString::from("key1"))
510 .expect("key1 should exist"),
511 "value1"
512 );
513 }
514
515 #[test]
516 fn query_assignment_response_body_large_dataset() {
517 let mut assignments = Vec::new();
518 for i in 0..100 {
519 assignments.push(create_assignment(
520 &format!("topic{}", i % 5),
521 &format!("broker{}", i % 3),
522 i,
523 if i % 2 == 0 {
524 MessageRequestMode::Pull
525 } else {
526 MessageRequestMode::Pop
527 },
528 None,
529 ));
530 }
531 let body = create_body_with_assignments(assignments);
532 assert_eq!(body.message_queue_assignments.len(), 100);
533 let json = serde_json::to_string(&body).unwrap();
534 let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
535 assert_eq!(recovered.message_queue_assignments.len(), 100);
536 }
537
538 #[test]
541 fn query_assignment_response_body_typical_real_world_response() {
542 let mut assignments = Vec::new();
543
544 for queue_id in 0..8 {
545 assignments.push(create_assignment(
546 "TopicOrder",
547 "broker-a",
548 queue_id,
549 MessageRequestMode::Pull,
550 None,
551 ));
552 assignments.push(create_assignment(
553 "TopicOrder",
554 "broker-b",
555 queue_id,
556 MessageRequestMode::Pull,
557 None,
558 ));
559 }
560
561 let body = create_body_with_assignments(assignments);
562 assert_eq!(body.message_queue_assignments.len(), 16);
563
564 let json = serde_json::to_string(&body).unwrap();
565 let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
566 assert_eq!(recovered.message_queue_assignments.len(), 16);
567
568 let broker_a_count = recovered
569 .message_queue_assignments
570 .iter()
571 .filter(|a| {
572 a.message_queue
573 .as_ref()
574 .map(|mq| mq.broker_name() == "broker-a")
575 .unwrap_or(false)
576 })
577 .count();
578 assert_eq!(broker_a_count, 8);
579 }
580
581 #[test]
582 fn query_assignment_response_body_single_queue_assignment() {
583 let assignments = vec![create_assignment(
584 "my-topic",
585 "my-broker",
586 7,
587 MessageRequestMode::Pop,
588 None,
589 )];
590 let body = create_body_with_assignments(assignments);
591 assert_eq!(body.message_queue_assignments.len(), 1);
592 let assignment = body
593 .message_queue_assignments
594 .iter()
595 .next()
596 .expect("Should have at least one assignment");
597 assert_eq!(
598 assignment
599 .message_queue
600 .as_ref()
601 .expect("MessageQueue should be present")
602 .topic(),
603 "my-topic"
604 );
605 }
606
607 #[test]
608 fn query_assignment_response_body_empty_response() {
609 let body = QueryAssignmentResponseBody::default();
610 assert!(body.message_queue_assignments.is_empty());
611 let json = serde_json::to_string(&body).unwrap();
612 assert!(json.contains("\"messageQueueAssignments\":[]"));
613
614 let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
615 assert!(recovered.message_queue_assignments.is_empty());
616 }
617
618 #[test]
619 fn query_assignment_response_body_multiple_queues_single_broker() {
620 let assignments = vec![
621 create_assignment("topic", "broker-a", 0, MessageRequestMode::Pull, None),
622 create_assignment("topic", "broker-a", 1, MessageRequestMode::Pull, None),
623 create_assignment("topic", "broker-a", 2, MessageRequestMode::Pull, None),
624 ];
625 let body = create_body_with_assignments(assignments);
626 assert_eq!(body.message_queue_assignments.len(), 3);
627 assert!(body.message_queue_assignments.iter().all(|a| a
628 .message_queue
629 .as_ref()
630 .expect("MessageQueue should be present")
631 .broker_name()
632 == "broker-a"));
633 }
634
635 #[test]
636 fn query_assignment_response_body_with_multiple_assignments() {
637 let assignments = vec![
638 create_assignment("topic", "broker", 0, MessageRequestMode::Pull, None),
639 create_assignment("topic", "broker", 1, MessageRequestMode::Pull, None),
640 create_assignment("topic", "broker", 2, MessageRequestMode::Pop, None),
641 ];
642 let body = create_body_with_assignments(assignments);
643 assert_eq!(body.message_queue_assignments.len(), 3);
644 }
645}