tetsy_libp2p_gossipsub/
types.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! A collection of types using the Gossipsub system.
22use crate::rpc_proto;
23use crate::TopicHash;
24use tetsy_libp2p_core::PeerId;
25use std::fmt;
26use std::fmt::Debug;
27
28#[derive(Debug)]
29/// Validation kinds from the application for received messages.
30pub enum MessageAcceptance {
31    /// The message is considered valid, and it should be delivered and forwarded to the network.
32    Accept,
33    /// The message is considered invalid, and it should be rejected and trigger the P₄ penalty.
34    Reject,
35    /// The message is neither delivered nor forwarded to the network, but the router does not
36    /// trigger the P₄ penalty.
37    Ignore,
38}
39
40/// Macro for declaring message id types
41macro_rules! declare_message_id_type {
42    ($name: ident, $name_string: expr) => {
43        #[derive(Clone, PartialEq, Eq, Hash, PartialOrd, Ord)]
44        pub struct $name(pub Vec<u8>);
45
46        impl $name {
47            pub fn new(value: &[u8]) -> Self {
48                Self(value.to_vec())
49            }
50        }
51
52        impl<T: Into<Vec<u8>>> From<T> for $name {
53            fn from(value: T) -> Self {
54                Self(value.into())
55            }
56        }
57
58        impl std::fmt::Display for $name {
59            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
60                write!(f, "{}", hex_fmt::HexFmt(&self.0))
61            }
62        }
63
64        impl std::fmt::Debug for $name {
65            fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
66                write!(f, "{}({})", $name_string, hex_fmt::HexFmt(&self.0))
67            }
68        }
69    };
70}
71
72// A type for gossipsub message ids.
73declare_message_id_type!(MessageId, "MessageId");
74
75// A type for gossipsub fast messsage ids, not to confuse with "real" message ids.
76//
77// A fast-message-id is an optional message_id that can be used to filter duplicates quickly. On
78// high intensive networks with lots of messages, where the message_id is based on the result of
79// decompressed traffic, it is beneficial to specify a `fast-message-id` that can identify and
80// filter duplicates quickly without performing the overhead of decompression.
81declare_message_id_type!(FastMessageId, "FastMessageId");
82
83/// Describes the types of peers that can exist in the gossipsub context.
84#[derive(Debug, Clone, PartialEq)]
85pub enum PeerKind {
86    /// A gossipsub 1.1 peer.
87    Gossipsubv1_1,
88    /// A gossipsub 1.0 peer.
89    Gossipsub,
90    /// A floodsub peer.
91    Floodsub,
92    /// The peer doesn't support any of the protocols.
93    NotSupported,
94}
95
96/// A message received by the gossipsub system and stored locally in caches..
97#[derive(Clone, PartialEq, Eq, Hash, Debug)]
98pub struct RawGossipsubMessage {
99    /// Id of the peer that published this message.
100    pub source: Option<PeerId>,
101
102    /// Content of the message. Its meaning is out of scope of this library.
103    pub data: Vec<u8>,
104
105    /// A random sequence number.
106    pub sequence_number: Option<u64>,
107
108    /// The topic this message belongs to
109    pub topic: TopicHash,
110
111    /// The signature of the message if it's signed.
112    pub signature: Option<Vec<u8>>,
113
114    /// The public key of the message if it is signed and the source [`PeerId`] cannot be inlined.
115    pub key: Option<Vec<u8>>,
116
117    /// Flag indicating if this message has been validated by the application or not.
118    pub validated: bool,
119}
120
121/// The message sent to the user after a [`RawGossipsubMessage`] has been transformed by a
122/// [`crate::DataTransform`].
123#[derive(Clone, PartialEq, Eq, Hash)]
124pub struct GossipsubMessage {
125    /// Id of the peer that published this message.
126    pub source: Option<PeerId>,
127
128    /// Content of the message.
129    pub data: Vec<u8>,
130
131    /// A random sequence number.
132    pub sequence_number: Option<u64>,
133
134    /// The topic this message belongs to
135    pub topic: TopicHash,
136}
137
138impl fmt::Debug for GossipsubMessage {
139    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
140        f.debug_struct("GossipsubMessage")
141            .field(
142                "data",
143                &format_args!("{:<20}", &hex_fmt::HexFmt(&self.data)),
144            )
145            .field("source", &self.source)
146            .field("sequence_number", &self.sequence_number)
147            .field("topic", &self.topic)
148            .finish()
149    }
150}
151
152/// A subscription received by the gossipsub system.
153#[derive(Debug, Clone, PartialEq, Eq, Hash)]
154pub struct GossipsubSubscription {
155    /// Action to perform.
156    pub action: GossipsubSubscriptionAction,
157    /// The topic from which to subscribe or unsubscribe.
158    pub topic_hash: TopicHash,
159}
160
161/// Action that a subscription wants to perform.
162#[derive(Debug, Clone, PartialEq, Eq, Hash)]
163pub enum GossipsubSubscriptionAction {
164    /// The remote wants to subscribe to the given topic.
165    Subscribe,
166    /// The remote wants to unsubscribe from the given topic.
167    Unsubscribe,
168}
169
170#[derive(Debug, Clone, PartialEq, Eq, Hash)]
171pub struct PeerInfo {
172    pub peer_id: Option<PeerId>,
173    //TODO add this when RFC: Signed Address Records got added to the spec (see pull request
174    // https://github.com/libp2p/specs/pull/217)
175    //pub signed_peer_record: ?,
176}
177
178/// A Control message received by the gossipsub system.
179#[derive(Debug, Clone, PartialEq, Eq, Hash)]
180pub enum GossipsubControlAction {
181    /// Node broadcasts known messages per topic - IHave control message.
182    IHave {
183        /// The topic of the messages.
184        topic_hash: TopicHash,
185        /// A list of known message ids (peer_id + sequence _number) as a string.
186        message_ids: Vec<MessageId>,
187    },
188    /// The node requests specific message ids (peer_id + sequence _number) - IWant control message.
189    IWant {
190        /// A list of known message ids (peer_id + sequence _number) as a string.
191        message_ids: Vec<MessageId>,
192    },
193    /// The node has been added to the mesh - Graft control message.
194    Graft {
195        /// The mesh topic the peer should be added to.
196        topic_hash: TopicHash,
197    },
198    /// The node has been removed from the mesh - Prune control message.
199    Prune {
200        /// The mesh topic the peer should be removed from.
201        topic_hash: TopicHash,
202        /// A list of peers to be proposed to the removed peer as peer exchange
203        peers: Vec<PeerInfo>,
204        /// The backoff time in seconds before we allow to reconnect
205        backoff: Option<u64>,
206    },
207}
208
209/// An RPC received/sent.
210#[derive(Clone, PartialEq, Eq, Hash)]
211pub struct GossipsubRpc {
212    /// List of messages that were part of this RPC query.
213    pub messages: Vec<RawGossipsubMessage>,
214    /// List of subscriptions.
215    pub subscriptions: Vec<GossipsubSubscription>,
216    /// List of Gossipsub control messages.
217    pub control_msgs: Vec<GossipsubControlAction>,
218}
219
220impl GossipsubRpc {
221    /// Converts the GossipsubRPC into its protobuf format.
222    // A convenience function to avoid explicitly specifying types.
223    pub fn into_protobuf(self) -> rpc_proto::Rpc {
224        self.into()
225    }
226}
227
228impl Into<rpc_proto::Rpc> for GossipsubRpc {
229    /// Converts the RPC into protobuf format.
230    fn into(self) -> rpc_proto::Rpc {
231        // Messages
232        let mut publish = Vec::new();
233
234        for message in self.messages.into_iter() {
235            let message = rpc_proto::Message {
236                from: message.source.map(|m| m.to_bytes()),
237                data: Some(message.data),
238                seqno: message.sequence_number.map(|s| s.to_be_bytes().to_vec()),
239                topic: TopicHash::into_string(message.topic),
240                signature: message.signature,
241                key: message.key,
242            };
243
244            publish.push(message);
245        }
246
247        // subscriptions
248        let subscriptions = self
249            .subscriptions
250            .into_iter()
251            .map(|sub| rpc_proto::rpc::SubOpts {
252                subscribe: Some(sub.action == GossipsubSubscriptionAction::Subscribe),
253                topic_id: Some(sub.topic_hash.into_string()),
254            })
255            .collect::<Vec<_>>();
256
257        // control messages
258        let mut control = rpc_proto::ControlMessage {
259            ihave: Vec::new(),
260            iwant: Vec::new(),
261            graft: Vec::new(),
262            prune: Vec::new(),
263        };
264
265        let empty_control_msg = self.control_msgs.is_empty();
266
267        for action in self.control_msgs {
268            match action {
269                // collect all ihave messages
270                GossipsubControlAction::IHave {
271                    topic_hash,
272                    message_ids,
273                } => {
274                    let rpc_ihave = rpc_proto::ControlIHave {
275                        topic_id: Some(topic_hash.into_string()),
276                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
277                    };
278                    control.ihave.push(rpc_ihave);
279                }
280                GossipsubControlAction::IWant { message_ids } => {
281                    let rpc_iwant = rpc_proto::ControlIWant {
282                        message_ids: message_ids.into_iter().map(|msg_id| msg_id.0).collect(),
283                    };
284                    control.iwant.push(rpc_iwant);
285                }
286                GossipsubControlAction::Graft { topic_hash } => {
287                    let rpc_graft = rpc_proto::ControlGraft {
288                        topic_id: Some(topic_hash.into_string()),
289                    };
290                    control.graft.push(rpc_graft);
291                }
292                GossipsubControlAction::Prune {
293                    topic_hash,
294                    peers,
295                    backoff,
296                } => {
297                    let rpc_prune = rpc_proto::ControlPrune {
298                        topic_id: Some(topic_hash.into_string()),
299                        peers: peers
300                            .into_iter()
301                            .map(|info| rpc_proto::PeerInfo {
302                                peer_id: info.peer_id.map(|id| id.to_bytes()),
303                                /// TODO, see https://github.com/libp2p/specs/pull/217
304                                signed_peer_record: None,
305                            })
306                            .collect(),
307                        backoff,
308                    };
309                    control.prune.push(rpc_prune);
310                }
311            }
312        }
313
314        rpc_proto::Rpc {
315            subscriptions,
316            publish,
317            control: if empty_control_msg {
318                None
319            } else {
320                Some(control)
321            },
322        }
323    }
324}
325
326impl fmt::Debug for GossipsubRpc {
327    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
328        let mut b = f.debug_struct("GossipsubRpc");
329        if !self.messages.is_empty() {
330            b.field("messages", &self.messages);
331        }
332        if !self.subscriptions.is_empty() {
333            b.field("subscriptions", &self.subscriptions);
334        }
335        if !self.control_msgs.is_empty() {
336            b.field("control_msgs", &self.control_msgs);
337        }
338        b.finish()
339    }
340}
341
342impl PeerKind {
343    pub fn as_static_ref(&self) -> &'static str {
344        match self {
345            Self::NotSupported => "Not Supported",
346            Self::Floodsub => "Floodsub",
347            Self::Gossipsub => "Gossipsub v1.0",
348            Self::Gossipsubv1_1 => "Gossipsub v1.1",
349        }
350    }
351}
352
353impl AsRef<str> for PeerKind {
354    fn as_ref(&self) -> &str {
355        self.as_static_ref()
356    }
357}
358
359impl fmt::Display for PeerKind {
360    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
361        f.write_str(self.as_ref())
362    }
363}