1use crate::config::ValidationMode;
22use crate::error::{GossipsubHandlerError, ValidationError};
23use crate::handler::HandlerEvent;
24use crate::rpc_proto;
25use crate::topic::TopicHash;
26use crate::types::{
27 GossipsubControlAction, GossipsubRpc, GossipsubSubscription, GossipsubSubscriptionAction,
28 MessageId, PeerInfo, PeerKind, RawGossipsubMessage,
29};
30use byteorder::{BigEndian, ByteOrder};
31use bytes::Bytes;
32use bytes::BytesMut;
33use futures::future;
34use futures::prelude::*;
35use asynchronous_codec::{Decoder, Encoder, Framed};
36use mwc_libp2p_core::{
37 identity::PublicKey, InboundUpgrade, OutboundUpgrade, PeerId, ProtocolName, UpgradeInfo,
38};
39use log::{debug, warn};
40use prost::Message as ProtobufMessage;
41use std::{borrow::Cow, pin::Pin};
42use unsigned_varint::codec;
43
44pub(crate) const SIGNING_PREFIX: &[u8] = b"libp2p-pubsub:";
45
46#[derive(Clone)]
48pub struct ProtocolConfig {
49 protocol_ids: Vec<ProtocolId>,
51 max_transmit_size: usize,
53 validation_mode: ValidationMode,
55}
56
57impl ProtocolConfig {
58 pub fn new(
62 id_prefix: Cow<'static, str>,
63 max_transmit_size: usize,
64 validation_mode: ValidationMode,
65 support_floodsub: bool,
66 ) -> ProtocolConfig {
67 let mut protocol_ids = vec![
69 ProtocolId::new(id_prefix.clone(), PeerKind::Gossipsubv1_1),
70 ProtocolId::new(id_prefix, PeerKind::Gossipsub),
71 ];
72
73 if support_floodsub {
75 protocol_ids.push(ProtocolId::new(Cow::from(""), PeerKind::Floodsub));
76 }
77
78 ProtocolConfig {
79 protocol_ids,
80 max_transmit_size,
81 validation_mode,
82 }
83 }
84}
85
86#[derive(Clone, Debug)]
88pub struct ProtocolId {
89 pub protocol_id: Vec<u8>,
91 pub kind: PeerKind,
93}
94
95impl ProtocolId {
97 pub fn new(prefix: Cow<'static, str>, kind: PeerKind) -> Self {
98 let protocol_id = match kind {
99 PeerKind::Gossipsubv1_1 => format!("/{}/{}", prefix, "1.1.0"),
100 PeerKind::Gossipsub => format!("/{}/{}", prefix, "1.0.0"),
101 PeerKind::Floodsub => format!("/{}/{}", "floodsub", "1.0.0"),
102 PeerKind::NotSupported => unreachable!("Should never advertise NotSupported"),
105 }
106 .into_bytes();
107 ProtocolId { protocol_id, kind }
108 }
109}
110
111impl ProtocolName for ProtocolId {
112 fn protocol_name(&self) -> &[u8] {
113 &self.protocol_id
114 }
115}
116
117impl UpgradeInfo for ProtocolConfig {
118 type Info = ProtocolId;
119 type InfoIter = Vec<Self::Info>;
120
121 fn protocol_info(&self) -> Self::InfoIter {
122 self.protocol_ids.clone()
123 }
124}
125
126impl<TSocket> InboundUpgrade<TSocket> for ProtocolConfig
127where
128 TSocket: AsyncRead + AsyncWrite + Unpin + Send + 'static,
129{
130 type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
131 type Error = GossipsubHandlerError;
132 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
133
134 fn upgrade_inbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
135 let mut length_codec = codec::UviBytes::default();
136 length_codec.set_max_len(self.max_transmit_size);
137 Box::pin(future::ok((
138 Framed::new(
139 socket,
140 GossipsubCodec::new(length_codec, self.validation_mode),
141 ),
142 protocol_id.kind,
143 )))
144 }
145}
146
147impl<TSocket> OutboundUpgrade<TSocket> for ProtocolConfig
148where
149 TSocket: AsyncWrite + AsyncRead + Unpin + Send + 'static,
150{
151 type Output = (Framed<TSocket, GossipsubCodec>, PeerKind);
152 type Error = GossipsubHandlerError;
153 type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
154
155 fn upgrade_outbound(self, socket: TSocket, protocol_id: Self::Info) -> Self::Future {
156 let mut length_codec = codec::UviBytes::default();
157 length_codec.set_max_len(self.max_transmit_size);
158 Box::pin(future::ok((
159 Framed::new(
160 socket,
161 GossipsubCodec::new(length_codec, self.validation_mode),
162 ),
163 protocol_id.kind,
164 )))
165 }
166}
167
168pub struct GossipsubCodec {
171 length_codec: codec::UviBytes,
173 validation_mode: ValidationMode,
175}
176
177impl GossipsubCodec {
178 pub fn new(length_codec: codec::UviBytes, validation_mode: ValidationMode) -> Self {
179 GossipsubCodec {
180 length_codec,
181 validation_mode,
182 }
183 }
184
185 fn verify_signature(message: &rpc_proto::Message) -> bool {
189 let from = match message.from.as_ref() {
190 Some(v) => v,
191 None => {
192 debug!("Signature verification failed: No source id given");
193 return false;
194 }
195 };
196
197 let source = match PeerId::from_bytes(&from) {
198 Ok(v) => v,
199 Err(_) => {
200 debug!("Signature verification failed: Invalid Peer Id");
201 return false;
202 }
203 };
204
205 let signature = match message.signature.as_ref() {
206 Some(v) => v,
207 None => {
208 debug!("Signature verification failed: No signature provided");
209 return false;
210 }
211 };
212
213 let public_key = match message
216 .key
217 .as_ref()
218 .map(|key| PublicKey::from_protobuf_encoding(&key))
219 {
220 Some(Ok(key)) => key,
221 _ => match PublicKey::from_protobuf_encoding(&source.to_hash_bytes()[2..]) {
222 Ok(v) => v,
223 Err(_) => {
224 warn!("Signature verification failed: No valid public key supplied");
225 return false;
226 }
227 },
228 };
229
230 if source != public_key.clone().into_peer_id() {
232 warn!("Signature verification failed: Public key doesn't match source peer id");
233 return false;
234 }
235
236 let mut message_sig = message.clone();
238 message_sig.signature = None;
239 message_sig.key = None;
240 let mut buf = Vec::with_capacity(message_sig.encoded_len());
241 message_sig
242 .encode(&mut buf)
243 .expect("Buffer has sufficient capacity");
244 let mut signature_bytes = SIGNING_PREFIX.to_vec();
245 signature_bytes.extend_from_slice(&buf);
246 public_key.verify(&signature_bytes, signature)
247 }
248}
249
250impl Encoder for GossipsubCodec {
251 type Item = rpc_proto::Rpc;
252 type Error = GossipsubHandlerError;
253
254 fn encode(&mut self, item: Self::Item, dst: &mut BytesMut) -> Result<(), Self::Error> {
255 let mut buf = Vec::with_capacity(item.encoded_len());
256
257 item.encode(&mut buf)
258 .expect("Buffer has sufficient capacity");
259
260 self.length_codec
262 .encode(Bytes::from(buf), dst)
263 .map_err(|_| GossipsubHandlerError::MaxTransmissionSize)
264 }
265}
266
267impl Decoder for GossipsubCodec {
268 type Item = HandlerEvent;
269 type Error = GossipsubHandlerError;
270
271 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
272 let packet = match self.length_codec.decode(src).map_err(|e| {
273 if let std::io::ErrorKind::PermissionDenied = e.kind() {
274 GossipsubHandlerError::MaxTransmissionSize
275 } else {
276 GossipsubHandlerError::Io(e)
277 }
278 })? {
279 Some(p) => p,
280 None => return Ok(None),
281 };
282
283 let rpc = rpc_proto::Rpc::decode(&packet[..]).map_err(std::io::Error::from)?;
284
285 let mut messages = Vec::with_capacity(rpc.publish.len());
287 let mut invalid_messages = Vec::new();
289
290 for message in rpc.publish.into_iter() {
291 let mut invalid_kind = None;
293 let mut verify_signature = false;
294 let mut verify_sequence_no = false;
295 let mut verify_source = false;
296
297 match self.validation_mode {
298 ValidationMode::Strict => {
299 verify_signature = true;
301 verify_sequence_no = true;
302 verify_source = true;
303 }
304 ValidationMode::Permissive => {
305 if message.signature.is_some() {
307 verify_signature = true;
308 }
309 if message.seqno.is_some() {
310 verify_sequence_no = true;
311 }
312 if message.from.is_some() {
313 verify_source = true;
314 }
315 }
316 ValidationMode::Anonymous => {
317 if message.signature.is_some() {
318 warn!("Signature field was non-empty and anonymous validation mode is set");
319 invalid_kind = Some(ValidationError::SignaturePresent);
320 } else if message.seqno.is_some() {
321 warn!("Sequence number was non-empty and anonymous validation mode is set");
322 invalid_kind = Some(ValidationError::SequenceNumberPresent);
323 } else if message.from.is_some() {
324 warn!("Message dropped. Message source was non-empty and anonymous validation mode is set");
325 invalid_kind = Some(ValidationError::MessageSourcePresent);
326 }
327 }
328 ValidationMode::None => {}
329 }
330
331 if let Some(validation_error) = invalid_kind.take() {
334 let message = RawGossipsubMessage {
335 source: None, data: message.data.unwrap_or_default(),
337 sequence_number: None, topic: TopicHash::from_raw(message.topic),
339 signature: None, key: message.key,
341 validated: false,
342 };
343 invalid_messages.push((message, validation_error));
344 continue;
346 }
347
348 if verify_signature && !GossipsubCodec::verify_signature(&message) {
350 warn!("Invalid signature for received message");
351
352 let message = RawGossipsubMessage {
355 source: None, data: message.data.unwrap_or_default(),
357 sequence_number: None, topic: TopicHash::from_raw(message.topic),
359 signature: None, key: message.key,
361 validated: false,
362 };
363 invalid_messages.push((message, ValidationError::InvalidSignature));
364 continue;
366 }
367
368 let sequence_number = if verify_sequence_no {
370 if let Some(seq_no) = message.seqno {
371 if seq_no.is_empty() {
372 None
373 } else if seq_no.len() != 8 {
374 debug!(
375 "Invalid sequence number length for received message. SeqNo: {:?} Size: {}",
376 seq_no,
377 seq_no.len()
378 );
379 let message = RawGossipsubMessage {
380 source: None, data: message.data.unwrap_or_default(),
382 sequence_number: None, topic: TopicHash::from_raw(message.topic),
384 signature: message.signature, key: message.key,
386 validated: false,
387 };
388 invalid_messages.push((message, ValidationError::InvalidSequenceNumber));
389 continue;
391 } else {
392 Some(BigEndian::read_u64(&seq_no))
394 }
395 } else {
396 debug!("Sequence number not present but expected");
398 let message = RawGossipsubMessage {
399 source: None, data: message.data.unwrap_or_default(),
401 sequence_number: None, topic: TopicHash::from_raw(message.topic),
403 signature: message.signature, key: message.key,
405 validated: false,
406 };
407 invalid_messages.push((message, ValidationError::EmptySequenceNumber));
408 continue;
409 }
410 } else {
411 None
413 };
414
415 let source = if verify_source {
417 if let Some(bytes) = message.from {
418 if !bytes.is_empty() {
419 match PeerId::from_bytes(&bytes) {
420 Ok(peer_id) => Some(peer_id), Err(_) => {
422 debug!("Message source has an invalid PeerId");
424 let message = RawGossipsubMessage {
425 source: None, data: message.data.unwrap_or_default(),
427 sequence_number,
428 topic: TopicHash::from_raw(message.topic),
429 signature: message.signature, key: message.key,
431 validated: false,
432 };
433 invalid_messages.push((message, ValidationError::InvalidPeerId));
434 continue;
435 }
436 }
437 } else {
438 None
439 }
440 } else {
441 None
442 }
443 } else {
444 None
445 };
446
447 messages.push(RawGossipsubMessage {
449 source,
450 data: message.data.unwrap_or_default(),
451 sequence_number,
452 topic: TopicHash::from_raw(message.topic),
453 signature: message.signature,
454 key: message.key,
455 validated: false,
456 });
457 }
458
459 let mut control_msgs = Vec::new();
460
461 if let Some(rpc_control) = rpc.control {
462 let ihave_msgs: Vec<GossipsubControlAction> = rpc_control
464 .ihave
465 .into_iter()
466 .map(|ihave| GossipsubControlAction::IHave {
467 topic_hash: TopicHash::from_raw(ihave.topic_id.unwrap_or_default()),
468 message_ids: ihave
469 .message_ids
470 .into_iter()
471 .map(MessageId::from)
472 .collect::<Vec<_>>(),
473 })
474 .collect();
475
476 let iwant_msgs: Vec<GossipsubControlAction> = rpc_control
477 .iwant
478 .into_iter()
479 .map(|iwant| GossipsubControlAction::IWant {
480 message_ids: iwant
481 .message_ids
482 .into_iter()
483 .map(MessageId::from)
484 .collect::<Vec<_>>(),
485 })
486 .collect();
487
488 let graft_msgs: Vec<GossipsubControlAction> = rpc_control
489 .graft
490 .into_iter()
491 .map(|graft| GossipsubControlAction::Graft {
492 topic_hash: TopicHash::from_raw(graft.topic_id.unwrap_or_default()),
493 })
494 .collect();
495
496 let mut prune_msgs = Vec::new();
497
498 for prune in rpc_control.prune {
499 let peers = prune
501 .peers
502 .into_iter()
503 .filter_map(|info| {
504 info.peer_id
505 .as_ref()
506 .and_then(|id| PeerId::from_bytes(id).ok())
507 .map(|peer_id|
508 PeerInfo {
510 peer_id: Some(peer_id),
511 })
512 })
513 .collect::<Vec<PeerInfo>>();
514
515 let topic_hash = TopicHash::from_raw(prune.topic_id.unwrap_or_default());
516 prune_msgs.push(GossipsubControlAction::Prune {
517 topic_hash,
518 peers,
519 backoff: prune.backoff,
520 });
521 }
522
523 control_msgs.extend(ihave_msgs);
524 control_msgs.extend(iwant_msgs);
525 control_msgs.extend(graft_msgs);
526 control_msgs.extend(prune_msgs);
527 }
528
529 Ok(Some(HandlerEvent::Message {
530 rpc: GossipsubRpc {
531 messages,
532 subscriptions: rpc
533 .subscriptions
534 .into_iter()
535 .map(|sub| GossipsubSubscription {
536 action: if Some(true) == sub.subscribe {
537 GossipsubSubscriptionAction::Subscribe
538 } else {
539 GossipsubSubscriptionAction::Unsubscribe
540 },
541 topic_hash: TopicHash::from_raw(sub.topic_id.unwrap_or_default()),
542 })
543 .collect(),
544 control_msgs,
545 },
546 invalid_messages,
547 }))
548 }
549}
550
551#[cfg(test)]
552mod tests {
553 use super::*;
554 use crate::config::GossipsubConfig;
555 use crate::Gossipsub;
556 use crate::IdentTopic as Topic;
557 use mwc_libp2p_core::identity::Keypair;
558 use quickcheck::*;
559 use rand::Rng;
560
561 #[derive(Clone, Debug)]
562 struct Message(RawGossipsubMessage);
563
564 impl Arbitrary for Message {
565 fn arbitrary<G: Gen>(g: &mut G) -> Self {
566 let keypair = TestKeypair::arbitrary(g);
567
568 let config = GossipsubConfig::default();
570 let gs: Gossipsub = Gossipsub::new(
571 crate::MessageAuthenticity::Signed(keypair.0.clone()),
572 config,
573 )
574 .unwrap();
575 let data = (0..g.gen_range(10, 10024))
576 .map(|_| g.gen())
577 .collect::<Vec<_>>();
578 let topic_id = TopicId::arbitrary(g).0;
579 Message(gs.build_raw_message(topic_id, data).unwrap())
580 }
581 }
582
583 #[derive(Clone, Debug)]
584 struct TopicId(TopicHash);
585
586 impl Arbitrary for TopicId {
587 fn arbitrary<G: Gen>(g: &mut G) -> Self {
588 let topic_string: String = (0..g.gen_range(20, 1024))
589 .map(|_| g.gen::<char>())
590 .collect::<String>()
591 .into();
592 TopicId(Topic::new(topic_string).into())
593 }
594 }
595
596 #[derive(Clone)]
597 struct TestKeypair(Keypair);
598
599 impl Arbitrary for TestKeypair {
600 fn arbitrary<G: Gen>(g: &mut G) -> Self {
601 let keypair = if g.gen() {
602 Keypair::generate_secp256k1()
604 } else {
605 let mut rsa_key = hex::decode("308204bd020100300d06092a864886f70d0101010500048204a7308204a30201000282010100ef930f41a71288b643c1cbecbf5f72ab53992249e2b00835bf07390b6745419f3848cbcc5b030faa127bc88cdcda1c1d6f3ff699f0524c15ab9d2c9d8015f5d4bd09881069aad4e9f91b8b0d2964d215cdbbae83ddd31a7622a8228acee07079f6e501aea95508fa26c6122816ef7b00ac526d422bd12aed347c37fff6c1c307f3ba57bb28a7f28609e0bdcc839da4eedca39f5d2fa855ba4b0f9c763e9764937db929a1839054642175312a3de2d3405c9d27bdf6505ef471ce85c5e015eee85bf7874b3d512f715de58d0794fd8afe021c197fbd385bb88a930342fac8da31c27166e2edab00fa55dc1c3814448ba38363077f4e8fe2bdea1c081f85f1aa6f02030100010282010028ff427a1aac1a470e7b4879601a6656193d3857ea79f33db74df61e14730e92bf9ffd78200efb0c40937c3356cbe049cd32e5f15be5c96d5febcaa9bd3484d7fded76a25062d282a3856a1b3b7d2c525cdd8434beae147628e21adf241dd64198d5819f310d033743915ba40ea0b6acdbd0533022ad6daa1ff42de51885f9e8bab2306c6ef1181902d1cd7709006eba1ab0587842b724e0519f295c24f6d848907f772ae9a0953fc931f4af16a07df450fb8bfa94572562437056613647818c238a6ff3f606cffa0533e4b8755da33418dfbc64a85110b1a036623c947400a536bb8df65e5ebe46f2dfd0cfc86e7aeeddd7574c253e8fbf755562b3669525d902818100f9fff30c6677b78dd31ec7a634361438457e80be7a7faf390903067ea8355faa78a1204a82b6e99cb7d9058d23c1ecf6cfe4a900137a00cecc0113fd68c5931602980267ea9a95d182d48ba0a6b4d5dd32fdac685cb2e5d8b42509b2eb59c9579ea6a67ccc7547427e2bd1fb1f23b0ccb4dd6ba7d206c8dd93253d70a451701302818100f5530dfef678d73ce6a401ae47043af10a2e3f224c71ae933035ecd68ccbc4df52d72bc6ca2b17e8faf3e548b483a2506c0369ab80df3b137b54d53fac98f95547c2bc245b416e650ce617e0d29db36066f1335a9ba02ad3e0edf9dc3d58fd835835042663edebce81803972696c789012847cb1f854ab2ac0a1bd3867ac7fb502818029c53010d456105f2bf52a9a8482bca2224a5eac74bf3cc1a4d5d291fafcdffd15a6a6448cce8efdd661f6617ca5fc37c8c885cc3374e109ac6049bcbf72b37eabf44602a2da2d4a1237fd145c863e6d75059976de762d9d258c42b0984e2a2befa01c95217c3ee9c736ff209c355466ff99375194eff943bc402ea1d172a1ed02818027175bf493bbbfb8719c12b47d967bf9eac061c90a5b5711172e9095c38bb8cc493c063abffe4bea110b0a2f22ac9311b3947ba31b7ef6bfecf8209eebd6d86c316a2366bbafda7279b2b47d5bb24b6202254f249205dcad347b574433f6593733b806f84316276c1990a016ce1bbdbe5f650325acc7791aefe515ecc60063bd02818100b6a2077f4adcf15a17092d9c4a346d6022ac48f3861b73cf714f84c440a07419a7ce75a73b9cbff4597c53c128bf81e87b272d70428a272d99f90cd9b9ea1033298e108f919c6477400145a102df3fb5601ffc4588203cf710002517bfa24e6ad32f4d09c6b1a995fa28a3104131bedd9072f3b4fb4a5c2056232643d310453f").unwrap();
607 Keypair::rsa_from_pkcs8(&mut rsa_key).unwrap()
608 };
609 TestKeypair(keypair)
610 }
611 }
612
613 impl std::fmt::Debug for TestKeypair {
614 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
615 f.debug_struct("TestKeypair")
616 .field("public", &self.0.public())
617 .finish()
618 }
619 }
620
621 #[test]
622 fn encode_decode() {
624 fn prop(message: Message) {
625 let message = message.0;
626
627 let rpc = GossipsubRpc {
628 messages: vec![message],
629 subscriptions: vec![],
630 control_msgs: vec![],
631 };
632
633 let mut codec = GossipsubCodec::new(codec::UviBytes::default(), ValidationMode::Strict);
634 let mut buf = BytesMut::new();
635 codec.encode(rpc.clone().into_protobuf(), &mut buf).unwrap();
636 let decoded_rpc = codec.decode(&mut buf).unwrap().unwrap();
637 match decoded_rpc {
639 HandlerEvent::Message { mut rpc, .. } => {
640 rpc.messages[0].validated = true;
641
642 assert_eq!(rpc, rpc);
643 }
644 _ => panic!("Must decode a message"),
645 }
646 }
647
648 QuickCheck::new().quickcheck(prop as fn(_) -> _)
649 }
650}