rocketmq_remoting/protocol/header/message_operation_header/
send_message_response_header.rs1use std::collections::HashMap;
16
17use cheetah_string::CheetahString;
18use rocketmq_macros::RequestHeaderCodecV2;
19use serde::Deserialize;
20use serde::Serialize;
21
22use crate::protocol::FastCodesHeader;
23
24#[derive(Debug, Serialize, Deserialize, Default, RequestHeaderCodecV2)]
25#[serde(rename_all = "camelCase")]
26pub struct SendMessageResponseHeader {
27 msg_id: CheetahString,
28 queue_id: i32,
29 queue_offset: i64,
30 transaction_id: Option<CheetahString>,
31 batch_uniq_id: Option<CheetahString>,
32 #[serde(skip_serializing_if = "Option::is_none")]
33 recall_handle: Option<CheetahString>,
34}
35
36impl SendMessageResponseHeader {
37 pub fn new(
38 msg_id: CheetahString,
39 queue_id: i32,
40 queue_offset: i64,
41 transaction_id: Option<CheetahString>,
42 batch_uniq_id: Option<CheetahString>,
43 recall_handle: Option<CheetahString>,
44 ) -> Self {
45 SendMessageResponseHeader {
46 msg_id,
47 queue_id,
48 queue_offset,
49 transaction_id,
50 batch_uniq_id,
51 recall_handle,
52 }
53 }
54
55 pub fn msg_id(&self) -> &CheetahString {
56 &self.msg_id
57 }
58
59 pub fn queue_id(&self) -> i32 {
60 self.queue_id
61 }
62
63 pub fn queue_offset(&self) -> i64 {
64 self.queue_offset
65 }
66
67 pub fn transaction_id(&self) -> Option<&str> {
68 self.transaction_id.as_deref()
69 }
70
71 pub fn batch_uniq_id(&self) -> Option<&str> {
72 self.batch_uniq_id.as_deref()
73 }
74
75 pub fn recall_handle(&self) -> Option<&str> {
76 self.recall_handle.as_deref()
77 }
78
79 pub fn set_msg_id(&mut self, msg_id: impl Into<CheetahString>) {
80 self.msg_id = msg_id.into();
81 }
82
83 pub fn set_queue_id(&mut self, queue_id: i32) {
84 self.queue_id = queue_id;
85 }
86
87 pub fn set_queue_offset(&mut self, queue_offset: i64) {
88 self.queue_offset = queue_offset;
89 }
90
91 pub fn set_transaction_id(&mut self, transaction_id: Option<CheetahString>) {
92 self.transaction_id = transaction_id;
93 }
94
95 pub fn set_batch_uniq_id(&mut self, batch_uniq_id: Option<CheetahString>) {
96 self.batch_uniq_id = batch_uniq_id;
97 }
98
99 pub fn set_recall_handle(&mut self, recall_handle: Option<CheetahString>) {
100 self.recall_handle = recall_handle;
101 }
102}
103
104impl FastCodesHeader for SendMessageResponseHeader {
105 fn encode_fast(&mut self, out: &mut bytes::BytesMut) {
106 Self::write_if_not_null(out, "msgId", self.msg_id.as_str());
107 Self::write_if_not_null(out, "queueId", self.queue_id.to_string().as_str());
108 Self::write_if_not_null(out, "queueOffset", self.queue_offset.to_string().as_str());
109 if let Some(ref transaction_id) = self.transaction_id {
110 Self::write_if_not_null(out, "transactionId", transaction_id.as_str());
111 }
112 if let Some(ref batch_uniq_id) = self.batch_uniq_id {
113 Self::write_if_not_null(out, "batchUniqId", batch_uniq_id.as_str());
114 }
115 if let Some(ref recall_handle) = self.recall_handle {
116 Self::write_if_not_null(out, "recallHandle", recall_handle.as_str());
117 }
118 }
119
120 fn decode_fast(&mut self, fields: &HashMap<CheetahString, CheetahString>) {
121 if let Some(str) = fields.get(&CheetahString::from_slice("msgId")) {
122 self.msg_id = str.clone();
123 }
124
125 if let Some(str) = fields.get(&CheetahString::from_slice("queueId")) {
126 self.queue_id = str.parse::<i32>().unwrap_or_default();
127 }
128
129 if let Some(str) = fields.get(&CheetahString::from_slice("queueOffset")) {
130 self.queue_offset = str.parse::<i64>().unwrap_or_default();
131 }
132
133 if let Some(str) = fields.get(&CheetahString::from_slice("transactionId")) {
134 self.transaction_id = Some(str.clone());
135 }
136
137 if let Some(str) = fields.get(&CheetahString::from_slice("batchUniqId")) {
138 self.batch_uniq_id = Some(str.clone());
139 }
140
141 if let Some(str) = fields.get(&CheetahString::from_slice("recallHandle")) {
142 self.recall_handle = Some(str.clone());
143 }
144 }
145}
146
147#[cfg(test)]
148mod tests {
149 use super::*;
150
151 #[test]
152 fn send_message_response_header_new_and_default() {
153 let header = SendMessageResponseHeader::new(
154 CheetahString::from("msg123"),
155 1,
156 100,
157 Some(CheetahString::from("tx456")),
158 Some(CheetahString::from("batch789")),
159 Some(CheetahString::from("recall-handle")),
160 );
161
162 assert_eq!(header.msg_id(), "msg123");
163 assert_eq!(header.queue_id(), 1);
164 assert_eq!(header.queue_offset(), 100);
165 assert_eq!(header.transaction_id(), Some("tx456"));
166 assert_eq!(header.batch_uniq_id(), Some("batch789"));
167 assert_eq!(header.recall_handle(), Some("recall-handle"));
168
169 let header = SendMessageResponseHeader::default();
170
171 assert_eq!(header.msg_id(), "");
172 assert_eq!(header.queue_id(), 0);
173 assert_eq!(header.queue_offset(), 0);
174 assert_eq!(header.transaction_id(), None);
175 assert_eq!(header.batch_uniq_id(), None);
176 assert_eq!(header.recall_handle(), None);
177 }
178
179 #[test]
180 fn send_message_response_header_setters_and_getters() {
181 let mut header = SendMessageResponseHeader::default();
182 header.set_msg_id("newMsgId");
183 header.set_queue_id(2);
184 header.set_queue_offset(200);
185 header.set_transaction_id(Some(CheetahString::from("newTxId")));
186 header.set_batch_uniq_id(Some(CheetahString::from("newBatchId")));
187 header.set_recall_handle(Some(CheetahString::from("recall-123")));
188
189 assert_eq!(header.msg_id(), "newMsgId");
190 assert_eq!(header.queue_id(), 2);
191 assert_eq!(header.queue_offset(), 200);
192 assert_eq!(header.transaction_id(), Some("newTxId"));
193 assert_eq!(header.batch_uniq_id(), Some("newBatchId"));
194 assert_eq!(header.recall_handle(), Some("recall-123"));
195 }
196
197 #[test]
198 fn send_message_response_header_serialization_and_deserialization() {
199 let header = SendMessageResponseHeader::new(
200 CheetahString::from("msg123"),
201 1,
202 100,
203 Some(CheetahString::from("tx456")),
204 Some(CheetahString::from("batch789")),
205 Some(CheetahString::from("recall-handle")),
206 );
207
208 let json = serde_json::to_string(&header).unwrap();
209 assert_eq!(
210 json,
211 r#"{"msgId":"msg123","queueId":1,"queueOffset":100,"transactionId":"tx456","batchUniqId":"batch789","recallHandle":"recall-handle"}"#
212 );
213
214 let header: SendMessageResponseHeader = serde_json::from_str(&json).unwrap();
215 assert_eq!(header.msg_id(), "msg123");
216 assert_eq!(header.queue_id(), 1);
217 assert_eq!(header.queue_offset(), 100);
218 assert_eq!(header.transaction_id(), Some("tx456"));
219 assert_eq!(header.batch_uniq_id(), Some("batch789"));
220 assert_eq!(header.recall_handle(), Some("recall-handle"));
221 }
222
223 #[test]
224 fn send_message_response_header_encode_decode_fast() {
225 let mut header = SendMessageResponseHeader::new(
226 CheetahString::from("msg123"),
227 1,
228 100,
229 Some(CheetahString::from("tx456")),
230 Some(CheetahString::from("batch789")),
231 Some(CheetahString::from("recall-handle")),
232 );
233
234 let mut out = bytes::BytesMut::new();
235 header.encode_fast(&mut out);
236 assert!(!out.is_empty());
237
238 let mut fields = std::collections::HashMap::new();
239 fields.insert(CheetahString::from("msgId"), CheetahString::from("msg123"));
240 fields.insert(CheetahString::from("queueId"), CheetahString::from("1"));
241 fields.insert(CheetahString::from("queueOffset"), CheetahString::from("100"));
242 fields.insert(CheetahString::from("transactionId"), CheetahString::from("tx456"));
243 fields.insert(CheetahString::from("batchUniqId"), CheetahString::from("batch789"));
244 fields.insert(
245 CheetahString::from("recallHandle"),
246 CheetahString::from("recall-handle"),
247 );
248
249 let mut header = SendMessageResponseHeader::default();
250 header.decode_fast(&fields);
251
252 assert_eq!(header.msg_id(), "msg123");
253 assert_eq!(header.queue_id(), 1);
254 assert_eq!(header.queue_offset(), 100);
255 assert_eq!(header.transaction_id(), Some("tx456"));
256 assert_eq!(header.batch_uniq_id(), Some("batch789"));
257 assert_eq!(header.recall_handle(), Some("recall-handle"));
258 }
259}