1use std::error::Error;
2
3use bincode::{Decode, Encode};
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode, PartialEq, Eq)]
7pub enum MessageStatus {
8 New,
9 Reconsume,
10 Pending(u8, u64, u64),
11 Dead,
12 Acked,
13}
14
15impl MessageStatus {
16 pub fn is_pending(&self) -> bool {
17 match self {
18 MessageStatus::Pending(_, _, _) => true,
19 _ => false,
20 }
21 }
22}
23
24#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
25pub struct MessageHistory {
26 pub status: MessageStatus,
27 pub timestamp: u64,
28}
29
30#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
31pub struct MessageBox {
32 pub id: u64,
33 pub status: MessageStatus,
34 pub timestamp: u64,
35 pub message: Message,
36 pub history: Vec<MessageHistory>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Encode, Decode)]
40pub enum MsgStatus {
41 Success,
42 Warning,
43 Failure,
44 Error,
45}
46
47#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
87#[serde(tag = "type", content = "data")]
88pub enum Message {
89 ReqPing(ReqMsgPing),
90 RespPing(RespMsgPing),
91 ReqAuthorizer(ReqMsgAuthorizer),
92 RespAuthorizer(RespMsgAuthorizer),
93 ReqSubscribeTopic(ReqMsgSubscriber),
94 RespSubscribeTopic(RespMsgSubscriber),
95 ReqUnsubscribeTopic(ReqMsgUnsubscriber),
96 RespUnsubscribeTopic(RespMsgUnsubscriber),
97 ReqPublish(ReqMsgPublish),
98 RespPublish(RespMsgPublish),
99 ReqSubscribe(ReqMsgSubscribe),
100 RespSubscribe(RespMsgSubscribe),
101 ReqConsumerTopic(ReqMsgConsumerTopic),
102 RespConsumerTopic(RespMsgConsumerTopic),
103 ReqUnconsumerTopic(ReqMsgUnconsumerTopic),
104 RespUnconsumerTopic(RespMsgUnconsumerTopic),
105 ReqPullMessage(ReqPullMessage),
106 RespPullMessage(RespPullMessage),
107 ReqProduceNormal(ReqMsgProduceNormal),
108 ReqProduceOrdered(ReqMsgProduceOrdered),
109 ReqProduceDelay(ReqMsgProduceDelay),
110 RespProduceNormal(RespMsgProduceNormal),
111 RespProduceOrdered(RespMsgProduceOrdered),
112 RespProduceDelay(RespMsgProduceDelay),
113 ReqConsume(ReqMsgConsume),
114 RespConsume(RespMsgConsume),
115 ReqConsumeAck(ReqMsgConsumeAck),
116 RespConsumeAck(RespMsgConsumeAck),
117 ReqConsumeAckMulti(ReqMsgConsumeAckMulti),
118 RespConsumeAckMulti(RespMsgConsumeAckMulti),
119 ReqReconsumeLater(ReqReconsumeLater),
120 RespReconsumeLater(RespReconsumeLater),
121 Error(String),
122 ReqMessageList(ReqMsgList),
123 RespMessageList(RespMsgList),
124}
125
126#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
127pub struct ReqMsgPing {}
128
129#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
130pub struct RespMsgPing {}
131
132#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
133pub struct ReqMsgAuthorizer {
134 pub access_key: String,
135 pub access_secret: String,
136}
137
138#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
139pub struct RespMsgAuthorizer {
140 pub id: u64,
141 pub status: MsgStatus,
142 pub msg: String,
143}
144
145#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
146pub struct ReqMsgSubscriber {
147 pub topic: String,
148}
149
150#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
151pub struct RespMsgSubscriber {
152 pub id: u64,
153 pub status: MsgStatus,
154 pub topic: String,
155 pub msg: String,
156}
157
158#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
159pub struct ReqMsgUnsubscriber {
160 pub topic: String,
161}
162
163#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
164pub struct RespMsgUnsubscriber {
165 pub id: u64,
166 pub status: MsgStatus,
167 pub topic: String,
168 pub msg: String,
169}
170
171#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
172pub struct ReqMsgPublish {
173 pub topic: String,
174 pub message: Vec<u8>,
175}
176
177#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
178pub struct RespMsgPublish {
179 pub id: u64,
180 pub status: MsgStatus,
181 pub topic: String,
182 pub msg: String,
183}
184
185#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
186pub struct ReqMsgSubscribe {
187 pub topic: String,
188}
189
190#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
191pub struct RespMsgSubscribe {
192 pub id: u64,
193 pub status: MsgStatus,
194 pub topic: String,
195 pub message: Vec<u8>,
196}
197
198#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
199pub struct ReqMsgConsumerTopic {
200 pub topic: String,
201}
202
203#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
204pub struct RespMsgConsumerTopic {
205 pub id: u64,
206 pub status: MsgStatus,
207 pub topic: String,
208 pub msg: String,
209}
210
211#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
212pub struct ReqMsgUnconsumerTopic {
213 pub topic: String,
214}
215
216#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
217pub struct RespMsgUnconsumerTopic {
218 pub id: u64,
219 pub status: MsgStatus,
220 pub topic: String,
221 pub msg: String,
222}
223
224#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
225pub struct ReqPullMessage {
226 pub topic: String,
227 pub total: u32,
228}
229
230#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
231pub struct RespPullMsg {
232 pub id: u64,
233 pub message: Vec<u8>,
234}
235
236#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
237pub struct RespPullMessage {
238 pub topic: String,
239 pub messages: Vec<RespPullMsg>,
240}
241
242#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
243pub struct ReqMsgProduceNormal {
244 pub topic: String,
245 pub message: Vec<u8>,
246}
247
248#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
249pub struct RespMsgProduceNormal {
250 pub id: u64,
251 pub status: MsgStatus,
252 pub topic: String,
253 pub msg: String,
254}
255
256#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
257pub struct ReqMsgProduceOrdered {
258 pub topic: String,
259 pub message: Vec<u8>,
260}
261
262#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
263pub struct RespMsgProduceOrdered {
264 pub id: u64,
265 pub status: MsgStatus,
266 pub topic: String,
267 pub msg: String,
268}
269
270#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
271pub struct ReqMsgProduceDelay {
272 pub topic: String,
273 pub message: Vec<u8>,
274 pub delay: u64,
275}
276
277#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
278pub struct RespMsgProduceDelay {
279 pub id: u64,
280 pub status: MsgStatus,
281 pub topic: String,
282 pub msg: String,
283 pub delay: u64,
284}
285
286#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
287pub struct ReqMsgConsume {
288 pub topic: String,
289}
290
291#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
292pub struct RespMsgConsume {
293 pub id: u64,
294 pub topic: String,
295 pub message: Vec<u8>,
296}
297
298#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
299pub struct ReqMsgConsumeAck {
300 pub id: u64,
301}
302
303#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
304pub struct RespMsgConsumeAck {
305 pub id: u64,
306 pub status: MsgStatus,
307 pub msg: String,
308}
309
310#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
311pub struct ReqMsgConsumeAckMulti {
312 pub ids: Vec<u64>,
313}
314
315#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
316pub struct RespMsgConsumeAckMulti {
317 pub id: u64,
318 pub status: MsgStatus,
319 pub msg: String,
320}
321
322#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
323pub struct ReqReconsumeLater {
324 pub id: u64,
325}
326
327#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
328pub struct RespReconsumeLater {
329 pub id: u64,
330 pub status: MsgStatus,
331 pub msg: String,
332}
333
334#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
335pub struct ReqMsgList {
336 pub topic: String,
337 pub page_size: u32,
338 pub page_num: u32,
339}
340
341#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
342pub struct RespMsgList {
343 pub id: u64,
344 pub status: MsgStatus,
345 pub topic: String,
346 pub message_list: Vec<MessageBox>,
347}
348
349impl PartialEq for Message {
350 fn eq(&self, _other: &Self) -> bool {
351 match (self, _other) {
352 (Message::ReqPing(_), Message::ReqPing(_)) => true,
353 (Message::RespPing(_), Message::RespPing(_)) => true,
354 (Message::ReqAuthorizer(req1), Message::ReqAuthorizer(req2)) => {
355 req1.access_key == req2.access_key && req1.access_secret == req2.access_secret
356 }
357 (Message::RespAuthorizer(resp1), Message::RespAuthorizer(resp2)) => {
358 resp1.id == resp2.id && resp1.status == resp2.status && resp1.msg == resp2.msg
359 }
360 (Message::ReqSubscribeTopic(req1), Message::ReqSubscribeTopic(req2)) => {
361 req1.topic == req2.topic
362 }
363 (Message::RespSubscribeTopic(resp1), Message::RespSubscribeTopic(resp2)) => {
364 resp1.id == resp2.id
365 && resp1.status == resp2.status
366 && resp1.topic == resp2.topic
367 && resp1.msg == resp2.msg
368 }
369 (Message::ReqUnsubscribeTopic(req1), Message::ReqUnsubscribeTopic(req2)) => {
370 req1.topic == req2.topic
371 }
372 (Message::RespUnsubscribeTopic(resp1), Message::RespUnsubscribeTopic(resp2)) => {
373 resp1.id == resp2.id
374 && resp1.status == resp2.status
375 && resp1.topic == resp2.topic
376 && resp1.msg == resp2.msg
377 }
378 (Message::ReqPublish(req1), Message::ReqPublish(req2)) => {
379 req1.topic == req2.topic && req1.message == req2.message
380 }
381 (Message::RespPublish(resp1), Message::RespPublish(resp2)) => {
382 resp1.id == resp2.id
383 && resp1.status == resp2.status
384 && resp1.topic == resp2.topic
385 && resp1.msg == resp2.msg
386 }
387 (Message::ReqSubscribe(req1), Message::ReqSubscribe(req2)) => req1.topic == req2.topic,
388 (Message::RespSubscribe(resp1), Message::RespSubscribe(resp2)) => {
389 resp1.id == resp2.id
390 && resp1.status == resp2.status
391 && resp1.topic == resp2.topic
392 && resp1.message == resp2.message
393 }
394 (Message::ReqProduceNormal(req1), Message::ReqProduceNormal(req2)) => {
395 req1.topic == req2.topic && req1.message == req2.message
396 }
397 (Message::RespProduceNormal(resp1), Message::RespProduceNormal(resp2)) => {
398 resp1.id == resp2.id
399 && resp1.status == resp2.status
400 && resp1.topic == resp2.topic
401 && resp1.msg == resp2.msg
402 }
403 (Message::ReqProduceOrdered(req1), Message::ReqProduceOrdered(req2)) => {
404 req1.topic == req2.topic && req1.message == req2.message
405 }
406 (Message::RespProduceOrdered(resp1), Message::RespProduceOrdered(resp2)) => {
407 resp1.id == resp2.id
408 && resp1.status == resp2.status
409 && resp1.topic == resp2.topic
410 && resp1.msg == resp2.msg
411 }
412 (Message::ReqProduceDelay(req1), Message::ReqProduceDelay(req2)) => {
413 req1.topic == req2.topic && req1.message == req2.message && req1.delay == req2.delay
414 }
415 (Message::RespProduceDelay(resp1), Message::RespProduceDelay(resp2)) => {
416 resp1.id == resp2.id
417 && resp1.status == resp2.status
418 && resp1.topic == resp2.topic
419 && resp1.msg == resp2.msg
420 && resp1.delay == resp2.delay
421 }
422 (Message::ReqConsume(req1), Message::ReqConsume(req2)) => req1.topic == req2.topic,
423 (Message::RespConsume(resp1), Message::RespConsume(resp2)) => {
424 resp1.id == resp2.id && resp1.topic == resp2.topic && resp1.message == resp2.message
425 }
426 (Message::ReqConsumeAck(req1), Message::ReqConsumeAck(req2)) => req1.id == req2.id,
427 (Message::RespConsumeAck(resp1), Message::RespConsumeAck(resp2)) => {
428 resp1.id == resp2.id && resp1.status == resp2.status && resp1.msg == resp2.msg
429 }
430 (Message::ReqReconsumeLater(req1), Message::ReqReconsumeLater(req2)) => {
431 req1.id == req2.id
432 }
433 (Message::RespReconsumeLater(resp1), Message::RespReconsumeLater(resp2)) => {
434 resp1.id == resp2.id && resp1.status == resp2.status && resp1.msg == resp2.msg
435 }
436 (Message::Error(err1), Message::Error(err2)) => err1 == err2,
437 _ => false,
438 }
439 }
440}
441
442impl Eq for Message {}
443
444impl Message {
445 pub fn serialize(&self) -> Result<Vec<u8>, Box<dyn Error>> {
446 let bytes = serde_json::to_vec(self)?;
447 Ok(bytes)
448 }
449
450 pub fn deserialize(bytes: &[u8]) -> Result<Self, Box<dyn Error>> {
451 let message: Message = serde_json::from_slice(bytes)?;
452 Ok(message)
453 }
454}