libp2p_identify/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::structs_proto;
22use futures::prelude::*;
23use libp2p_core::{
24    Multiaddr,
25    PublicKey,
26    upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}
27};
28use log::{debug, trace};
29use prost::Message;
30use std::convert::TryFrom;
31use std::{fmt, io, iter, pin::Pin};
32
33/// Configuration for an upgrade to the `Identify` protocol.
34#[derive(Debug, Clone)]
35pub struct IdentifyProtocolConfig;
36
37#[derive(Debug, Clone)]
38#[non_exhaustive]
39pub struct RemoteInfo {
40    /// Information about the remote.
41    pub info: IdentifyInfo,
42    /// Address the remote sees for us.
43    pub observed_addr: Multiaddr,
44}
45
46/// The substream on which a reply is expected to be sent.
47pub struct ReplySubstream<T> {
48    inner: T,
49}
50
51impl<T> fmt::Debug for ReplySubstream<T> {
52    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
53        f.debug_tuple("ReplySubstream").finish()
54    }
55}
56
57impl<T> ReplySubstream<T>
58where
59    T: AsyncWrite + Unpin
60{
61    /// Sends back the requested information on the substream.
62    ///
63    /// Consumes the substream, returning a `ReplyFuture` that resolves
64    /// when the reply has been sent on the underlying connection.
65    pub fn send(mut self, info: IdentifyInfo, observed_addr: &Multiaddr)
66        -> impl Future<Output = Result<(), io::Error>>
67    {
68        debug!("Sending identify info to client");
69        trace!("Sending: {:?}", info);
70
71        let listen_addrs = info.listen_addrs
72            .into_iter()
73            .map(|addr| addr.to_vec())
74            .collect();
75
76        let pubkey_bytes = info.public_key.into_protobuf_encoding();
77
78        let message = structs_proto::Identify {
79            agent_version: Some(info.agent_version),
80            protocol_version: Some(info.protocol_version),
81            public_key: Some(pubkey_bytes),
82            listen_addrs,
83            observed_addr: Some(observed_addr.to_vec()),
84            protocols: info.protocols
85        };
86
87        async move {
88            let mut bytes = Vec::with_capacity(message.encoded_len());
89            message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
90            upgrade::write_one(&mut self.inner, &bytes).await
91        }
92    }
93}
94
95/// Information of a peer sent in `Identify` protocol responses.
96#[derive(Debug, Clone)]
97pub struct IdentifyInfo {
98    /// The public key underlying the peer's `PeerId`.
99    pub public_key: PublicKey,
100    /// Version of the protocol family used by the peer, e.g. `ipfs/1.0.0`
101    /// or `polkadot/1.0.0`.
102    pub protocol_version: String,
103    /// Name and version of the peer, similar to the `User-Agent` header in
104    /// the HTTP protocol.
105    pub agent_version: String,
106    /// The addresses that the peer is listening on.
107    pub listen_addrs: Vec<Multiaddr>,
108    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
109    pub protocols: Vec<String>,
110}
111
112impl UpgradeInfo for IdentifyProtocolConfig {
113    type Info = &'static [u8];
114    type InfoIter = iter::Once<Self::Info>;
115
116    fn protocol_info(&self) -> Self::InfoIter {
117        iter::once(b"/ipfs/id/1.0.0")
118    }
119}
120
121impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
122where
123    C: AsyncRead + AsyncWrite + Unpin,
124{
125    type Output = ReplySubstream<C>;
126    type Error = io::Error;
127    type Future = future::Ready<Result<Self::Output, io::Error>>;
128
129    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
130        trace!("Upgrading inbound connection");
131        future::ok(ReplySubstream { inner: socket })
132    }
133}
134
135impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
136where
137    C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
138{
139    type Output = RemoteInfo;
140    type Error = upgrade::ReadOneError;
141    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
142
143    fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
144        Box::pin(async move {
145            socket.close().await?;
146            let msg = upgrade::read_one(&mut socket, 4096).await?;
147            let (info, observed_addr) = match parse_proto_msg(msg) {
148                Ok(v) => v,
149                Err(err) => {
150                    debug!("Failed to parse protobuf message; error = {:?}", err);
151                    return Err(err.into())
152                }
153            };
154
155            trace!("Remote observes us as {:?}", observed_addr);
156            trace!("Information received: {:?}", info);
157
158            Ok(RemoteInfo {
159                info,
160                observed_addr,
161            })
162        })
163    }
164}
165
166// Turns a protobuf message into an `IdentifyInfo` and an observed address. If something bad
167// happens, turn it into an `io::Error`.
168fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), io::Error> {
169    match structs_proto::Identify::decode(msg.as_ref()) {
170        Ok(msg) => {
171            // Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into
172            // an `io::Error`.
173            fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, io::Error> {
174                Multiaddr::try_from(bytes)
175                    .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
176            }
177
178            let listen_addrs = {
179                let mut addrs = Vec::new();
180                for addr in msg.listen_addrs.into_iter() {
181                    addrs.push(bytes_to_multiaddr(addr)?);
182                }
183                addrs
184            };
185
186            let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default())
187                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
188
189            let observed_addr = bytes_to_multiaddr(msg.observed_addr.unwrap_or_default())?;
190            let info = IdentifyInfo {
191                public_key,
192                protocol_version: msg.protocol_version.unwrap_or_default(),
193                agent_version: msg.agent_version.unwrap_or_default(),
194                listen_addrs,
195                protocols: msg.protocols
196            };
197
198            Ok((info, observed_addr))
199        }
200
201        Err(err) => Err(io::Error::new(io::ErrorKind::InvalidData, err)),
202    }
203}
204
205#[cfg(test)]
206mod tests {
207    use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
208    use libp2p_tcp::TcpConfig;
209    use futures::{prelude::*, channel::oneshot};
210    use libp2p_core::{
211        identity,
212        Transport,
213        upgrade::{self, apply_outbound, apply_inbound}
214    };
215
216    #[test]
217    fn correct_transfer() {
218        // We open a server and a client, send info from the server to the client, and check that
219        // they were successfully received.
220        let send_pubkey = identity::Keypair::generate_ed25519().public();
221        let recv_pubkey = send_pubkey.clone();
222
223        let (tx, rx) = oneshot::channel();
224
225        let bg_task = async_std::task::spawn(async move {
226            let transport = TcpConfig::new();
227
228            let mut listener = transport
229                .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
230                .unwrap();
231
232            let addr = listener.next().await
233                .expect("some event")
234                .expect("no error")
235                .into_new_address()
236                .expect("listen address");
237            tx.send(addr).unwrap();
238
239            let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
240            let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap();
241            sender.send(
242                IdentifyInfo {
243                    public_key: send_pubkey,
244                    protocol_version: "proto_version".to_owned(),
245                    agent_version: "agent_version".to_owned(),
246                    listen_addrs: vec![
247                        "/ip4/80.81.82.83/tcp/500".parse().unwrap(),
248                        "/ip6/::1/udp/1000".parse().unwrap(),
249                    ],
250                    protocols: vec!["proto1".to_string(), "proto2".to_string()],
251                },
252                &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
253            ).await.unwrap();
254        });
255
256        async_std::task::block_on(async move {
257            let transport = TcpConfig::new();
258
259            let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
260            let RemoteInfo { info, observed_addr, .. } =
261                apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap();
262            assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
263            assert_eq!(info.public_key, recv_pubkey);
264            assert_eq!(info.protocol_version, "proto_version");
265            assert_eq!(info.agent_version, "agent_version");
266            assert_eq!(info.listen_addrs,
267                &["/ip4/80.81.82.83/tcp/500".parse().unwrap(),
268                "/ip6/::1/udp/1000".parse().unwrap()]);
269            assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
270
271            bg_task.await;
272        });
273    }
274}