tet_libp2p_kad/
protocol.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! The Kademlia connection protocol upgrade and associated message types.
22//!
23//! The connection protocol upgrade is provided by [`KademliaProtocolConfig`], with the
24//! request and response types [`KadRequestMsg`] and [`KadResponseMsg`], respectively.
25//! The upgrade's output is a `Sink + Stream` of messages. The `Stream` component is used
26//! to poll the underlying transport for incoming messages, and the `Sink` component
27//! is used to send messages to remote peers.
28
29use bytes::BytesMut;
30use codec::UviBytes;
31use crate::dht_proto as proto;
32use crate::record::{self, Record};
33use futures::prelude::*;
34use asynchronous_codec::Framed;
35use tet_libp2p_core::{Multiaddr, PeerId};
36use tet_libp2p_core::upgrade::{InboundUpgrade, OutboundUpgrade, UpgradeInfo};
37use prost::Message;
38use std::{borrow::Cow, convert::TryFrom, time::Duration};
39use std::{io, iter};
40use unsigned_varint::codec;
41use wasm_timer::Instant;
42
43/// The protocol name used for negotiating with tet-multistream-select.
44pub const DEFAULT_PROTO_NAME: &[u8] = b"/ipfs/kad/1.0.0";
45
46/// The default maximum size for a varint length-delimited packet.
47pub const DEFAULT_MAX_PACKET_SIZE: usize = 16 * 1024;
48
49/// Status of our connection to a node reported by the Kademlia protocol.
50#[derive(Copy, Clone, PartialEq, Eq, Debug, Hash)]
51pub enum KadConnectionType {
52    /// Sender hasn't tried to connect to peer.
53    NotConnected = 0,
54    /// Sender is currently connected to peer.
55    Connected = 1,
56    /// Sender was recently connected to peer.
57    CanConnect = 2,
58    /// Sender tried to connect to peer but failed.
59    CannotConnect = 3,
60}
61
62impl From<proto::message::ConnectionType> for KadConnectionType {
63    fn from(raw: proto::message::ConnectionType) -> KadConnectionType {
64        use proto::message::ConnectionType::*;
65        match raw {
66            NotConnected => KadConnectionType::NotConnected,
67            Connected => KadConnectionType::Connected,
68            CanConnect => KadConnectionType::CanConnect,
69            CannotConnect => KadConnectionType::CannotConnect,
70        }
71    }
72}
73
74impl Into<proto::message::ConnectionType> for KadConnectionType {
75    fn into(self) -> proto::message::ConnectionType {
76        use proto::message::ConnectionType::*;
77        match self {
78            KadConnectionType::NotConnected => NotConnected,
79            KadConnectionType::Connected => Connected,
80            KadConnectionType::CanConnect => CanConnect,
81            KadConnectionType::CannotConnect => CannotConnect,
82        }
83    }
84}
85
86/// Information about a peer, as known by the sender.
87#[derive(Debug, Clone, PartialEq, Eq)]
88pub struct KadPeer {
89    /// Identifier of the peer.
90    pub node_id: PeerId,
91    /// The multiaddresses that the sender think can be used in order to reach the peer.
92    pub multiaddrs: Vec<Multiaddr>,
93    /// How the sender is connected to that remote.
94    pub connection_ty: KadConnectionType,
95}
96
97// Builds a `KadPeer` from a corresponding protobuf message.
98impl TryFrom<proto::message::Peer> for KadPeer {
99    type Error = io::Error;
100
101    fn try_from(peer: proto::message::Peer) -> Result<KadPeer, Self::Error> {
102        // TODO: this is in fact a CID; not sure if this should be handled in `from_bytes` or
103        //       as a special case here
104        let node_id = PeerId::from_bytes(&peer.id)
105            .map_err(|_| invalid_data("invalid peer id"))?;
106
107        let mut addrs = Vec::with_capacity(peer.addrs.len());
108        for addr in peer.addrs.into_iter() {
109            let as_ma = Multiaddr::try_from(addr).map_err(invalid_data)?;
110            addrs.push(as_ma);
111        }
112        debug_assert_eq!(addrs.len(), addrs.capacity());
113
114        let connection_ty = proto::message::ConnectionType::from_i32(peer.connection)
115            .ok_or_else(|| invalid_data("unknown connection type"))?
116            .into();
117
118        Ok(KadPeer {
119            node_id,
120            multiaddrs: addrs,
121            connection_ty
122        })
123    }
124}
125
126impl Into<proto::message::Peer> for KadPeer {
127    fn into(self) -> proto::message::Peer {
128        proto::message::Peer {
129            id: self.node_id.to_bytes(),
130            addrs: self.multiaddrs.into_iter().map(|a| a.to_vec()).collect(),
131            connection: {
132                let ct: proto::message::ConnectionType = self.connection_ty.into();
133                ct as i32
134            }
135        }
136    }
137}
138
139/// Configuration for a Kademlia connection upgrade. When applied to a connection, turns this
140/// connection into a `Stream + Sink` whose items are of type `KadRequestMsg` and `KadResponseMsg`.
141// TODO: if, as suspected, we can confirm with Protocol Labs that each open Kademlia substream does
142//       only one request, then we can change the output of the `InboundUpgrade` and
143//       `OutboundUpgrade` to be just a single message
144#[derive(Debug, Clone)]
145pub struct KademliaProtocolConfig {
146    protocol_name: Cow<'static, [u8]>,
147    /// Maximum allowed size of a packet.
148    max_packet_size: usize,
149}
150
151impl KademliaProtocolConfig {
152    /// Returns the configured protocol name.
153    pub fn protocol_name(&self) -> &[u8] {
154        &self.protocol_name
155    }
156
157    /// Modifies the protocol name used on the wire. Can be used to create incompatibilities
158    /// between networks on purpose.
159    pub fn set_protocol_name(&mut self, name: impl Into<Cow<'static, [u8]>>) {
160        self.protocol_name = name.into();
161    }
162
163    /// Modifies the maximum allowed size of a single Kademlia packet.
164    pub fn set_max_packet_size(&mut self, size: usize) {
165        self.max_packet_size = size;
166    }
167}
168
169impl Default for KademliaProtocolConfig {
170    fn default() -> Self {
171        KademliaProtocolConfig {
172            protocol_name: Cow::Borrowed(DEFAULT_PROTO_NAME),
173            max_packet_size: DEFAULT_MAX_PACKET_SIZE,
174        }
175    }
176}
177
178impl UpgradeInfo for KademliaProtocolConfig {
179    type Info = Cow<'static, [u8]>;
180    type InfoIter = iter::Once<Self::Info>;
181
182    fn protocol_info(&self) -> Self::InfoIter {
183        iter::once(self.protocol_name.clone())
184    }
185}
186
187impl<C> InboundUpgrade<C> for KademliaProtocolConfig
188where
189    C: AsyncRead + AsyncWrite + Unpin,
190{
191    type Output = KadInStreamSink<C>;
192    type Future = future::Ready<Result<Self::Output, io::Error>>;
193    type Error = io::Error;
194
195    fn upgrade_inbound(self, incoming: C, _: Self::Info) -> Self::Future {
196        let mut codec = UviBytes::default();
197        codec.set_max_len(self.max_packet_size);
198
199        future::ok(
200            Framed::new(incoming, codec)
201                .err_into()
202                .with::<_, _, fn(_) -> _, _>(|response| {
203                    let proto_struct = resp_msg_to_proto(response);
204                    let mut buf = Vec::with_capacity(proto_struct.encoded_len());
205                    proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
206                    future::ready(Ok(io::Cursor::new(buf)))
207                })
208                .and_then::<_, fn(_) -> _>(|bytes| {
209                    let request = match proto::Message::decode(bytes) {
210                        Ok(r) => r,
211                        Err(err) => return future::ready(Err(err.into()))
212                    };
213                    future::ready(proto_to_req_msg(request))
214                }),
215        )
216    }
217}
218
219impl<C> OutboundUpgrade<C> for KademliaProtocolConfig
220where
221    C: AsyncRead + AsyncWrite + Unpin,
222{
223    type Output = KadOutStreamSink<C>;
224    type Future = future::Ready<Result<Self::Output, io::Error>>;
225    type Error = io::Error;
226
227    fn upgrade_outbound(self, incoming: C, _: Self::Info) -> Self::Future {
228        let mut codec = UviBytes::default();
229        codec.set_max_len(self.max_packet_size);
230
231        future::ok(
232            Framed::new(incoming, codec)
233                .err_into()
234                .with::<_, _, fn(_) -> _, _>(|request| {
235                    let proto_struct = req_msg_to_proto(request);
236                    let mut buf = Vec::with_capacity(proto_struct.encoded_len());
237                    proto_struct.encode(&mut buf).expect("Vec<u8> provides capacity as needed");
238                    future::ready(Ok(io::Cursor::new(buf)))
239                })
240                .and_then::<_, fn(_) -> _>(|bytes| {
241                    let response = match proto::Message::decode(bytes) {
242                        Ok(r) => r,
243                        Err(err) => return future::ready(Err(err.into()))
244                    };
245                    future::ready(proto_to_resp_msg(response))
246                }),
247        )
248    }
249}
250
251/// Sink of responses and stream of requests.
252pub type KadInStreamSink<S> = KadStreamSink<S, KadResponseMsg, KadRequestMsg>;
253
254/// Sink of requests and stream of responses.
255pub type KadOutStreamSink<S> = KadStreamSink<S, KadRequestMsg, KadResponseMsg>;
256
257pub type KadStreamSink<S, A, B> = stream::AndThen<
258    sink::With<
259        stream::ErrInto<Framed<S, UviBytes<io::Cursor<Vec<u8>>>>, io::Error>,
260        io::Cursor<Vec<u8>>,
261        A,
262        future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
263        fn(A) -> future::Ready<Result<io::Cursor<Vec<u8>>, io::Error>>,
264    >,
265    future::Ready<Result<B, io::Error>>,
266    fn(BytesMut) -> future::Ready<Result<B, io::Error>>,
267>;
268
269/// Request that we can send to a peer or that we received from a peer.
270#[derive(Debug, Clone, PartialEq, Eq)]
271pub enum KadRequestMsg {
272    /// Ping request.
273    Ping,
274
275    /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
276    /// returned is not specified, but should be around 20.
277    FindNode {
278        /// The key for which to locate the closest nodes.
279        key: Vec<u8>,
280    },
281
282    /// Same as `FindNode`, but should also return the entries of the local providers list for
283    /// this key.
284    GetProviders {
285        /// Identifier being searched.
286        key: record::Key,
287    },
288
289    /// Indicates that this list of providers is known for this key.
290    AddProvider {
291        /// Key for which we should add providers.
292        key: record::Key,
293        /// Known provider for this key.
294        provider: KadPeer,
295    },
296
297    /// Request to get a value from the dht records.
298    GetValue {
299        /// The key we are searching for.
300        key: record::Key,
301    },
302
303    /// Request to put a value into the dht records.
304    PutValue {
305        record: Record,
306    }
307}
308
309/// Response that we can send to a peer or that we received from a peer.
310#[derive(Debug, Clone, PartialEq, Eq)]
311pub enum KadResponseMsg {
312    /// Ping response.
313    Pong,
314
315    /// Response to a `FindNode`.
316    FindNode {
317        /// Results of the request.
318        closer_peers: Vec<KadPeer>,
319    },
320
321    /// Response to a `GetProviders`.
322    GetProviders {
323        /// Nodes closest to the key.
324        closer_peers: Vec<KadPeer>,
325        /// Known providers for this key.
326        provider_peers: Vec<KadPeer>,
327    },
328
329    /// Response to a `GetValue`.
330    GetValue {
331        /// Result that might have been found
332        record: Option<Record>,
333        /// Nodes closest to the key
334        closer_peers: Vec<KadPeer>,
335    },
336
337    /// Response to a `PutValue`.
338    PutValue {
339        /// The key of the record.
340        key: record::Key,
341        /// Value of the record.
342        value: Vec<u8>,
343    },
344}
345
346/// Converts a `KadRequestMsg` into the corresponding protobuf message for sending.
347fn req_msg_to_proto(kad_msg: KadRequestMsg) -> proto::Message {
348    match kad_msg {
349        KadRequestMsg::Ping => proto::Message {
350            r#type: proto::message::MessageType::Ping as i32,
351            .. proto::Message::default()
352        },
353        KadRequestMsg::FindNode { key } => proto::Message {
354            r#type: proto::message::MessageType::FindNode as i32,
355            key,
356            cluster_level_raw: 10,
357            .. proto::Message::default()
358        },
359        KadRequestMsg::GetProviders { key } => proto::Message {
360            r#type: proto::message::MessageType::GetProviders as i32,
361            key: key.to_vec(),
362            cluster_level_raw: 10,
363            .. proto::Message::default()
364        },
365        KadRequestMsg::AddProvider { key, provider } => proto::Message {
366            r#type: proto::message::MessageType::AddProvider as i32,
367            cluster_level_raw: 10,
368            key: key.to_vec(),
369            provider_peers: vec![provider.into()],
370            .. proto::Message::default()
371        },
372        KadRequestMsg::GetValue { key } => proto::Message {
373            r#type: proto::message::MessageType::GetValue as i32,
374            cluster_level_raw: 10,
375            key: key.to_vec(),
376            .. proto::Message::default()
377        },
378        KadRequestMsg::PutValue { record } => proto::Message {
379            r#type: proto::message::MessageType::PutValue as i32,
380            record: Some(record_to_proto(record)),
381            .. proto::Message::default()
382        }
383    }
384}
385
386/// Converts a `KadResponseMsg` into the corresponding protobuf message for sending.
387fn resp_msg_to_proto(kad_msg: KadResponseMsg) -> proto::Message {
388    match kad_msg {
389        KadResponseMsg::Pong => proto::Message {
390            r#type: proto::message::MessageType::Ping as i32,
391            .. proto::Message::default()
392        },
393        KadResponseMsg::FindNode { closer_peers } => proto::Message {
394            r#type: proto::message::MessageType::FindNode as i32,
395            cluster_level_raw: 9,
396            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
397            .. proto::Message::default()
398        },
399        KadResponseMsg::GetProviders { closer_peers, provider_peers } => proto::Message {
400            r#type: proto::message::MessageType::GetProviders as i32,
401            cluster_level_raw: 9,
402            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
403            provider_peers: provider_peers.into_iter().map(KadPeer::into).collect(),
404            .. proto::Message::default()
405        },
406        KadResponseMsg::GetValue { record, closer_peers } => proto::Message {
407            r#type: proto::message::MessageType::GetValue as i32,
408            cluster_level_raw: 9,
409            closer_peers: closer_peers.into_iter().map(KadPeer::into).collect(),
410            record: record.map(record_to_proto),
411            .. proto::Message::default()
412        },
413        KadResponseMsg::PutValue { key, value } => proto::Message {
414            r#type: proto::message::MessageType::PutValue as i32,
415            key: key.to_vec(),
416            record: Some(proto::Record {
417                key: key.to_vec(),
418                value,
419                .. proto::Record::default()
420            }),
421            .. proto::Message::default()
422        }
423    }
424}
425
426/// Converts a received protobuf message into a corresponding `KadRequestMsg`.
427///
428/// Fails if the protobuf message is not a valid and supported Kademlia request message.
429fn proto_to_req_msg(message: proto::Message) -> Result<KadRequestMsg, io::Error> {
430    let msg_type = proto::message::MessageType::from_i32(message.r#type)
431        .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
432
433    match msg_type {
434        proto::message::MessageType::Ping => Ok(KadRequestMsg::Ping),
435        proto::message::MessageType::PutValue => {
436            let record = record_from_proto(message.record.unwrap_or_default())?;
437            Ok(KadRequestMsg::PutValue { record })
438        }
439        proto::message::MessageType::GetValue => {
440            Ok(KadRequestMsg::GetValue { key: record::Key::from(message.key) })
441        }
442        proto::message::MessageType::FindNode => {
443            Ok(KadRequestMsg::FindNode { key: message.key })
444        }
445        proto::message::MessageType::GetProviders => {
446            Ok(KadRequestMsg::GetProviders { key: record::Key::from(message.key)})
447        }
448        proto::message::MessageType::AddProvider => {
449            // TODO: for now we don't parse the peer properly, so it is possible that we get
450            //       parsing errors for peers even when they are valid; we ignore these
451            //       errors for now, but ultimately we should just error altogether
452            let provider = message.provider_peers
453                .into_iter()
454                .find_map(|peer| KadPeer::try_from(peer).ok());
455
456            if let Some(provider) = provider {
457                let key = record::Key::from(message.key);
458                Ok(KadRequestMsg::AddProvider { key, provider })
459            } else {
460                Err(invalid_data("AddProvider message with no valid peer."))
461            }
462        }
463    }
464}
465
466/// Converts a received protobuf message into a corresponding `KadResponseMessage`.
467///
468/// Fails if the protobuf message is not a valid and supported Kademlia response message.
469fn proto_to_resp_msg(message: proto::Message) -> Result<KadResponseMsg, io::Error> {
470    let msg_type = proto::message::MessageType::from_i32(message.r#type)
471        .ok_or_else(|| invalid_data(format!("unknown message type: {}", message.r#type)))?;
472
473    match msg_type {
474        proto::message::MessageType::Ping => Ok(KadResponseMsg::Pong),
475        proto::message::MessageType::GetValue => {
476            let record =
477                if let Some(r) = message.record {
478                    Some(record_from_proto(r)?)
479                } else {
480                    None
481                };
482
483            let closer_peers = message.closer_peers.into_iter()
484                .filter_map(|peer| KadPeer::try_from(peer).ok())
485                .collect();
486
487            Ok(KadResponseMsg::GetValue { record, closer_peers })
488        }
489
490        proto::message::MessageType::FindNode => {
491            let closer_peers = message.closer_peers.into_iter()
492                .filter_map(|peer| KadPeer::try_from(peer).ok())
493                .collect();
494
495            Ok(KadResponseMsg::FindNode { closer_peers })
496        }
497
498        proto::message::MessageType::GetProviders => {
499            let closer_peers = message.closer_peers.into_iter()
500                .filter_map(|peer| KadPeer::try_from(peer).ok())
501                .collect();
502
503            let provider_peers = message.provider_peers.into_iter()
504                .filter_map(|peer| KadPeer::try_from(peer).ok())
505                .collect();
506
507            Ok(KadResponseMsg::GetProviders {
508                closer_peers,
509                provider_peers,
510            })
511        }
512
513        proto::message::MessageType::PutValue => {
514            let key = record::Key::from(message.key);
515            let rec = message.record.ok_or_else(|| {
516                invalid_data("received PutValue message with no record")
517            })?;
518
519            Ok(KadResponseMsg::PutValue {
520                key,
521                value: rec.value
522            })
523        }
524
525        proto::message::MessageType::AddProvider =>
526            Err(invalid_data("received an unexpected AddProvider message"))
527    }
528}
529
530fn record_from_proto(record: proto::Record) -> Result<Record, io::Error> {
531    let key = record::Key::from(record.key);
532    let value = record.value;
533
534    let publisher =
535        if !record.publisher.is_empty() {
536            PeerId::from_bytes(&record.publisher)
537                .map(Some)
538                .map_err(|_| invalid_data("Invalid publisher peer ID."))?
539        } else {
540            None
541        };
542
543    let expires =
544        if record.ttl > 0 {
545            Some(Instant::now() + Duration::from_secs(record.ttl as u64))
546        } else {
547            None
548        };
549
550    Ok(Record { key, value, publisher, expires })
551}
552
553fn record_to_proto(record: Record) -> proto::Record {
554    proto::Record {
555        key: record.key.to_vec(),
556        value: record.value,
557        publisher: record.publisher.map(|id| id.to_bytes()).unwrap_or_default(),
558        ttl: record.expires
559            .map(|t| {
560                let now = Instant::now();
561                if t > now {
562                    (t - now).as_secs() as u32
563                } else {
564                    1 // because 0 means "does not expire"
565                }
566            })
567            .unwrap_or(0),
568        time_received: String::new()
569    }
570}
571
572/// Creates an `io::Error` with `io::ErrorKind::InvalidData`.
573fn invalid_data<E>(e: E) -> io::Error
574where
575    E: Into<Box<dyn std::error::Error + Send + Sync>>
576{
577    io::Error::new(io::ErrorKind::InvalidData, e)
578}
579
580#[cfg(test)]
581mod tests {
582
583    /*// TODO: restore
584    use self::tet_libp2p_tcp::TcpConfig;
585    use self::tokio::runtime::current_thread::Runtime;
586    use futures::{Future, Sink, Stream};
587    use tet_libp2p_core::{PeerId, PublicKey, Transport};
588    use multihash::{encode, Hash};
589    use protocol::{KadConnectionType, KadPeer, KademliaProtocolConfig};
590    use std::sync::mpsc;
591    use std::thread;
592
593    #[test]
594    fn correct_transfer() {
595        // We open a server and a client, send a message between the two, and check that they were
596        // successfully received.
597
598        test_one(KadMsg::Ping);
599        test_one(KadMsg::FindNodeReq {
600            key: PeerId::random(),
601        });
602        test_one(KadMsg::FindNodeRes {
603            closer_peers: vec![KadPeer {
604                node_id: PeerId::random(),
605                multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
606                connection_ty: KadConnectionType::Connected,
607            }],
608        });
609        test_one(KadMsg::GetProvidersReq {
610            key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
611        });
612        test_one(KadMsg::GetProvidersRes {
613            closer_peers: vec![KadPeer {
614                node_id: PeerId::random(),
615                multiaddrs: vec!["/ip4/100.101.102.103/tcp/20105".parse().unwrap()],
616                connection_ty: KadConnectionType::Connected,
617            }],
618            provider_peers: vec![KadPeer {
619                node_id: PeerId::random(),
620                multiaddrs: vec!["/ip4/200.201.202.203/tcp/1999".parse().unwrap()],
621                connection_ty: KadConnectionType::NotConnected,
622            }],
623        });
624        test_one(KadMsg::AddProvider {
625            key: encode(Hash::SHA2256, &[9, 12, 0, 245, 245, 201, 28, 95]).unwrap(),
626            provider_peer: KadPeer {
627                node_id: PeerId::random(),
628                multiaddrs: vec!["/ip4/9.1.2.3/udp/23".parse().unwrap()],
629                connection_ty: KadConnectionType::Connected,
630            },
631        });
632        // TODO: all messages
633
634        fn test_one(msg_server: KadMsg) {
635            let msg_client = msg_server.clone();
636            let (tx, rx) = mpsc::channel();
637
638            let bg_thread = thread::spawn(move || {
639                let transport = TcpConfig::new().with_upgrade(KademliaProtocolConfig);
640
641                let (listener, addr) = transport
642                    .listen_on("/ip4/127.0.0.1/tcp/0".parse().unwrap())
643                    .unwrap();
644                tx.send(addr).unwrap();
645
646                let future = listener
647                    .into_future()
648                    .map_err(|(err, _)| err)
649                    .and_then(|(client, _)| client.unwrap().0)
650                    .and_then(|proto| proto.into_future().map_err(|(err, _)| err).map(|(v, _)| v))
651                    .map(|recv_msg| {
652                        assert_eq!(recv_msg.unwrap(), msg_server);
653                        ()
654                    });
655                let mut rt = Runtime::new().unwrap();
656                let _ = rt.block_on(future).unwrap();
657            });
658
659            let transport = TcpConfig::new().with_upgrade(KademliaProtocolConfig);
660
661            let future = transport
662                .dial(rx.recv().unwrap())
663                .unwrap()
664                .and_then(|proto| proto.send(msg_client))
665                .map(|_| ());
666            let mut rt = Runtime::new().unwrap();
667            let _ = rt.block_on(future).unwrap();
668            bg_thread.join().unwrap();
669        }
670    }*/
671}