tet_libp2p_identify/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::structs_proto;
22use futures::prelude::*;
23use tet_libp2p_core::{
24    Multiaddr,
25    PublicKey,
26    upgrade::{self, InboundUpgrade, OutboundUpgrade, UpgradeInfo}
27};
28use log::{debug, trace};
29use prost::Message;
30use std::convert::TryFrom;
31use std::{fmt, io, iter, pin::Pin};
32
33/// Configuration for an upgrade to the `Identify` protocol.
34#[derive(Debug, Clone)]
35pub struct IdentifyProtocolConfig;
36
37#[derive(Debug, Clone)]
38pub struct RemoteInfo {
39    /// Information about the remote.
40    pub info: IdentifyInfo,
41    /// Address the remote sees for us.
42    pub observed_addr: Multiaddr,
43
44    _priv: ()
45}
46
47/// The substream on which a reply is expected to be sent.
48pub struct ReplySubstream<T> {
49    inner: T,
50}
51
52impl<T> fmt::Debug for ReplySubstream<T> {
53    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
54        f.debug_tuple("ReplySubstream").finish()
55    }
56}
57
58impl<T> ReplySubstream<T>
59where
60    T: AsyncWrite + Unpin
61{
62    /// Sends back the requested information on the substream.
63    ///
64    /// Consumes the substream, returning a `ReplyFuture` that resolves
65    /// when the reply has been sent on the underlying connection.
66    pub fn send(mut self, info: IdentifyInfo, observed_addr: &Multiaddr)
67        -> impl Future<Output = Result<(), io::Error>>
68    {
69        debug!("Sending identify info to client");
70        trace!("Sending: {:?}", info);
71
72        let listen_addrs = info.listen_addrs
73            .into_iter()
74            .map(|addr| addr.to_vec())
75            .collect();
76
77        let pubkey_bytes = info.public_key.into_protobuf_encoding();
78
79        let message = structs_proto::Identify {
80            agent_version: Some(info.agent_version),
81            protocol_version: Some(info.protocol_version),
82            public_key: Some(pubkey_bytes),
83            listen_addrs: listen_addrs,
84            observed_addr: Some(observed_addr.to_vec()),
85            protocols: info.protocols
86        };
87
88        async move {
89            let mut bytes = Vec::with_capacity(message.encoded_len());
90            message.encode(&mut bytes).expect("Vec<u8> provides capacity as needed");
91            upgrade::write_one(&mut self.inner, &bytes).await
92        }
93    }
94}
95
96/// Information of a peer sent in `Identify` protocol responses.
97#[derive(Debug, Clone)]
98pub struct IdentifyInfo {
99    /// The public key underlying the peer's `PeerId`.
100    pub public_key: PublicKey,
101    /// Version of the protocol family used by the peer, e.g. `ipfs/1.0.0`
102    /// or `polkadot/1.0.0`.
103    pub protocol_version: String,
104    /// Name and version of the peer, similar to the `User-Agent` header in
105    /// the HTTP protocol.
106    pub agent_version: String,
107    /// The addresses that the peer is listening on.
108    pub listen_addrs: Vec<Multiaddr>,
109    /// The list of protocols supported by the peer, e.g. `/ipfs/ping/1.0.0`.
110    pub protocols: Vec<String>,
111}
112
113impl UpgradeInfo for IdentifyProtocolConfig {
114    type Info = &'static [u8];
115    type InfoIter = iter::Once<Self::Info>;
116
117    fn protocol_info(&self) -> Self::InfoIter {
118        iter::once(b"/ipfs/id/1.0.0")
119    }
120}
121
122impl<C> InboundUpgrade<C> for IdentifyProtocolConfig
123where
124    C: AsyncRead + AsyncWrite + Unpin,
125{
126    type Output = ReplySubstream<C>;
127    type Error = io::Error;
128    type Future = future::Ready<Result<Self::Output, io::Error>>;
129
130    fn upgrade_inbound(self, socket: C, _: Self::Info) -> Self::Future {
131        trace!("Upgrading inbound connection");
132        future::ok(ReplySubstream { inner: socket })
133    }
134}
135
136impl<C> OutboundUpgrade<C> for IdentifyProtocolConfig
137where
138    C: AsyncRead + AsyncWrite + Unpin + Send + 'static,
139{
140    type Output = RemoteInfo;
141    type Error = upgrade::ReadOneError;
142    type Future = Pin<Box<dyn Future<Output = Result<Self::Output, Self::Error>> + Send>>;
143
144    fn upgrade_outbound(self, mut socket: C, _: Self::Info) -> Self::Future {
145        Box::pin(async move {
146            socket.close().await?;
147            let msg = upgrade::read_one(&mut socket, 4096).await?;
148            let (info, observed_addr) = match parse_proto_msg(msg) {
149                Ok(v) => v,
150                Err(err) => {
151                    debug!("Failed to parse protobuf message; error = {:?}", err);
152                    return Err(err.into())
153                }
154            };
155
156            trace!("Remote observes us as {:?}", observed_addr);
157            trace!("Information received: {:?}", info);
158
159            Ok(RemoteInfo {
160                info,
161                observed_addr: observed_addr.clone(),
162                _priv: ()
163            })
164        })
165    }
166}
167
168// Turns a protobuf message into an `IdentifyInfo` and an observed address. If something bad
169// happens, turn it into an `io::Error`.
170fn parse_proto_msg(msg: impl AsRef<[u8]>) -> Result<(IdentifyInfo, Multiaddr), io::Error> {
171    match structs_proto::Identify::decode(msg.as_ref()) {
172        Ok(msg) => {
173            // Turn a `Vec<u8>` into a `Multiaddr`. If something bad happens, turn it into
174            // an `io::Error`.
175            fn bytes_to_multiaddr(bytes: Vec<u8>) -> Result<Multiaddr, io::Error> {
176                Multiaddr::try_from(bytes)
177                    .map_err(|err| io::Error::new(io::ErrorKind::InvalidData, err))
178            }
179
180            let listen_addrs = {
181                let mut addrs = Vec::new();
182                for addr in msg.listen_addrs.into_iter() {
183                    addrs.push(bytes_to_multiaddr(addr)?);
184                }
185                addrs
186            };
187
188            let public_key = PublicKey::from_protobuf_encoding(&msg.public_key.unwrap_or_default())
189                .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))?;
190
191            let observed_addr = bytes_to_multiaddr(msg.observed_addr.unwrap_or_default())?;
192            let info = IdentifyInfo {
193                public_key,
194                protocol_version: msg.protocol_version.unwrap_or_default(),
195                agent_version: msg.agent_version.unwrap_or_default(),
196                listen_addrs,
197                protocols: msg.protocols
198            };
199
200            Ok((info, observed_addr))
201        }
202
203        Err(err) => Err(io::Error::new(io::ErrorKind::InvalidData, err)),
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use crate::protocol::{IdentifyInfo, RemoteInfo, IdentifyProtocolConfig};
210    use tet_libp2p_tcp::TcpConfig;
211    use futures::{prelude::*, channel::oneshot};
212    use tet_libp2p_core::{
213        identity,
214        Transport,
215        upgrade::{self, apply_outbound, apply_inbound}
216    };
217
218    #[test]
219    fn correct_transfer() {
220        // We open a server and a client, send info from the server to the client, and check that
221        // they were successfully received.
222        let send_pubkey = identity::Keypair::generate_ed25519().public();
223        let recv_pubkey = send_pubkey.clone();
224
225        let (tx, rx) = oneshot::channel();
226
227        let bg_task = async_std::task::spawn(async move {
228            let transport = TcpConfig::new();
229
230            let mut listener = transport
231                .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
232                .unwrap();
233
234            let addr = listener.next().await
235                .expect("some event")
236                .expect("no error")
237                .into_new_address()
238                .expect("listen address");
239            tx.send(addr).unwrap();
240
241            let socket = listener.next().await.unwrap().unwrap().into_upgrade().unwrap().0.await.unwrap();
242            let sender = apply_inbound(socket, IdentifyProtocolConfig).await.unwrap();
243            sender.send(
244                IdentifyInfo {
245                    public_key: send_pubkey,
246                    protocol_version: "proto_version".to_owned(),
247                    agent_version: "agent_version".to_owned(),
248                    listen_addrs: vec![
249                        "/ip4/80.81.82.83/tcp/500".parse().unwrap(),
250                        "/ip6/::1/udp/1000".parse().unwrap(),
251                    ],
252                    protocols: vec!["proto1".to_string(), "proto2".to_string()],
253                },
254                &"/ip4/100.101.102.103/tcp/5000".parse().unwrap(),
255            ).await.unwrap();
256        });
257
258        async_std::task::block_on(async move {
259            let transport = TcpConfig::new();
260
261            let socket = transport.dial(rx.await.unwrap()).unwrap().await.unwrap();
262            let RemoteInfo { info, observed_addr, .. } =
263                apply_outbound(socket, IdentifyProtocolConfig, upgrade::Version::V1).await.unwrap();
264            assert_eq!(observed_addr, "/ip4/100.101.102.103/tcp/5000".parse().unwrap());
265            assert_eq!(info.public_key, recv_pubkey);
266            assert_eq!(info.protocol_version, "proto_version");
267            assert_eq!(info.agent_version, "agent_version");
268            assert_eq!(info.listen_addrs,
269                &["/ip4/80.81.82.83/tcp/500".parse().unwrap(),
270                "/ip6/::1/udp/1000".parse().unwrap()]);
271            assert_eq!(info.protocols, &["proto1".to_string(), "proto2".to_string()]);
272
273            bg_task.await;
274        });
275    }
276}