Skip to main content

topiq_core/
message.rs

1use bytes::Bytes;
2use serde::{Deserialize, Deserializer, Serialize, Serializer};
3
4use crate::topic::Subject;
5
6/// A message flowing through the pub/sub system.
7///
8/// Payloads are opaque bytes -- the broker never inspects them.
9#[derive(Debug, Clone, PartialEq, Eq)]
10pub struct Message {
11    pub topic: Subject,
12    pub payload: Bytes,
13    pub reply_to: Option<Subject>,
14}
15
16impl Serialize for Message {
17    fn serialize<S: Serializer>(&self, serializer: S) -> Result<S::Ok, S::Error> {
18        use serde::ser::SerializeStruct;
19        let mut s = serializer.serialize_struct("Message", 3)?;
20        s.serialize_field("topic", &self.topic)?;
21        s.serialize_field("payload", self.payload.as_ref() as &[u8])?;
22        s.serialize_field("reply_to", &self.reply_to)?;
23        s.end()
24    }
25}
26
27impl<'de> Deserialize<'de> for Message {
28    fn deserialize<D: Deserializer<'de>>(deserializer: D) -> Result<Self, D::Error> {
29        #[derive(Deserialize)]
30        struct MessageHelper {
31            topic: Subject,
32            payload: Vec<u8>,
33            reply_to: Option<Subject>,
34        }
35        let helper = MessageHelper::deserialize(deserializer)?;
36        Ok(Message {
37            topic: helper.topic,
38            payload: Bytes::from(helper.payload),
39            reply_to: helper.reply_to,
40        })
41    }
42}
43
44impl Message {
45    pub fn new(topic: Subject, payload: Bytes) -> Self {
46        Self {
47            topic,
48            payload,
49            reply_to: None,
50        }
51    }
52
53    pub fn with_reply(topic: Subject, payload: Bytes, reply_to: Subject) -> Self {
54        Self {
55            topic,
56            payload,
57            reply_to: Some(reply_to),
58        }
59    }
60
61    pub fn payload_len(&self) -> usize {
62        self.payload.len()
63    }
64}
65
66#[cfg(test)]
67mod tests {
68    use super::*;
69
70    #[test]
71    fn new_message() {
72        let msg = Message::new(
73            Subject::new("test.topic").unwrap(),
74            Bytes::from("hello"),
75        );
76        assert_eq!(msg.topic.as_str(), "test.topic");
77        assert_eq!(msg.payload, Bytes::from("hello"));
78        assert!(msg.reply_to.is_none());
79    }
80
81    #[test]
82    fn message_with_reply() {
83        let msg = Message::with_reply(
84            Subject::new("request").unwrap(),
85            Bytes::from("ping"),
86            Subject::new("reply.inbox").unwrap(),
87        );
88        assert_eq!(msg.reply_to.unwrap().as_str(), "reply.inbox");
89    }
90
91    #[test]
92    fn payload_len() {
93        let msg = Message::new(Subject::new("t").unwrap(), Bytes::from("12345"));
94        assert_eq!(msg.payload_len(), 5);
95    }
96}