Skip to main content

rocketmq_remoting/protocol/header/message_operation_header/
send_message_response_header.rs

1// Copyright 2023 The RocketMQ Rust Authors
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::protocol::FastCodesHeader;
23
24#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct SendMessageResponseHeader {
27    msg_id: CheetahString,
28    queue_id: i32,
29    queue_offset: i64,
30    transaction_id: Option<CheetahString>,
31    batch_uniq_id: Option<CheetahString>,
32    #[serde(skip_serializing_if = "Option::is_none")]
33    recall_handle: Option<CheetahString>,
34}
35
36impl SendMessageResponseHeader {
37    pub fn new(
38        msg_id: CheetahString,
39        queue_id: i32,
40        queue_offset: i64,
41        transaction_id: Option<CheetahString>,
42        batch_uniq_id: Option<CheetahString>,
43        recall_handle: Option<CheetahString>,
44    ) -> Self {
45        SendMessageResponseHeader {
46            msg_id,
47            queue_id,
48            queue_offset,
49            transaction_id,
50            batch_uniq_id,
51            recall_handle,
52        }
53    }
54
55    pub fn msg_id(&self) -> &CheetahString {
56        &self.msg_id
57    }
58
59    pub fn queue_id(&self) -> i32 {
60        self.queue_id
61    }
62
63    pub fn queue_offset(&self) -> i64 {
64        self.queue_offset
65    }
66
67    pub fn transaction_id(&self) -> Option<&str> {
68        self.transaction_id.as_deref()
69    }
70
71    pub fn batch_uniq_id(&self) -> Option<&str> {
72        self.batch_uniq_id.as_deref()
73    }
74
75    pub fn recall_handle(&self) -> Option<&str> {
76        self.recall_handle.as_deref()
77    }
78
79    pub fn set_msg_id(&mut self, msg_id: impl Into<CheetahString>) {
80        self.msg_id = msg_id.into();
81    }
82
83    pub fn set_queue_id(&mut self, queue_id: i32) {
84        self.queue_id = queue_id;
85    }
86
87    pub fn set_queue_offset(&mut self, queue_offset: i64) {
88        self.queue_offset = queue_offset;
89    }
90
91    pub fn set_transaction_id(&mut self, transaction_id: Option<CheetahString>) {
92        self.transaction_id = transaction_id;
93    }
94
95    pub fn set_batch_uniq_id(&mut self, batch_uniq_id: Option<CheetahString>) {
96        self.batch_uniq_id = batch_uniq_id;
97    }
98
99    pub fn set_recall_handle(&mut self, recall_handle: Option<CheetahString>) {
100        self.recall_handle = recall_handle;
101    }
102}
103
104impl FastCodesHeader for SendMessageResponseHeader {
105    fn encode_fast(&mut self, out: &mut bytes::BytesMut) {
106        Self::write_if_not_null(out, "msgId", self.msg_id.as_str());
107        Self::write_if_not_null(out, "queueId", self.queue_id.to_string().as_str());
108        Self::write_if_not_null(out, "queueOffset", self.queue_offset.to_string().as_str());
109        if let Some(ref transaction_id) = self.transaction_id {
110            Self::write_if_not_null(out, "transactionId", transaction_id.as_str());
111        }
112        if let Some(ref batch_uniq_id) = self.batch_uniq_id {
113            Self::write_if_not_null(out, "batchUniqId", batch_uniq_id.as_str());
114        }
115        if let Some(ref recall_handle) = self.recall_handle {
116            Self::write_if_not_null(out, "recallHandle", recall_handle.as_str());
117        }
118    }
119
120    fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>) {
121        if let Some(str) = fields.get(&CheetahString::from_slice("msgId")) {
122            self.msg_id = str.clone();
123        }
124
125        if let Some(str) = fields.get(&CheetahString::from_slice("queueId")) {
126            self.queue_id = str.parse::<i32>().unwrap_or_default();
127        }
128
129        if let Some(str) = fields.get(&CheetahString::from_slice("queueOffset")) {
130            self.queue_offset = str.parse::<i64>().unwrap_or_default();
131        }
132
133        if let Some(str) = fields.get(&CheetahString::from_slice("transactionId")) {
134            self.transaction_id = Some(str.clone());
135        }
136
137        if let Some(str) = fields.get(&CheetahString::from_slice("batchUniqId")) {
138            self.batch_uniq_id = Some(str.clone());
139        }
140
141        if let Some(str) = fields.get(&CheetahString::from_slice("recallHandle")) {
142            self.recall_handle = Some(str.clone());
143        }
144    }
145}
146
147#[cfg(test)]
148mod tests {
149    use super::*;
150
151    #[test]
152    fn send_message_response_header_new_and_default() {
153        let header = SendMessageResponseHeader::new(
154            CheetahString::from("msg123"),
155            1,
156            100,
157            Some(CheetahString::from("tx456")),
158            Some(CheetahString::from("batch789")),
159            Some(CheetahString::from("recall-handle")),
160        );
161
162        assert_eq!(header.msg_id(), "msg123");
163        assert_eq!(header.queue_id(), 1);
164        assert_eq!(header.queue_offset(), 100);
165        assert_eq!(header.transaction_id(), Some("tx456"));
166        assert_eq!(header.batch_uniq_id(), Some("batch789"));
167        assert_eq!(header.recall_handle(), Some("recall-handle"));
168
169        let header = SendMessageResponseHeader::default();
170
171        assert_eq!(header.msg_id(), "");
172        assert_eq!(header.queue_id(), 0);
173        assert_eq!(header.queue_offset(), 0);
174        assert_eq!(header.transaction_id(), None);
175        assert_eq!(header.batch_uniq_id(), None);
176        assert_eq!(header.recall_handle(), None);
177    }
178
179    #[test]
180    fn send_message_response_header_setters_and_getters() {
181        let mut header = SendMessageResponseHeader::default();
182        header.set_msg_id("newMsgId");
183        header.set_queue_id(2);
184        header.set_queue_offset(200);
185        header.set_transaction_id(Some(CheetahString::from("newTxId")));
186        header.set_batch_uniq_id(Some(CheetahString::from("newBatchId")));
187        header.set_recall_handle(Some(CheetahString::from("recall-123")));
188
189        assert_eq!(header.msg_id(), "newMsgId");
190        assert_eq!(header.queue_id(), 2);
191        assert_eq!(header.queue_offset(), 200);
192        assert_eq!(header.transaction_id(), Some("newTxId"));
193        assert_eq!(header.batch_uniq_id(), Some("newBatchId"));
194        assert_eq!(header.recall_handle(), Some("recall-123"));
195    }
196
197    #[test]
198    fn send_message_response_header_serialization_and_deserialization() {
199        let header = SendMessageResponseHeader::new(
200            CheetahString::from("msg123"),
201            1,
202            100,
203            Some(CheetahString::from("tx456")),
204            Some(CheetahString::from("batch789")),
205            Some(CheetahString::from("recall-handle")),
206        );
207
208        let json = serde_json::to_string(&header).unwrap();
209        assert_eq!(
210            json,
211            r#"{"msgId":"msg123","queueId":1,"queueOffset":100,"transactionId":"tx456","batchUniqId":"batch789","recallHandle":"recall-handle"}"#
212        );
213
214        let header: SendMessageResponseHeader = serde_json::from_str(&json).unwrap();
215        assert_eq!(header.msg_id(), "msg123");
216        assert_eq!(header.queue_id(), 1);
217        assert_eq!(header.queue_offset(), 100);
218        assert_eq!(header.transaction_id(), Some("tx456"));
219        assert_eq!(header.batch_uniq_id(), Some("batch789"));
220        assert_eq!(header.recall_handle(), Some("recall-handle"));
221    }
222
223    #[test]
224    fn send_message_response_header_encode_decode_fast() {
225        let mut header = SendMessageResponseHeader::new(
226            CheetahString::from("msg123"),
227            1,
228            100,
229            Some(CheetahString::from("tx456")),
230            Some(CheetahString::from("batch789")),
231            Some(CheetahString::from("recall-handle")),
232        );
233
234        let mut out = bytes::BytesMut::new();
235        header.encode_fast(&mut out);
236        assert!(!out.is_empty());
237
238        let mut fields = std::collections::HashMap::new();
239        fields.insert(CheetahString::from("msgId"), CheetahString::from("msg123"));
240        fields.insert(CheetahString::from("queueId"), CheetahString::from("1"));
241        fields.insert(CheetahString::from("queueOffset"), CheetahString::from("100"));
242        fields.insert(CheetahString::from("transactionId"), CheetahString::from("tx456"));
243        fields.insert(CheetahString::from("batchUniqId"), CheetahString::from("batch789"));
244        fields.insert(
245            CheetahString::from("recallHandle"),
246            CheetahString::from("recall-handle"),
247        );
248
249        let mut header = SendMessageResponseHeader::default();
250        header.decode_fast(&fields);
251
252        assert_eq!(header.msg_id(), "msg123");
253        assert_eq!(header.queue_id(), 1);
254        assert_eq!(header.queue_offset(), 100);
255        assert_eq!(header.transaction_id(), Some("tx456"));
256        assert_eq!(header.batch_uniq_id(), Some("batch789"));
257        assert_eq!(header.recall_handle(), Some("recall-handle"));
258    }
259}