cs_mwc_libp2p_gossipsub/
protocol.rs

1// Copyright 2020 Sigma Prime Pty Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::config::ValidationMode;
22use crate::error::{GossipsubHandlerError, ValidationError};
23use crate::handler::HandlerEvent;
24use crate::rpc_proto;
25use crate::topic::TopicHash;
26use crate::types::{
27    GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction,
28    MessageId, PeerInfo, PeerKind, RawGossipsubMessage,
29};
30use byteorder::{BigEndian, ByteOrder};
31use bytes::Bytes;
32use bytes::BytesMut;
33use futures::future;
34use futures::prelude::*;
35use asynchronous_codec::{Decoder, Encoder, Framed};
36use mwc_libp2p_core::{
37    identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo,
38};
39use log::{debug, warn};
40use prost::Message as ProtobufMessage;
41use std::{borrow::Cow, pin::Pin};
42use unsigned_varint::codec;
43
44pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
45
46/// Implementation of [`InboundUpgrade`] and [`OutboundUpgrade`] for the Gossipsub protocol.
47#[derive(Clone)]
48pub struct ProtocolConfig {
49    /// The Gossipsub protocol id to listen on.
50    protocol_ids: Vec<ProtocolId>,
51    /// The maximum transmit size for a packet.
52    max_transmit_size: usize,
53    /// Determines the level of validation to be done on incoming messages.
54    validation_mode: ValidationMode,
55}
56
57impl ProtocolConfig {
58    /// Builds a new [`ProtocolConfig`].
59    ///
60    /// Sets the maximum gossip transmission size.
61    pub fn new(
62        id_prefix: Cow<'static, str>,
63        max_transmit_size: usize,
64        validation_mode: ValidationMode,
65        support_floodsub: bool,
66    ) -> ProtocolConfig {
67        // support version 1.1.0 and 1.0.0 with user-customized prefix
68        let mut protocol_ids = vec![
69            ProtocolId::new(id_prefix.clone(), PeerKind::Gossipsubv1_1),
70            ProtocolId::new(id_prefix, PeerKind::Gossipsub),
71        ];
72
73        // add floodsub support if enabled.
74        if support_floodsub {
75            protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub));
76        }
77
78        ProtocolConfig {
79            protocol_ids,
80            max_transmit_size,
81            validation_mode,
82        }
83    }
84}
85
86/// The protocol ID
87#[derive(Clone, Debug)]
88pub struct ProtocolId {
89    /// The RPC message type/name.
90    pub protocol_id: Vec<u8>,
91    /// The type of protocol we support
92    pub kind: PeerKind,
93}
94
95/// An RPC protocol ID.
96impl ProtocolId {
97    pub fn new(prefix: Cow<'static, str>, kind: PeerKind) -> Self {
98        let protocol_id = match kind {
99            PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix, "1.1.0"),
100            PeerKind::Gossipsub => format!("/{}/{}", prefix, "1.0.0"),
101            PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"),
102            // NOTE: This is used for informing the behaviour of unsupported peers. We do not
103            // advertise this variant.
104            PeerKind::NotSupported => unreachable!("Should never advertise NotSupported"),
105        }
106        .into_bytes();
107        ProtocolId { protocol_id, kind }
108    }
109}
110
111impl ProtocolName for ProtocolId {
112    fn protocol_name(&self) -> &[u8] {
113        &self.protocol_id
114    }
115}
116
117impl UpgradeInfo for ProtocolConfig {
118    type Info = ProtocolId;
119    type InfoIter = Vec<Self::Info>;
120
121    fn protocol_info(&self) -> Self::InfoIter {
122        self.protocol_ids.clone()
123    }
124}
125
126impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
127where
128    TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
129{
130    type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
131    type Error = GossipsubHandlerError;
132    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
133
134    fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
135        let mut length_codec = codec::UviBytes::default();
136        length_codec.set_max_len(self.max_transmit_size);
137        Box::pin(future::ok((
138            Framed::new(
139                socket,
140                GossipsubCodec::new(length_codec, self.validation_mode),
141            ),
142            protocol_id.kind,
143        )))
144    }
145}
146
147impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
148where
149    TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
150{
151    type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
152    type Error = GossipsubHandlerError;
153    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
154
155    fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
156        let mut length_codec = codec::UviBytes::default();
157        length_codec.set_max_len(self.max_transmit_size);
158        Box::pin(future::ok((
159            Framed::new(
160                socket,
161                GossipsubCodec::new(length_codec, self.validation_mode),
162            ),
163            protocol_id.kind,
164        )))
165    }
166}
167
168/* Gossip codec for the framing */
169
170pub struct GossipsubCodec {
171    /// Codec to encode/decode the Unsigned varint length prefix of the frames.
172    length_codec: codec::UviBytes,
173    /// Determines the level of validation performed on incoming messages.
174    validation_mode: ValidationMode,
175}
176
177impl GossipsubCodec {
178    pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> Self {
179        GossipsubCodec {
180            length_codec,
181            validation_mode,
182        }
183    }
184
185    /// Verifies a gossipsub message. This returns either a success or failure. All errors
186    /// are logged, which prevents error handling in the codec and handler. We simply drop invalid
187    /// messages and log warnings, rather than propagating errors through the codec.
188    fn verify_signature(message: &rpc_proto::Message) -> bool {
189        let from = match message.from.as_ref() {
190            Some(v) => v,
191            None => {
192                debug!("Signature verification failed: No source id given");
193                return false;
194            }
195        };
196
197        let source = match PeerId::from_bytes(&from) {
198            Ok(v) => v,
199            Err(_) => {
200                debug!("Signature verification failed: Invalid Peer Id");
201                return false;
202            }
203        };
204
205        let signature = match message.signature.as_ref() {
206            Some(v) => v,
207            None => {
208                debug!("Signature verification failed: No signature provided");
209                return false;
210            }
211        };
212
213        // If there is a key value in the protobuf, use that key otherwise the key must be
214        // obtained from the inlined source peer_id.
215        let public_key = match message
216            .key
217            .as_ref()
218            .map(|key| PublicKey::from_protobuf_encoding(&key))
219        {
220            Some(Ok(key)) => key,
221            _ => match PublicKey::from_protobuf_encoding(&source.to_hash_bytes()[2..]) {
222                Ok(v) => v,
223                Err(_) => {
224                    warn!("Signature verification failed: No valid public key supplied");
225                    return false;
226                }
227            },
228        };
229
230        // The key must match the peer_id
231        if source != public_key.clone().into_peer_id() {
232            warn!("Signature verification failed: Public key doesn't match source peer id");
233            return false;
234        }
235
236        // Construct the signature bytes
237        let mut message_sig = message.clone();
238        message_sig.signature = None;
239        message_sig.key = None;
240        let mut buf = Vec::with_capacity(message_sig.encoded_len());
241        message_sig
242            .encode(&mut buf)
243            .expect("Buffer has sufficient capacity");
244        let mut signature_bytes = SIGNING_PREFIX.to_vec();
245        signature_bytes.extend_from_slice(&buf);
246        public_key.verify(&signature_bytes, signature)
247    }
248}
249
250impl Encoder for GossipsubCodec {
251    type Item = rpc_proto::Rpc;
252    type Error = GossipsubHandlerError;
253
254    fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
255        let mut buf = Vec::with_capacity(item.encoded_len());
256
257        item.encode(&mut buf)
258            .expect("Buffer has sufficient capacity");
259
260        // length prefix the protobuf message, ensuring the max limit is not hit
261        self.length_codec
262            .encode(Bytes::from(buf), dst)
263            .map_err(|_| GossipsubHandlerError::MaxTransmissionSize)
264    }
265}
266
267impl Decoder for GossipsubCodec {
268    type Item = HandlerEvent;
269    type Error = GossipsubHandlerError;
270
271    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
272        let packet = match self.length_codec.decode(src).map_err(|e| {
273            if let std::io::ErrorKind::PermissionDenied = e.kind() {
274                GossipsubHandlerError::MaxTransmissionSize
275            } else {
276                GossipsubHandlerError::Io(e)
277            }
278        })? {
279            Some(p) => p,
280            None => return Ok(None),
281        };
282
283        let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(std::io::Error::from)?;
284
285        // Store valid messages.
286        let mut messages = Vec::with_capacity(rpc.publish.len());
287        // Store any invalid messages.
288        let mut invalid_messages = Vec::new();
289
290        for message in rpc.publish.into_iter() {
291            // Keep track of the type of invalid message.
292            let mut invalid_kind = None;
293            let mut verify_signature = false;
294            let mut verify_sequence_no = false;
295            let mut verify_source = false;
296
297            match self.validation_mode {
298                ValidationMode::Strict => {
299                    // Validate everything
300                    verify_signature = true;
301                    verify_sequence_no = true;
302                    verify_source = true;
303                }
304                ValidationMode::Permissive => {
305                    // If the fields exist, validate them
306                    if message.signature.is_some() {
307                        verify_signature = true;
308                    }
309                    if message.seqno.is_some() {
310                        verify_sequence_no = true;
311                    }
312                    if message.from.is_some() {
313                        verify_source = true;
314                    }
315                }
316                ValidationMode::Anonymous => {
317                    if message.signature.is_some() {
318                        warn!("Signature field was non-empty and anonymous validation mode is set");
319                        invalid_kind = Some(ValidationError::SignaturePresent);
320                    } else if message.seqno.is_some() {
321                        warn!("Sequence number was non-empty and anonymous validation mode is set");
322                        invalid_kind = Some(ValidationError::SequenceNumberPresent);
323                    } else if message.from.is_some() {
324                        warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
325                        invalid_kind = Some(ValidationError::MessageSourcePresent);
326                    }
327                }
328                ValidationMode::None => {}
329            }
330
331            // If the initial validation logic failed, add the message to invalid messages and
332            // continue processing the others.
333            if let Some(validation_error) = invalid_kind.take() {
334                let message = RawGossipsubMessage {
335                    source: None, // don't bother inform the application
336                    data: message.data.unwrap_or_default(),
337                    sequence_number: None, // don't inform the application
338                    topic: TopicHash::from_raw(message.topic),
339                    signature: None, // don't inform the application
340                    key: message.key,
341                    validated: false,
342                };
343                invalid_messages.push((message, validation_error));
344                // proceed to the next message
345                continue;
346            }
347
348            // verify message signatures if required
349            if verify_signature && !GossipsubCodec::verify_signature(&message) {
350                warn!("Invalid signature for received message");
351
352                // Build the invalid message (ignoring further validation of sequence number
353                // and source)
354                let message = RawGossipsubMessage {
355                    source: None, // don't bother inform the application
356                    data: message.data.unwrap_or_default(),
357                    sequence_number: None, // don't inform the application
358                    topic: TopicHash::from_raw(message.topic),
359                    signature: None, // don't inform the application
360                    key: message.key,
361                    validated: false,
362                };
363                invalid_messages.push((message, ValidationError::InvalidSignature));
364                // proceed to the next message
365                continue;
366            }
367
368            // ensure the sequence number is a u64
369            let sequence_number = if verify_sequence_no {
370                if let Some(seq_no) = message.seqno {
371                    if seq_no.is_empty() {
372                        None
373                    } else if seq_no.len() != 8 {
374                        debug!(
375                            "Invalid sequence number length for received message. SeqNo: {:?} Size: {}",
376                            seq_no,
377                            seq_no.len()
378                        );
379                        let message = RawGossipsubMessage {
380                            source: None, // don't bother inform the application
381                            data: message.data.unwrap_or_default(),
382                            sequence_number: None, // don't inform the application
383                            topic: TopicHash::from_raw(message.topic),
384                            signature: message.signature, // don't inform the application
385                            key: message.key,
386                            validated: false,
387                        };
388                        invalid_messages.push((message, ValidationError::InvalidSequenceNumber));
389                        // proceed to the next message
390                        continue;
391                    } else {
392                        // valid sequence number
393                        Some(BigEndian::read_u64(&seq_no))
394                    }
395                } else {
396                    // sequence number was not present
397                    debug!("Sequence number not present but expected");
398                    let message = RawGossipsubMessage {
399                        source: None, // don't bother inform the application
400                        data: message.data.unwrap_or_default(),
401                        sequence_number: None, // don't inform the application
402                        topic: TopicHash::from_raw(message.topic),
403                        signature: message.signature, // don't inform the application
404                        key: message.key,
405                        validated: false,
406                    };
407                    invalid_messages.push((message, ValidationError::EmptySequenceNumber));
408                    continue;
409                }
410            } else {
411                // Do not verify the sequence number, consider it empty
412                None
413            };
414
415            // Verify the message source if required
416            let source = if verify_source {
417                if let Some(bytes) = message.from {
418                    if !bytes.is_empty() {
419                        match PeerId::from_bytes(&bytes) {
420                            Ok(peer_id) => Some(peer_id), // valid peer id
421                            Err(_) => {
422                                // invalid peer id, add to invalid messages
423                                debug!("Message source has an invalid PeerId");
424                                let message = RawGossipsubMessage {
425                                    source: None, // don't bother inform the application
426                                    data: message.data.unwrap_or_default(),
427                                    sequence_number,
428                                    topic: TopicHash::from_raw(message.topic),
429                                    signature: message.signature, // don't inform the application
430                                    key: message.key,
431                                    validated: false,
432                                };
433                                invalid_messages.push((message, ValidationError::InvalidPeerId));
434                                continue;
435                            }
436                        }
437                    } else {
438                        None
439                    }
440                } else {
441                    None
442                }
443            } else {
444                None
445            };
446
447            // This message has passed all validation, add it to the validated messages.
448            messages.push(RawGossipsubMessage {
449                source,
450                data: message.data.unwrap_or_default(),
451                sequence_number,
452                topic: TopicHash::from_raw(message.topic),
453                signature: message.signature,
454                key: message.key,
455                validated: false,
456            });
457        }
458
459        let mut control_msgs = Vec::new();
460
461        if let Some(rpc_control) = rpc.control {
462            // Collect the gossipsub control messages
463            let ihave_msgs: Vec<GossipsubControlAction> = rpc_control
464                .ihave
465                .into_iter()
466                .map(|ihave| GossipsubControlAction::IHave {
467                    topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
468                    message_ids: ihave
469                        .message_ids
470                        .into_iter()
471                        .map(MessageId::from)
472                        .collect::<Vec<_>>(),
473                })
474                .collect();
475
476            let iwant_msgs: Vec<GossipsubControlAction> = rpc_control
477                .iwant
478                .into_iter()
479                .map(|iwant| GossipsubControlAction::IWant {
480                    message_ids: iwant
481                        .message_ids
482                        .into_iter()
483                        .map(MessageId::from)
484                        .collect::<Vec<_>>(),
485                })
486                .collect();
487
488            let graft_msgs: Vec<GossipsubControlAction> = rpc_control
489                .graft
490                .into_iter()
491                .map(|graft| GossipsubControlAction::Graft {
492                    topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
493                })
494                .collect();
495
496            let mut prune_msgs = Vec::new();
497
498            for prune in rpc_control.prune {
499                // filter out invalid peers
500                let peers = prune
501                    .peers
502                    .into_iter()
503                    .filter_map(|info| {
504                        info.peer_id
505                            .as_ref()
506                            .and_then(|id| PeerId::from_bytes(id).ok())
507                            .map(|peer_id|
508                                    //TODO signedPeerRecord, see https://github.com/libp2p/specs/pull/217
509                                    PeerInfo {
510                                        peer_id: Some(peer_id),
511                                    })
512                    })
513                    .collect::<Vec<PeerInfo>>();
514
515                let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
516                prune_msgs.push(GossipsubControlAction::Prune {
517                    topic_hash,
518                    peers,
519                    backoff: prune.backoff,
520                });
521            }
522
523            control_msgs.extend(ihave_msgs);
524            control_msgs.extend(iwant_msgs);
525            control_msgs.extend(graft_msgs);
526            control_msgs.extend(prune_msgs);
527        }
528
529        Ok(Some(HandlerEvent::Message {
530            rpc: GossipsubRpc {
531                messages,
532                subscriptions: rpc
533                    .subscriptions
534                    .into_iter()
535                    .map(|sub| GossipsubSubscription {
536                        action: if Some(true) == sub.subscribe {
537                            GossipsubSubscriptionAction::Subscribe
538                        } else {
539                            GossipsubSubscriptionAction::Unsubscribe
540                        },
541                        topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
542                    })
543                    .collect(),
544                control_msgs,
545            },
546            invalid_messages,
547        }))
548    }
549}
550
551#[cfg(test)]
552mod tests {
553    use super::*;
554    use crate::config::GossipsubConfig;
555    use crate::Gossipsub;
556    use crate::IdentTopic as Topic;
557    use mwc_libp2p_core::identity::Keypair;
558    use quickcheck::*;
559    use rand::Rng;
560
561    #[derive(Clone, Debug)]
562    struct Message(RawGossipsubMessage);
563
564    impl Arbitrary for Message {
565        fn arbitrary<G: Gen>(g: &mut G) -> Self {
566            let keypair = TestKeypair::arbitrary(g);
567
568            // generate an arbitrary GossipsubMessage using the behaviour signing functionality
569            let config = GossipsubConfig::default();
570            let gs: Gossipsub = Gossipsub::new(
571                crate::MessageAuthenticity::Signed(keypair.0.clone()),
572                config,
573            )
574            .unwrap();
575            let data = (0..g.gen_range(10, 10024))
576                .map(|_| g.gen())
577                .collect::<Vec<_>>();
578            let topic_id = TopicId::arbitrary(g).0;
579            Message(gs.build_raw_message(topic_id, data).unwrap())
580        }
581    }
582
583    #[derive(Clone, Debug)]
584    struct TopicId(TopicHash);
585
586    impl Arbitrary for TopicId {
587        fn arbitrary<G: Gen>(g: &mut G) -> Self {
588            let topic_string: String = (0..g.gen_range(20, 1024))
589                .map(|_| g.gen::<char>())
590                .collect::<String>()
591                .into();
592            TopicId(Topic::new(topic_string).into())
593        }
594    }
595
596    #[derive(Clone)]
597    struct TestKeypair(Keypair);
598
599    impl Arbitrary for TestKeypair {
600        fn arbitrary<G: Gen>(g: &mut G) -> Self {
601            let keypair = if g.gen() {
602                // Small enough to be inlined.
603                Keypair::generate_secp256k1()
604            } else {
605                // Too large to be inlined.
606                let mut rsa_key = hex::decode("308204bd020100300d06092a864886f70d0101010500048204a7308204a30201000282010100ef930f41a71288b643c1cbecbf5f72ab53992249e2b00835bf07390b6745419f3848cbcc5b030faa127bc88cdcda1c1d6f3ff699f0524c15ab9d2c9d8015f5d4bd09881069aad4e9f91b8b0d2964d215cdbbae83ddd31a7622a8228acee07079f6e501aea95508fa26c6122816ef7b00ac526d422bd12aed347c37fff6c1c307f3ba57bb28a7f28609e0bdcc839da4eedca39f5d2fa855ba4b0f9c763e9764937db929a1839054642175312a3de2d3405c9d27bdf6505ef471ce85c5e015eee85bf7874b3d512f715de58d0794fd8afe021c197fbd385bb88a930342fac8da31c27166e2edab00fa55dc1c3814448ba38363077f4e8fe2bdea1c081f85f1aa6f02030100010282010028ff427a1aac1a470e7b4879601a6656193d3857ea79f33db74df61e14730e92bf9ffd78200efb0c40937c3356cbe049cd32e5f15be5c96d5febcaa9bd3484d7fded76a25062d282a3856a1b3b7d2c525cdd8434beae147628e21adf241dd64198d5819f310d033743915ba40ea0b6acdbd0533022ad6daa1ff42de51885f9e8bab2306c6ef1181902d1cd7709006eba1ab0587842b724e0519f295c24f6d848907f772ae9a0953fc931f4af16a07df450fb8bfa94572562437056613647818c238a6ff3f606cffa0533e4b8755da33418dfbc64a85110b1a036623c947400a536bb8df65e5ebe46f2dfd0cfc86e7aeeddd7574c253e8fbf755562b3669525d902818100f9fff30c6677b78dd31ec7a634361438457e80be7a7faf390903067ea8355faa78a1204a82b6e99cb7d9058d23c1ecf6cfe4a900137a00cecc0113fd68c5931602980267ea9a95d182d48ba0a6b4d5dd32fdac685cb2e5d8b42509b2eb59c9579ea6a67ccc7547427e2bd1fb1f23b0ccb4dd6ba7d206c8dd93253d70a451701302818100f5530dfef678d73ce6a401ae47043af10a2e3f224c71ae933035ecd68ccbc4df52d72bc6ca2b17e8faf3e548b483a2506c0369ab80df3b137b54d53fac98f95547c2bc245b416e650ce617e0d29db36066f1335a9ba02ad3e0edf9dc3d58fd835835042663edebce81803972696c789012847cb1f854ab2ac0a1bd3867ac7fb502818029c53010d456105f2bf52a9a8482bca2224a5eac74bf3cc1a4d5d291fafcdffd15a6a6448cce8efdd661f6617ca5fc37c8c885cc3374e109ac6049bcbf72b37eabf44602a2da2d4a1237fd145c863e6d75059976de762d9d258c42b0984e2a2befa01c95217c3ee9c736ff209c355466ff99375194eff943bc402ea1d172a1ed02818027175bf493bbbfb8719c12b47d967bf9eac061c90a5b5711172e9095c38bb8cc493c063abffe4bea110b0a2f22ac9311b3947ba31b7ef6bfecf8209eebd6d86c316a2366bbafda7279b2b47d5bb24b6202254f249205dcad347b574433f6593733b806f84316276c1990a016ce1bbdbe5f650325acc7791aefe515ecc60063bd02818100b6a2077f4adcf15a17092d9c4a346d6022ac48f3861b73cf714f84c440a07419a7ce75a73b9cbff4597c53c128bf81e87b272d70428a272d99f90cd9b9ea1033298e108f919c6477400145a102df3fb5601ffc4588203cf710002517bfa24e6ad32f4d09c6b1a995fa28a3104131bedd9072f3b4fb4a5c2056232643d310453f").unwrap();
607                Keypair::rsa_from_pkcs8(&mut rsa_key).unwrap()
608            };
609            TestKeypair(keypair)
610        }
611    }
612
613    impl std::fmt::Debug for TestKeypair {
614        fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
615            f.debug_struct("TestKeypair")
616                .field("public", &self.0.public())
617                .finish()
618        }
619    }
620
621    #[test]
622    /// Test that RPC messages can be encoded and decoded successfully.
623    fn encode_decode() {
624        fn prop(message: Message) {
625            let message = message.0;
626
627            let rpc = GossipsubRpc {
628                messages: vec![message],
629                subscriptions: vec![],
630                control_msgs: vec![],
631            };
632
633            let mut codec = GossipsubCodec::new(codec::UviBytes::default(), ValidationMode::Strict);
634            let mut buf = BytesMut::new();
635            codec.encode(rpc.clone().into_protobuf(), &mut buf).unwrap();
636            let decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
637            // mark as validated as its a published message
638            match decoded_rpc {
639                HandlerEvent::Message { mut rpc, .. } => {
640                    rpc.messages[0].validated = true;
641
642                    assert_eq!(rpc, rpc);
643                }
644                _ => panic!("Must decode a message"),
645            }
646        }
647
648        QuickCheck::new().quickcheck(prop as fn(_) -> _)
649    }
650}