Skip to main content

rocketmq_remoting/protocol/body/
query_assignment_response_body.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashSet;
16
17use rocketmq_common::common::message::message_queue_assignment::MessageQueueAssignment;
18use serde::Deserialize;
19use serde::Serialize;
20
21#[derive(Debug, Clone, Serialize, Deserialize, Default)]
22#[serde(rename_all = "camelCase")]
23pub struct QueryAssignmentResponseBody {
24    pub message_queue_assignments: HashSet<MessageQueueAssignment>,
25}
26
27#[cfg(test)]
28mod tests {
29    use super::*;
30    use crate::protocol::RemotingDeserializable;
31    use crate::protocol::RemotingSerializable;
32    use cheetah_string::CheetahString;
33    use rocketmq_common::common::message::message_enum::MessageRequestMode;
34    use rocketmq_common::common::message::message_queue::MessageQueue;
35    use std::collections::HashMap;
36
37    fn create_message_queue(topic: &str, broker: &str, queue_id: i32) -> MessageQueue {
38        let mut mq = MessageQueue::new();
39        mq.set_topic(CheetahString::from(topic));
40        mq.set_broker_name(CheetahString::from(broker));
41        mq.set_queue_id(queue_id);
42        mq
43    }
44
45    fn create_assignment(
46        topic: &str,
47        broker: &str,
48        queue_id: i32,
49        mode: MessageRequestMode,
50        attachments: Option<HashMap<CheetahString, CheetahString>>,
51    ) -> MessageQueueAssignment {
52        MessageQueueAssignment {
53            message_queue: Some(create_message_queue(topic, broker, queue_id)),
54            mode,
55            attachments,
56        }
57    }
58
59    fn create_body_with_assignments(assignments: Vec<MessageQueueAssignment>) -> QueryAssignmentResponseBody {
60        QueryAssignmentResponseBody {
61            message_queue_assignments: assignments.into_iter().collect(),
62        }
63    }
64
65    // ==================== Basic Structure Tests ====================
66
67    #[test]
68    fn query_assignment_response_body_default_is_empty() {
69        let body = QueryAssignmentResponseBody::default();
70        assert!(body.message_queue_assignments.is_empty());
71    }
72
73    #[test]
74    fn query_assignment_response_body_with_empty_hashset() {
75        let body = QueryAssignmentResponseBody {
76            message_queue_assignments: HashSet::new(),
77        };
78        assert!(body.message_queue_assignments.is_empty());
79    }
80
81    #[test]
82    fn query_assignment_response_body_clone_preserves_data() {
83        let body = create_body_with_assignments(vec![create_assignment(
84            "test-topic",
85            "broker-a",
86            0,
87            MessageRequestMode::Pull,
88            None,
89        )]);
90        let cloned = body.clone();
91        assert_eq!(
92            body.message_queue_assignments.len(),
93            cloned.message_queue_assignments.len()
94        );
95    }
96
97    #[test]
98    fn query_assignment_response_body_clone_is_independent() {
99        let body = create_body_with_assignments(vec![create_assignment(
100            "topic",
101            "broker",
102            0,
103            MessageRequestMode::Pull,
104            None,
105        )]);
106        let mut cloned = body.clone();
107        cloned.message_queue_assignments.insert(create_assignment(
108            "other",
109            "broker",
110            99,
111            MessageRequestMode::Pop,
112            None,
113        ));
114        assert_eq!(body.message_queue_assignments.len(), 1);
115        assert_eq!(cloned.message_queue_assignments.len(), 2);
116    }
117
118    #[test]
119    fn query_assignment_response_body_debug_format() {
120        let body = create_body_with_assignments(vec![create_assignment(
121            "test-topic",
122            "broker-a",
123            0,
124            MessageRequestMode::Pull,
125            None,
126        )]);
127        let debug_str = format!("{:?}", body);
128        assert!(debug_str.contains("QueryAssignmentResponseBody"));
129        assert!(debug_str.contains("message_queue_assignments"));
130    }
131
132    // ==================== HashSet Behavior Tests ====================
133
134    #[test]
135    fn query_assignment_response_body_hashset_deduplication() {
136        let mut assignments = HashSet::new();
137
138        let mq_gen = || MessageQueue::new();
139
140        let a1 = MessageQueueAssignment {
141            message_queue: Some(mq_gen()),
142            mode: MessageRequestMode::Pop,
143            attachments: None,
144        };
145
146        let a2 = MessageQueueAssignment {
147            message_queue: Some(mq_gen()),
148            mode: MessageRequestMode::Pop,
149            attachments: None,
150        };
151
152        assignments.insert(a1);
153        assignments.insert(a2);
154
155        assert_eq!(assignments.len(), 1);
156    }
157
158    #[test]
159    fn query_assignment_response_body_hashset_ordering_not_guaranteed() {
160        let assignments = vec![
161            create_assignment("topic", "broker", 0, MessageRequestMode::Pull, None),
162            create_assignment("topic", "broker", 1, MessageRequestMode::Pull, None),
163            create_assignment("topic", "broker", 2, MessageRequestMode::Pull, None),
164        ];
165        let body = create_body_with_assignments(assignments);
166
167        let queue_ids: Vec<i32> = body
168            .message_queue_assignments
169            .iter()
170            .filter_map(|a| a.message_queue.as_ref().map(|mq| mq.queue_id()))
171            .collect();
172
173        assert_eq!(queue_ids.len(), 3);
174        assert!(queue_ids.contains(&0));
175        assert!(queue_ids.contains(&1));
176        assert!(queue_ids.contains(&2));
177    }
178
179    #[test]
180    fn query_assignment_response_body_hashset_equality() {
181        let body1 = create_body_with_assignments(vec![create_assignment(
182            "topic",
183            "broker",
184            0,
185            MessageRequestMode::Pull,
186            None,
187        )]);
188        let body2 = create_body_with_assignments(vec![create_assignment(
189            "topic",
190            "broker",
191            0,
192            MessageRequestMode::Pull,
193            None,
194        )]);
195        assert_eq!(body1.message_queue_assignments, body2.message_queue_assignments);
196    }
197
198    // ==================== Serialization Tests ====================
199
200    #[test]
201    fn query_assignment_response_body_serialization_camelcase() {
202        let body = create_body_with_assignments(vec![create_assignment(
203            "topic",
204            "broker",
205            0,
206            MessageRequestMode::Pull,
207            None,
208        )]);
209        let json = serde_json::to_string(&body).unwrap();
210        assert!(json.contains("\"messageQueueAssignments\""));
211        assert!(!json.contains("\"message_queue_assignments\""));
212    }
213
214    #[test]
215    fn query_assignment_response_body_serialization_with_modes() {
216        let assignments = vec![
217            create_assignment("topic", "broker", 0, MessageRequestMode::Pull, None),
218            create_assignment("topic", "broker", 1, MessageRequestMode::Pop, None),
219        ];
220        let body = create_body_with_assignments(assignments);
221        let json = serde_json::to_string(&body).unwrap();
222        assert!(json.contains("\"PULL\""));
223        assert!(json.contains("\"POP\""));
224    }
225
226    #[test]
227    fn query_assignment_response_body_serialization_empty() {
228        let body = QueryAssignmentResponseBody::default();
229        let json = serde_json::to_string(&body).unwrap();
230        assert_eq!(json, "{\"messageQueueAssignments\":[]}");
231    }
232
233    #[test]
234    fn query_assignment_response_body_roundtrip_consistency() {
235        let assignments = vec![create_assignment(
236            "test-topic",
237            "broker-a",
238            5,
239            MessageRequestMode::Pull,
240            None,
241        )];
242        let body = create_body_with_assignments(assignments);
243        let json = serde_json::to_string(&body).unwrap();
244        let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
245        assert_eq!(
246            body.message_queue_assignments.len(),
247            recovered.message_queue_assignments.len()
248        );
249    }
250
251    #[test]
252    fn query_assignment_response_body_encode_decode() {
253        let body = create_body_with_assignments(vec![create_assignment(
254            "test-topic",
255            "broker-a",
256            3,
257            MessageRequestMode::Pull,
258            None,
259        )]);
260
261        let encoded = body.encode().expect("encode should succeed");
262        assert!(!encoded.is_empty());
263
264        let decoded = QueryAssignmentResponseBody::decode(&encoded).expect("decode should succeed");
265        assert_eq!(
266            body.message_queue_assignments.len(),
267            decoded.message_queue_assignments.len()
268        );
269    }
270
271    #[test]
272    fn query_assignment_response_body_encode_produces_valid_json() {
273        let body = create_body_with_assignments(vec![create_assignment(
274            "topic",
275            "broker",
276            0,
277            MessageRequestMode::Pull,
278            None,
279        )]);
280
281        let encoded = body.encode().expect("encode should succeed");
282        let json_str = String::from_utf8(encoded).expect("encoded bytes should be valid UTF-8");
283
284        assert!(json_str.contains("messageQueueAssignments"));
285        let _: serde_json::Value = serde_json::from_str(&json_str).expect("encoded JSON should be valid");
286    }
287
288    // ==================== Deserialization Tests ====================
289
290    #[test]
291    fn query_assignment_response_body_deserialization() {
292        let json = r#"{
293            "messageQueueAssignments": [
294                {
295                    "messageQueue": {
296                        "topic": "test-topic",
297                        "brokerName": "broker-b",
298                        "queueId": 2
299                    },
300                    "mode": "PULL",
301                    "attachments": {"version": "1.0"}
302                }
303            ]
304        }"#;
305
306        let deserialized: QueryAssignmentResponseBody =
307            serde_json::from_str(json).expect("Failed to deserialize QueryAssignmentResponseBody");
308
309        assert_eq!(deserialized.message_queue_assignments.len(), 1);
310
311        let assignment = deserialized
312            .message_queue_assignments
313            .iter()
314            .next()
315            .expect("Should have at least one assignment");
316        let mq = assignment
317            .message_queue
318            .as_ref()
319            .expect("MessageQueue should be present");
320
321        assert_eq!(mq.topic(), "test-topic");
322        assert_eq!(mq.queue_id(), 2);
323
324        let attachments = assignment.attachments.as_ref().expect("Attachments should be present");
325        assert_eq!(
326            attachments
327                .get(&CheetahString::from("version"))
328                .expect("Version key should exist"),
329            "1.0"
330        );
331    }
332
333    #[test]
334    fn query_assignment_response_body_deserialization_missing_optional_fields() {
335        let json = r#"{
336            "messageQueueAssignments": [
337                {
338                    "messageQueue": {
339                        "topic": "topic",
340                        "brokerName": "broker",
341                        "queueId": 1
342                    },
343                    "mode": "PULL"
344                }
345            ]
346        }"#;
347        let body: QueryAssignmentResponseBody = serde_json::from_str(json).expect("Failed to deserialize");
348        assert_eq!(body.message_queue_assignments.len(), 1);
349        let assignment = body
350            .message_queue_assignments
351            .iter()
352            .next()
353            .expect("Should have at least one assignment");
354        assert!(assignment.attachments.is_none());
355    }
356
357    #[test]
358    fn query_assignment_response_body_deserialization_extra_fields_ignored() {
359        let json = r#"{
360            "messageQueueAssignments": [
361                {
362                    "messageQueue": {
363                        "topic": "topic",
364                        "brokerName": "broker",
365                        "queueId": 1
366                    },
367                    "mode": "PULL",
368                    "attachments": null,
369                    "extraField": "should be ignored",
370                    "anotherField": 123
371                }
372            ]
373        }"#;
374        let body: QueryAssignmentResponseBody = serde_json::from_str(json).expect("Failed to deserialize");
375        assert_eq!(body.message_queue_assignments.len(), 1);
376        let assignment = body
377            .message_queue_assignments
378            .iter()
379            .next()
380            .expect("Should have at least one assignment");
381        let mq = assignment
382            .message_queue
383            .as_ref()
384            .expect("MessageQueue should be present");
385        assert_eq!(mq.topic(), "topic");
386        assert_eq!(mq.queue_id(), 1);
387    }
388
389    #[test]
390    fn query_assignment_response_body_deserialization_malformed_json() {
391        let json = r#"{ invalid json }"#;
392        let result: Result<QueryAssignmentResponseBody, _> = serde_json::from_str(json);
393        assert!(result.is_err());
394    }
395
396    #[test]
397    fn query_assignment_response_body_deserialization_wrong_data_types() {
398        let json = r#"{
399            "messageQueueAssignments": "not an array"
400        }"#;
401        let result: Result<QueryAssignmentResponseBody, _> = serde_json::from_str(json);
402        assert!(result.is_err());
403    }
404
405    #[test]
406    fn query_assignment_response_body_nested_structure() {
407        let json = r#"{
408            "messageQueueAssignments": [
409                {
410                    "messageQueue": {
411                        "topic": "my-topic",
412                        "brokerName": "my-broker",
413                        "queueId": 3
414                    },
415                    "mode": "PULL",
416                    "attachments": null
417                }
418            ]
419        }"#;
420        let body: QueryAssignmentResponseBody = serde_json::from_str(json).expect("Failed to deserialize");
421        assert_eq!(body.message_queue_assignments.len(), 1);
422        let assignment = body
423            .message_queue_assignments
424            .iter()
425            .next()
426            .expect("Should have at least one assignment");
427        let mq = assignment
428            .message_queue
429            .as_ref()
430            .expect("MessageQueue should be present");
431        assert_eq!(mq.topic(), "my-topic");
432        assert_eq!(mq.broker_name(), "my-broker");
433        assert_eq!(mq.queue_id(), 3);
434    }
435
436    // ==================== Edge Cases Tests ====================
437
438    #[test]
439    fn query_assignment_response_body_with_none_message_queue() {
440        let assignment = MessageQueueAssignment {
441            message_queue: None,
442            mode: MessageRequestMode::Pull,
443            attachments: None,
444        };
445        let body = create_body_with_assignments(vec![assignment]);
446        assert_eq!(body.message_queue_assignments.len(), 1);
447        let json = serde_json::to_string(&body).unwrap();
448        let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
449        assert_eq!(recovered.message_queue_assignments.len(), 1);
450        let assignment = recovered
451            .message_queue_assignments
452            .iter()
453            .next()
454            .expect("Should have at least one assignment");
455        assert!(assignment.message_queue.is_none());
456    }
457
458    #[test]
459    fn query_assignment_response_body_empty_attachments_vs_none() {
460        let empty_attachments = HashMap::new();
461        let body_with_empty = create_body_with_assignments(vec![create_assignment(
462            "topic",
463            "broker",
464            0,
465            MessageRequestMode::Pull,
466            Some(empty_attachments),
467        )]);
468
469        let body_with_none = create_body_with_assignments(vec![create_assignment(
470            "topic",
471            "broker",
472            0,
473            MessageRequestMode::Pull,
474            None,
475        )]);
476
477        let json_empty = serde_json::to_string(&body_with_empty).unwrap();
478        let json_none = serde_json::to_string(&body_with_none).unwrap();
479
480        assert_ne!(json_empty, json_none);
481    }
482
483    #[test]
484    fn query_assignment_response_body_with_attachments() {
485        let mut attachments = HashMap::new();
486        attachments.insert(CheetahString::from("key1"), CheetahString::from("value1"));
487        attachments.insert(CheetahString::from("key2"), CheetahString::from("value2"));
488        let assignments = vec![create_assignment(
489            "topic",
490            "broker",
491            0,
492            MessageRequestMode::Pull,
493            Some(attachments),
494        )];
495        let body = create_body_with_assignments(assignments);
496        let json = serde_json::to_string(&body).unwrap();
497        assert!(json.contains("\"key1\""));
498        assert!(json.contains("\"value1\""));
499        let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
500        assert_eq!(recovered.message_queue_assignments.len(), 1);
501        let assignment = recovered
502            .message_queue_assignments
503            .iter()
504            .next()
505            .expect("Should have at least one assignment");
506        let recovered_attachments = assignment.attachments.as_ref().expect("Attachments should be present");
507        assert_eq!(
508            recovered_attachments
509                .get(&CheetahString::from("key1"))
510                .expect("key1 should exist"),
511            "value1"
512        );
513    }
514
515    #[test]
516    fn query_assignment_response_body_large_dataset() {
517        let mut assignments = Vec::new();
518        for i in 0..100 {
519            assignments.push(create_assignment(
520                &format!("topic{}", i % 5),
521                &format!("broker{}", i % 3),
522                i,
523                if i % 2 == 0 {
524                    MessageRequestMode::Pull
525                } else {
526                    MessageRequestMode::Pop
527                },
528                None,
529            ));
530        }
531        let body = create_body_with_assignments(assignments);
532        assert_eq!(body.message_queue_assignments.len(), 100);
533        let json = serde_json::to_string(&body).unwrap();
534        let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
535        assert_eq!(recovered.message_queue_assignments.len(), 100);
536    }
537
538    // ==================== Real-World Scenario Tests ====================
539
540    #[test]
541    fn query_assignment_response_body_typical_real_world_response() {
542        let mut assignments = Vec::new();
543
544        for queue_id in 0..8 {
545            assignments.push(create_assignment(
546                "TopicOrder",
547                "broker-a",
548                queue_id,
549                MessageRequestMode::Pull,
550                None,
551            ));
552            assignments.push(create_assignment(
553                "TopicOrder",
554                "broker-b",
555                queue_id,
556                MessageRequestMode::Pull,
557                None,
558            ));
559        }
560
561        let body = create_body_with_assignments(assignments);
562        assert_eq!(body.message_queue_assignments.len(), 16);
563
564        let json = serde_json::to_string(&body).unwrap();
565        let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
566        assert_eq!(recovered.message_queue_assignments.len(), 16);
567
568        let broker_a_count = recovered
569            .message_queue_assignments
570            .iter()
571            .filter(|a| {
572                a.message_queue
573                    .as_ref()
574                    .map(|mq| mq.broker_name() == "broker-a")
575                    .unwrap_or(false)
576            })
577            .count();
578        assert_eq!(broker_a_count, 8);
579    }
580
581    #[test]
582    fn query_assignment_response_body_single_queue_assignment() {
583        let assignments = vec![create_assignment(
584            "my-topic",
585            "my-broker",
586            7,
587            MessageRequestMode::Pop,
588            None,
589        )];
590        let body = create_body_with_assignments(assignments);
591        assert_eq!(body.message_queue_assignments.len(), 1);
592        let assignment = body
593            .message_queue_assignments
594            .iter()
595            .next()
596            .expect("Should have at least one assignment");
597        assert_eq!(
598            assignment
599                .message_queue
600                .as_ref()
601                .expect("MessageQueue should be present")
602                .topic(),
603            "my-topic"
604        );
605    }
606
607    #[test]
608    fn query_assignment_response_body_empty_response() {
609        let body = QueryAssignmentResponseBody::default();
610        assert!(body.message_queue_assignments.is_empty());
611        let json = serde_json::to_string(&body).unwrap();
612        assert!(json.contains("\"messageQueueAssignments\":[]"));
613
614        let recovered: QueryAssignmentResponseBody = serde_json::from_str(&json).expect("Failed to deserialize");
615        assert!(recovered.message_queue_assignments.is_empty());
616    }
617
618    #[test]
619    fn query_assignment_response_body_multiple_queues_single_broker() {
620        let assignments = vec![
621            create_assignment("topic", "broker-a", 0, MessageRequestMode::Pull, None),
622            create_assignment("topic", "broker-a", 1, MessageRequestMode::Pull, None),
623            create_assignment("topic", "broker-a", 2, MessageRequestMode::Pull, None),
624        ];
625        let body = create_body_with_assignments(assignments);
626        assert_eq!(body.message_queue_assignments.len(), 3);
627        assert!(body.message_queue_assignments.iter().all(|a| a
628            .message_queue
629            .as_ref()
630            .expect("MessageQueue should be present")
631            .broker_name()
632            == "broker-a"));
633    }
634
635    #[test]
636    fn query_assignment_response_body_with_multiple_assignments() {
637        let assignments = vec![
638            create_assignment("topic", "broker", 0, MessageRequestMode::Pull, None),
639            create_assignment("topic", "broker", 1, MessageRequestMode::Pull, None),
640            create_assignment("topic", "broker", 2, MessageRequestMode::Pop, None),
641        ];
642        let body = create_body_with_assignments(assignments);
643        assert_eq!(body.message_queue_assignments.len(), 3);
644    }
645}