rocketmq_client_v4/protocols/header/
send_message_request_header.rs

1use crate::protocols::{get_current_time_millis, SerializeDeserialize};
2use bytes::{BufMut, BytesMut};
3use serde::Serialize;
4use std::collections::HashMap;
5
6#[derive(Debug, Serialize)]
7#[allow(non_snake_case)]
8pub struct SendMessageRequestHeader {
9    pub producerGroup: String,
10    pub topic: String,
11    pub defaultTopic: String,
12    pub defaultTopicQueueNums: i32,
13    pub queueId: i32,
14    pub sysFlag: i32,
15    pub bornTimestamp: i64,
16    pub flag: i32,
17    pub properties: String,
18    pub reconsumeTimes: i32,
19    pub unitMode: bool,
20    pub batch: bool,
21    pub maxReconsumeTimes: i32,
22}
23
24impl SendMessageRequestHeader {
25    pub fn new(
26        producer_group: String,
27        topic: String,
28        queue_id: i32,
29        properties: &HashMap<String, String>,
30    ) -> Self {
31        SendMessageRequestHeader {
32            producerGroup: producer_group,
33            topic,
34            defaultTopic: "TBW102".to_string(),
35            defaultTopicQueueNums: 4,
36            queueId: queue_id,
37            sysFlag: 0,
38            bornTimestamp: get_current_time_millis(),
39            flag: 0,
40            properties: Self::convert_map_to_string(properties),
41            reconsumeTimes: 0,
42            unitMode: false,
43            batch: true,
44            maxReconsumeTimes: 0,
45        }
46    }
47
48    pub fn convert_map_to_string(map: &HashMap<String, String>) -> String {
49        let mut bytebuf = BytesMut::with_capacity(128);
50        for (k, v) in map {
51            bytebuf.put_slice(k.as_bytes());
52            bytebuf.put_u8(1);
53            bytebuf.put_slice(v.as_bytes());
54            bytebuf.put_u8(2)
55        }
56        let mut list = bytebuf.to_vec();
57
58        if list.len() > 0 {
59            list.pop();
60        }
61        String::from_utf8(list).unwrap()
62    }
63}
64
65impl SerializeDeserialize for SendMessageRequestHeader {}
66
67#[derive(Debug, Serialize)]
68#[allow(non_snake_case)]
69pub struct SendMessageRequestHeaderV2 {
70    pub a: String, // producerGroup
71    pub b: String, //topic
72    pub c: String, // defaultTopic
73    pub d: i32,    // defaultTopicQueueNums
74    pub e: i32,    // queueId
75    pub f: i32,    // sysFlag
76    pub g: i64,    // bornTimestamp
77    pub h: i32,    // flag
78    pub i: String, // properties
79    pub j: i32,    //reconsumeTimes
80    pub k: bool,   //unitMode
81    pub l: i32,    //consumeRetryTimes
82    pub m: bool,   // batch
83}
84
85impl SendMessageRequestHeaderV2 {
86    pub fn new(header: SendMessageRequestHeader) -> Self {
87        Self {
88            a: header.producerGroup,
89            b: header.topic,
90            c: header.defaultTopic,
91            d: header.defaultTopicQueueNums,
92            e: header.queueId,
93            f: header.sysFlag,
94            g: header.bornTimestamp,
95            h: header.flag,
96            i: header.properties,
97            j: header.reconsumeTimes,
98            k: header.unitMode,
99            l: header.maxReconsumeTimes,
100            m: header.batch,
101        }
102    }
103}
104
105impl SerializeDeserialize for SendMessageRequestHeaderV2 {
106    fn to_bytes_1(&self) -> Vec<u8>
107    where
108        Self: Serialize,
109    {
110        let mut buf = BytesMut::with_capacity(128);
111        buf.put_i16(1);
112        buf.put_slice("a".as_bytes());
113        buf.put_i32(self.a.len() as i32);
114        buf.put_slice(self.a.as_bytes());
115
116        buf.put_i16(1);
117        buf.put_slice("b".as_bytes());
118        buf.put_i32(self.b.len() as i32);
119        buf.put_slice(self.b.as_bytes());
120
121        buf.put_i16(1);
122        buf.put_slice("c".as_bytes());
123        buf.put_i32(self.c.len() as i32);
124        buf.put_slice(self.c.as_bytes());
125
126        buf.put_i16(1);
127        buf.put_slice("d".as_bytes());
128        let td = self.d.to_string();
129        buf.put_i32(td.len() as i32);
130        buf.put_slice(td.as_bytes());
131
132        buf.put_i16(1);
133        buf.put_slice("e".as_bytes());
134        let te = self.e.to_string();
135        buf.put_i32(te.len() as i32);
136        buf.put_slice(te.as_bytes());
137
138        buf.put_i16(1);
139        buf.put_slice("f".as_bytes());
140        let tf = self.f.to_string();
141        buf.put_i32(tf.len() as i32);
142        buf.put_slice(tf.as_bytes());
143
144        buf.put_i16(1);
145        buf.put_slice("g".as_bytes());
146        let tg = self.g.to_string();
147        buf.put_i32(tg.len() as i32);
148        buf.put_slice(tg.as_bytes());
149
150        buf.put_i16(1);
151        buf.put_slice("h".as_bytes());
152        let th = self.h.to_string();
153        buf.put_i32(th.len() as i32);
154        buf.put_slice(th.as_bytes());
155
156        buf.put_i16(1);
157        buf.put_slice("i".as_bytes());
158        buf.put_i32(self.i.len() as i32);
159        buf.put_slice(self.i.as_bytes());
160
161        buf.put_i16(1);
162        buf.put_slice("j".as_bytes());
163        let tj = self.j.to_string();
164        buf.put_i32(tj.len() as i32);
165        buf.put_slice(tj.as_bytes());
166
167        buf.put_i16(1);
168        buf.put_slice("k".as_bytes());
169        match self.k {
170            true => {
171                buf.put_i32("true".len() as i32);
172                buf.put_slice("true".as_bytes());
173            }
174            false => {
175                buf.put_i32("false".len() as i32);
176                buf.put_slice("false".as_bytes());
177            }
178        }
179
180        buf.put_i16(1);
181        buf.put_slice("l".as_bytes());
182        let tl = self.l.to_string();
183        buf.put_i32(tl.len() as i32);
184        buf.put_slice(tl.as_bytes());
185
186        buf.put_i16(1);
187        buf.put_slice("m".as_bytes());
188        match self.m {
189            true => {
190                buf.put_i32("true".len() as i32);
191                buf.put_slice("true".as_bytes());
192            }
193            false => {
194                buf.put_i32("false".len() as i32);
195                buf.put_slice("false".as_bytes());
196            }
197        }
198
199        // debug!("bytes:{:?}", buf.to_vec());
200        buf.to_vec()
201    }
202}