libp2p_pubsub_core/framing/
message.rs1use bytes::Bytes;
2use libp2p::identity::PeerId;
3
4use libp2p_pubsub_proto::pubsub::MessageProto;
5
6use crate::topic::TopicHash;
7
8#[derive(Clone, PartialEq, Debug)]
9pub struct Message {
10 proto: MessageProto,
11}
12
13impl Message {
14 #[must_use]
15 pub fn new(topic: impl Into<TopicHash>, data: impl Into<Vec<u8>>) -> Self {
16 let topic = topic.into();
17 let data = data.into();
18
19 let proto = MessageProto {
20 from: None,
21 data: Some(data.into()),
22 seqno: None,
23 topic: topic.into_string(),
24 signature: None,
25 key: None,
26 };
27
28 Self { proto }
29 }
30
31 #[must_use]
32 pub fn new_with_sequence_number(
33 topic: impl Into<TopicHash>,
34 data: impl Into<Vec<u8>>,
35 seq_no: impl Into<Vec<u8>>,
36 ) -> Self {
37 let mut rpc = Self::new(topic, data);
38 rpc.set_sequence_number(Some(Bytes::from(seq_no.into())));
39 rpc
40 }
41
42 #[must_use]
43 pub fn into_proto(self) -> MessageProto {
44 self.proto
45 }
46
47 #[must_use]
48 pub fn as_proto(&self) -> &MessageProto {
49 &self.proto
50 }
51
52 #[must_use]
53 pub fn source(&self) -> Option<PeerId> {
54 self.proto
55 .from
56 .as_ref()
57 .map(|bytes| PeerId::from_bytes(bytes).expect("valid peer id"))
58 }
59
60 pub fn set_source(&mut self, source: Option<PeerId>) {
61 self.proto.from = source.map(|peer_id| peer_id.to_bytes().into());
62 }
63
64 #[must_use]
65 pub fn data(&self) -> &[u8] {
66 self.proto.data.as_ref().unwrap()
67 }
68
69 #[must_use]
70 pub fn sequence_number(&self) -> Option<Bytes> {
71 self.proto.seqno.clone()
72 }
73
74 pub fn set_sequence_number(&mut self, seq_no: Option<impl Into<Vec<u8>>>) {
75 self.proto.seqno = seq_no.map(|n| n.into().into());
76 }
77
78 #[must_use]
79 pub fn topic_str(&self) -> &str {
80 self.proto.topic.as_str()
81 }
82
83 #[must_use]
84 pub fn topic(&self) -> TopicHash {
85 TopicHash::from_raw(self.topic_str())
86 }
87
88 #[must_use]
89 pub fn signature(&self) -> Option<&[u8]> {
90 self.proto.signature.as_ref().map(|bytes| bytes.as_ref())
91 }
92
93 pub fn set_signature(&mut self, signature: Option<impl Into<Vec<u8>>>) {
94 self.proto.signature = signature.map(|bytes| bytes.into().into());
95 }
96
97 #[must_use]
98 pub fn key(&self) -> Option<&[u8]> {
99 self.proto.key.as_ref().map(|bytes| bytes.as_ref())
100 }
101
102 pub fn set_key(&mut self, key: Option<impl Into<Vec<u8>>>) {
103 self.proto.key = key.map(|bytes| bytes.into().into());
104 }
105}
106
107impl From<MessageProto> for Message {
108 #[must_use]
111 fn from(mut proto: MessageProto) -> Self {
112 if proto.data.is_none() {
114 proto.data = Some(Bytes::new());
115 }
116
117 if let Some(from) = proto.from.as_ref() {
119 if from.is_empty() {
120 proto.from = None;
121 }
122 }
123
124 if let Some(seq_no) = proto.seqno.as_ref() {
126 if seq_no.is_empty() {
127 proto.seqno = None;
128 }
129 }
130
131 if let Some(signature) = proto.signature.as_ref() {
133 if signature.is_empty() {
134 proto.signature = None;
135 }
136 }
137
138 if let Some(key) = proto.key.as_ref() {
140 if key.is_empty() {
141 proto.key = None;
142 }
143 }
144
145 Self { proto }
146 }
147}
148
149impl From<Message> for MessageProto {
150 fn from(message: Message) -> Self {
151 message.into_proto()
152 }
153}