ant_libp2p_kad/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The Kademlia connection protocol upgrade and associated message types.
22//!
23//! The connection protocol upgrade is provided by [`ProtocolConfig`], with the
24//! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively.
25//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
26//! to poll the underlying transport for incoming messages, and the `Sink` component
27//! is used to send messages to remote peers.
28
29use ant_libp2p_core as libp2p_core;
30use ant_libp2p_swarm as libp2p_swarm;
31
32use std::{io, iter, marker::PhantomData, time::Duration};
33
34use asynchronous_codec::{Decoder, Encoder, Framed};
35use bytes::BytesMut;
36use futures::prelude::*;
37use libp2p_core::{
38    upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo},
39    Multiaddr,
40};
41use libp2p_identity::PeerId;
42use libp2p_swarm::StreamProtocol;
43use tracing::debug;
44use web_time::Instant;
45
46use crate::{
47    proto,
48    record::{self, Record},
49};
50
51/// The protocol name used for negotiating with multistream-select.
52pub(crate) const DEFAULT_PROTO_NAME: StreamProtocol = StreamProtocol::new("/ipfs/kad/1.0.0");
53/// The default maximum size for a varint length-delimited packet.
54pub(crate) const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
55/// Status of our connection to a node reported by the Kademlia protocol.
56#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
57pub enum ConnectionType {
58    /// Sender hasn't tried to connect to peer.
59    NotConnected = 0,
60    /// Sender is currently connected to peer.
61    Connected = 1,
62    /// Sender was recently connected to peer.
63    CanConnect = 2,
64    /// Sender tried to connect to peer but failed.
65    CannotConnect = 3,
66}
67
68impl From<proto::ConnectionType> for ConnectionType {
69    fn from(raw: proto::ConnectionType) -> ConnectionType {
70        use proto::ConnectionType::*;
71        match raw {
72            NOT_CONNECTED => ConnectionType::NotConnected,
73            CONNECTED => ConnectionType::Connected,
74            CAN_CONNECT => ConnectionType::CanConnect,
75            CANNOT_CONNECT => ConnectionType::CannotConnect,
76        }
77    }
78}
79
80impl From<ConnectionType> for proto::ConnectionType {
81    fn from(val: ConnectionType) -> Self {
82        use proto::ConnectionType::*;
83        match val {
84            ConnectionType::NotConnected => NOT_CONNECTED,
85            ConnectionType::Connected => CONNECTED,
86            ConnectionType::CanConnect => CAN_CONNECT,
87            ConnectionType::CannotConnect => CANNOT_CONNECT,
88        }
89    }
90}
91
92/// Information about a peer, as known by the sender.
93#[derive(Debug, Clone, PartialEq, Eq)]
94pub struct KadPeer {
95    /// Identifier of the peer.
96    pub node_id: PeerId,
97    /// The multiaddresses that the sender think can be used in order to reach the peer.
98    pub multiaddrs: Vec<Multiaddr>,
99    /// How the sender is connected to that remote.
100    pub connection_ty: ConnectionType,
101}
102
103// Builds a `KadPeer` from a corresponding protobuf message.
104impl TryFrom<proto::Peer> for KadPeer {
105    type Error = io::Error;
106
107    fn try_from(peer: proto::Peer) -> Result<KadPeer, Self::Error> {
108        // TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
109        //       as a special case here
110        let node_id = PeerId::from_bytes(&peer.id).map_err(|_| invalid_data("invalid peer id"))?;
111
112        let mut addrs = Vec::with_capacity(peer.addrs.len());
113        for addr in peer.addrs.into_iter() {
114            match Multiaddr::try_from(addr).map(|addr| addr.with_p2p(node_id)) {
115                Ok(Ok(a)) => addrs.push(a),
116                Ok(Err(a)) => {
117                    debug!("Unable to parse multiaddr: {a} is not compatible with {node_id}")
118                }
119                Err(e) => debug!("Unable to parse multiaddr: {e}"),
120            };
121        }
122
123        Ok(KadPeer {
124            node_id,
125            multiaddrs: addrs,
126            connection_ty: peer.connection.into(),
127        })
128    }
129}
130
131impl From<KadPeer> for proto::Peer {
132    fn from(peer: KadPeer) -> Self {
133        proto::Peer {
134            id: peer.node_id.to_bytes(),
135            addrs: peer.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
136            connection: peer.connection_ty.into(),
137        }
138    }
139}
140
141/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this
142/// connection into a `Stream + Sink` whose items are of type `KadRequestMsg` and `KadResponseMsg`.
143// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does
144//       only one request, then we can change the output of the `InboundUpgrade` and
145//       `OutboundUpgrade` to be just a single message
146#[derive(Debug, Clone)]
147pub struct ProtocolConfig {
148    protocol_names: Vec<StreamProtocol>,
149    /// Maximum allowed size of a packet.
150    max_packet_size: usize,
151}
152
153impl ProtocolConfig {
154    /// Builds a new `ProtocolConfig` with the given protocol name.
155    pub fn new(protocol_name: StreamProtocol) -> Self {
156        ProtocolConfig {
157            protocol_names: vec![protocol_name],
158            max_packet_size: DEFAULT_MAX_PACKET_SIZE,
159        }
160    }
161
162    /// Returns the default configuration.
163    #[deprecated(note = "Use `ProtocolConfig::new` instead")]
164    #[allow(clippy::should_implement_trait)]
165    pub fn default() -> Self {
166        Default::default()
167    }
168
169    /// Returns the configured protocol name.
170    pub fn protocol_names(&self) -> &[StreamProtocol] {
171        &self.protocol_names
172    }
173
174    /// Modifies the protocol names used on the wire. Can be used to create incompatibilities
175    /// between networks on purpose.
176    #[deprecated(note = "Use `ProtocolConfig::new` instead")]
177    pub fn set_protocol_names(&mut self, names: Vec<StreamProtocol>) {
178        self.protocol_names = names;
179    }
180
181    /// Modifies the maximum allowed size of a single Kademlia packet.
182    pub fn set_max_packet_size(&mut self, size: usize) {
183        self.max_packet_size = size;
184    }
185}
186
187impl Default for ProtocolConfig {
188    /// Returns the default configuration.
189    ///
190    /// Deprecated: use `ProtocolConfig::new` instead.
191    fn default() -> Self {
192        ProtocolConfig {
193            protocol_names: iter::once(DEFAULT_PROTO_NAME).collect(),
194            max_packet_size: DEFAULT_MAX_PACKET_SIZE,
195        }
196    }
197}
198
199impl UpgradeInfo for ProtocolConfig {
200    type Info = StreamProtocol;
201    type InfoIter = std::vec::IntoIter<Self::Info>;
202
203    fn protocol_info(&self) -> Self::InfoIter {
204        self.protocol_names.clone().into_iter()
205    }
206}
207
208/// Codec for Kademlia inbound and outbound message framing.
209pub struct Codec<A, B> {
210    codec: quick_protobuf_codec::Codec<proto::Message>,
211    __phantom: PhantomData<(A, B)>,
212}
213impl<A, B> Codec<A, B> {
214    fn new(max_packet_size: usize) -> Self {
215        Codec {
216            codec: quick_protobuf_codec::Codec::new(max_packet_size),
217            __phantom: PhantomData,
218        }
219    }
220}
221
222impl<A: Into<proto::Message>, B> Encoder for Codec<A, B> {
223    type Error = io::Error;
224    type Item<'a> = A;
225
226    fn encode(&mut self, item: Self::Item<'_>, dst: &mut BytesMut) -> Result<(), Self::Error> {
227        Ok(self.codec.encode(item.into(), dst)?)
228    }
229}
230impl<A, B: TryFrom<proto::Message, Error = io::Error>> Decoder for Codec<A, B> {
231    type Error = io::Error;
232    type Item = B;
233
234    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
235        self.codec.decode(src)?.map(B::try_from).transpose()
236    }
237}
238
239/// Sink of responses and stream of requests.
240pub(crate) type KadInStreamSink<S> = Framed<S, Codec<KadResponseMsg, KadRequestMsg>>;
241/// Sink of requests and stream of responses.
242pub(crate) type KadOutStreamSink<S> = Framed<S, Codec<KadRequestMsg, KadResponseMsg>>;
243
244impl<C> InboundUpgrade<C> for ProtocolConfig
245where
246    C: AsyncRead + AsyncWrite + Unpin,
247{
248    type Output = KadInStreamSink<C>;
249    type Future = future::Ready<Result<Self::Output, io::Error>>;
250    type Error = io::Error;
251
252    fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
253        let codec = Codec::new(self.max_packet_size);
254
255        future::ok(Framed::new(incoming, codec))
256    }
257}
258
259impl<C> OutboundUpgrade<C> for ProtocolConfig
260where
261    C: AsyncRead + AsyncWrite + Unpin,
262{
263    type Output = KadOutStreamSink<C>;
264    type Future = future::Ready<Result<Self::Output, io::Error>>;
265    type Error = io::Error;
266
267    fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
268        let codec = Codec::new(self.max_packet_size);
269
270        future::ok(Framed::new(incoming, codec))
271    }
272}
273
274/// Request that we can send to a peer or that we received from a peer.
275#[derive(Debug, Clone, PartialEq, Eq)]
276pub enum KadRequestMsg {
277    /// Ping request.
278    Ping,
279
280    /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
281    /// returned is not specified, but should be around 20.
282    FindNode {
283        /// The key for which to locate the closest nodes.
284        key: Vec<u8>,
285    },
286
287    /// Same as `FindNode`, but should also return the entries of the local providers list for
288    /// this key.
289    GetProviders {
290        /// Identifier being searched.
291        key: record::Key,
292    },
293
294    /// Indicates that this list of providers is known for this key.
295    AddProvider {
296        /// Key for which we should add providers.
297        key: record::Key,
298        /// Known provider for this key.
299        provider: KadPeer,
300    },
301
302    /// Request to get a value from the dht records.
303    GetValue {
304        /// The key we are searching for.
305        key: record::Key,
306    },
307
308    /// Request to put a value into the dht records.
309    PutValue { record: Record },
310}
311
312/// Response that we can send to a peer or that we received from a peer.
313#[derive(Debug, Clone, PartialEq, Eq)]
314pub enum KadResponseMsg {
315    /// Ping response.
316    Pong,
317
318    /// Response to a `FindNode`.
319    FindNode {
320        /// Results of the request.
321        closer_peers: Vec<KadPeer>,
322    },
323
324    /// Response to a `GetProviders`.
325    GetProviders {
326        /// Nodes closest to the key.
327        closer_peers: Vec<KadPeer>,
328        /// Known providers for this key.
329        provider_peers: Vec<KadPeer>,
330    },
331
332    /// Response to a `GetValue`.
333    GetValue {
334        /// Result that might have been found
335        record: Option<Record>,
336        /// Nodes closest to the key
337        closer_peers: Vec<KadPeer>,
338    },
339
340    /// Response to a `PutValue`.
341    PutValue {
342        /// The key of the record.
343        key: record::Key,
344        /// Value of the record.
345        value: Vec<u8>,
346    },
347}
348
349impl From<KadRequestMsg> for proto::Message {
350    fn from(kad_msg: KadRequestMsg) -> Self {
351        req_msg_to_proto(kad_msg)
352    }
353}
354impl From<KadResponseMsg> for proto::Message {
355    fn from(kad_msg: KadResponseMsg) -> Self {
356        resp_msg_to_proto(kad_msg)
357    }
358}
359impl TryFrom<proto::Message> for KadRequestMsg {
360    type Error = io::Error;
361
362    fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
363        proto_to_req_msg(message)
364    }
365}
366impl TryFrom<proto::Message> for KadResponseMsg {
367    type Error = io::Error;
368
369    fn try_from(message: proto::Message) -> Result<Self, Self::Error> {
370        proto_to_resp_msg(message)
371    }
372}
373
374/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
375fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
376    match kad_msg {
377        KadRequestMsg::Ping => proto::Message {
378            type_pb: proto::MessageType::PING,
379            ..proto::Message::default()
380        },
381        KadRequestMsg::FindNode { key } => proto::Message {
382            type_pb: proto::MessageType::FIND_NODE,
383            key,
384            clusterLevelRaw: 10,
385            ..proto::Message::default()
386        },
387        KadRequestMsg::GetProviders { key } => proto::Message {
388            type_pb: proto::MessageType::GET_PROVIDERS,
389            key: key.to_vec(),
390            clusterLevelRaw: 10,
391            ..proto::Message::default()
392        },
393        KadRequestMsg::AddProvider { key, provider } => proto::Message {
394            type_pb: proto::MessageType::ADD_PROVIDER,
395            clusterLevelRaw: 10,
396            key: key.to_vec(),
397            providerPeers: vec![provider.into()],
398            ..proto::Message::default()
399        },
400        KadRequestMsg::GetValue { key } => proto::Message {
401            type_pb: proto::MessageType::GET_VALUE,
402            clusterLevelRaw: 10,
403            key: key.to_vec(),
404            ..proto::Message::default()
405        },
406        KadRequestMsg::PutValue { record } => proto::Message {
407            type_pb: proto::MessageType::PUT_VALUE,
408            key: record.key.to_vec(),
409            record: Some(record_to_proto(record)),
410            ..proto::Message::default()
411        },
412    }
413}
414
415/// Converts a `KadResponseMsg` into the corresponding protobuf message for sending.
416fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
417    match kad_msg {
418        KadResponseMsg::Pong => proto::Message {
419            type_pb: proto::MessageType::PING,
420            ..proto::Message::default()
421        },
422        KadResponseMsg::FindNode { closer_peers } => proto::Message {
423            type_pb: proto::MessageType::FIND_NODE,
424            clusterLevelRaw: 9,
425            closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
426            ..proto::Message::default()
427        },
428        KadResponseMsg::GetProviders {
429            closer_peers,
430            provider_peers,
431        } => proto::Message {
432            type_pb: proto::MessageType::GET_PROVIDERS,
433            clusterLevelRaw: 9,
434            closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
435            providerPeers: provider_peers.into_iter().map(KadPeer::into).collect(),
436            ..proto::Message::default()
437        },
438        KadResponseMsg::GetValue {
439            record,
440            closer_peers,
441        } => proto::Message {
442            type_pb: proto::MessageType::GET_VALUE,
443            clusterLevelRaw: 9,
444            closerPeers: closer_peers.into_iter().map(KadPeer::into).collect(),
445            record: record.map(record_to_proto),
446            ..proto::Message::default()
447        },
448        KadResponseMsg::PutValue { key, value } => proto::Message {
449            type_pb: proto::MessageType::PUT_VALUE,
450            key: key.to_vec(),
451            record: Some(proto::Record {
452                key: key.to_vec(),
453                value,
454                ..proto::Record::default()
455            }),
456            ..proto::Message::default()
457        },
458    }
459}
460
461/// Converts a received protobuf message into a corresponding `KadRequestMsg`.
462///
463/// Fails if the protobuf message is not a valid and supported Kademlia request message.
464fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
465    match message.type_pb {
466        proto::MessageType::PING => Ok(KadRequestMsg::Ping),
467        proto::MessageType::PUT_VALUE => {
468            let record = record_from_proto(message.record.unwrap_or_default())?;
469            Ok(KadRequestMsg::PutValue { record })
470        }
471        proto::MessageType::GET_VALUE => Ok(KadRequestMsg::GetValue {
472            key: record::Key::from(message.key),
473        }),
474        proto::MessageType::FIND_NODE => Ok(KadRequestMsg::FindNode { key: message.key }),
475        proto::MessageType::GET_PROVIDERS => Ok(KadRequestMsg::GetProviders {
476            key: record::Key::from(message.key),
477        }),
478        proto::MessageType::ADD_PROVIDER => {
479            // TODO: for now we don't parse the peer properly, so it is possible that we get
480            //       parsing errors for peers even when they are valid; we ignore these
481            //       errors for now, but ultimately we should just error altogether
482            let provider = message
483                .providerPeers
484                .into_iter()
485                .find_map(|peer| KadPeer::try_from(peer).ok());
486
487            if let Some(provider) = provider {
488                let key = record::Key::from(message.key);
489                Ok(KadRequestMsg::AddProvider { key, provider })
490            } else {
491                Err(invalid_data("AddProvider message with no valid peer."))
492            }
493        }
494    }
495}
496
497/// Converts a received protobuf message into a corresponding `KadResponseMessage`.
498///
499/// Fails if the protobuf message is not a valid and supported Kademlia response message.
500fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
501    match message.type_pb {
502        proto::MessageType::PING => Ok(KadResponseMsg::Pong),
503        proto::MessageType::GET_VALUE => {
504            let record = if let Some(r) = message.record {
505                Some(record_from_proto(r)?)
506            } else {
507                None
508            };
509
510            let closer_peers = message
511                .closerPeers
512                .into_iter()
513                .filter_map(|peer| KadPeer::try_from(peer).ok())
514                .collect();
515
516            Ok(KadResponseMsg::GetValue {
517                record,
518                closer_peers,
519            })
520        }
521
522        proto::MessageType::FIND_NODE => {
523            let closer_peers = message
524                .closerPeers
525                .into_iter()
526                .filter_map(|peer| KadPeer::try_from(peer).ok())
527                .collect();
528
529            Ok(KadResponseMsg::FindNode { closer_peers })
530        }
531
532        proto::MessageType::GET_PROVIDERS => {
533            let closer_peers = message
534                .closerPeers
535                .into_iter()
536                .filter_map(|peer| KadPeer::try_from(peer).ok())
537                .collect();
538
539            let provider_peers = message
540                .providerPeers
541                .into_iter()
542                .filter_map(|peer| KadPeer::try_from(peer).ok())
543                .collect();
544
545            Ok(KadResponseMsg::GetProviders {
546                closer_peers,
547                provider_peers,
548            })
549        }
550
551        proto::MessageType::PUT_VALUE => {
552            let key = record::Key::from(message.key);
553            let rec = message
554                .record
555                .ok_or_else(|| invalid_data("received PutValue message with no record"))?;
556
557            Ok(KadResponseMsg::PutValue {
558                key,
559                value: rec.value,
560            })
561        }
562
563        proto::MessageType::ADD_PROVIDER => {
564            Err(invalid_data("received an unexpected AddProvider message"))
565        }
566    }
567}
568
569fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
570    let key = record::Key::from(record.key);
571    let value = record.value;
572
573    let publisher = if !record.publisher.is_empty() {
574        PeerId::from_bytes(&record.publisher)
575            .map(Some)
576            .map_err(|_| invalid_data("Invalid publisher peer ID."))?
577    } else {
578        None
579    };
580
581    let expires = if record.ttl > 0 {
582        Some(Instant::now() + Duration::from_secs(record.ttl as u64))
583    } else {
584        None
585    };
586
587    Ok(Record {
588        key,
589        value,
590        publisher,
591        expires,
592    })
593}
594
595fn record_to_proto(record: Record) -> proto::Record {
596    proto::Record {
597        key: record.key.to_vec(),
598        value: record.value,
599        publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
600        ttl: record
601            .expires
602            .map(|t| {
603                let now = Instant::now();
604                if t > now {
605                    (t - now).as_secs() as u32
606                } else {
607                    1 // because 0 means "does not expire"
608                }
609            })
610            .unwrap_or(0),
611        timeReceived: String::new(),
612    }
613}
614
615/// Creates an `io::Error` with `io::ErrorKind::InvalidData`.
616fn invalid_data<E>(e: E) -> io::Error
617where
618    E: Into<Box<dyn std::error::Error + Send + Sync>>,
619{
620    io::Error::new(io::ErrorKind::InvalidData, e)
621}
622
623#[cfg(test)]
624mod tests {
625    use super::*;
626
627    #[test]
628    fn append_p2p() {
629        let peer_id = PeerId::random();
630        let multiaddr = "/ip6/2001:db8::/tcp/1234".parse::<Multiaddr>().unwrap();
631
632        let payload = proto::Peer {
633            id: peer_id.to_bytes(),
634            addrs: vec![multiaddr.to_vec()],
635            connection: proto::ConnectionType::CAN_CONNECT,
636        };
637
638        let peer = KadPeer::try_from(payload).unwrap();
639
640        assert_eq!(peer.multiaddrs, vec![multiaddr.with_p2p(peer_id).unwrap()])
641    }
642
643    #[test]
644    fn skip_invalid_multiaddr() {
645        let peer_id = PeerId::random();
646        let multiaddr = "/ip6/2001:db8::/tcp/1234".parse::<Multiaddr>().unwrap();
647
648        let valid_multiaddr = multiaddr.clone().with_p2p(peer_id).unwrap();
649
650        let multiaddr_with_incorrect_peer_id = {
651            let other_peer_id = PeerId::random();
652            assert_ne!(peer_id, other_peer_id);
653            multiaddr.with_p2p(other_peer_id).unwrap()
654        };
655
656        let invalid_multiaddr = {
657            let a = vec![255; 8];
658            assert!(Multiaddr::try_from(a.clone()).is_err());
659            a
660        };
661
662        let payload = proto::Peer {
663            id: peer_id.to_bytes(),
664            addrs: vec![
665                valid_multiaddr.to_vec(),
666                multiaddr_with_incorrect_peer_id.to_vec(),
667                invalid_multiaddr,
668            ],
669            connection: proto::ConnectionType::CAN_CONNECT,
670        };
671
672        let peer = KadPeer::try_from(payload).unwrap();
673
674        assert_eq!(peer.multiaddrs, vec![valid_multiaddr])
675    }
676
677    // // TODO: restore
678    // use self::libp2p_tcp::TcpTransport;
679    // use self::tokio::runtime::current_thread::Runtime;
680    // use futures::{Future, Sink, Stream};
681    // use libp2p_core::{PeerId, PublicKey, Transport};
682    // use multihash::{encode, Hash};
683    // use protocol::{ConnectionType, KadPeer, ProtocolConfig};
684    // use std::sync::mpsc;
685    // use std::thread;
686    //
687    // #[test]
688    // fn correct_transfer() {
689    // We open a server and a client, send a message between the two, and check that they were
690    // successfully received.
691    //
692    // test_one(KadMsg::Ping);
693    // test_one(KadMsg::FindNodeReq {
694    // key: PeerId::random(),
695    // });
696    // test_one(KadMsg::FindNodeRes {
697    // closer_peers: vec![KadPeer {
698    // node_id: PeerId::random(),
699    // multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
700    // connection_ty: ConnectionType::Connected,
701    // }],
702    // });
703    // test_one(KadMsg::GetProvidersReq {
704    // key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
705    // });
706    // test_one(KadMsg::GetProvidersRes {
707    // closer_peers: vec![KadPeer {
708    // node_id: PeerId::random(),
709    // multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
710    // connection_ty: ConnectionType::Connected,
711    // }],
712    // provider_peers: vec![KadPeer {
713    // node_id: PeerId::random(),
714    // multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()],
715    // connection_ty: ConnectionType::NotConnected,
716    // }],
717    // });
718    // test_one(KadMsg::AddProvider {
719    // key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
720    // provider_peer: KadPeer {
721    // node_id: PeerId::random(),
722    // multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()],
723    // connection_ty: ConnectionType::Connected,
724    // },
725    // });
726    // TODO: all messages
727    //
728    // fn test_one(msg_server: KadMsg) {
729    // let msg_client = msg_server.clone();
730    // let (tx, rx) = mpsc::channel();
731    //
732    // let bg_thread = thread::spawn(move || {
733    // let transport = TcpTransport::default().with_upgrade(ProtocolConfig);
734    //
735    // let (listener, addr) = transport
736    // .listen_on( "/ip4/127.0.0.1/tcp/0".parse().unwrap())
737    // .unwrap();
738    // tx.send(addr).unwrap();
739    //
740    // let future = listener
741    // .into_future()
742    // .map_err(|(err, _)| err)
743    // .and_then(|(client, _)| client.unwrap().0)
744    // .and_then(|proto| proto.into_future().map_err(|(err, _)| err).map(|(v, _)| v))
745    // .map(|recv_msg| {
746    // assert_eq!(recv_msg.unwrap(), msg_server);
747    // ()
748    // });
749    // let mut rt = Runtime::new().unwrap();
750    // let _ = rt.block_on(future).unwrap();
751    // });
752    //
753    // let transport = TcpTransport::default().with_upgrade(ProtocolConfig);
754    //
755    // let future = transport
756    // .dial(rx.recv().unwrap())
757    // .unwrap()
758    // .and_then(|proto| proto.send(msg_client))
759    // .map(|_| ());
760    // let mut rt = Runtime::new().unwrap();
761    // let _ = rt.block_on(future).unwrap();
762    // bg_thread.join().unwrap();
763    // }
764    // }
765}