rocketmq_client_v4/protocols/header/
send_message_request_header.rs1use crate::protocols::{get_current_time_millis, SerializeDeserialize};
2use bytes::{BufMut, BytesMut};
3use serde::Serialize;
4use std::collections::HashMap;
5
6#[derive(Debug, Serialize)]
7#[allow(non_snake_case)]
8pub struct SendMessageRequestHeader {
9 pub producerGroup: String,
10 pub topic: String,
11 pub defaultTopic: String,
12 pub defaultTopicQueueNums: i32,
13 pub queueId: i32,
14 pub sysFlag: i32,
15 pub bornTimestamp: i64,
16 pub flag: i32,
17 pub properties: String,
18 pub reconsumeTimes: i32,
19 pub unitMode: bool,
20 pub batch: bool,
21 pub maxReconsumeTimes: i32,
22}
23
24impl SendMessageRequestHeader {
25 pub fn new(
26 producer_group: String,
27 topic: String,
28 queue_id: i32,
29 properties: &HashMap<String, String>,
30 ) -> Self {
31 SendMessageRequestHeader {
32 producerGroup: producer_group,
33 topic,
34 defaultTopic: "TBW102".to_string(),
35 defaultTopicQueueNums: 4,
36 queueId: queue_id,
37 sysFlag: 0,
38 bornTimestamp: get_current_time_millis(),
39 flag: 0,
40 properties: Self::convert_map_to_string(properties),
41 reconsumeTimes: 0,
42 unitMode: false,
43 batch: true,
44 maxReconsumeTimes: 0,
45 }
46 }
47
48 pub fn convert_map_to_string(map: &HashMap<String, String>) -> String {
49 let mut bytebuf = BytesMut::with_capacity(128);
50 for (k, v) in map {
51 bytebuf.put_slice(k.as_bytes());
52 bytebuf.put_u8(1);
53 bytebuf.put_slice(v.as_bytes());
54 bytebuf.put_u8(2)
55 }
56 let mut list = bytebuf.to_vec();
57
58 if list.len() > 0 {
59 list.pop();
60 }
61 String::from_utf8(list).unwrap()
62 }
63}
64
65impl SerializeDeserialize for SendMessageRequestHeader {}
66
67#[derive(Debug, Serialize)]
68#[allow(non_snake_case)]
69pub struct SendMessageRequestHeaderV2 {
70 pub a: String, pub b: String, pub c: String, pub d: i32, pub e: i32, pub f: i32, pub g: i64, pub h: i32, pub i: String, pub j: i32, pub k: bool, pub l: i32, pub m: bool, }
84
85impl SendMessageRequestHeaderV2 {
86 pub fn new(header: SendMessageRequestHeader) -> Self {
87 Self {
88 a: header.producerGroup,
89 b: header.topic,
90 c: header.defaultTopic,
91 d: header.defaultTopicQueueNums,
92 e: header.queueId,
93 f: header.sysFlag,
94 g: header.bornTimestamp,
95 h: header.flag,
96 i: header.properties,
97 j: header.reconsumeTimes,
98 k: header.unitMode,
99 l: header.maxReconsumeTimes,
100 m: header.batch,
101 }
102 }
103}
104
105impl SerializeDeserialize for SendMessageRequestHeaderV2 {
106 fn to_bytes_1(&self) -> Vec<u8>
107 where
108 Self: Serialize,
109 {
110 let mut buf = BytesMut::with_capacity(128);
111 buf.put_i16(1);
112 buf.put_slice("a".as_bytes());
113 buf.put_i32(self.a.len() as i32);
114 buf.put_slice(self.a.as_bytes());
115
116 buf.put_i16(1);
117 buf.put_slice("b".as_bytes());
118 buf.put_i32(self.b.len() as i32);
119 buf.put_slice(self.b.as_bytes());
120
121 buf.put_i16(1);
122 buf.put_slice("c".as_bytes());
123 buf.put_i32(self.c.len() as i32);
124 buf.put_slice(self.c.as_bytes());
125
126 buf.put_i16(1);
127 buf.put_slice("d".as_bytes());
128 let td = self.d.to_string();
129 buf.put_i32(td.len() as i32);
130 buf.put_slice(td.as_bytes());
131
132 buf.put_i16(1);
133 buf.put_slice("e".as_bytes());
134 let te = self.e.to_string();
135 buf.put_i32(te.len() as i32);
136 buf.put_slice(te.as_bytes());
137
138 buf.put_i16(1);
139 buf.put_slice("f".as_bytes());
140 let tf = self.f.to_string();
141 buf.put_i32(tf.len() as i32);
142 buf.put_slice(tf.as_bytes());
143
144 buf.put_i16(1);
145 buf.put_slice("g".as_bytes());
146 let tg = self.g.to_string();
147 buf.put_i32(tg.len() as i32);
148 buf.put_slice(tg.as_bytes());
149
150 buf.put_i16(1);
151 buf.put_slice("h".as_bytes());
152 let th = self.h.to_string();
153 buf.put_i32(th.len() as i32);
154 buf.put_slice(th.as_bytes());
155
156 buf.put_i16(1);
157 buf.put_slice("i".as_bytes());
158 buf.put_i32(self.i.len() as i32);
159 buf.put_slice(self.i.as_bytes());
160
161 buf.put_i16(1);
162 buf.put_slice("j".as_bytes());
163 let tj = self.j.to_string();
164 buf.put_i32(tj.len() as i32);
165 buf.put_slice(tj.as_bytes());
166
167 buf.put_i16(1);
168 buf.put_slice("k".as_bytes());
169 match self.k {
170 true => {
171 buf.put_i32("true".len() as i32);
172 buf.put_slice("true".as_bytes());
173 }
174 false => {
175 buf.put_i32("false".len() as i32);
176 buf.put_slice("false".as_bytes());
177 }
178 }
179
180 buf.put_i16(1);
181 buf.put_slice("l".as_bytes());
182 let tl = self.l.to_string();
183 buf.put_i32(tl.len() as i32);
184 buf.put_slice(tl.as_bytes());
185
186 buf.put_i16(1);
187 buf.put_slice("m".as_bytes());
188 match self.m {
189 true => {
190 buf.put_i32("true".len() as i32);
191 buf.put_slice("true".as_bytes());
192 }
193 false => {
194 buf.put_i32("false".len() as i32);
195 buf.put_slice("false".as_bytes());
196 }
197 }
198
199 buf.to_vec()
201 }
202}