amq/
message.rs

1use std::error::Error;
2
3use bincode::{Decode, Encode};
4use serde::{Deserialize, Serialize};
5
6#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode, PartialEq, Eq)]
7pub enum MessageStatus {
8    New,
9    Reconsume,
10    Pending(u8, u64, u64),
11    Dead,
12    Acked,
13}
14
15impl MessageStatus {
16    pub fn is_pending(&self) -> bool {
17        match self {
18            MessageStatus::Pending(_, _, _) => true,
19            _ => false,
20        }
21    }
22}
23
24#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
25pub struct MessageHistory {
26    pub status: MessageStatus,
27    pub timestamp: u64,
28}
29
30#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
31pub struct MessageBox {
32    pub id: u64,
33    pub status: MessageStatus,
34    pub timestamp: u64,
35    pub message: Message,
36    pub history: Vec<MessageHistory>,
37}
38
39#[derive(Debug, Clone, PartialEq, Eq, Deserialize, Serialize, Encode, Decode)]
40pub enum MsgStatus {
41    Success,
42    Warning,
43    Failure,
44    Error,
45}
46
47/// Message for Amq.
48///
49/// # Variants
50///
51/// - `ReqPing`: Request ping.
52/// - `RespPing`: Response ping.
53/// - `ReqAuthorizer`: Request authorizer.
54/// - `RespAuthorizer`: Response authorizer.
55/// - `ReqSubscribeTopic`: Request subscribe topic.
56/// - `RespSubscribeTopic`: Response subscribe topic.
57/// - `ReqUnsubscribeTopic`: Request unsubscribe topic.
58/// - `RespUnsubscribeTopic`: Response unsubscribe topic.
59/// - `ReqPublish`: Request publish message.
60/// - `RespPublish`: Response publish message.
61/// - `ReqSubscribe`: Request subscribe message.
62/// - `RespSubscribe`: Response subscribe message.
63/// - `ReqConsumerTopic`: Request consumer topic.
64/// - `RespConsumerTopic`: Response consumer topic.
65/// - `ReqUnconsumerTopic`: Request unconsumer topic.
66/// - `RespUnconsumerTopic`: Response unconsumer topic.
67/// - `ReqPullMessage`: Request pull message.
68/// - `RespPullMessage`: Response pull message.
69/// - `ReqProduceNormal`: Request produce normal message.
70/// - `ReqProduceOrdered`: Request produce ordered message.
71/// - `ReqProduceDelay`: Request produce delay message.
72/// - `RespProduceNormal`: Response produce normal message.
73/// - `RespProduceOrdered`: Response produce ordered message.
74/// - `RespProduceDelay`: Response produce delay message.
75/// - `ReqConsume`: Request consume message.
76/// - `RespConsume`: Response consume message.
77/// - `ReqConsumeAck`: Request consume ack message.
78/// - `RespConsumeAck`: Response consume ack message.
79/// - `ReqConsumeAckMulti`: Request consume ack multi message.
80/// - `RespConsumeAckMulti`: Response consume ack multi message.
81/// - `ReqReconsumeLater`: Request reconsume later message.
82/// - `RespReconsumeLater`: Response reconsume later message.
83/// - `Error`: Error message.
84/// - `ReqMessageList`: Request message list.
85/// - `RespMessageList`: Response message list.
86#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
87#[serde(tag = "type", content = "data")]
88pub enum Message {
89    ReqPing(ReqMsgPing),
90    RespPing(RespMsgPing),
91    ReqAuthorizer(ReqMsgAuthorizer),
92    RespAuthorizer(RespMsgAuthorizer),
93    ReqSubscribeTopic(ReqMsgSubscriber),
94    RespSubscribeTopic(RespMsgSubscriber),
95    ReqUnsubscribeTopic(ReqMsgUnsubscriber),
96    RespUnsubscribeTopic(RespMsgUnsubscriber),
97    ReqPublish(ReqMsgPublish),
98    RespPublish(RespMsgPublish),
99    ReqSubscribe(ReqMsgSubscribe),
100    RespSubscribe(RespMsgSubscribe),
101    ReqConsumerTopic(ReqMsgConsumerTopic),
102    RespConsumerTopic(RespMsgConsumerTopic),
103    ReqUnconsumerTopic(ReqMsgUnconsumerTopic),
104    RespUnconsumerTopic(RespMsgUnconsumerTopic),
105    ReqPullMessage(ReqPullMessage),
106    RespPullMessage(RespPullMessage),
107    ReqProduceNormal(ReqMsgProduceNormal),
108    ReqProduceOrdered(ReqMsgProduceOrdered),
109    ReqProduceDelay(ReqMsgProduceDelay),
110    RespProduceNormal(RespMsgProduceNormal),
111    RespProduceOrdered(RespMsgProduceOrdered),
112    RespProduceDelay(RespMsgProduceDelay),
113    ReqConsume(ReqMsgConsume),
114    RespConsume(RespMsgConsume),
115    ReqConsumeAck(ReqMsgConsumeAck),
116    RespConsumeAck(RespMsgConsumeAck),
117    ReqConsumeAckMulti(ReqMsgConsumeAckMulti),
118    RespConsumeAckMulti(RespMsgConsumeAckMulti),
119    ReqReconsumeLater(ReqReconsumeLater),
120    RespReconsumeLater(RespReconsumeLater),
121    Error(String),
122    ReqMessageList(ReqMsgList),
123    RespMessageList(RespMsgList),
124}
125
126#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
127pub struct ReqMsgPing {}
128
129#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
130pub struct RespMsgPing {}
131
132#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
133pub struct ReqMsgAuthorizer {
134    pub access_key: String,
135    pub access_secret: String,
136}
137
138#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
139pub struct RespMsgAuthorizer {
140    pub id: u64,
141    pub status: MsgStatus,
142    pub msg: String,
143}
144
145#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
146pub struct ReqMsgSubscriber {
147    pub topic: String,
148}
149
150#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
151pub struct RespMsgSubscriber {
152    pub id: u64,
153    pub status: MsgStatus,
154    pub topic: String,
155    pub msg: String,
156}
157
158#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
159pub struct ReqMsgUnsubscriber {
160    pub topic: String,
161}
162
163#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
164pub struct RespMsgUnsubscriber {
165    pub id: u64,
166    pub status: MsgStatus,
167    pub topic: String,
168    pub msg: String,
169}
170
171#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
172pub struct ReqMsgPublish {
173    pub topic: String,
174    pub message: Vec<u8>,
175}
176
177#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
178pub struct RespMsgPublish {
179    pub id: u64,
180    pub status: MsgStatus,
181    pub topic: String,
182    pub msg: String,
183}
184
185#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
186pub struct ReqMsgSubscribe {
187    pub topic: String,
188}
189
190#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
191pub struct RespMsgSubscribe {
192    pub id: u64,
193    pub status: MsgStatus,
194    pub topic: String,
195    pub message: Vec<u8>,
196}
197
198#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
199pub struct ReqMsgConsumerTopic {
200    pub topic: String,
201}
202
203#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
204pub struct RespMsgConsumerTopic {
205    pub id: u64,
206    pub status: MsgStatus,
207    pub topic: String,
208    pub msg: String,
209}
210
211#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
212pub struct ReqMsgUnconsumerTopic {
213    pub topic: String,
214}
215
216#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
217pub struct RespMsgUnconsumerTopic {
218    pub id: u64,
219    pub status: MsgStatus,
220    pub topic: String,
221    pub msg: String,
222}
223
224#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
225pub struct ReqPullMessage {
226    pub topic: String,
227    pub total: u32,
228}
229
230#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
231pub struct RespPullMsg {
232    pub id: u64,
233    pub message: Vec<u8>,
234}
235
236#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
237pub struct RespPullMessage {
238    pub topic: String,
239    pub messages: Vec<RespPullMsg>,
240}
241
242#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
243pub struct ReqMsgProduceNormal {
244    pub topic: String,
245    pub message: Vec<u8>,
246}
247
248#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
249pub struct RespMsgProduceNormal {
250    pub id: u64,
251    pub status: MsgStatus,
252    pub topic: String,
253    pub msg: String,
254}
255
256#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
257pub struct ReqMsgProduceOrdered {
258    pub topic: String,
259    pub message: Vec<u8>,
260}
261
262#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
263pub struct RespMsgProduceOrdered {
264    pub id: u64,
265    pub status: MsgStatus,
266    pub topic: String,
267    pub msg: String,
268}
269
270#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
271pub struct ReqMsgProduceDelay {
272    pub topic: String,
273    pub message: Vec<u8>,
274    pub delay: u64,
275}
276
277#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
278pub struct RespMsgProduceDelay {
279    pub id: u64,
280    pub status: MsgStatus,
281    pub topic: String,
282    pub msg: String,
283    pub delay: u64,
284}
285
286#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
287pub struct ReqMsgConsume {
288    pub topic: String,
289}
290
291#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
292pub struct RespMsgConsume {
293    pub id: u64,
294    pub topic: String,
295    pub message: Vec<u8>,
296}
297
298#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
299pub struct ReqMsgConsumeAck {
300    pub id: u64,
301}
302
303#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
304pub struct RespMsgConsumeAck {
305    pub id: u64,
306    pub status: MsgStatus,
307    pub msg: String,
308}
309
310#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
311pub struct ReqMsgConsumeAckMulti {
312    pub ids: Vec<u64>,
313}
314
315#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
316pub struct RespMsgConsumeAckMulti {
317    pub id: u64,
318    pub status: MsgStatus,
319    pub msg: String,
320}
321
322#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
323pub struct ReqReconsumeLater {
324    pub id: u64,
325}
326
327#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
328pub struct RespReconsumeLater {
329    pub id: u64,
330    pub status: MsgStatus,
331    pub msg: String,
332}
333
334#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
335pub struct ReqMsgList {
336    pub topic: String,
337    pub page_size: u32,
338    pub page_num: u32,
339}
340
341#[derive(Debug, Clone, Deserialize, Serialize, Encode, Decode)]
342pub struct RespMsgList {
343    pub id: u64,
344    pub status: MsgStatus,
345    pub topic: String,
346    pub message_list: Vec<MessageBox>,
347}
348
349impl PartialEq for Message {
350    fn eq(&self, _other: &Self) -> bool {
351        match (self, _other) {
352            (Message::ReqPing(_), Message::ReqPing(_)) => true,
353            (Message::RespPing(_), Message::RespPing(_)) => true,
354            (Message::ReqAuthorizer(req1), Message::ReqAuthorizer(req2)) => {
355                req1.access_key == req2.access_key && req1.access_secret == req2.access_secret
356            }
357            (Message::RespAuthorizer(resp1), Message::RespAuthorizer(resp2)) => {
358                resp1.id == resp2.id && resp1.status == resp2.status && resp1.msg == resp2.msg
359            }
360            (Message::ReqSubscribeTopic(req1), Message::ReqSubscribeTopic(req2)) => {
361                req1.topic == req2.topic
362            }
363            (Message::RespSubscribeTopic(resp1), Message::RespSubscribeTopic(resp2)) => {
364                resp1.id == resp2.id
365                    && resp1.status == resp2.status
366                    && resp1.topic == resp2.topic
367                    && resp1.msg == resp2.msg
368            }
369            (Message::ReqUnsubscribeTopic(req1), Message::ReqUnsubscribeTopic(req2)) => {
370                req1.topic == req2.topic
371            }
372            (Message::RespUnsubscribeTopic(resp1), Message::RespUnsubscribeTopic(resp2)) => {
373                resp1.id == resp2.id
374                    && resp1.status == resp2.status
375                    && resp1.topic == resp2.topic
376                    && resp1.msg == resp2.msg
377            }
378            (Message::ReqPublish(req1), Message::ReqPublish(req2)) => {
379                req1.topic == req2.topic && req1.message == req2.message
380            }
381            (Message::RespPublish(resp1), Message::RespPublish(resp2)) => {
382                resp1.id == resp2.id
383                    && resp1.status == resp2.status
384                    && resp1.topic == resp2.topic
385                    && resp1.msg == resp2.msg
386            }
387            (Message::ReqSubscribe(req1), Message::ReqSubscribe(req2)) => req1.topic == req2.topic,
388            (Message::RespSubscribe(resp1), Message::RespSubscribe(resp2)) => {
389                resp1.id == resp2.id
390                    && resp1.status == resp2.status
391                    && resp1.topic == resp2.topic
392                    && resp1.message == resp2.message
393            }
394            (Message::ReqProduceNormal(req1), Message::ReqProduceNormal(req2)) => {
395                req1.topic == req2.topic && req1.message == req2.message
396            }
397            (Message::RespProduceNormal(resp1), Message::RespProduceNormal(resp2)) => {
398                resp1.id == resp2.id
399                    && resp1.status == resp2.status
400                    && resp1.topic == resp2.topic
401                    && resp1.msg == resp2.msg
402            }
403            (Message::ReqProduceOrdered(req1), Message::ReqProduceOrdered(req2)) => {
404                req1.topic == req2.topic && req1.message == req2.message
405            }
406            (Message::RespProduceOrdered(resp1), Message::RespProduceOrdered(resp2)) => {
407                resp1.id == resp2.id
408                    && resp1.status == resp2.status
409                    && resp1.topic == resp2.topic
410                    && resp1.msg == resp2.msg
411            }
412            (Message::ReqProduceDelay(req1), Message::ReqProduceDelay(req2)) => {
413                req1.topic == req2.topic && req1.message == req2.message && req1.delay == req2.delay
414            }
415            (Message::RespProduceDelay(resp1), Message::RespProduceDelay(resp2)) => {
416                resp1.id == resp2.id
417                    && resp1.status == resp2.status
418                    && resp1.topic == resp2.topic
419                    && resp1.msg == resp2.msg
420                    && resp1.delay == resp2.delay
421            }
422            (Message::ReqConsume(req1), Message::ReqConsume(req2)) => req1.topic == req2.topic,
423            (Message::RespConsume(resp1), Message::RespConsume(resp2)) => {
424                resp1.id == resp2.id && resp1.topic == resp2.topic && resp1.message == resp2.message
425            }
426            (Message::ReqConsumeAck(req1), Message::ReqConsumeAck(req2)) => req1.id == req2.id,
427            (Message::RespConsumeAck(resp1), Message::RespConsumeAck(resp2)) => {
428                resp1.id == resp2.id && resp1.status == resp2.status && resp1.msg == resp2.msg
429            }
430            (Message::ReqReconsumeLater(req1), Message::ReqReconsumeLater(req2)) => {
431                req1.id == req2.id
432            }
433            (Message::RespReconsumeLater(resp1), Message::RespReconsumeLater(resp2)) => {
434                resp1.id == resp2.id && resp1.status == resp2.status && resp1.msg == resp2.msg
435            }
436            (Message::Error(err1), Message::Error(err2)) => err1 == err2,
437            _ => false,
438        }
439    }
440}
441
442impl Eq for Message {}
443
444impl Message {
445    pub fn serialize(&self) -> Result<Vec<u8>, Box<dyn Error>> {
446        let bytes = serde_json::to_vec(self)?;
447        Ok(bytes)
448    }
449
450    pub fn deserialize(bytes: &[u8]) -> Result<Self, Box<dyn Error>> {
451        let message: Message = serde_json::from_slice(bytes)?;
452        Ok(message)
453    }
454}