libp2p_request_response/
lib.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Generic request/response protocols.
22//!
23//! ## General Usage
24//!
25//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic
26//! request/response protocol or protocol family, whereby each request is
27//! sent over a new substream on a connection. `RequestResponse` is generic
28//! over the actual messages being sent, which are defined in terms of a
29//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts
30//! to providing an implementation of this trait which can then be
31//! given to [`RequestResponse::new`]. Further configuration options are
32//! available via the [`RequestResponseConfig`].
33//!
34//! Requests are sent using [`RequestResponse::send_request`] and the
35//! responses received as [`RequestResponseMessage::Response`] via
36//! [`RequestResponseEvent::Message`].
37//!
38//! Responses are sent using [`RequestResponse::send_response`] upon
39//! receiving a [`RequestResponseMessage::Request`] via
40//! [`RequestResponseEvent::Message`].
41//!
42//! ## Protocol Families
43//!
44//! A single [`RequestResponse`] instance can be used with an entire
45//! protocol family that share the same request and response types.
46//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
47//! instantiated with a sum type.
48//!
49//! ## Limited Protocol Support
50//!
51//! It is possible to only support inbound or outbound requests for
52//! a particular protocol. This is achieved by instantiating `RequestResponse`
53//! with protocols using [`ProtocolSupport::Inbound`] or
54//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
55//! family can be configured in this way. Such protocols will not be
56//! advertised during inbound respectively outbound protocol negotiation
57//! on the substreams.
58
59pub mod codec;
60pub mod handler;
61pub mod throttled;
62
63pub use codec::{RequestResponseCodec, ProtocolName};
64pub use handler::ProtocolSupport;
65pub use throttled::Throttled;
66
67use futures::{
68    channel::oneshot,
69};
70use handler::{
71    RequestProtocol,
72    RequestResponseHandler,
73    RequestResponseHandlerEvent,
74};
75use libp2p_core::{
76    ConnectedPoint,
77    Multiaddr,
78    PeerId,
79    connection::ConnectionId,
80};
81use libp2p_swarm::{
82    DialPeerCondition,
83    NetworkBehaviour,
84    NetworkBehaviourAction,
85    NotifyHandler,
86    PollParameters,
87};
88use smallvec::SmallVec;
89use std::{
90    collections::{HashMap, HashSet, VecDeque},
91    fmt,
92    time::Duration,
93    sync::{atomic::AtomicU64, Arc},
94    task::{Context, Poll}
95};
96
97/// An inbound request or response.
98#[derive(Debug)]
99pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TResponse> {
100    /// A request message.
101    Request {
102        /// The ID of this request.
103        request_id: RequestId,
104        /// The request message.
105        request: TRequest,
106        /// The channel waiting for the response.
107        ///
108        /// If this channel is dropped instead of being used to send a response
109        /// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
110        /// with [`InboundFailure::ResponseOmission`] is emitted.
111        channel: ResponseChannel<TChannelResponse>,
112    },
113    /// A response message.
114    Response {
115        /// The ID of the request that produced this response.
116        ///
117        /// See [`RequestResponse::send_request`].
118        request_id: RequestId,
119        /// The response message.
120        response: TResponse
121    },
122}
123
124/// The events emitted by a [`RequestResponse`] protocol.
125#[derive(Debug)]
126pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse> {
127    /// An incoming message (request or response).
128    Message {
129        /// The peer who sent the message.
130        peer: PeerId,
131        /// The incoming message.
132        message: RequestResponseMessage<TRequest, TResponse, TChannelResponse>
133    },
134    /// An outbound request failed.
135    OutboundFailure {
136        /// The peer to whom the request was sent.
137        peer: PeerId,
138        /// The (local) ID of the failed request.
139        request_id: RequestId,
140        /// The error that occurred.
141        error: OutboundFailure,
142    },
143    /// An inbound request failed.
144    InboundFailure {
145        /// The peer from whom the request was received.
146        peer: PeerId,
147        /// The ID of the failed inbound request.
148        request_id: RequestId,
149        /// The error that occurred.
150        error: InboundFailure,
151    },
152    /// A response to an inbound request has been sent.
153    ///
154    /// When this event is received, the response has been flushed on
155    /// the underlying transport connection.
156    ResponseSent {
157        /// The peer to whom the response was sent.
158        peer: PeerId,
159        /// The ID of the inbound request whose response was sent.
160        request_id: RequestId,
161    },
162}
163
164/// Possible failures occurring in the context of sending
165/// an outbound request and receiving the response.
166#[derive(Debug, Clone)]
167pub enum OutboundFailure {
168    /// The request could not be sent because a dialing attempt failed.
169    DialFailure,
170    /// The request timed out before a response was received.
171    ///
172    /// It is not known whether the request may have been
173    /// received (and processed) by the remote peer.
174    Timeout,
175    /// The connection closed before a response was received.
176    ///
177    /// It is not known whether the request may have been
178    /// received (and processed) by the remote peer.
179    ConnectionClosed,
180    /// The remote supports none of the requested protocols.
181    UnsupportedProtocols,
182}
183
184/// Possible failures occurring in the context of receiving an
185/// inbound request and sending a response.
186#[derive(Debug, Clone)]
187pub enum InboundFailure {
188    /// The inbound request timed out, either while reading the
189    /// incoming request or before a response is sent, e.g. if
190    /// [`RequestResponse::send_response`] is not called in a
191    /// timely manner.
192    Timeout,
193    /// The connection closed before a response could be send.
194    ConnectionClosed,
195    /// The local peer supports none of the protocols requested
196    /// by the remote.
197    UnsupportedProtocols,
198    /// The local peer failed to respond to an inbound request
199    /// due to the [`ResponseChannel`] being dropped instead of
200    /// being passed to [`RequestResponse::send_response`].
201    ResponseOmission,
202}
203
204/// A channel for sending a response to an inbound request.
205///
206/// See [`RequestResponse::send_response`].
207#[derive(Debug)]
208pub struct ResponseChannel<TResponse> {
209    request_id: RequestId,
210    peer: PeerId,
211    sender: oneshot::Sender<TResponse>,
212}
213
214impl<TResponse> ResponseChannel<TResponse> {
215    /// Checks whether the response channel is still open, i.e.
216    /// the `RequestResponse` behaviour is still waiting for a
217    /// a response to be sent via [`RequestResponse::send_response`]
218    /// and this response channel.
219    ///
220    /// If the response channel is no longer open then the inbound
221    /// request timed out waiting for the response.
222    pub fn is_open(&self) -> bool {
223        !self.sender.is_canceled()
224    }
225
226    /// Get the ID of the inbound request waiting for a response.
227    pub(crate) fn request_id(&self) -> RequestId {
228        self.request_id
229    }
230}
231
232/// The ID of an inbound or outbound request.
233///
234/// Note: [`RequestId`]'s uniqueness is only guaranteed between two
235/// inbound and likewise between two outbound requests. There is no
236/// uniqueness guarantee in a set of both inbound and outbound
237/// [`RequestId`]s nor in a set of inbound or outbound requests
238/// originating from different [`RequestResponse`] behaviours.
239#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
240pub struct RequestId(u64);
241
242impl fmt::Display for RequestId {
243    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
244        write!(f, "{}", self.0)
245    }
246}
247
248/// The configuration for a `RequestResponse` protocol.
249#[derive(Debug, Clone)]
250pub struct RequestResponseConfig {
251    request_timeout: Duration,
252    connection_keep_alive: Duration,
253}
254
255impl Default for RequestResponseConfig {
256    fn default() -> Self {
257        Self {
258            connection_keep_alive: Duration::from_secs(10),
259            request_timeout: Duration::from_secs(10),
260        }
261    }
262}
263
264impl RequestResponseConfig {
265    /// Sets the keep-alive timeout of idle connections.
266    pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
267        self.connection_keep_alive = v;
268        self
269    }
270
271    /// Sets the timeout for inbound and outbound requests.
272    pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
273        self.request_timeout = v;
274        self
275    }
276}
277
278/// A request/response protocol for some message codec.
279pub struct RequestResponse<TCodec>
280where
281    TCodec: RequestResponseCodec,
282{
283    /// The supported inbound protocols.
284    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
285    /// The supported outbound protocols.
286    outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
287    /// The next (local) request ID.
288    next_request_id: RequestId,
289    /// The next (inbound) request ID.
290    next_inbound_id: Arc<AtomicU64>,
291    /// The protocol configuration.
292    config: RequestResponseConfig,
293    /// The protocol codec for reading and writing requests and responses.
294    codec: TCodec,
295    /// Pending events to return from `poll`.
296    pending_events: VecDeque<
297        NetworkBehaviourAction<
298            RequestProtocol<TCodec>,
299            RequestResponseEvent<TCodec::Request, TCodec::Response>>>,
300    /// The currently connected peers, their pending outbound and inbound responses and their known,
301    /// reachable addresses, if any.
302    connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
303    /// Externally managed addresses via `add_address` and `remove_address`.
304    addresses: HashMap<PeerId, SmallVec<[Multiaddr; 6]>>,
305    /// Requests that have not yet been sent and are waiting for a connection
306    /// to be established.
307    pending_outbound_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
308}
309
310impl<TCodec> RequestResponse<TCodec>
311where
312    TCodec: RequestResponseCodec + Clone,
313{
314    /// Creates a new `RequestResponse` behaviour for the given
315    /// protocols, codec and configuration.
316    pub fn new<I>(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self
317    where
318        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>
319    {
320        let mut inbound_protocols = SmallVec::new();
321        let mut outbound_protocols = SmallVec::new();
322        for (p, s) in protocols {
323            if s.inbound() {
324                inbound_protocols.push(p.clone());
325            }
326            if s.outbound() {
327                outbound_protocols.push(p.clone());
328            }
329        }
330        RequestResponse {
331            inbound_protocols,
332            outbound_protocols,
333            next_request_id: RequestId(1),
334            next_inbound_id: Arc::new(AtomicU64::new(1)),
335            config: cfg,
336            codec,
337            pending_events: VecDeque::new(),
338            connected: HashMap::new(),
339            pending_outbound_requests: HashMap::new(),
340            addresses: HashMap::new(),
341        }
342    }
343
344    /// Creates a `RequestResponse` which limits requests per peer.
345    ///
346    /// The behaviour is wrapped in [`Throttled`] and detects the limits
347    /// per peer at runtime which are then enforced.
348    pub fn throttled<I>(c: TCodec, protos: I, cfg: RequestResponseConfig) -> Throttled<TCodec>
349    where
350        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
351        TCodec: Send,
352        TCodec::Protocol: Sync
353    {
354        Throttled::new(c, protos, cfg)
355    }
356
357    /// Initiates sending a request.
358    ///
359    /// If the targeted peer is currently not connected, a dialing
360    /// attempt is initiated and the request is sent as soon as a
361    /// connection is established.
362    ///
363    /// > **Note**: In order for such a dialing attempt to succeed,
364    /// > the `RequestResonse` protocol must either be embedded
365    /// > in another `NetworkBehaviour` that provides peer and
366    /// > address discovery, or known addresses of peers must be
367    /// > managed via [`RequestResponse::add_address`] and
368    /// > [`RequestResponse::remove_address`].
369    pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId {
370        let request_id = self.next_request_id();
371        let request = RequestProtocol {
372            request_id,
373            codec: self.codec.clone(),
374            protocols: self.outbound_protocols.clone(),
375            request,
376        };
377
378        if let Some(request) = self.try_send_request(peer, request) {
379            self.pending_events.push_back(NetworkBehaviourAction::DialPeer {
380                peer_id: *peer,
381                condition: DialPeerCondition::Disconnected,
382            });
383            self.pending_outbound_requests.entry(*peer).or_default().push(request);
384        }
385
386        request_id
387    }
388
389    /// Initiates sending a response to an inbound request.
390    ///
391    /// If the [`ResponseChannel`] is already closed due to a timeout or the
392    /// connection being closed, the response is returned as an `Err` for
393    /// further handling. Once the response has been successfully sent on the
394    /// corresponding connection, [`RequestResponseEvent::ResponseSent`] is
395    /// emitted. In all other cases [`RequestResponseEvent::InboundFailure`]
396    /// will be or has been emitted.
397    ///
398    /// The provided `ResponseChannel` is obtained from an inbound
399    /// [`RequestResponseMessage::Request`].
400    pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response)
401        -> Result<(), TCodec::Response>
402    {
403        ch.sender.send(rs)
404    }
405
406    /// Adds a known address for a peer that can be used for
407    /// dialing attempts by the `Swarm`, i.e. is returned
408    /// by [`NetworkBehaviour::addresses_of_peer`].
409    ///
410    /// Addresses added in this way are only removed by `remove_address`.
411    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
412        self.addresses.entry(*peer).or_default().push(address);
413    }
414
415    /// Removes an address of a peer previously added via `add_address`.
416    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
417        let mut last = false;
418        if let Some(addresses) = self.addresses.get_mut(peer) {
419            addresses.retain(|a| a != address);
420            last = addresses.is_empty();
421        }
422        if last {
423            self.addresses.remove(peer);
424        }
425    }
426
427    /// Checks whether a peer is currently connected.
428    pub fn is_connected(&self, peer: &PeerId) -> bool {
429        if let Some(connections) = self.connected.get(peer) {
430            !connections.is_empty()
431        } else {
432            false
433        }
434    }
435
436    /// Checks whether an outbound request to the peer with the provided
437    /// [`PeerId`] initiated by [`RequestResponse::send_request`] is still
438    /// pending, i.e. waiting for a response.
439    pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
440        // Check if request is already sent on established connection.
441        let est_conn = self.connected.get(peer)
442            .map(|cs| cs.iter().any(|c| c.pending_inbound_responses.contains(request_id)))
443            .unwrap_or(false);
444        // Check if request is still pending to be sent.
445        let pen_conn = self.pending_outbound_requests.get(peer)
446            .map(|rps| rps.iter().any(|rp| {rp.request_id == *request_id}))
447            .unwrap_or(false);
448
449        est_conn || pen_conn
450    }
451
452    /// Checks whether an inbound request from the peer with the provided
453    /// [`PeerId`] is still pending, i.e. waiting for a response by the local
454    /// node through [`RequestResponse::send_response`].
455    pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
456        self.connected.get(peer)
457            .map(|cs| cs.iter().any(|c| c.pending_outbound_responses.contains(request_id)))
458            .unwrap_or(false)
459    }
460
461    /// Returns the next request ID.
462    fn next_request_id(&mut self) -> RequestId {
463        let request_id = self.next_request_id;
464        self.next_request_id.0 += 1;
465        request_id
466    }
467
468    /// Tries to send a request by queueing an appropriate event to be
469    /// emitted to the `Swarm`. If the peer is not currently connected,
470    /// the given request is return unchanged.
471    fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol<TCodec>)
472        -> Option<RequestProtocol<TCodec>>
473    {
474        if let Some(connections) = self.connected.get_mut(peer) {
475            if connections.is_empty() {
476                return Some(request)
477            }
478            let ix = (request.request_id.0 as usize) % connections.len();
479            let conn = &mut connections[ix];
480            conn.pending_inbound_responses.insert(request.request_id);
481            self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
482                peer_id: *peer,
483                handler: NotifyHandler::One(conn.id),
484                event: request
485            });
486            None
487        } else {
488            Some(request)
489        }
490    }
491
492    /// Remove pending outbound response for the given peer and connection.
493    ///
494    /// Returns `true` if the provided connection to the given peer is still
495    /// alive and the [`RequestId`] was previously present and is now removed.
496    /// Returns `false` otherwise.
497    fn remove_pending_outbound_response(
498        &mut self,
499        peer: &PeerId,
500        connection: ConnectionId,
501        request: RequestId,
502    ) -> bool {
503        self.get_connection_mut(peer, connection)
504            .map(|c| c.pending_outbound_responses.remove(&request))
505            .unwrap_or(false)
506    }
507
508    /// Remove pending inbound response for the given peer and connection.
509    ///
510    /// Returns `true` if the provided connection to the given peer is still
511    /// alive and the [`RequestId`] was previously present and is now removed.
512    /// Returns `false` otherwise.
513    fn remove_pending_inbound_response(
514        &mut self,
515        peer: &PeerId,
516        connection: ConnectionId,
517        request: &RequestId,
518    ) -> bool {
519        self.get_connection_mut(peer, connection)
520            .map(|c| c.pending_inbound_responses.remove(request))
521            .unwrap_or(false)
522    }
523
524    /// Returns a mutable reference to the connection in `self.connected`
525    /// corresponding to the given [`PeerId`] and [`ConnectionId`].
526    fn get_connection_mut(
527        &mut self,
528        peer: &PeerId,
529        connection: ConnectionId,
530    ) -> Option<&mut Connection> {
531        self.connected.get_mut(peer).and_then(|connections| {
532            connections.iter_mut().find(|c| c.id == connection)
533        })
534    }
535}
536
537impl<TCodec> NetworkBehaviour for RequestResponse<TCodec>
538where
539    TCodec: RequestResponseCodec + Send + Clone + 'static,
540{
541    type ProtocolsHandler = RequestResponseHandler<TCodec>;
542    type OutEvent = RequestResponseEvent<TCodec::Request, TCodec::Response>;
543
544    fn new_handler(&mut self) -> Self::ProtocolsHandler {
545        RequestResponseHandler::new(
546            self.inbound_protocols.clone(),
547            self.codec.clone(),
548            self.config.connection_keep_alive,
549            self.config.request_timeout,
550            self.next_inbound_id.clone()
551        )
552    }
553
554    fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
555        let mut addresses = Vec::new();
556        if let Some(connections) = self.connected.get(peer) {
557            addresses.extend(connections.iter().filter_map(|c| c.address.clone()))
558        }
559        if let Some(more) = self.addresses.get(peer) {
560            addresses.extend(more.into_iter().cloned());
561        }
562        addresses
563    }
564
565    fn inject_connected(&mut self, peer: &PeerId) {
566        if let Some(pending) = self.pending_outbound_requests.remove(peer) {
567            for request in pending {
568                let request = self.try_send_request(peer, request);
569                assert!(request.is_none());
570            }
571        }
572    }
573
574    fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
575        let address = match endpoint {
576            ConnectedPoint::Dialer { address } => Some(address.clone()),
577            ConnectedPoint::Listener { .. } => None
578        };
579        self.connected.entry(*peer)
580            .or_default()
581            .push(Connection::new(*conn, address));
582    }
583
584    fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
585        let connections = self.connected.get_mut(peer_id)
586            .expect("Expected some established connection to peer before closing.");
587
588        let connection = connections.iter()
589            .position(|c| &c.id == conn)
590            .map(|p: usize| connections.remove(p))
591            .expect("Expected connection to be established before closing.");
592
593        if connections.is_empty() {
594            self.connected.remove(peer_id);
595        }
596
597        for request_id in connection.pending_outbound_responses {
598            self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
599                RequestResponseEvent::InboundFailure {
600                    peer: *peer_id,
601                    request_id,
602                    error: InboundFailure::ConnectionClosed
603                }
604            ));
605
606        }
607
608        for request_id in connection.pending_inbound_responses {
609            self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
610                RequestResponseEvent::OutboundFailure {
611                    peer: *peer_id,
612                    request_id,
613                    error: OutboundFailure::ConnectionClosed
614                }
615            ));
616        }
617    }
618
619    fn inject_disconnected(&mut self, peer: &PeerId) {
620        self.connected.remove(peer);
621    }
622
623    fn inject_dial_failure(&mut self, peer: &PeerId) {
624        // If there are pending outgoing requests when a dial failure occurs,
625        // it is implied that we are not connected to the peer, since pending
626        // outgoing requests are drained when a connection is established and
627        // only created when a peer is not connected when a request is made.
628        // Thus these requests must be considered failed, even if there is
629        // another, concurrent dialing attempt ongoing.
630        if let Some(pending) = self.pending_outbound_requests.remove(peer) {
631            for request in pending {
632                self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
633                    RequestResponseEvent::OutboundFailure {
634                        peer: *peer,
635                        request_id: request.request_id,
636                        error: OutboundFailure::DialFailure
637                    }
638                ));
639            }
640        }
641    }
642
643    fn inject_event(
644        &mut self,
645        peer: PeerId,
646        connection: ConnectionId,
647        event: RequestResponseHandlerEvent<TCodec>,
648    ) {
649        match event {
650            RequestResponseHandlerEvent::Response { request_id, response } => {
651                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
652                debug_assert!(
653                    removed,
654                    "Expect request_id to be pending before receiving response.",
655                );
656
657                let message = RequestResponseMessage::Response { request_id, response };
658                self.pending_events.push_back(
659                    NetworkBehaviourAction::GenerateEvent(
660                        RequestResponseEvent::Message { peer, message }));
661            }
662            RequestResponseHandlerEvent::Request { request_id, request, sender } => {
663                let channel = ResponseChannel { request_id, peer, sender };
664                let message = RequestResponseMessage::Request { request_id, request, channel };
665                self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
666                    RequestResponseEvent::Message { peer, message }
667                ));
668
669                match self.get_connection_mut(&peer, connection) {
670                    Some(connection) => {
671                        let inserted = connection.pending_outbound_responses.insert(request_id);
672                        debug_assert!(inserted, "Expect id of new request to be unknown.");
673                    },
674                    // Connection closed after `RequestResponseEvent::Request` has been emitted.
675                    None => {
676                        self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
677                            RequestResponseEvent::InboundFailure {
678                                peer,
679                                request_id,
680                                error: InboundFailure::ConnectionClosed
681                            }
682                        ));
683                    }
684                }
685            }
686            RequestResponseHandlerEvent::ResponseSent(request_id) => {
687                let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
688                debug_assert!(removed, "Expect request_id to be pending before response is sent.");
689
690                self.pending_events.push_back(
691                    NetworkBehaviourAction::GenerateEvent(
692                        RequestResponseEvent::ResponseSent { peer, request_id }));
693            }
694            RequestResponseHandlerEvent::ResponseOmission(request_id) => {
695                let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
696                debug_assert!(
697                    removed,
698                    "Expect request_id to be pending before response is omitted.",
699                );
700
701                self.pending_events.push_back(
702                    NetworkBehaviourAction::GenerateEvent(
703                        RequestResponseEvent::InboundFailure {
704                            peer,
705                            request_id,
706                            error: InboundFailure::ResponseOmission
707                        }));
708            }
709            RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
710                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
711                debug_assert!(removed, "Expect request_id to be pending before request times out.");
712
713                self.pending_events.push_back(
714                    NetworkBehaviourAction::GenerateEvent(
715                        RequestResponseEvent::OutboundFailure {
716                            peer,
717                            request_id,
718                            error: OutboundFailure::Timeout,
719                        }));
720            }
721            RequestResponseHandlerEvent::InboundTimeout(request_id) => {
722                // Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing
723                // out to receive the request and for timing out sending the response. In the former
724                // case the request is never added to `pending_outbound_responses` and thus one can
725                // not assert the request_id to be present before removing it.
726                self.remove_pending_outbound_response(&peer, connection, request_id);
727
728                self.pending_events.push_back(
729                    NetworkBehaviourAction::GenerateEvent(
730                        RequestResponseEvent::InboundFailure {
731                            peer,
732                            request_id,
733                            error: InboundFailure::Timeout,
734                        }));
735            }
736            RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
737                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
738                debug_assert!(
739                    removed,
740                    "Expect request_id to be pending before failing to connect.",
741                );
742
743                self.pending_events.push_back(
744                    NetworkBehaviourAction::GenerateEvent(
745                        RequestResponseEvent::OutboundFailure {
746                            peer,
747                            request_id,
748                            error: OutboundFailure::UnsupportedProtocols,
749                        }));
750            }
751            RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => {
752                // Note: No need to call `self.remove_pending_outbound_response`,
753                // `RequestResponseHandlerEvent::Request` was never emitted for this request and
754                // thus request was never added to `pending_outbound_responses`.
755                self.pending_events.push_back(
756                    NetworkBehaviourAction::GenerateEvent(
757                        RequestResponseEvent::InboundFailure {
758                            peer,
759                            request_id,
760                            error: InboundFailure::UnsupportedProtocols,
761                        }));
762            }
763        }
764    }
765
766    fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters)
767        -> Poll<NetworkBehaviourAction<
768            RequestProtocol<TCodec>,
769            RequestResponseEvent<TCodec::Request, TCodec::Response>
770        >>
771    {
772        if let Some(ev) = self.pending_events.pop_front() {
773            return Poll::Ready(ev);
774        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
775            self.pending_events.shrink_to_fit();
776        }
777
778        Poll::Pending
779    }
780}
781
782/// Internal threshold for when to shrink the capacity
783/// of empty queues. If the capacity of an empty queue
784/// exceeds this threshold, the associated memory is
785/// released.
786const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
787
788/// Internal information tracked for an established connection.
789struct Connection {
790    id: ConnectionId,
791    address: Option<Multiaddr>,
792    /// Pending outbound responses where corresponding inbound requests have
793    /// been received on this connection and emitted via `poll` but have not yet
794    /// been answered.
795    pending_outbound_responses: HashSet<RequestId>,
796    /// Pending inbound responses for previously sent requests on this
797    /// connection.
798    pending_inbound_responses: HashSet<RequestId>
799}
800
801impl Connection {
802    fn new(id: ConnectionId, address: Option<Multiaddr>) -> Self {
803        Self {
804            id,
805            address,
806            pending_outbound_responses: Default::default(),
807            pending_inbound_responses: Default::default(),
808        }
809    }
810}