libp2p_floodsub/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::rpc_proto;
22use crate::topic::Topic;
23use libp2p_core::{InboundUpgrade, OutboundUpgrade, UpgradeInfo, PeerId, upgrade};
24use prost::Message;
25use std::{error, fmt, io, iter, pin::Pin};
26use futures::{Future, io::{AsyncRead, AsyncWrite}};
27
28/// Implementation of `ConnectionUpgrade` for the floodsub protocol.
29#[derive(Debug, Clone, Default)]
30pub struct FloodsubProtocol {}
31
32impl FloodsubProtocol {
33    /// Builds a new `FloodsubProtocol`.
34    pub fn new() -> FloodsubProtocol {
35        FloodsubProtocol {}
36    }
37}
38
39impl UpgradeInfo for FloodsubProtocol {
40    type Info = &'static [u8];
41    type InfoIter = iter::Once<Self::Info>;
42
43    fn protocol_info(&self) -> Self::InfoIter {
44        iter::once(b"/floodsub/1.0.0")
45    }
46}
47
48impl<TSocket> InboundUpgrade<TSocket> for FloodsubProtocol
49where
50    TSocket: AsyncRead + AsyncWrite + Send + Unpin + 'static,
51{
52    type Output = FloodsubRpc;
53    type Error = FloodsubDecodeError;
54    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
55
56    fn upgrade_inbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
57        Box::pin(async move {
58            let packet = upgrade::read_one(&mut socket, 2048).await?;
59            let rpc = rpc_proto::Rpc::decode(&packet[..])?;
60
61            let mut messages = Vec::with_capacity(rpc.publish.len());
62            for publish in rpc.publish.into_iter() {
63                messages.push(FloodsubMessage {
64                    source: PeerId::from_bytes(&publish.from.unwrap_or_default()).map_err(|_| {
65                        FloodsubDecodeError::InvalidPeerId
66                    })?,
67                    data: publish.data.unwrap_or_default(),
68                    sequence_number: publish.seqno.unwrap_or_default(),
69                    topics: publish.topic_ids
70                        .into_iter()
71                        .map(Topic::new)
72                        .collect(),
73                });
74            }
75
76            Ok(FloodsubRpc {
77                messages,
78                subscriptions: rpc.subscriptions
79                    .into_iter()
80                    .map(|sub| FloodsubSubscription {
81                        action: if Some(true) == sub.subscribe {
82                            FloodsubSubscriptionAction::Subscribe
83                        } else {
84                            FloodsubSubscriptionAction::Unsubscribe
85                        },
86                        topic: Topic::new(sub.topic_id.unwrap_or_default()),
87                    })
88                    .collect(),
89            })
90        })
91    }
92}
93
94/// Reach attempt interrupt errors.
95#[derive(Debug)]
96pub enum FloodsubDecodeError {
97    /// Error when reading the packet from the socket.
98    ReadError(upgrade::ReadOneError),
99    /// Error when decoding the raw buffer into a protobuf.
100    ProtobufError(prost::DecodeError),
101    /// Error when parsing the `PeerId` in the message.
102    InvalidPeerId,
103}
104
105impl From<upgrade::ReadOneError> for FloodsubDecodeError {
106    fn from(err: upgrade::ReadOneError) -> Self {
107        FloodsubDecodeError::ReadError(err)
108    }
109}
110
111impl From<prost::DecodeError> for FloodsubDecodeError {
112    fn from(err: prost::DecodeError) -> Self {
113        FloodsubDecodeError::ProtobufError(err)
114    }
115}
116
117impl fmt::Display for FloodsubDecodeError {
118    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
119        match *self {
120            FloodsubDecodeError::ReadError(ref err) =>
121                write!(f, "Error while reading from socket: {}", err),
122            FloodsubDecodeError::ProtobufError(ref err) =>
123                write!(f, "Error while decoding protobuf: {}", err),
124            FloodsubDecodeError::InvalidPeerId =>
125                write!(f, "Error while decoding PeerId from message"),
126        }
127    }
128}
129
130impl error::Error for FloodsubDecodeError {
131    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
132        match *self {
133            FloodsubDecodeError::ReadError(ref err) => Some(err),
134            FloodsubDecodeError::ProtobufError(ref err) => Some(err),
135            FloodsubDecodeError::InvalidPeerId => None,
136        }
137    }
138}
139
140/// An RPC received by the floodsub system.
141#[derive(Debug, Clone, PartialEq, Eq, Hash)]
142pub struct FloodsubRpc {
143    /// List of messages that were part of this RPC query.
144    pub messages: Vec<FloodsubMessage>,
145    /// List of subscriptions.
146    pub subscriptions: Vec<FloodsubSubscription>,
147}
148
149impl UpgradeInfo for FloodsubRpc {
150    type Info = &'static [u8];
151    type InfoIter = iter::Once<Self::Info>;
152
153    fn protocol_info(&self) -> Self::InfoIter {
154        iter::once(b"/floodsub/1.0.0")
155    }
156}
157
158impl<TSocket> OutboundUpgrade<TSocket> for FloodsubRpc
159where
160    TSocket: AsyncWrite + AsyncRead + Send + Unpin + 'static,
161{
162    type Output = ();
163    type Error = io::Error;
164    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
165
166    fn upgrade_outbound(self, mut socket: TSocket, _: Self::Info) -> Self::Future {
167        Box::pin(async move {
168            let bytes = self.into_bytes();
169            upgrade::write_one(&mut socket, bytes).await?;
170            Ok(())
171        })
172    }
173}
174
175impl FloodsubRpc {
176    /// Turns this `FloodsubRpc` into a message that can be sent to a substream.
177    fn into_bytes(self) -> Vec<u8> {
178        let rpc = rpc_proto::Rpc {
179            publish: self.messages.into_iter()
180                .map(|msg| {
181                    rpc_proto::Message {
182                        from: Some(msg.source.to_bytes()),
183                        data: Some(msg.data),
184                        seqno: Some(msg.sequence_number),
185                        topic_ids: msg.topics
186                            .into_iter()
187                            .map(|topic| topic.into())
188                            .collect()
189                    }
190                })
191                .collect(),
192
193            subscriptions: self.subscriptions.into_iter()
194                .map(|topic| {
195                    rpc_proto::rpc::SubOpts {
196                        subscribe: Some(topic.action == FloodsubSubscriptionAction::Subscribe),
197                        topic_id: Some(topic.topic.into())
198                    }
199                })
200                .collect()
201        };
202
203        let mut buf = Vec::with_capacity(rpc.encoded_len());
204        rpc.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
205        buf
206    }
207}
208
209/// A message received by the floodsub system.
210#[derive(Debug, Clone, PartialEq, Eq, Hash)]
211pub struct FloodsubMessage {
212    /// Id of the peer that published this message.
213    pub source: PeerId,
214
215    /// Content of the message. Its meaning is out of scope of this library.
216    pub data: Vec<u8>,
217
218    /// An incrementing sequence number.
219    pub sequence_number: Vec<u8>,
220
221    /// List of topics this message belongs to.
222    ///
223    /// Each message can belong to multiple topics at once.
224    pub topics: Vec<Topic>,
225}
226
227/// A subscription received by the floodsub system.
228#[derive(Debug, Clone, PartialEq, Eq, Hash)]
229pub struct FloodsubSubscription {
230    /// Action to perform.
231    pub action: FloodsubSubscriptionAction,
232    /// The topic from which to subscribe or unsubscribe.
233    pub topic: Topic,
234}
235
236/// Action that a subscription wants to perform.
237#[derive(Debug, Clone, PartialEq, Eq, Hash)]
238pub enum FloodsubSubscriptionAction {
239    /// The remote wants to subscribe to the given topic.
240    Subscribe,
241    /// The remote wants to unsubscribe from the given topic.
242    Unsubscribe,
243}