1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

use std::{
    collections::{HashMap, HashSet},
    string::ToString,
};

use bytes::{Buf, Bytes};
use lazy_static::lazy_static;

pub mod message_accessor;
pub mod message_batch;
pub mod message_client_id_setter;
pub mod message_decoder;
pub mod message_enum;
pub mod message_id;
pub mod message_queue;
pub mod message_single;

pub trait MessageTrait {
    fn topic(&self) -> &str;

    fn with_topic(&mut self, topic: impl Into<String>);

    fn tags(&self) -> Option<&str>;

    fn with_tags(&mut self, tags: impl Into<String>);

    fn put_property(&mut self, key: impl Into<String>, value: impl Into<String>);

    fn properties(&self) -> &HashMap<String, String>;

    fn put_user_property(&mut self, name: impl Into<String>, value: impl Into<String>);

    fn delay_time_level(&self) -> i32;

    fn with_delay_time_level(&self, level: i32) -> i32;
}

pub const MESSAGE_MAGIC_CODE_V1: i32 = -626843481;
pub const MESSAGE_MAGIC_CODE_V2: i32 = -626843477;

#[derive(Debug, PartialEq, Eq, Hash, Clone, Copy)]
pub enum MessageVersion {
    V1,
    V2,
}

impl Default for MessageVersion {
    fn default() -> Self {
        Self::V1
    }
}

impl MessageVersion {
    pub fn value_of_magic_code(magic_code: i32) -> Result<MessageVersion, &'static str> {
        match magic_code {
            MESSAGE_MAGIC_CODE_V1 => Ok(MessageVersion::V1),
            MESSAGE_MAGIC_CODE_V2 => Ok(MessageVersion::V2),
            _ => Err("Invalid magicCode"),
        }
    }

    pub fn get_magic_code(&self) -> i32 {
        match self {
            MessageVersion::V1 => MESSAGE_MAGIC_CODE_V1,
            MessageVersion::V2 => MESSAGE_MAGIC_CODE_V2,
        }
    }

    pub fn get_topic_length_size(&self) -> usize {
        match self {
            MessageVersion::V1 => 1,
            MessageVersion::V2 => 2,
        }
    }

    pub fn get_topic_length(&self, buffer: &mut Bytes) -> usize {
        match self {
            MessageVersion::V1 => i32::from(buffer.get_u8()) as usize,
            MessageVersion::V2 => buffer.get_i16() as usize,
        }
    }

    pub fn get_topic_length_at_index(&self, buffer: &[u8], index: usize) -> usize {
        match self {
            MessageVersion::V1 => buffer[index] as usize,
            MessageVersion::V2 => ((buffer[index] as usize) << 8) | (buffer[index + 1] as usize),
        }
    }

    pub fn put_topic_length(&self, buffer: &mut Vec<u8>, topic_length: usize) {
        match self {
            MessageVersion::V1 => buffer.push(topic_length as u8),
            MessageVersion::V2 => {
                buffer.push((topic_length >> 8) as u8);
                buffer.push((topic_length & 0xFF) as u8);
            }
        }
    }

    pub fn is_v1(&self) -> bool {
        match self {
            MessageVersion::V1 => true,
            MessageVersion::V2 => false,
        }
    }

    pub fn is_v2(&self) -> bool {
        match self {
            MessageVersion::V1 => false,
            MessageVersion::V2 => true,
        }
    }
}

pub struct MessageConst;

impl MessageConst {
    pub const PROPERTY_KEYS: &'static str = "KEYS";
    pub const PROPERTY_TAGS: &'static str = "TAGS";
    pub const PROPERTY_WAIT_STORE_MSG_OK: &'static str = "WAIT";
    pub const PROPERTY_DELAY_TIME_LEVEL: &'static str = "DELAY";
    pub const PROPERTY_RETRY_TOPIC: &'static str = "RETRY_TOPIC";
    pub const PROPERTY_REAL_TOPIC: &'static str = "REAL_TOPIC";
    pub const PROPERTY_REAL_QUEUE_ID: &'static str = "REAL_QID";
    pub const PROPERTY_TRANSACTION_PREPARED: &'static str = "TRAN_MSG";
    pub const PROPERTY_PRODUCER_GROUP: &'static str = "PGROUP";
    pub const PROPERTY_MIN_OFFSET: &'static str = "MIN_OFFSET";
    pub const PROPERTY_MAX_OFFSET: &'static str = "MAX_OFFSET";
    pub const PROPERTY_BUYER_ID: &'static str = "BUYER_ID";
    pub const PROPERTY_ORIGIN_MESSAGE_ID: &'static str = "ORIGIN_MESSAGE_ID";
    pub const PROPERTY_TRANSFER_FLAG: &'static str = "TRANSFER_FLAG";
    pub const PROPERTY_CORRECTION_FLAG: &'static str = "CORRECTION_FLAG";
    pub const PROPERTY_MQ2_FLAG: &'static str = "MQ2_FLAG";
    pub const PROPERTY_RECONSUME_TIME: &'static str = "RECONSUME_TIME";
    pub const PROPERTY_MSG_REGION: &'static str = "MSG_REGION";
    pub const PROPERTY_TRACE_SWITCH: &'static str = "TRACE_ON";
    pub const PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX: &'static str = "UNIQ_KEY";
    pub const PROPERTY_EXTEND_UNIQ_INFO: &'static str = "EXTEND_UNIQ_INFO";
    pub const PROPERTY_MAX_RECONSUME_TIMES: &'static str = "MAX_RECONSUME_TIMES";
    pub const PROPERTY_CONSUME_START_TIMESTAMP: &'static str = "CONSUME_START_TIME";
    pub const PROPERTY_INNER_NUM: &'static str = "INNER_NUM";
    pub const PROPERTY_INNER_BASE: &'static str = "INNER_BASE";
    pub const DUP_INFO: &'static str = "DUP_INFO";
    pub const PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS: &'static str =
        "CHECK_IMMUNITY_TIME_IN_SECONDS";
    pub const PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET: &'static str =
        "TRAN_PREPARED_QUEUE_OFFSET";
    pub const PROPERTY_TRANSACTION_ID: &'static str = "__transactionId__";
    pub const PROPERTY_TRANSACTION_CHECK_TIMES: &'static str = "TRANSACTION_CHECK_TIMES";
    pub const PROPERTY_INSTANCE_ID: &'static str = "INSTANCE_ID";
    pub const PROPERTY_CORRELATION_ID: &'static str = "CORRELATION_ID";
    pub const PROPERTY_MESSAGE_REPLY_TO_CLIENT: &'static str = "REPLY_TO_CLIENT";
    pub const PROPERTY_MESSAGE_TTL: &'static str = "TTL";
    pub const PROPERTY_REPLY_MESSAGE_ARRIVE_TIME: &'static str = "ARRIVE_TIME";
    pub const PROPERTY_PUSH_REPLY_TIME: &'static str = "PUSH_REPLY_TIME";
    pub const PROPERTY_CLUSTER: &'static str = "CLUSTER";
    pub const PROPERTY_MESSAGE_TYPE: &'static str = "MSG_TYPE";
    pub const PROPERTY_POP_CK: &'static str = "POP_CK";
    pub const PROPERTY_POP_CK_OFFSET: &'static str = "POP_CK_OFFSET";
    pub const PROPERTY_FIRST_POP_TIME: &'static str = "1ST_POP_TIME";
    pub const PROPERTY_SHARDING_KEY: &'static str = "__SHARDINGKEY";
    pub const PROPERTY_FORWARD_QUEUE_ID: &'static str = "PROPERTY_FORWARD_QUEUE_ID";
    pub const PROPERTY_REDIRECT: &'static str = "REDIRECT";
    pub const PROPERTY_INNER_MULTI_DISPATCH: &'static str = "INNER_MULTI_DISPATCH";
    pub const PROPERTY_INNER_MULTI_QUEUE_OFFSET: &'static str = "INNER_MULTI_QUEUE_OFFSET";
    pub const PROPERTY_TRACE_CONTEXT: &'static str = "TRACE_CONTEXT";
    pub const PROPERTY_TIMER_DELAY_SEC: &'static str = "TIMER_DELAY_SEC";
    pub const PROPERTY_TIMER_DELIVER_MS: &'static str = "TIMER_DELIVER_MS";
    pub const PROPERTY_BORN_HOST: &'static str = "__BORNHOST";
    pub const PROPERTY_BORN_TIMESTAMP: &'static str = "BORN_TIMESTAMP";

    /**
     * property which name starts with "__RMQ.TRANSIENT." is called transient one that will not
     * be stored in broker disks.
     */
    pub const PROPERTY_TRANSIENT_PREFIX: &'static str = "__RMQ.TRANSIENT.";

    /**
     * the transient property key of topicSysFlag (set by the client when pulling messages)
     */
    pub const PROPERTY_TRANSIENT_TOPIC_CONFIG: &'static str = "__RMQ.TRANSIENT.TOPIC_SYS_FLAG";

    /**
     * the transient property key of groupSysFlag (set by the client when pulling messages)
     */
    pub const PROPERTY_TRANSIENT_GROUP_CONFIG: &'static str = "__RMQ.TRANSIENT.GROUP_SYS_FLAG";

    pub const KEY_SEPARATOR: &'static str = " ";

    pub const PROPERTY_TIMER_ENQUEUE_MS: &'static str = "TIMER_ENQUEUE_MS";
    pub const PROPERTY_TIMER_DEQUEUE_MS: &'static str = "TIMER_DEQUEUE_MS";
    pub const PROPERTY_TIMER_ROLL_TIMES: &'static str = "TIMER_ROLL_TIMES";
    pub const PROPERTY_TIMER_OUT_MS: &'static str = "TIMER_OUT_MS";
    pub const PROPERTY_TIMER_DEL_UNIQKEY: &'static str = "TIMER_DEL_UNIQKEY";
    pub const PROPERTY_TIMER_DELAY_LEVEL: &'static str = "TIMER_DELAY_LEVEL";
    pub const PROPERTY_TIMER_DELAY_MS: &'static str = "TIMER_DELAY_MS";
    pub const PROPERTY_CRC32: &'static str = "__CRC32#";

    /**
     * properties for DLQ
     */
    pub const PROPERTY_DLQ_ORIGIN_TOPIC: &'static str = "DLQ_ORIGIN_TOPIC";
    pub const PROPERTY_DLQ_ORIGIN_MESSAGE_ID: &'static str = "DLQ_ORIGIN_MESSAGE_ID";
}

lazy_static! {
    pub static ref STRING_HASH_SET: HashSet<&'static str> = {
        let mut set = HashSet::with_capacity(64);
        set.insert(MessageConst::PROPERTY_TRACE_SWITCH);
        set.insert(MessageConst::PROPERTY_MSG_REGION);
        set.insert(MessageConst::PROPERTY_KEYS);
        set.insert(MessageConst::PROPERTY_TAGS);
        set.insert(MessageConst::PROPERTY_WAIT_STORE_MSG_OK);
        set.insert(MessageConst::PROPERTY_DELAY_TIME_LEVEL);
        set.insert(MessageConst::PROPERTY_RETRY_TOPIC);
        set.insert(MessageConst::PROPERTY_REAL_TOPIC);
        set.insert(MessageConst::PROPERTY_REAL_QUEUE_ID);
        set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED);
        set.insert(MessageConst::PROPERTY_PRODUCER_GROUP);
        set.insert(MessageConst::PROPERTY_MIN_OFFSET);
        set.insert(MessageConst::PROPERTY_MAX_OFFSET);
        set.insert(MessageConst::PROPERTY_BUYER_ID);
        set.insert(MessageConst::PROPERTY_ORIGIN_MESSAGE_ID);
        set.insert(MessageConst::PROPERTY_TRANSFER_FLAG);
        set.insert(MessageConst::PROPERTY_CORRECTION_FLAG);
        set.insert(MessageConst::PROPERTY_MQ2_FLAG);
        set.insert(MessageConst::PROPERTY_RECONSUME_TIME);
        set.insert(MessageConst::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
        set.insert(MessageConst::PROPERTY_MAX_RECONSUME_TIMES);
        set.insert(MessageConst::PROPERTY_CONSUME_START_TIMESTAMP);
        set.insert(MessageConst::PROPERTY_POP_CK);
        set.insert(MessageConst::PROPERTY_POP_CK_OFFSET);
        set.insert(MessageConst::PROPERTY_FIRST_POP_TIME);
        set.insert(MessageConst::PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET);
        set.insert(MessageConst::DUP_INFO);
        set.insert(MessageConst::PROPERTY_EXTEND_UNIQ_INFO);
        set.insert(MessageConst::PROPERTY_INSTANCE_ID);
        set.insert(MessageConst::PROPERTY_CORRELATION_ID);
        set.insert(MessageConst::PROPERTY_MESSAGE_REPLY_TO_CLIENT);
        set.insert(MessageConst::PROPERTY_MESSAGE_TTL);
        set.insert(MessageConst::PROPERTY_REPLY_MESSAGE_ARRIVE_TIME);
        set.insert(MessageConst::PROPERTY_PUSH_REPLY_TIME);
        set.insert(MessageConst::PROPERTY_CLUSTER);
        set.insert(MessageConst::PROPERTY_MESSAGE_TYPE);
        set.insert(MessageConst::PROPERTY_INNER_MULTI_QUEUE_OFFSET);
        set.insert(MessageConst::PROPERTY_TIMER_DELAY_MS);
        set.insert(MessageConst::PROPERTY_TIMER_DELAY_SEC);
        set.insert(MessageConst::PROPERTY_TIMER_DELIVER_MS);
        set.insert(MessageConst::PROPERTY_TIMER_ENQUEUE_MS);
        set.insert(MessageConst::PROPERTY_TIMER_DEQUEUE_MS);
        set.insert(MessageConst::PROPERTY_TIMER_ROLL_TIMES);
        set.insert(MessageConst::PROPERTY_TIMER_OUT_MS);
        set.insert(MessageConst::PROPERTY_TIMER_DEL_UNIQKEY);
        set.insert(MessageConst::PROPERTY_TIMER_DELAY_LEVEL);
        set.insert(MessageConst::PROPERTY_BORN_HOST);
        set.insert(MessageConst::PROPERTY_BORN_TIMESTAMP);
        set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_TOPIC);
        set.insert(MessageConst::PROPERTY_DLQ_ORIGIN_MESSAGE_ID);
        set.insert(MessageConst::PROPERTY_CRC32);
        set
    };
}