libp2p_kad/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The Kademlia connection protocol upgrade and associated message types.
22//!
23//! The connection protocol upgrade is provided by [`KademliaProtocolConfig`], with the
24//! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively.
25//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
26//! to poll the underlying transport for incoming messages, and the `Sink` component
27//! is used to send messages to remote peers.
28
29use bytes::BytesMut;
30use codec::UviBytes;
31use crate::dht_proto as proto;
32use crate::record::{self, Record};
33use futures::prelude::*;
34use asynchronous_codec::Framed;
35use libp2p_core::{Multiaddr, PeerId};
36use libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
37use prost::Message;
38use std::{borrow::Cow, convert::TryFrom, time::Duration};
39use std::{io, iter};
40use unsigned_varint::codec;
41use wasm_timer::Instant;
42
43use derivative::Derivative;
44use libp2p_core::identity::PublicKey;
45use trust_graph::{Certificate, Trust};
46
47/// The protocol name used for negotiating with multistream-select.
48pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
49
50/// The default maximum size for a varint length-delimited packet.
51pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
52
53/// Status of our connection to a node reported by the Kademlia protocol.
54#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
55pub enum KadConnectionType {
56    /// Sender hasn't tried to connect to peer.
57    NotConnected = 0,
58    /// Sender is currently connected to peer.
59    Connected = 1,
60    /// Sender was recently connected to peer.
61    CanConnect = 2,
62    /// Sender tried to connect to peer but failed.
63    CannotConnect = 3,
64}
65
66impl From<proto::message::ConnectionType> for KadConnectionType {
67    fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
68        use proto::message::ConnectionType::*;
69        match raw {
70            NotConnected => KadConnectionType::NotConnected,
71            Connected => KadConnectionType::Connected,
72            CanConnect => KadConnectionType::CanConnect,
73            CannotConnect => KadConnectionType::CannotConnect,
74        }
75    }
76}
77
78impl Into<proto::message::ConnectionType> for KadConnectionType {
79    fn into(self) -> proto::message::ConnectionType {
80        use proto::message::ConnectionType::*;
81        match self {
82            KadConnectionType::NotConnected => NotConnected,
83            KadConnectionType::Connected => Connected,
84            KadConnectionType::CanConnect => CanConnect,
85            KadConnectionType::CannotConnect => CannotConnect,
86        }
87    }
88}
89
90/// Information about a peer, as known by the sender.
91#[derive(Derivative)]
92#[derivative(Debug, Clone, PartialEq, Eq)]
93pub struct KadPeer {
94    #[derivative(Debug="ignore")]
95    pub public_key: PublicKey,
96    /// Identifier of the peer.
97    pub node_id: PeerId,
98    /// The multiaddresses that the sender think can be used in order to reach the peer.
99    pub multiaddrs: Vec<Multiaddr>,
100    /// How the sender is connected to that remote.
101    pub connection_ty: KadConnectionType,
102    pub certificates: Vec<Certificate>,
103}
104
105// Builds a `KadPeer` from a corresponding protobuf message.
106impl TryFrom<proto::message::Peer> for KadPeer {
107    type Error = io::Error;
108
109    fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
110        // TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
111        //       as a special case here
112        let node_id = PeerId::from_bytes(&peer.id)
113            .map_err(|_| invalid_data("invalid peer id"))?;
114
115        let mut addrs = Vec::with_capacity(peer.addrs.len());
116        for addr in peer.addrs.into_iter() {
117            let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
118            addrs.push(as_ma);
119        }
120        debug_assert_eq!(addrs.len(), addrs.capacity());
121
122        let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
123            .ok_or_else(|| invalid_data("unknown connection type"))?
124            .into();
125
126        let public_key = PublicKey::from_protobuf_encoding(peer.public_key.as_slice())
127            .map_err(|e|
128                invalid_data(format!("invalid public key: {}", e).as_str())
129            )?;
130
131
132        let mut certificates = Vec::with_capacity(peer.certificates.len());
133        for cert in peer.certificates.into_iter() {
134            let mut chain = Vec::with_capacity(cert.chain.len());
135            for trust in cert.chain.into_iter() {
136                let issued_for = fluence_identity::PublicKey::decode(&trust.issued_for)
137                    .map_err(|e|
138                        invalid_data(format!("invalid issued_for: {}", e).as_str())
139                    )?;
140                let expires_at: Duration = Duration::from_secs(trust.expires_at_secs);
141                let issued_at: Duration = Duration::from_secs(trust.issued_at_secs);
142                let signature = fluence_identity::Signature::decode(trust.signature)
143                    .map_err(|e|
144                        invalid_data(format!("invalid signature: {}", e).as_str())
145                    )?;
146
147                let trust = Trust::new(issued_for, expires_at, issued_at, signature);
148                chain.push(trust);
149            }
150            certificates.push(Certificate::new_unverified(chain));
151        }
152
153        Ok(KadPeer {
154            public_key,
155            node_id,
156            multiaddrs: addrs,
157            connection_ty,
158            certificates
159        })
160    }
161}
162
163impl Into<proto::message::Peer> for KadPeer {
164    fn into(self) -> proto::message::Peer {
165        let certificates = self.certificates.into_iter().map(|cert|
166            proto::Certificate {
167                chain: cert.chain.into_iter().map(|trust| {
168                    proto::Trust {
169                        issued_for: trust.issued_for.encode(),
170                        expires_at_secs: trust.expires_at.as_secs(),
171                        signature: trust.signature.encode(),
172                        issued_at_secs: trust.issued_at.as_secs(),
173                    }
174                }).collect(),
175            }
176        ).collect();
177
178        proto::message::Peer {
179            id: self.node_id.to_bytes(),
180            addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
181            connection: {
182                let ct: proto::message::ConnectionType = self.connection_ty.into();
183                ct as i32
184            },
185            public_key: self.public_key.into_protobuf_encoding(),
186            certificates
187        }
188    }
189}
190
191/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this
192/// connection into a `Stream + Sink` whose items are of type `KadRequestMsg` and `KadResponseMsg`.
193// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does
194//       only one request, then we can change the output of the `InboundUpgrade` and
195//       `OutboundUpgrade` to be just a single message
196#[derive(Debug, Clone)]
197pub struct KademliaProtocolConfig {
198    protocol_name: Cow<'static, [u8]>,
199    /// Maximum allowed size of a packet.
200    max_packet_size: usize,
201}
202
203impl KademliaProtocolConfig {
204    /// Returns the configured protocol name.
205    pub fn protocol_name(&self) -> &[u8] {
206        &self.protocol_name
207    }
208
209    /// Modifies the protocol name used on the wire. Can be used to create incompatibilities
210    /// between networks on purpose.
211    pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
212        self.protocol_name = name.into();
213    }
214
215    /// Modifies the maximum allowed size of a single Kademlia packet.
216    pub fn set_max_packet_size(&mut self, size: usize) {
217        self.max_packet_size = size;
218    }
219}
220
221impl Default for KademliaProtocolConfig {
222    fn default() -> Self {
223        KademliaProtocolConfig {
224            protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
225            max_packet_size: DEFAULT_MAX_PACKET_SIZE,
226        }
227    }
228}
229
230impl UpgradeInfo for KademliaProtocolConfig {
231    type Info = Cow<'static, [u8]>;
232    type InfoIter = iter::Once<Self::Info>;
233
234    fn protocol_info(&self) -> Self::InfoIter {
235        iter::once(self.protocol_name.clone())
236    }
237}
238
239impl<C> InboundUpgrade<C> for KademliaProtocolConfig
240where
241    C: AsyncRead + AsyncWrite + Unpin,
242{
243    type Output = KadInStreamSink<C>;
244    type Future = future::Ready<Result<Self::Output, io::Error>>;
245    type Error = io::Error;
246
247    fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
248        let mut codec = UviBytes::default();
249        codec.set_max_len(self.max_packet_size);
250
251        future::ok(
252            Framed::new(incoming, codec)
253                .err_into()
254                .with::<_, _, fn(_) -> _, _>(|response| {
255                    let proto_struct = resp_msg_to_proto(response);
256                    let mut buf = Vec::with_capacity(proto_struct.encoded_len());
257                    proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
258                    future::ready(Ok(io::Cursor::new(buf)))
259                })
260                .and_then::<_, fn(_) -> _>(|bytes| {
261                    let request = match proto::Message::decode(bytes) {
262                        Ok(r) => r,
263                        Err(err) => return future::ready(Err(err.into()))
264                    };
265                    future::ready(proto_to_req_msg(request))
266                }),
267        )
268    }
269}
270
271impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
272where
273    C: AsyncRead + AsyncWrite + Unpin,
274{
275    type Output = KadOutStreamSink<C>;
276    type Future = future::Ready<Result<Self::Output, io::Error>>;
277    type Error = io::Error;
278
279    fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
280        let mut codec = UviBytes::default();
281        codec.set_max_len(self.max_packet_size);
282
283        future::ok(
284            Framed::new(incoming, codec)
285                .err_into()
286                .with::<_, _, fn(_) -> _, _>(|request| {
287                    let proto_struct = req_msg_to_proto(request);
288                    let mut buf = Vec::with_capacity(proto_struct.encoded_len());
289                    proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
290                    future::ready(Ok(io::Cursor::new(buf)))
291                })
292                .and_then::<_, fn(_) -> _>(|bytes| {
293                    let response = match proto::Message::decode(bytes) {
294                        Ok(r) => r,
295                        Err(err) => return future::ready(Err(err.into()))
296                    };
297                    future::ready(proto_to_resp_msg(response))
298                }),
299        )
300    }
301}
302
303/// Sink of responses and stream of requests.
304pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
305
306/// Sink of requests and stream of responses.
307pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
308
309pub type KadStreamSink<S, A, B> = stream::AndThen<
310    sink::With<
311        stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
312        io::Cursor<Vec<u8>>,
313        A,
314        future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
315        fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
316    >,
317    future::Ready<Result<B, io::Error>>,
318    fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
319>;
320
321/// Request that we can send to a peer or that we received from a peer.
322#[derive(Debug, Clone, PartialEq, Eq)]
323pub enum KadRequestMsg {
324    /// Ping request.
325    Ping,
326
327    /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
328    /// returned is not specified, but should be around 20.
329    FindNode {
330        /// The key for which to locate the closest nodes.
331        key: Vec<u8>,
332    },
333
334    /// Same as `FindNode`, but should also return the entries of the local providers list for
335    /// this key.
336    GetProviders {
337        /// Identifier being searched.
338        key: record::Key,
339    },
340
341    /// Indicates that this list of providers is known for this key.
342    AddProvider {
343        /// Key for which we should add providers.
344        key: record::Key,
345        /// Known provider for this key.
346        provider: KadPeer,
347    },
348
349    /// Request to get a value from the dht records.
350    GetValue {
351        /// The key we are searching for.
352        key: record::Key,
353    },
354
355    /// Request to put a value into the dht records.
356    PutValue {
357        record: Record,
358    }
359}
360
361/// Response that we can send to a peer or that we received from a peer.
362#[derive(Debug, Clone, PartialEq, Eq)]
363pub enum KadResponseMsg {
364    /// Ping response.
365    Pong,
366
367    /// Response to a `FindNode`.
368    FindNode {
369        /// Results of the request.
370        closer_peers: Vec<KadPeer>,
371    },
372
373    /// Response to a `GetProviders`.
374    GetProviders {
375        /// Nodes closest to the key.
376        closer_peers: Vec<KadPeer>,
377        /// Known providers for this key.
378        provider_peers: Vec<KadPeer>,
379    },
380
381    /// Response to a `GetValue`.
382    GetValue {
383        /// Result that might have been found
384        record: Option<Record>,
385        /// Nodes closest to the key
386        closer_peers: Vec<KadPeer>,
387    },
388
389    /// Response to a `PutValue`.
390    PutValue {
391        /// The key of the record.
392        key: record::Key,
393        /// Value of the record.
394        value: Vec<u8>,
395    },
396}
397
398/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
399fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
400    match kad_msg {
401        KadRequestMsg::Ping => proto::Message {
402            r#type: proto::message::MessageType::Ping as i32,
403            .. proto::Message::default()
404        },
405        KadRequestMsg::FindNode { key } => proto::Message {
406            r#type: proto::message::MessageType::FindNode as i32,
407            key,
408            cluster_level_raw: 10,
409            .. proto::Message::default()
410        },
411        KadRequestMsg::GetProviders { key } => proto::Message {
412            r#type: proto::message::MessageType::GetProviders as i32,
413            key: key.to_vec(),
414            cluster_level_raw: 10,
415            .. proto::Message::default()
416        },
417        KadRequestMsg::AddProvider { key, provider } => proto::Message {
418            r#type: proto::message::MessageType::AddProvider as i32,
419            cluster_level_raw: 10,
420            key: key.to_vec(),
421            provider_peers: vec![provider.into()],
422            .. proto::Message::default()
423        },
424        KadRequestMsg::GetValue { key } => proto::Message {
425            r#type: proto::message::MessageType::GetValue as i32,
426            cluster_level_raw: 10,
427            key: key.to_vec(),
428            .. proto::Message::default()
429        },
430        KadRequestMsg::PutValue { record } => proto::Message {
431            r#type: proto::message::MessageType::PutValue as i32,
432            record: Some(record_to_proto(record)),
433            .. proto::Message::default()
434        }
435    }
436}
437
438/// Converts a `KadResponseMsg` into the corresponding protobuf message for sending.
439fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
440    match kad_msg {
441        KadResponseMsg::Pong => proto::Message {
442            r#type: proto::message::MessageType::Ping as i32,
443            .. proto::Message::default()
444        },
445        KadResponseMsg::FindNode { closer_peers } => proto::Message {
446            r#type: proto::message::MessageType::FindNode as i32,
447            cluster_level_raw: 9,
448            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
449            .. proto::Message::default()
450        },
451        KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
452            r#type: proto::message::MessageType::GetProviders as i32,
453            cluster_level_raw: 9,
454            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
455            provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
456            .. proto::Message::default()
457        },
458        KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
459            r#type: proto::message::MessageType::GetValue as i32,
460            cluster_level_raw: 9,
461            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
462            record: record.map(record_to_proto),
463            .. proto::Message::default()
464        },
465        KadResponseMsg::PutValue { key, value } => proto::Message {
466            r#type: proto::message::MessageType::PutValue as i32,
467            key: key.to_vec(),
468            record: Some(proto::Record {
469                key: key.to_vec(),
470                value,
471                .. proto::Record::default()
472            }),
473            .. proto::Message::default()
474        }
475    }
476}
477
478/// Converts a received protobuf message into a corresponding `KadRequestMsg`.
479///
480/// Fails if the protobuf message is not a valid and supported Kademlia request message.
481fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
482    let msg_type = proto::message::MessageType::from_i32(message.r#type)
483        .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
484
485    match msg_type {
486        proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
487        proto::message::MessageType::PutValue => {
488            let record = record_from_proto(message.record.unwrap_or_default())?;
489            Ok(KadRequestMsg::PutValue { record })
490        }
491        proto::message::MessageType::GetValue => {
492            Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
493        }
494        proto::message::MessageType::FindNode => {
495            Ok(KadRequestMsg::FindNode { key: message.key })
496        }
497        proto::message::MessageType::GetProviders => {
498            Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
499        }
500        proto::message::MessageType::AddProvider => {
501            // TODO: for now we don't parse the peer properly, so it is possible that we get
502            //       parsing errors for peers even when they are valid; we ignore these
503            //       errors for now, but ultimately we should just error altogether
504            let provider = message.provider_peers
505                .into_iter()
506                .find_map(|peer| KadPeer::try_from(peer).ok());
507
508            if let Some(provider) = provider {
509                let key = record::Key::from(message.key);
510                Ok(KadRequestMsg::AddProvider { key, provider })
511            } else {
512                Err(invalid_data("AddProvider message with no valid peer."))
513            }
514        }
515    }
516}
517
518/// Converts a received protobuf message into a corresponding `KadResponseMessage`.
519///
520/// Fails if the protobuf message is not a valid and supported Kademlia response message.
521fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
522    let msg_type = proto::message::MessageType::from_i32(message.r#type)
523        .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
524
525    match msg_type {
526        proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
527        proto::message::MessageType::GetValue => {
528            let record =
529                if let Some(r) = message.record {
530                    Some(record_from_proto(r)?)
531                } else {
532                    None
533                };
534
535            let closer_peers = message.closer_peers.into_iter()
536                .filter_map(|peer| KadPeer::try_from(peer).ok())
537                .collect();
538
539            Ok(KadResponseMsg::GetValue { record, closer_peers })
540        }
541
542        proto::message::MessageType::FindNode => {
543            let closer_peers = message.closer_peers.into_iter()
544                .filter_map(|peer| KadPeer::try_from(peer).ok())
545                .collect();
546
547            Ok(KadResponseMsg::FindNode { closer_peers })
548        }
549
550        proto::message::MessageType::GetProviders => {
551            let closer_peers = message.closer_peers.into_iter()
552                .filter_map(|peer| KadPeer::try_from(peer).ok())
553                .collect();
554
555            let provider_peers = message.provider_peers.into_iter()
556                .filter_map(|peer| KadPeer::try_from(peer).ok())
557                .collect();
558
559            Ok(KadResponseMsg::GetProviders {
560                closer_peers,
561                provider_peers,
562            })
563        }
564
565        proto::message::MessageType::PutValue => {
566            let key = record::Key::from(message.key);
567            let rec = message.record.ok_or_else(|| {
568                invalid_data("received PutValue message with no record")
569            })?;
570
571            Ok(KadResponseMsg::PutValue {
572                key,
573                value: rec.value
574            })
575        }
576
577        proto::message::MessageType::AddProvider =>
578            Err(invalid_data("received an unexpected AddProvider message"))
579    }
580}
581
582pub fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
583    let key = record::Key::from(record.key);
584    let value = record.value;
585
586    let publisher =
587        if !record.publisher.is_empty() {
588            PeerId::from_bytes(&record.publisher)
589                .map(Some)
590                .map_err(|_| invalid_data("Invalid publisher peer ID."))?
591        } else {
592            None
593        };
594
595    let expires =
596        if record.ttl > 0 {
597            Some(Instant::now() + Duration::from_secs(record.ttl as u64))
598        } else {
599            None
600        };
601
602    Ok(Record { key, value, publisher, expires })
603}
604
605pub fn record_to_proto(record: Record) -> proto::Record {
606    proto::Record {
607        key: record.key.to_vec(),
608        value: record.value,
609        publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
610        ttl: record.expires
611            .map(|t| {
612                let now = Instant::now();
613                if t > now {
614                    (t - now).as_secs() as u32
615                } else {
616                    1 // because 0 means "does not expire"
617                }
618            })
619            .unwrap_or(0),
620        time_received: String::new()
621    }
622}
623
624/// Creates an `io::Error` with `io::ErrorKind::InvalidData`.
625fn invalid_data<E>(e: E) -> io::Error
626where
627    E: Into<Box<dyn std::error::Error + Send + Sync>>
628{
629    io::Error::new(io::ErrorKind::InvalidData, e)
630}
631
632#[cfg(test)]
633mod tests {
634
635    /*// TODO: restore
636    use self::libp2p_tcp::TcpConfig;
637    use self::tokio::runtime::current_thread::Runtime;
638    use futures::{Future, Sink, Stream};
639    use libp2p_core::{PeerId, PublicKey, Transport};
640    use multihash::{encode, Hash};
641    use protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig};
642    use std::sync::mpsc;
643    use std::thread;
644
645    #[test]
646    fn correct_transfer() {
647        // We open a server and a client, send a message between the two, and check that they were
648        // successfully received.
649
650        test_one(KadMsg::Ping);
651        test_one(KadMsg::FindNodeReq {
652            key: PeerId::random(),
653        });
654        test_one(KadMsg::FindNodeRes {
655            closer_peers: vec![KadPeer {
656                node_id: PeerId::random(),
657                multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
658                connection_ty: KadConnectionType::Connected,
659            }],
660        });
661        test_one(KadMsg::GetProvidersReq {
662            key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
663        });
664        test_one(KadMsg::GetProvidersRes {
665            closer_peers: vec![KadPeer {
666                node_id: PeerId::random(),
667                multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
668                connection_ty: KadConnectionType::Connected,
669            }],
670            provider_peers: vec![KadPeer {
671                node_id: PeerId::random(),
672                multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()],
673                connection_ty: KadConnectionType::NotConnected,
674            }],
675        });
676        test_one(KadMsg::AddProvider {
677            key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
678            provider_peer: KadPeer {
679                node_id: PeerId::random(),
680                multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()],
681                connection_ty: KadConnectionType::Connected,
682            },
683        });
684        // TODO: all messages
685
686        fn test_one(msg_server: KadMsg) {
687            let msg_client = msg_server.clone();
688            let (tx, rx) = mpsc::channel();
689
690            let bg_thread = thread::spawn(move || {
691                let transport = TcpConfig::new().with_upgrade(KademliaProtocolConfig);
692
693                let (listener, addr) = transport
694                    .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
695                    .unwrap();
696                tx.send(addr).unwrap();
697
698                let future = listener
699                    .into_future()
700                    .map_err(|(err, _)| err)
701                    .and_then(|(client, _)| client.unwrap().0)
702                    .and_then(|proto| proto.into_future().map_err(|(err, _)| err).map(|(v, _)| v))
703                    .map(|recv_msg| {
704                        assert_eq!(recv_msg.unwrap(), msg_server);
705                        ()
706                    });
707                let mut rt = Runtime::new().unwrap();
708                let _ = rt.block_on(future).unwrap();
709            });
710
711            let transport = TcpConfig::new().with_upgrade(KademliaProtocolConfig);
712
713            let future = transport
714                .dial(rx.recv().unwrap())
715                .unwrap()
716                .and_then(|proto| proto.send(msg_client))
717                .map(|_| ());
718            let mut rt = Runtime::new().unwrap();
719            let _ = rt.block_on(future).unwrap();
720            bg_thread.join().unwrap();
721        }
722    }*/
723}