libp2p_pubsub_core/framing/
message.rs

1use bytes::Bytes;
2use libp2p::identity::PeerId;
3
4use libp2p_pubsub_proto::pubsub::MessageProto;
5
6use crate::topic::TopicHash;
7
8#[derive(Clone, PartialEq, Debug)]
9pub struct Message {
10    proto: MessageProto,
11}
12
13impl Message {
14    #[must_use]
15    pub fn new(topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) -> Self {
16        let topic = topic.into();
17        let data = data.into();
18
19        let proto = MessageProto {
20            from: None,
21            data: Some(data.into()),
22            seqno: None,
23            topic: topic.into_string(),
24            signature: None,
25            key: None,
26        };
27
28        Self { proto }
29    }
30
31    #[must_use]
32    pub fn new_with_sequence_number(
33        topic: impl Into<TopicHash>,
34        data: impl Into<Vec<u8>>,
35        seq_no: impl Into<Vec<u8>>,
36    ) -> Self {
37        let mut rpc = Self::new(topic, data);
38        rpc.set_sequence_number(Some(Bytes::from(seq_no.into())));
39        rpc
40    }
41
42    #[must_use]
43    pub fn into_proto(self) -> MessageProto {
44        self.proto
45    }
46
47    #[must_use]
48    pub fn as_proto(&self) -> &MessageProto {
49        &self.proto
50    }
51
52    #[must_use]
53    pub fn source(&self) -> Option<PeerId> {
54        self.proto
55            .from
56            .as_ref()
57            .map(|bytes| PeerId::from_bytes(bytes).expect("valid peer id"))
58    }
59
60    pub fn set_source(&mut self, source: Option<PeerId>) {
61        self.proto.from = source.map(|peer_id| peer_id.to_bytes().into());
62    }
63
64    #[must_use]
65    pub fn data(&self) -> &[u8] {
66        self.proto.data.as_ref().unwrap()
67    }
68
69    #[must_use]
70    pub fn sequence_number(&self) -> Option<Bytes> {
71        self.proto.seqno.clone()
72    }
73
74    pub fn set_sequence_number(&mut self, seq_no: Option<impl Into<Vec<u8>>>) {
75        self.proto.seqno = seq_no.map(|n| n.into().into());
76    }
77
78    #[must_use]
79    pub fn topic_str(&self) -> &str {
80        self.proto.topic.as_str()
81    }
82
83    #[must_use]
84    pub fn topic(&self) -> TopicHash {
85        TopicHash::from_raw(self.topic_str())
86    }
87
88    #[must_use]
89    pub fn signature(&self) -> Option<&[u8]> {
90        self.proto.signature.as_ref().map(|bytes| bytes.as_ref())
91    }
92
93    pub fn set_signature(&mut self, signature: Option<impl Into<Vec<u8>>>) {
94        self.proto.signature = signature.map(|bytes| bytes.into().into());
95    }
96
97    #[must_use]
98    pub fn key(&self) -> Option<&[u8]> {
99        self.proto.key.as_ref().map(|bytes| bytes.as_ref())
100    }
101
102    pub fn set_key(&mut self, key: Option<impl Into<Vec<u8>>>) {
103        self.proto.key = key.map(|bytes| bytes.into().into());
104    }
105}
106
107impl From<MessageProto> for Message {
108    /// Convert from a [`MessageProto`] into a [`Message`]. Additionally. sanitize the protobuf
109    /// message by removing optional fields when empty.
110    #[must_use]
111    fn from(mut proto: MessageProto) -> Self {
112        // A non-present data field should be interpreted as an empty payload.
113        if proto.data.is_none() {
114            proto.data = Some(Bytes::new());
115        }
116
117        // An empty from field should be interpreted as not present.
118        if let Some(from) = proto.from.as_ref() {
119            if from.is_empty() {
120                proto.from = None;
121            }
122        }
123
124        // An empty seqno field should be interpreted as not present.
125        if let Some(seq_no) = proto.seqno.as_ref() {
126            if seq_no.is_empty() {
127                proto.seqno = None;
128            }
129        }
130
131        // An empty signature field should be interpreted as not present.
132        if let Some(signature) = proto.signature.as_ref() {
133            if signature.is_empty() {
134                proto.signature = None;
135            }
136        }
137
138        // An empty key field should be interpreted as not present.
139        if let Some(key) = proto.key.as_ref() {
140            if key.is_empty() {
141                proto.key = None;
142            }
143        }
144
145        Self { proto }
146    }
147}
148
149impl From<Message> for MessageProto {
150    fn from(message: Message) -> Self {
151        message.into_proto()
152    }
153}