tet_libp2p_kad/
handler.rs

1// Copyright 2018 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21use crate::protocol::{
22    KadInStreamSink, KadOutStreamSink, KadPeer, KadRequestMsg, KadResponseMsg,
23    KademliaProtocolConfig,
24};
25use crate::record::{self, Record};
26use futures::prelude::*;
27use tet_libp2p_swarm::{
28    IntoProtocolsHandler,
29    KeepAlive,
30    NegotiatedSubstream,
31    SubstreamProtocol,
32    ProtocolsHandler,
33    ProtocolsHandlerEvent,
34    ProtocolsHandlerUpgrErr
35};
36use tet_libp2p_core::{
37    ConnectedPoint,
38    PeerId,
39    either::EitherOutput,
40    upgrade::{self, InboundUpgrade, OutboundUpgrade}
41};
42use log::trace;
43use std::{error, fmt, io, marker::PhantomData, pin::Pin, task::Context, task::Poll, time::Duration};
44use wasm_timer::Instant;
45
46/// A prototype from which [`KademliaHandler`]s can be constructed.
47pub struct KademliaHandlerProto<T> {
48    config: KademliaHandlerConfig,
49    _type: PhantomData<T>,
50}
51
52impl<T> KademliaHandlerProto<T> {
53    pub fn new(config: KademliaHandlerConfig) -> Self {
54        KademliaHandlerProto { config, _type: PhantomData }
55    }
56}
57
58impl<T: Clone + Send + 'static> IntoProtocolsHandler for KademliaHandlerProto<T> {
59    type Handler = KademliaHandler<T>;
60
61    fn into_handler(self, _: &PeerId, endpoint: &ConnectedPoint) -> Self::Handler {
62        KademliaHandler::new(self.config, endpoint.clone())
63    }
64
65    fn inbound_protocol(&self) -> <Self::Handler as ProtocolsHandler>::InboundProtocol {
66        if self.config.allow_listening {
67            upgrade::EitherUpgrade::A(self.config.protocol_config.clone())
68        } else {
69            upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade)
70        }
71    }
72}
73
74/// Protocol handler that manages substreams for the Kademlia protocol
75/// on a single connection with a peer.
76///
77/// The handler will automatically open a Kademlia substream with the remote for each request we
78/// make.
79///
80/// It also handles requests made by the remote.
81pub struct KademliaHandler<TUserData> {
82    /// Configuration for the Kademlia protocol.
83    config: KademliaHandlerConfig,
84
85    /// Next unique ID of a connection.
86    next_connec_unique_id: UniqueConnecId,
87
88    /// List of active substreams with the state they are in.
89    substreams: Vec<SubstreamState<TUserData>>,
90
91    /// Until when to keep the connection alive.
92    keep_alive: KeepAlive,
93
94    /// The connected endpoint of the connection that the handler
95    /// is associated with.
96    endpoint: ConnectedPoint,
97
98    /// The current state of protocol confirmation.
99    protocol_status: ProtocolStatus,
100}
101
102/// The states of protocol confirmation that a connection
103/// handler transitions through.
104enum ProtocolStatus {
105    /// It is as yet unknown whether the remote supports the
106    /// configured protocol name.
107    Unconfirmed,
108    /// The configured protocol name has been confirmed by the remote
109    /// but has not yet been reported to the `Kademlia` behaviour.
110    Confirmed,
111    /// The configured protocol has been confirmed by the remote
112    /// and the confirmation reported to the `Kademlia` behaviour.
113    Reported,
114}
115
116/// Configuration of a [`KademliaHandler`].
117#[derive(Debug, Clone)]
118pub struct KademliaHandlerConfig {
119    /// Configuration of the wire protocol.
120    pub protocol_config: KademliaProtocolConfig,
121
122    /// If false, we deny incoming requests.
123    pub allow_listening: bool,
124
125    /// Time after which we close an idle connection.
126    pub idle_timeout: Duration,
127}
128
129/// State of an active substream, opened either by us or by the remote.
130enum SubstreamState<TUserData> {
131    /// We haven't started opening the outgoing substream yet.
132    /// Contains the request we want to send, and the user data if we expect an answer.
133    OutPendingOpen(KadRequestMsg, Option<TUserData>),
134    /// Waiting to send a message to the remote.
135    OutPendingSend(
136        KadOutStreamSink<NegotiatedSubstream>,
137        KadRequestMsg,
138        Option<TUserData>,
139    ),
140    /// Waiting to flush the substream so that the data arrives to the remote.
141    OutPendingFlush(KadOutStreamSink<NegotiatedSubstream>, Option<TUserData>),
142    /// Waiting for an answer back from the remote.
143    // TODO: add timeout
144    OutWaitingAnswer(KadOutStreamSink<NegotiatedSubstream>, TUserData),
145    /// An error happened on the substream and we should report the error to the user.
146    OutReportError(KademliaHandlerQueryErr, TUserData),
147    /// The substream is being closed.
148    OutClosing(KadOutStreamSink<NegotiatedSubstream>),
149    /// Waiting for a request from the remote.
150    InWaitingMessage(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
151    /// Waiting for the user to send a `KademliaHandlerIn` event containing the response.
152    InWaitingUser(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
153    /// Waiting to send an answer back to the remote.
154    InPendingSend(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>, KadResponseMsg),
155    /// Waiting to flush an answer back to the remote.
156    InPendingFlush(UniqueConnecId, KadInStreamSink<NegotiatedSubstream>),
157    /// The substream is being closed.
158    InClosing(KadInStreamSink<NegotiatedSubstream>),
159}
160
161impl<TUserData> SubstreamState<TUserData> {
162    /// Tries to close the substream.
163    ///
164    /// If the substream is not ready to be closed, returns it back.
165    fn try_close(&mut self, cx: &mut Context<'_>) -> Poll<()> {
166        match self {
167            SubstreamState::OutPendingOpen(_, _)
168            | SubstreamState::OutReportError(_, _) => Poll::Ready(()),
169            SubstreamState::OutPendingSend(ref mut stream, _, _)
170            | SubstreamState::OutPendingFlush(ref mut stream, _)
171            | SubstreamState::OutWaitingAnswer(ref mut stream, _)
172            | SubstreamState::OutClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) {
173                Poll::Ready(_) => Poll::Ready(()),
174                Poll::Pending => Poll::Pending,
175            },
176            SubstreamState::InWaitingMessage(_, ref mut stream)
177            | SubstreamState::InWaitingUser(_, ref mut stream)
178            | SubstreamState::InPendingSend(_, ref mut stream, _)
179            | SubstreamState::InPendingFlush(_, ref mut stream)
180            | SubstreamState::InClosing(ref mut stream) => match Sink::poll_close(Pin::new(stream), cx) {
181                Poll::Ready(_) => Poll::Ready(()),
182                Poll::Pending => Poll::Pending,
183            },
184        }
185    }
186}
187
188/// Event produced by the Kademlia handler.
189#[derive(Debug)]
190pub enum KademliaHandlerEvent<TUserData> {
191    /// The configured protocol name has been confirmed by the peer through
192    /// a successfully negotiated substream.
193    ///
194    /// This event is only emitted once by a handler upon the first
195    /// successfully negotiated inbound or outbound substream and
196    /// indicates that the connected peer participates in the Kademlia
197    /// overlay network identified by the configured protocol name.
198    ProtocolConfirmed { endpoint: ConnectedPoint },
199
200    /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
201    /// returned is not specified, but should be around 20.
202    FindNodeReq {
203        /// The key for which to locate the closest nodes.
204        key: Vec<u8>,
205        /// Identifier of the request. Needs to be passed back when answering.
206        request_id: KademliaRequestId,
207    },
208
209    /// Response to an `KademliaHandlerIn::FindNodeReq`.
210    FindNodeRes {
211        /// Results of the request.
212        closer_peers: Vec<KadPeer>,
213        /// The user data passed to the `FindNodeReq`.
214        user_data: TUserData,
215    },
216
217    /// Same as `FindNodeReq`, but should also return the entries of the local providers list for
218    /// this key.
219    GetProvidersReq {
220        /// The key for which providers are requested.
221        key: record::Key,
222        /// Identifier of the request. Needs to be passed back when answering.
223        request_id: KademliaRequestId,
224    },
225
226    /// Response to an `KademliaHandlerIn::GetProvidersReq`.
227    GetProvidersRes {
228        /// Nodes closest to the key.
229        closer_peers: Vec<KadPeer>,
230        /// Known providers for this key.
231        provider_peers: Vec<KadPeer>,
232        /// The user data passed to the `GetProvidersReq`.
233        user_data: TUserData,
234    },
235
236    /// An error happened when performing a query.
237    QueryError {
238        /// The error that happened.
239        error: KademliaHandlerQueryErr,
240        /// The user data passed to the query.
241        user_data: TUserData,
242    },
243
244    /// The peer announced itself as a provider of a key.
245    AddProvider {
246        /// The key for which the peer is a provider of the associated value.
247        key: record::Key,
248        /// The peer that is the provider of the value for `key`.
249        provider: KadPeer,
250    },
251
252    /// Request to get a value from the dht records
253    GetRecord {
254        /// Key for which we should look in the dht
255        key: record::Key,
256        /// Identifier of the request. Needs to be passed back when answering.
257        request_id: KademliaRequestId,
258    },
259
260    /// Response to a `KademliaHandlerIn::GetRecord`.
261    GetRecordRes {
262        /// The result is present if the key has been found
263        record: Option<Record>,
264        /// Nodes closest to the key.
265        closer_peers: Vec<KadPeer>,
266        /// The user data passed to the `GetValue`.
267        user_data: TUserData,
268    },
269
270    /// Request to put a value in the dht records
271    PutRecord {
272        record: Record,
273        /// Identifier of the request. Needs to be passed back when answering.
274        request_id: KademliaRequestId,
275    },
276
277    /// Response to a request to store a record.
278    PutRecordRes {
279        /// The key of the stored record.
280        key: record::Key,
281        /// The value of the stored record.
282        value: Vec<u8>,
283        /// The user data passed to the `PutValue`.
284        user_data: TUserData,
285    }
286}
287
288/// Error that can happen when requesting an RPC query.
289#[derive(Debug)]
290pub enum KademliaHandlerQueryErr {
291    /// Error while trying to perform the query.
292    Upgrade(ProtocolsHandlerUpgrErr<io::Error>),
293    /// Received an answer that doesn't correspond to the request.
294    UnexpectedMessage,
295    /// I/O error in the substream.
296    Io(io::Error),
297}
298
299impl fmt::Display for KademliaHandlerQueryErr {
300    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
301        match self {
302            KademliaHandlerQueryErr::Upgrade(err) => {
303                write!(f, "Error while performing Kademlia query: {}", err)
304            },
305            KademliaHandlerQueryErr::UnexpectedMessage => {
306                write!(f, "Remote answered our Kademlia RPC query with the wrong message type")
307            },
308            KademliaHandlerQueryErr::Io(err) => {
309                write!(f, "I/O error during a Kademlia RPC query: {}", err)
310            },
311        }
312    }
313}
314
315impl error::Error for KademliaHandlerQueryErr {
316    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
317        match self {
318            KademliaHandlerQueryErr::Upgrade(err) => Some(err),
319            KademliaHandlerQueryErr::UnexpectedMessage => None,
320            KademliaHandlerQueryErr::Io(err) => Some(err),
321        }
322    }
323}
324
325impl From<ProtocolsHandlerUpgrErr<io::Error>> for KademliaHandlerQueryErr {
326    fn from(err: ProtocolsHandlerUpgrErr<io::Error>) -> Self {
327        KademliaHandlerQueryErr::Upgrade(err)
328    }
329}
330
331/// Event to send to the handler.
332#[derive(Debug)]
333pub enum KademliaHandlerIn<TUserData> {
334    /// Resets the (sub)stream associated with the given request ID,
335    /// thus signaling an error to the remote.
336    ///
337    /// Explicitly resetting the (sub)stream associated with a request
338    /// can be used as an alternative to letting requests simply time
339    /// out on the remote peer, thus potentially avoiding some delay
340    /// for the query on the remote.
341    Reset(KademliaRequestId),
342
343    /// Request for the list of nodes whose IDs are the closest to `key`. The number of nodes
344    /// returned is not specified, but should be around 20.
345    FindNodeReq {
346        /// Identifier of the node.
347        key: Vec<u8>,
348        /// Custom user data. Passed back in the out event when the results arrive.
349        user_data: TUserData,
350    },
351
352    /// Response to a `FindNodeReq`.
353    FindNodeRes {
354        /// Results of the request.
355        closer_peers: Vec<KadPeer>,
356        /// Identifier of the request that was made by the remote.
357        ///
358        /// It is a logic error to use an id of the handler of a different node.
359        request_id: KademliaRequestId,
360    },
361
362    /// Same as `FindNodeReq`, but should also return the entries of the local providers list for
363    /// this key.
364    GetProvidersReq {
365        /// Identifier being searched.
366        key: record::Key,
367        /// Custom user data. Passed back in the out event when the results arrive.
368        user_data: TUserData,
369    },
370
371    /// Response to a `GetProvidersReq`.
372    GetProvidersRes {
373        /// Nodes closest to the key.
374        closer_peers: Vec<KadPeer>,
375        /// Known providers for this key.
376        provider_peers: Vec<KadPeer>,
377        /// Identifier of the request that was made by the remote.
378        ///
379        /// It is a logic error to use an id of the handler of a different node.
380        request_id: KademliaRequestId,
381    },
382
383    /// Indicates that this provider is known for this key.
384    ///
385    /// The API of the handler doesn't expose any event that allows you to know whether this
386    /// succeeded.
387    AddProvider {
388        /// Key for which we should add providers.
389        key: record::Key,
390        /// Known provider for this key.
391        provider: KadPeer,
392    },
393
394    /// Request to retrieve a record from the DHT.
395    GetRecord {
396        /// The key of the record.
397        key: record::Key,
398        /// Custom data. Passed back in the out event when the results arrive.
399        user_data: TUserData,
400    },
401
402    /// Response to a `GetRecord` request.
403    GetRecordRes {
404        /// The value that might have been found in our storage.
405        record: Option<Record>,
406        /// Nodes that are closer to the key we were searching for.
407        closer_peers: Vec<KadPeer>,
408        /// Identifier of the request that was made by the remote.
409        request_id: KademliaRequestId,
410    },
411
412    /// Put a value into the dht records.
413    PutRecord {
414        record: Record,
415        /// Custom data. Passed back in the out event when the results arrive.
416        user_data: TUserData,
417    },
418
419    /// Response to a `PutRecord`.
420    PutRecordRes {
421        /// Key of the value that was put.
422        key: record::Key,
423        /// Value that was put.
424        value: Vec<u8>,
425        /// Identifier of the request that was made by the remote.
426        request_id: KademliaRequestId,
427    }
428}
429
430/// Unique identifier for a request. Must be passed back in order to answer a request from
431/// the remote.
432#[derive(Debug, PartialEq, Eq)]
433pub struct KademliaRequestId {
434    /// Unique identifier for an incoming connection.
435    connec_unique_id: UniqueConnecId,
436}
437
438/// Unique identifier for a connection.
439#[derive(Debug, Copy, Clone, PartialEq, Eq)]
440struct UniqueConnecId(u64);
441
442impl<TUserData> KademliaHandler<TUserData> {
443    /// Create a [`KademliaHandler`] using the given configuration.
444    pub fn new(config: KademliaHandlerConfig, endpoint: ConnectedPoint) -> Self {
445        let keep_alive = KeepAlive::Until(Instant::now() + config.idle_timeout);
446
447        KademliaHandler {
448            config,
449            endpoint,
450            next_connec_unique_id: UniqueConnecId(0),
451            substreams: Vec::new(),
452            keep_alive,
453            protocol_status: ProtocolStatus::Unconfirmed,
454        }
455    }
456}
457
458impl<TUserData> ProtocolsHandler for KademliaHandler<TUserData>
459where
460    TUserData: Clone + Send + 'static,
461{
462    type InEvent = KademliaHandlerIn<TUserData>;
463    type OutEvent = KademliaHandlerEvent<TUserData>;
464    type Error = io::Error; // TODO: better error type?
465    type InboundProtocol = upgrade::EitherUpgrade<KademliaProtocolConfig, upgrade::DeniedUpgrade>;
466    type OutboundProtocol = KademliaProtocolConfig;
467    // Message of the request to send to the remote, and user data if we expect an answer.
468    type OutboundOpenInfo = (KadRequestMsg, Option<TUserData>);
469    type InboundOpenInfo = ();
470
471    fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, Self::InboundOpenInfo> {
472        if self.config.allow_listening {
473            SubstreamProtocol::new(self.config.protocol_config.clone(), ()).map_upgrade(upgrade::EitherUpgrade::A)
474        } else {
475            SubstreamProtocol::new(upgrade::EitherUpgrade::B(upgrade::DeniedUpgrade), ())
476        }
477    }
478
479    fn inject_fully_negotiated_outbound(
480        &mut self,
481        protocol: <Self::OutboundProtocol as OutboundUpgrade<NegotiatedSubstream>>::Output,
482        (msg, user_data): Self::OutboundOpenInfo,
483    ) {
484        self.substreams.push(SubstreamState::OutPendingSend(protocol, msg, user_data));
485        if let ProtocolStatus::Unconfirmed = self.protocol_status {
486            // Upon the first successfully negotiated substream, we know that the
487            // remote is configured with the same protocol name and we want
488            // the behaviour to add this peer to the routing table, if possible.
489            self.protocol_status = ProtocolStatus::Confirmed;
490        }
491    }
492
493    fn inject_fully_negotiated_inbound(
494        &mut self,
495        protocol: <Self::InboundProtocol as InboundUpgrade<NegotiatedSubstream>>::Output,
496        (): Self::InboundOpenInfo
497    ) {
498        // If `self.allow_listening` is false, then we produced a `DeniedUpgrade` and `protocol`
499        // is a `Void`.
500        let protocol = match protocol {
501            EitherOutput::First(p) => p,
502            EitherOutput::Second(p) => void::unreachable(p),
503        };
504
505        debug_assert!(self.config.allow_listening);
506        let connec_unique_id = self.next_connec_unique_id;
507        self.next_connec_unique_id.0 += 1;
508        self.substreams.push(SubstreamState::InWaitingMessage(connec_unique_id, protocol));
509        if let ProtocolStatus::Unconfirmed = self.protocol_status {
510            // Upon the first successfully negotiated substream, we know that the
511            // remote is configured with the same protocol name and we want
512            // the behaviour to add this peer to the routing table, if possible.
513            self.protocol_status = ProtocolStatus::Confirmed;
514        }
515    }
516
517    fn inject_event(&mut self, message: KademliaHandlerIn<TUserData>) {
518        match message {
519            KademliaHandlerIn::Reset(request_id) => {
520                let pos = self.substreams.iter().position(|state| match state {
521                        SubstreamState::InWaitingUser(conn_id, _) =>
522                            conn_id == &request_id.connec_unique_id,
523                    _ => false,
524                });
525                if let Some(pos) = pos {
526                    // TODO: we don't properly close down the substream
527                    let waker = futures::task::noop_waker();
528                    let mut cx = Context::from_waker(&waker);
529                    let _ = self.substreams.remove(pos).try_close(&mut cx);
530                }
531            }
532            KademliaHandlerIn::FindNodeReq { key, user_data } => {
533                let msg = KadRequestMsg::FindNode { key };
534                self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
535            }
536            KademliaHandlerIn::FindNodeRes {
537                closer_peers,
538                request_id,
539            } => {
540                let pos = self.substreams.iter().position(|state| match state {
541                    SubstreamState::InWaitingUser(ref conn_id, _) =>
542                        conn_id == &request_id.connec_unique_id,
543                    _ => false,
544                });
545
546                if let Some(pos) = pos {
547                    let (conn_id, substream) = match self.substreams.remove(pos) {
548                        SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
549                        _ => unreachable!(),
550                    };
551
552                    let msg = KadResponseMsg::FindNode {
553                        closer_peers: closer_peers.clone(),
554                    };
555                    self.substreams
556                        .push(SubstreamState::InPendingSend(conn_id, substream, msg));
557                }
558            }
559            KademliaHandlerIn::GetProvidersReq { key, user_data } => {
560                let msg = KadRequestMsg::GetProviders { key };
561                self.substreams
562                    .push(SubstreamState::OutPendingOpen(msg, Some(user_data.clone())));
563            }
564            KademliaHandlerIn::GetProvidersRes {
565                closer_peers,
566                provider_peers,
567                request_id,
568            } => {
569                let pos = self.substreams.iter().position(|state| match state {
570                    SubstreamState::InWaitingUser(ref conn_id, _)
571                        if conn_id == &request_id.connec_unique_id =>
572                    {
573                        true
574                    }
575                    _ => false,
576                });
577
578                if let Some(pos) = pos {
579                    let (conn_id, substream) = match self.substreams.remove(pos) {
580                        SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
581                        _ => unreachable!(),
582                    };
583
584                    let msg = KadResponseMsg::GetProviders {
585                        closer_peers: closer_peers.clone(),
586                        provider_peers: provider_peers.clone(),
587                    };
588                    self.substreams
589                        .push(SubstreamState::InPendingSend(conn_id, substream, msg));
590                }
591            }
592            KademliaHandlerIn::AddProvider { key, provider } => {
593                let msg = KadRequestMsg::AddProvider { key, provider };
594                self.substreams.push(SubstreamState::OutPendingOpen(msg, None));
595            }
596            KademliaHandlerIn::GetRecord { key, user_data } => {
597                let msg = KadRequestMsg::GetValue { key };
598                self.substreams.push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
599
600            }
601            KademliaHandlerIn::PutRecord { record, user_data } => {
602                let msg = KadRequestMsg::PutValue { record };
603                self.substreams
604                    .push(SubstreamState::OutPendingOpen(msg, Some(user_data)));
605            }
606            KademliaHandlerIn::GetRecordRes {
607                record,
608                closer_peers,
609                request_id,
610            } => {
611                let pos = self.substreams.iter().position(|state| match state {
612                    SubstreamState::InWaitingUser(ref conn_id, _)
613                        => conn_id == &request_id.connec_unique_id,
614                    _ => false,
615                });
616
617                if let Some(pos) = pos {
618                    let (conn_id, substream) = match self.substreams.remove(pos) {
619                        SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
620                        _ => unreachable!(),
621                    };
622
623                    let msg = KadResponseMsg::GetValue {
624                        record,
625                        closer_peers: closer_peers.clone(),
626                    };
627                    self.substreams
628                        .push(SubstreamState::InPendingSend(conn_id, substream, msg));
629                }
630            }
631            KademliaHandlerIn::PutRecordRes {
632                key,
633                request_id,
634                value,
635            } => {
636                let pos = self.substreams.iter().position(|state| match state {
637                    SubstreamState::InWaitingUser(ref conn_id, _)
638                        if conn_id == &request_id.connec_unique_id =>
639                        {
640                            true
641                        }
642                    _ => false,
643                });
644
645                if let Some(pos) = pos {
646                    let (conn_id, substream) = match self.substreams.remove(pos) {
647                        SubstreamState::InWaitingUser(conn_id, substream) => (conn_id, substream),
648                        _ => unreachable!(),
649                    };
650
651                    let msg = KadResponseMsg::PutValue {
652                        key,
653                        value,
654                    };
655                    self.substreams
656                        .push(SubstreamState::InPendingSend(conn_id, substream, msg));
657                }
658            }
659        }
660    }
661
662    fn inject_dial_upgrade_error(
663        &mut self,
664        (_, user_data): Self::OutboundOpenInfo,
665        error: ProtocolsHandlerUpgrErr<io::Error>,
666    ) {
667        // TODO: cache the fact that the remote doesn't support kademlia at all, so that we don't
668        //       continue trying
669        if let Some(user_data) = user_data {
670            self.substreams
671                .push(SubstreamState::OutReportError(error.into(), user_data));
672        }
673    }
674
675    fn connection_keep_alive(&self) -> KeepAlive {
676        self.keep_alive
677    }
678
679    fn poll(
680        &mut self,
681        cx: &mut Context<'_>,
682    ) -> Poll<
683        ProtocolsHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::OutEvent, Self::Error>,
684    > {
685        if self.substreams.is_empty() {
686            return Poll::Pending;
687        }
688
689        if let ProtocolStatus::Confirmed = self.protocol_status {
690            self.protocol_status = ProtocolStatus::Reported;
691            return Poll::Ready(ProtocolsHandlerEvent::Custom(
692                KademliaHandlerEvent::ProtocolConfirmed {
693                    endpoint: self.endpoint.clone()
694                }))
695        }
696
697        // We remove each element from `substreams` one by one and add them back.
698        for n in (0..self.substreams.len()).rev() {
699            let mut substream = self.substreams.swap_remove(n);
700
701            loop {
702                match advance_substream(substream, self.config.protocol_config.clone(), cx) {
703                    (Some(new_state), Some(event), _) => {
704                        self.substreams.push(new_state);
705                        return Poll::Ready(event);
706                    }
707                    (None, Some(event), _) => {
708                        if self.substreams.is_empty() {
709                            self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
710                        }
711                        return Poll::Ready(event);
712                    }
713                    (Some(new_state), None, false) => {
714                        self.substreams.push(new_state);
715                        break;
716                    }
717                    (Some(new_state), None, true) => {
718                        substream = new_state;
719                        continue;
720                    }
721                    (None, None, _) => {
722                        break;
723                    }
724                }
725            }
726        }
727
728        if self.substreams.is_empty() {
729            // We destroyed all substreams in this function.
730            self.keep_alive = KeepAlive::Until(Instant::now() + self.config.idle_timeout);
731        } else {
732            self.keep_alive = KeepAlive::Yes;
733        }
734
735        Poll::Pending
736    }
737}
738
739impl Default for KademliaHandlerConfig {
740    fn default() -> Self {
741        KademliaHandlerConfig {
742            protocol_config: Default::default(),
743            allow_listening: true,
744            idle_timeout: Duration::from_secs(10),
745        }
746    }
747}
748
749/// Advances one substream.
750///
751/// Returns the new state for that substream, an event to generate, and whether the substream
752/// should be polled again.
753fn advance_substream<TUserData>(
754    state: SubstreamState<TUserData>,
755    upgrade: KademliaProtocolConfig,
756    cx: &mut Context<'_>,
757) -> (
758    Option<SubstreamState<TUserData>>,
759    Option<
760        ProtocolsHandlerEvent<
761            KademliaProtocolConfig,
762            (KadRequestMsg, Option<TUserData>),
763            KademliaHandlerEvent<TUserData>,
764            io::Error,
765        >,
766    >,
767    bool,
768)
769{
770    match state {
771        SubstreamState::OutPendingOpen(msg, user_data) => {
772            let ev = ProtocolsHandlerEvent::OutboundSubstreamRequest {
773                protocol: SubstreamProtocol::new(upgrade, (msg, user_data))
774            };
775            (None, Some(ev), false)
776        }
777        SubstreamState::OutPendingSend(mut substream, msg, user_data) => {
778            match Sink::poll_ready(Pin::new(&mut substream), cx) {
779                Poll::Ready(Ok(())) => {
780                    match Sink::start_send(Pin::new(&mut substream), msg) {
781                        Ok(()) => (
782                            Some(SubstreamState::OutPendingFlush(substream, user_data)),
783                            None,
784                            true,
785                        ),
786                        Err(error) => {
787                            let event = if let Some(user_data) = user_data {
788                                Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
789                                    error: KademliaHandlerQueryErr::Io(error),
790                                    user_data
791                                }))
792                            } else {
793                                None
794                            };
795
796                            (None, event, false)
797                        }
798                    }
799                },
800                Poll::Pending => (
801                    Some(SubstreamState::OutPendingSend(substream, msg, user_data)),
802                    None,
803                    false,
804                ),
805                Poll::Ready(Err(error)) => {
806                    let event = if let Some(user_data) = user_data {
807                        Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
808                            error: KademliaHandlerQueryErr::Io(error),
809                            user_data
810                        }))
811                    } else {
812                        None
813                    };
814
815                    (None, event, false)
816                }
817            }
818        }
819        SubstreamState::OutPendingFlush(mut substream, user_data) => {
820            match Sink::poll_flush(Pin::new(&mut substream), cx) {
821                Poll::Ready(Ok(())) => {
822                    if let Some(user_data) = user_data {
823                        (
824                            Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
825                            None,
826                            true,
827                        )
828                    } else {
829                        (Some(SubstreamState::OutClosing(substream)), None, true)
830                    }
831                }
832                Poll::Pending => (
833                    Some(SubstreamState::OutPendingFlush(substream, user_data)),
834                    None,
835                    false,
836                ),
837                Poll::Ready(Err(error)) => {
838                    let event = if let Some(user_data) = user_data {
839                        Some(ProtocolsHandlerEvent::Custom(KademliaHandlerEvent::QueryError {
840                            error: KademliaHandlerQueryErr::Io(error),
841                            user_data,
842                        }))
843                    } else {
844                        None
845                    };
846
847                    (None, event, false)
848                }
849            }
850        }
851        SubstreamState::OutWaitingAnswer(mut substream, user_data) => match Stream::poll_next(Pin::new(&mut substream), cx) {
852            Poll::Ready(Some(Ok(msg))) => {
853                let new_state = SubstreamState::OutClosing(substream);
854                let event = process_kad_response(msg, user_data);
855                (
856                    Some(new_state),
857                    Some(ProtocolsHandlerEvent::Custom(event)),
858                    true,
859                )
860            }
861            Poll::Pending => (
862                Some(SubstreamState::OutWaitingAnswer(substream, user_data)),
863                None,
864                false,
865            ),
866            Poll::Ready(Some(Err(error))) => {
867                let event = KademliaHandlerEvent::QueryError {
868                    error: KademliaHandlerQueryErr::Io(error),
869                    user_data,
870                };
871                (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
872            }
873            Poll::Ready(None) => {
874                let event = KademliaHandlerEvent::QueryError {
875                    error: KademliaHandlerQueryErr::Io(io::ErrorKind::UnexpectedEof.into()),
876                    user_data,
877                };
878                (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
879            }
880        },
881        SubstreamState::OutReportError(error, user_data) => {
882            let event = KademliaHandlerEvent::QueryError { error, user_data };
883            (None, Some(ProtocolsHandlerEvent::Custom(event)), false)
884        }
885        SubstreamState::OutClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) {
886            Poll::Ready(Ok(())) => (None, None, false),
887            Poll::Pending => (Some(SubstreamState::OutClosing(stream)), None, false),
888            Poll::Ready(Err(_)) => (None, None, false),
889        },
890        SubstreamState::InWaitingMessage(id, mut substream) => match Stream::poll_next(Pin::new(&mut substream), cx) {
891            Poll::Ready(Some(Ok(msg))) => {
892                if let Ok(ev) = process_kad_request(msg, id) {
893                    (
894                        Some(SubstreamState::InWaitingUser(id, substream)),
895                        Some(ProtocolsHandlerEvent::Custom(ev)),
896                        false,
897                    )
898                } else {
899                    (Some(SubstreamState::InClosing(substream)), None, true)
900                }
901            }
902            Poll::Pending => (
903                Some(SubstreamState::InWaitingMessage(id, substream)),
904                None,
905                false,
906            ),
907            Poll::Ready(None) => {
908                trace!("Inbound substream: EOF");
909                (None, None, false)
910            }
911            Poll::Ready(Some(Err(e))) => {
912                trace!("Inbound substream error: {:?}", e);
913                (None, None, false)
914            },
915        },
916        SubstreamState::InWaitingUser(id, substream) => (
917            Some(SubstreamState::InWaitingUser(id, substream)),
918            None,
919            false,
920        ),
921        SubstreamState::InPendingSend(id, mut substream, msg) => match Sink::poll_ready(Pin::new(&mut substream), cx) {
922            Poll::Ready(Ok(())) => match Sink::start_send(Pin::new(&mut substream), msg) {
923                Ok(()) => (
924                    Some(SubstreamState::InPendingFlush(id, substream)),
925                    None,
926                    true,
927                ),
928                Err(_) => (None, None, false),
929            },
930            Poll::Pending => (
931                Some(SubstreamState::InPendingSend(id, substream, msg)),
932                None,
933                false,
934            ),
935            Poll::Ready(Err(_)) => (None, None, false),
936        }
937        SubstreamState::InPendingFlush(id, mut substream) => match Sink::poll_flush(Pin::new(&mut substream), cx) {
938            Poll::Ready(Ok(())) => (
939                Some(SubstreamState::InWaitingMessage(id, substream)),
940                None,
941                true,
942            ),
943            Poll::Pending => (
944                Some(SubstreamState::InPendingFlush(id, substream)),
945                None,
946                false,
947            ),
948            Poll::Ready(Err(_)) => (None, None, false),
949        },
950        SubstreamState::InClosing(mut stream) => match Sink::poll_close(Pin::new(&mut stream), cx) {
951            Poll::Ready(Ok(())) => (None, None, false),
952            Poll::Pending => (Some(SubstreamState::InClosing(stream)), None, false),
953            Poll::Ready(Err(_)) => (None, None, false),
954        },
955    }
956}
957
958/// Processes a Kademlia message that's expected to be a request from a remote.
959fn process_kad_request<TUserData>(
960    event: KadRequestMsg,
961    connec_unique_id: UniqueConnecId,
962) -> Result<KademliaHandlerEvent<TUserData>, io::Error> {
963    match event {
964        KadRequestMsg::Ping => {
965            // TODO: implement; although in practice the PING message is never
966            //       used, so we may consider removing it altogether
967            Err(io::Error::new(
968                io::ErrorKind::InvalidData,
969                "the PING Kademlia message is not implemented",
970            ))
971        }
972        KadRequestMsg::FindNode { key } => Ok(KademliaHandlerEvent::FindNodeReq {
973            key,
974            request_id: KademliaRequestId { connec_unique_id },
975        }),
976        KadRequestMsg::GetProviders { key } => Ok(KademliaHandlerEvent::GetProvidersReq {
977            key,
978            request_id: KademliaRequestId { connec_unique_id },
979        }),
980        KadRequestMsg::AddProvider { key, provider } => {
981            Ok(KademliaHandlerEvent::AddProvider { key, provider })
982        }
983        KadRequestMsg::GetValue { key } => Ok(KademliaHandlerEvent::GetRecord {
984            key,
985            request_id: KademliaRequestId { connec_unique_id },
986        }),
987        KadRequestMsg::PutValue { record } => Ok(KademliaHandlerEvent::PutRecord {
988            record,
989            request_id: KademliaRequestId { connec_unique_id },
990        })
991    }
992}
993
994/// Process a Kademlia message that's supposed to be a response to one of our requests.
995fn process_kad_response<TUserData>(
996    event: KadResponseMsg,
997    user_data: TUserData,
998) -> KademliaHandlerEvent<TUserData> {
999    // TODO: must check that the response corresponds to the request
1000    match event {
1001        KadResponseMsg::Pong => {
1002            // We never send out pings.
1003            KademliaHandlerEvent::QueryError {
1004                error: KademliaHandlerQueryErr::UnexpectedMessage,
1005                user_data,
1006            }
1007        }
1008        KadResponseMsg::FindNode { closer_peers } => {
1009            KademliaHandlerEvent::FindNodeRes {
1010                closer_peers,
1011                user_data,
1012            }
1013        },
1014        KadResponseMsg::GetProviders {
1015            closer_peers,
1016            provider_peers,
1017        } => KademliaHandlerEvent::GetProvidersRes {
1018            closer_peers,
1019            provider_peers,
1020            user_data,
1021        },
1022        KadResponseMsg::GetValue {
1023            record,
1024            closer_peers,
1025        } => KademliaHandlerEvent::GetRecordRes {
1026            record,
1027            closer_peers,
1028            user_data,
1029        },
1030        KadResponseMsg::PutValue { key, value, .. } => {
1031            KademliaHandlerEvent::PutRecordRes {
1032                key,
1033                value,
1034                user_data,
1035            }
1036        }
1037    }
1038}