tetsy_libp2p_gossipsub/types.rs
1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! A collection of types using the Gossipsub system.
22use crate::rpc_proto;
23use crate::TopicHash;
24use tetsy_libp2p_core::PeerId;
25use std::fmt;
26use std::fmt::Debug;
27
28#[derive(Debug)]
29/// Validation kinds from the application for received messages.
30pub enum MessageAcceptance {
31 /// The message is considered valid, and it should be delivered and forwarded to the network.
32 Accept,
33 /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty.
34 Reject,
35 /// The message is neither delivered nor forwarded to the network, but the router does not
36 /// trigger the P₄ penalty.
37 Ignore,
38}
39
40/// Macro for declaring message id types
41macro_rules! declare_message_id_type {
42 ($name: ident, $name_string: expr) => {
43 #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
44 pub struct $name(pub Vec<u8>);
45
46 impl $name {
47 pub fn new(value: &[u8]) -> Self {
48 Self(value.to_vec())
49 }
50 }
51
52 impl<T: Into<Vec<u8>>> From<T> for $name {
53 fn from(value: T) -> Self {
54 Self(value.into())
55 }
56 }
57
58 impl std::fmt::Display for $name {
59 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60 write!(f, "{}", hex_fmt::HexFmt(&self.0))
61 }
62 }
63
64 impl std::fmt::Debug for $name {
65 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66 write!(f, "{}({})", $name_string, hex_fmt::HexFmt(&self.0))
67 }
68 }
69 };
70}
71
72// A type for gossipsub message ids.
73declare_message_id_type!(MessageId, "MessageId");
74
75// A type for gossipsub fast messsage ids, not to confuse with "real" message ids.
76//
77// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On
78// high intensive networks with lots of messages, where the message_id is based on the result of
79// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and
80// filter duplicates quickly without performing the overhead of decompression.
81declare_message_id_type!(FastMessageId, "FastMessageId");
82
83/// Describes the types of peers that can exist in the gossipsub context.
84#[derive(Debug, Clone, PartialEq)]
85pub enum PeerKind {
86 /// A gossipsub 1.1 peer.
87 Gossipsubv1_1,
88 /// A gossipsub 1.0 peer.
89 Gossipsub,
90 /// A floodsub peer.
91 Floodsub,
92 /// The peer doesn't support any of the protocols.
93 NotSupported,
94}
95
96/// A message received by the gossipsub system and stored locally in caches..
97#[derive(Clone, PartialEq, Eq, Hash, Debug)]
98pub struct RawGossipsubMessage {
99 /// Id of the peer that published this message.
100 pub source: Option<PeerId>,
101
102 /// Content of the message. Its meaning is out of scope of this library.
103 pub data: Vec<u8>,
104
105 /// A random sequence number.
106 pub sequence_number: Option<u64>,
107
108 /// The topic this message belongs to
109 pub topic: TopicHash,
110
111 /// The signature of the message if it's signed.
112 pub signature: Option<Vec<u8>>,
113
114 /// The public key of the message if it is signed and the source [`PeerId`] cannot be inlined.
115 pub key: Option<Vec<u8>>,
116
117 /// Flag indicating if this message has been validated by the application or not.
118 pub validated: bool,
119}
120
121/// The message sent to the user after a [`RawGossipsubMessage`] has been transformed by a
122/// [`crate::DataTransform`].
123#[derive(Clone, PartialEq, Eq, Hash)]
124pub struct GossipsubMessage {
125 /// Id of the peer that published this message.
126 pub source: Option<PeerId>,
127
128 /// Content of the message.
129 pub data: Vec<u8>,
130
131 /// A random sequence number.
132 pub sequence_number: Option<u64>,
133
134 /// The topic this message belongs to
135 pub topic: TopicHash,
136}
137
138impl fmt::Debug for GossipsubMessage {
139 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140 f.debug_struct("GossipsubMessage")
141 .field(
142 "data",
143 &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
144 )
145 .field("source", &self.source)
146 .field("sequence_number", &self.sequence_number)
147 .field("topic", &self.topic)
148 .finish()
149 }
150}
151
152/// A subscription received by the gossipsub system.
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub struct GossipsubSubscription {
155 /// Action to perform.
156 pub action: GossipsubSubscriptionAction,
157 /// The topic from which to subscribe or unsubscribe.
158 pub topic_hash: TopicHash,
159}
160
161/// Action that a subscription wants to perform.
162#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163pub enum GossipsubSubscriptionAction {
164 /// The remote wants to subscribe to the given topic.
165 Subscribe,
166 /// The remote wants to unsubscribe from the given topic.
167 Unsubscribe,
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Hash)]
171pub struct PeerInfo {
172 pub peer_id: Option<PeerId>,
173 //TODO add this when RFC: Signed Address Records got added to the spec (see pull request
174 // https://github.com/libp2p/specs/pull/217)
175 //pub signed_peer_record: ?,
176}
177
178/// A Control message received by the gossipsub system.
179#[derive(Debug, Clone, PartialEq, Eq, Hash)]
180pub enum GossipsubControlAction {
181 /// Node broadcasts known messages per topic - IHave control message.
182 IHave {
183 /// The topic of the messages.
184 topic_hash: TopicHash,
185 /// A list of known message ids (peer_id + sequence _number) as a string.
186 message_ids: Vec<MessageId>,
187 },
188 /// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
189 IWant {
190 /// A list of known message ids (peer_id + sequence _number) as a string.
191 message_ids: Vec<MessageId>,
192 },
193 /// The node has been added to the mesh - Graft control message.
194 Graft {
195 /// The mesh topic the peer should be added to.
196 topic_hash: TopicHash,
197 },
198 /// The node has been removed from the mesh - Prune control message.
199 Prune {
200 /// The mesh topic the peer should be removed from.
201 topic_hash: TopicHash,
202 /// A list of peers to be proposed to the removed peer as peer exchange
203 peers: Vec<PeerInfo>,
204 /// The backoff time in seconds before we allow to reconnect
205 backoff: Option<u64>,
206 },
207}
208
209/// An RPC received/sent.
210#[derive(Clone, PartialEq, Eq, Hash)]
211pub struct GossipsubRpc {
212 /// List of messages that were part of this RPC query.
213 pub messages: Vec<RawGossipsubMessage>,
214 /// List of subscriptions.
215 pub subscriptions: Vec<GossipsubSubscription>,
216 /// List of Gossipsub control messages.
217 pub control_msgs: Vec<GossipsubControlAction>,
218}
219
220impl GossipsubRpc {
221 /// Converts the GossipsubRPC into its protobuf format.
222 // A convenience function to avoid explicitly specifying types.
223 pub fn into_protobuf(self) -> rpc_proto::Rpc {
224 self.into()
225 }
226}
227
228impl Into<rpc_proto::Rpc> for GossipsubRpc {
229 /// Converts the RPC into protobuf format.
230 fn into(self) -> rpc_proto::Rpc {
231 // Messages
232 let mut publish = Vec::new();
233
234 for message in self.messages.into_iter() {
235 let message = rpc_proto::Message {
236 from: message.source.map(|m| m.to_bytes()),
237 data: Some(message.data),
238 seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
239 topic: TopicHash::into_string(message.topic),
240 signature: message.signature,
241 key: message.key,
242 };
243
244 publish.push(message);
245 }
246
247 // subscriptions
248 let subscriptions = self
249 .subscriptions
250 .into_iter()
251 .map(|sub| rpc_proto::rpc::SubOpts {
252 subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe),
253 topic_id: Some(sub.topic_hash.into_string()),
254 })
255 .collect::<Vec<_>>();
256
257 // control messages
258 let mut control = rpc_proto::ControlMessage {
259 ihave: Vec::new(),
260 iwant: Vec::new(),
261 graft: Vec::new(),
262 prune: Vec::new(),
263 };
264
265 let empty_control_msg = self.control_msgs.is_empty();
266
267 for action in self.control_msgs {
268 match action {
269 // collect all ihave messages
270 GossipsubControlAction::IHave {
271 topic_hash,
272 message_ids,
273 } => {
274 let rpc_ihave = rpc_proto::ControlIHave {
275 topic_id: Some(topic_hash.into_string()),
276 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
277 };
278 control.ihave.push(rpc_ihave);
279 }
280 GossipsubControlAction::IWant { message_ids } => {
281 let rpc_iwant = rpc_proto::ControlIWant {
282 message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
283 };
284 control.iwant.push(rpc_iwant);
285 }
286 GossipsubControlAction::Graft { topic_hash } => {
287 let rpc_graft = rpc_proto::ControlGraft {
288 topic_id: Some(topic_hash.into_string()),
289 };
290 control.graft.push(rpc_graft);
291 }
292 GossipsubControlAction::Prune {
293 topic_hash,
294 peers,
295 backoff,
296 } => {
297 let rpc_prune = rpc_proto::ControlPrune {
298 topic_id: Some(topic_hash.into_string()),
299 peers: peers
300 .into_iter()
301 .map(|info| rpc_proto::PeerInfo {
302 peer_id: info.peer_id.map(|id| id.to_bytes()),
303 /// TODO, see https://github.com/libp2p/specs/pull/217
304 signed_peer_record: None,
305 })
306 .collect(),
307 backoff,
308 };
309 control.prune.push(rpc_prune);
310 }
311 }
312 }
313
314 rpc_proto::Rpc {
315 subscriptions,
316 publish,
317 control: if empty_control_msg {
318 None
319 } else {
320 Some(control)
321 },
322 }
323 }
324}
325
326impl fmt::Debug for GossipsubRpc {
327 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328 let mut b = f.debug_struct("GossipsubRpc");
329 if !self.messages.is_empty() {
330 b.field("messages", &self.messages);
331 }
332 if !self.subscriptions.is_empty() {
333 b.field("subscriptions", &self.subscriptions);
334 }
335 if !self.control_msgs.is_empty() {
336 b.field("control_msgs", &self.control_msgs);
337 }
338 b.finish()
339 }
340}
341
342impl PeerKind {
343 pub fn as_static_ref(&self) -> &'static str {
344 match self {
345 Self::NotSupported => "Not Supported",
346 Self::Floodsub => "Floodsub",
347 Self::Gossipsub => "Gossipsub v1.0",
348 Self::Gossipsubv1_1 => "Gossipsub v1.1",
349 }
350 }
351}
352
353impl AsRef<str> for PeerKind {
354 fn as_ref(&self) -> &str {
355 self.as_static_ref()
356 }
357}
358
359impl fmt::Display for PeerKind {
360 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361 f.write_str(self.as_ref())
362 }
363}