tetsy_libp2p_request_response/
lib.rs

1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Generic request/response protocols.
22//!
23//! ## General Usage
24//!
25//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic
26//! request/response protocol or protocol family, whereby each request is
27//! sent over a new substream on a connection. `RequestResponse` is generic
28//! over the actual messages being sent, which are defined in terms of a
29//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts
30//! to providing an implementation of this trait which can then be
31//! given to [`RequestResponse::new`]. Further configuration options are
32//! available via the [`RequestResponseConfig`].
33//!
34//! Requests are sent using [`RequestResponse::send_request`] and the
35//! responses received as [`RequestResponseMessage::Response`] via
36//! [`RequestResponseEvent::Message`].
37//!
38//! Responses are sent using [`RequestResponse::send_response`] upon
39//! receiving a [`RequestResponseMessage::Request`] via
40//! [`RequestResponseEvent::Message`].
41//!
42//! ## Protocol Families
43//!
44//! A single [`RequestResponse`] instance can be used with an entire
45//! protocol family that share the same request and response types.
46//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
47//! instantiated with a sum type.
48//!
49//! ## Limited Protocol Support
50//!
51//! It is possible to only support inbound or outbound requests for
52//! a particular protocol. This is achieved by instantiating `RequestResponse`
53//! with protocols using [`ProtocolSupport::Inbound`] or
54//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
55//! family can be configured in this way. Such protocols will not be
56//! advertised during inbound respectively outbound protocol negotiation
57//! on the substreams.
58
59pub mod codec;
60pub mod handler;
61pub mod throttled;
62
63pub use codec::{RequestResponseCodec, ProtocolName};
64pub use handler::ProtocolSupport;
65pub use throttled::Throttled;
66
67use futures::{
68    channel::oneshot,
69};
70use handler::{
71    RequestProtocol,
72    RequestResponseHandler,
73    RequestResponseHandlerEvent,
74};
75use tetsy_libp2p_core::{
76    ConnectedPoint,
77    Multiaddr,
78    PeerId,
79    connection::ConnectionId,
80};
81use tetsy_libp2p_swarm::{
82    DialPeerCondition,
83    NetworkBehaviour,
84    NetworkBehaviourAction,
85    NotifyHandler,
86    PollParameters,
87};
88use smallvec::SmallVec;
89use std::{
90    collections::{HashMap, HashSet, VecDeque},
91    fmt,
92    time::Duration,
93    sync::{atomic::AtomicU64, Arc},
94    task::{Context, Poll}
95};
96
97/// An inbound request or response.
98#[derive(Debug)]
99pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TResponse> {
100    /// A request message.
101    Request {
102        /// The ID of this request.
103        request_id: RequestId,
104        /// The request message.
105        request: TRequest,
106        /// The channel waiting for the response.
107        ///
108        /// If this channel is dropped instead of being used to send a response
109        /// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
110        /// with [`InboundFailure::ResponseOmission`] is emitted.
111        channel: ResponseChannel<TChannelResponse>,
112    },
113    /// A response message.
114    Response {
115        /// The ID of the request that produced this response.
116        ///
117        /// See [`RequestResponse::send_request`].
118        request_id: RequestId,
119        /// The response message.
120        response: TResponse
121    },
122}
123
124/// The events emitted by a [`RequestResponse`] protocol.
125#[derive(Debug)]
126pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse> {
127    /// An incoming message (request or response).
128    Message {
129        /// The peer who sent the message.
130        peer: PeerId,
131        /// The incoming message.
132        message: RequestResponseMessage<TRequest, TResponse, TChannelResponse>
133    },
134    /// An outbound request failed.
135    OutboundFailure {
136        /// The peer to whom the request was sent.
137        peer: PeerId,
138        /// The (local) ID of the failed request.
139        request_id: RequestId,
140        /// The error that occurred.
141        error: OutboundFailure,
142    },
143    /// An inbound request failed.
144    InboundFailure {
145        /// The peer from whom the request was received.
146        peer: PeerId,
147        /// The ID of the failed inbound request.
148        request_id: RequestId,
149        /// The error that occurred.
150        error: InboundFailure,
151    },
152    /// A response to an inbound request has been sent.
153    ///
154    /// When this event is received, the response has been flushed on
155    /// the underlying transport connection.
156    ResponseSent {
157        /// The peer to whom the response was sent.
158        peer: PeerId,
159        /// The ID of the inbound request whose response was sent.
160        request_id: RequestId,
161    },
162}
163
164/// Possible failures occurring in the context of sending
165/// an outbound request and receiving the response.
166#[derive(Debug, Clone)]
167pub enum OutboundFailure {
168    /// The request could not be sent because a dialing attempt failed.
169    DialFailure,
170    /// The request timed out before a response was received.
171    ///
172    /// It is not known whether the request may have been
173    /// received (and processed) by the remote peer.
174    Timeout,
175    /// The connection closed before a response was received.
176    ///
177    /// It is not known whether the request may have been
178    /// received (and processed) by the remote peer.
179    ConnectionClosed,
180    /// The remote supports none of the requested protocols.
181    UnsupportedProtocols,
182}
183
184/// Possible failures occurring in the context of receiving an
185/// inbound request and sending a response.
186#[derive(Debug, Clone)]
187pub enum InboundFailure {
188    /// The inbound request timed out, either while reading the
189    /// incoming request or before a response is sent, e.g. if
190    /// [`RequestResponse::send_response`] is not called in a
191    /// timely manner.
192    Timeout,
193    /// The connection closed before a response could be send.
194    ConnectionClosed,
195    /// The local peer supports none of the protocols requested
196    /// by the remote.
197    UnsupportedProtocols,
198    /// The local peer failed to respond to an inbound request
199    /// due to the [`ResponseChannel`] being dropped instead of
200    /// being passed to [`RequestResponse::send_response`].
201    ResponseOmission,
202}
203
204/// A channel for sending a response to an inbound request.
205///
206/// See [`RequestResponse::send_response`].
207#[derive(Debug)]
208pub struct ResponseChannel<TResponse> {
209    request_id: RequestId,
210    peer: PeerId,
211    sender: oneshot::Sender<TResponse>,
212}
213
214impl<TResponse> ResponseChannel<TResponse> {
215    /// Checks whether the response channel is still open, i.e.
216    /// the `RequestResponse` behaviour is still waiting for a
217    /// a response to be sent via [`RequestResponse::send_response`]
218    /// and this response channel.
219    ///
220    /// If the response channel is no longer open then the inbound
221    /// request timed out waiting for the response.
222    pub fn is_open(&self) -> bool {
223        !self.sender.is_canceled()
224    }
225
226    /// Get the ID of the inbound request waiting for a response.
227    pub(crate) fn request_id(&self) -> RequestId {
228        self.request_id
229    }
230}
231
232/// The ID of an inbound or outbound request.
233#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
234pub struct RequestId(u64);
235
236impl fmt::Display for RequestId {
237    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238        write!(f, "{}", self.0)
239    }
240}
241
242/// The configuration for a `RequestResponse` protocol.
243#[derive(Debug, Clone)]
244pub struct RequestResponseConfig {
245    request_timeout: Duration,
246    connection_keep_alive: Duration,
247}
248
249impl Default for RequestResponseConfig {
250    fn default() -> Self {
251        Self {
252            connection_keep_alive: Duration::from_secs(10),
253            request_timeout: Duration::from_secs(10),
254        }
255    }
256}
257
258impl RequestResponseConfig {
259    /// Sets the keep-alive timeout of idle connections.
260    pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
261        self.connection_keep_alive = v;
262        self
263    }
264
265    /// Sets the timeout for inbound and outbound requests.
266    pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
267        self.request_timeout = v;
268        self
269    }
270}
271
272/// A request/response protocol for some message codec.
273pub struct RequestResponse<TCodec>
274where
275    TCodec: RequestResponseCodec,
276{
277    /// The supported inbound protocols.
278    inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
279    /// The supported outbound protocols.
280    outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
281    /// The next (local) request ID.
282    next_request_id: RequestId,
283    /// The next (inbound) request ID.
284    next_inbound_id: Arc<AtomicU64>,
285    /// The protocol configuration.
286    config: RequestResponseConfig,
287    /// The protocol codec for reading and writing requests and responses.
288    codec: TCodec,
289    /// Pending events to return from `poll`.
290    pending_events: VecDeque<
291        NetworkBehaviourAction<
292            RequestProtocol<TCodec>,
293            RequestResponseEvent<TCodec::Request, TCodec::Response>>>,
294    /// The currently connected peers, their pending outbound and inbound responses and their known,
295    /// reachable addresses, if any.
296    connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
297    /// Externally managed addresses via `add_address` and `remove_address`.
298    addresses: HashMap<PeerId, SmallVec<[Multiaddr; 6]>>,
299    /// Requests that have not yet been sent and are waiting for a connection
300    /// to be established.
301    pending_outbound_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
302}
303
304impl<TCodec> RequestResponse<TCodec>
305where
306    TCodec: RequestResponseCodec + Clone,
307{
308    /// Creates a new `RequestResponse` behaviour for the given
309    /// protocols, codec and configuration.
310    pub fn new<I>(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self
311    where
312        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>
313    {
314        let mut inbound_protocols = SmallVec::new();
315        let mut outbound_protocols = SmallVec::new();
316        for (p, s) in protocols {
317            if s.inbound() {
318                inbound_protocols.push(p.clone());
319            }
320            if s.outbound() {
321                outbound_protocols.push(p.clone());
322            }
323        }
324        RequestResponse {
325            inbound_protocols,
326            outbound_protocols,
327            next_request_id: RequestId(1),
328            next_inbound_id: Arc::new(AtomicU64::new(1)),
329            config: cfg,
330            codec,
331            pending_events: VecDeque::new(),
332            connected: HashMap::new(),
333            pending_outbound_requests: HashMap::new(),
334            addresses: HashMap::new(),
335        }
336    }
337
338    /// Creates a `RequestResponse` which limits requests per peer.
339    ///
340    /// The behaviour is wrapped in [`Throttled`] and detects the limits
341    /// per peer at runtime which are then enforced.
342    pub fn throttled<I>(c: TCodec, protos: I, cfg: RequestResponseConfig) -> Throttled<TCodec>
343    where
344        I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
345        TCodec: Send,
346        TCodec::Protocol: Sync
347    {
348        Throttled::new(c, protos, cfg)
349    }
350
351    /// Initiates sending a request.
352    ///
353    /// If the targeted peer is currently not connected, a dialing
354    /// attempt is initiated and the request is sent as soon as a
355    /// connection is established.
356    ///
357    /// > **Note**: In order for such a dialing attempt to succeed,
358    /// > the `RequestResonse` protocol must either be embedded
359    /// > in another `NetworkBehaviour` that provides peer and
360    /// > address discovery, or known addresses of peers must be
361    /// > managed via [`RequestResponse::add_address`] and
362    /// > [`RequestResponse::remove_address`].
363    pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId {
364        let request_id = self.next_request_id();
365        let request = RequestProtocol {
366            request_id,
367            codec: self.codec.clone(),
368            protocols: self.outbound_protocols.clone(),
369            request,
370        };
371
372        if let Some(request) = self.try_send_request(peer, request) {
373            self.pending_events.push_back(NetworkBehaviourAction::DialPeer {
374                peer_id: peer.clone(),
375                condition: DialPeerCondition::Disconnected,
376            });
377            self.pending_outbound_requests.entry(peer.clone()).or_default().push(request);
378        }
379
380        request_id
381    }
382
383    /// Initiates sending a response to an inbound request.
384    ///
385    /// If the [`ResponseChannel`] is already closed due to a timeout or the
386    /// connection being closed, the response is returned as an `Err` for
387    /// further handling. Once the response has been successfully sent on the
388    /// corresponding connection, [`RequestResponseEvent::ResponseSent`] is
389    /// emitted. In all other cases [`RequestResponseEvent::InboundFailure`]
390    /// will be or has been emitted.
391    ///
392    /// The provided `ResponseChannel` is obtained from an inbound
393    /// [`RequestResponseMessage::Request`].
394    pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response)
395        -> Result<(), TCodec::Response>
396    {
397        ch.sender.send(rs)
398    }
399
400    /// Adds a known address for a peer that can be used for
401    /// dialing attempts by the `Swarm`, i.e. is returned
402    /// by [`NetworkBehaviour::addresses_of_peer`].
403    ///
404    /// Addresses added in this way are only removed by `remove_address`.
405    pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
406        self.addresses.entry(peer.clone()).or_default().push(address);
407    }
408
409    /// Removes an address of a peer previously added via `add_address`.
410    pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
411        let mut last = false;
412        if let Some(addresses) = self.addresses.get_mut(peer) {
413            addresses.retain(|a| a != address);
414            last = addresses.is_empty();
415        }
416        if last {
417            self.addresses.remove(peer);
418        }
419    }
420
421    /// Checks whether a peer is currently connected.
422    pub fn is_connected(&self, peer: &PeerId) -> bool {
423        if let Some(connections) = self.connected.get(peer) {
424            !connections.is_empty()
425        } else {
426            false
427        }
428    }
429
430    /// Checks whether an outbound request to the peer with the provided
431    /// [`PeerId`] initiated by [`RequestResponse::send_request`] is still
432    /// pending, i.e. waiting for a response.
433    pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
434        self.connected.get(peer)
435            .map(|cs| cs.iter().any(|c| c.pending_inbound_responses.contains(request_id)))
436            .unwrap_or(false)
437    }
438
439    /// Checks whether an inbound request from the peer with the provided
440    /// [`PeerId`] is still pending, i.e. waiting for a response by the local
441    /// node through [`RequestResponse::send_response`].
442    pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
443        self.connected.get(peer)
444            .map(|cs| cs.iter().any(|c| c.pending_outbound_responses.contains(request_id)))
445            .unwrap_or(false)
446    }
447
448    /// Returns the next request ID.
449    fn next_request_id(&mut self) -> RequestId {
450        let request_id = self.next_request_id;
451        self.next_request_id.0 += 1;
452        request_id
453    }
454
455    /// Tries to send a request by queueing an appropriate event to be
456    /// emitted to the `Swarm`. If the peer is not currently connected,
457    /// the given request is return unchanged.
458    fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol<TCodec>)
459        -> Option<RequestProtocol<TCodec>>
460    {
461        if let Some(connections) = self.connected.get_mut(peer) {
462            if connections.is_empty() {
463                return Some(request)
464            }
465            let ix = (request.request_id.0 as usize) % connections.len();
466            let conn = &mut connections[ix];
467            conn.pending_inbound_responses.insert(request.request_id);
468            self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
469                peer_id: peer.clone(),
470                handler: NotifyHandler::One(conn.id),
471                event: request
472            });
473            None
474        } else {
475            Some(request)
476        }
477    }
478
479    /// Remove pending outbound response for the given peer and connection.
480    ///
481    /// Returns `true` if the provided connection to the given peer is still
482    /// alive and the [`RequestId`] was previously present and is now removed.
483    /// Returns `false` otherwise.
484    fn remove_pending_outbound_response(
485        &mut self,
486        peer: &PeerId,
487        connection: ConnectionId,
488        request: RequestId,
489    ) -> bool {
490        self.get_connection_mut(peer, connection)
491            .map(|c| c.pending_outbound_responses.remove(&request))
492            .unwrap_or(false)
493    }
494
495    /// Remove pending inbound response for the given peer and connection.
496    ///
497    /// Returns `true` if the provided connection to the given peer is still
498    /// alive and the [`RequestId`] was previously present and is now removed.
499    /// Returns `false` otherwise.
500    fn remove_pending_inbound_response(
501        &mut self,
502        peer: &PeerId,
503        connection: ConnectionId,
504        request: &RequestId,
505    ) -> bool {
506        self.get_connection_mut(peer, connection)
507            .map(|c| c.pending_inbound_responses.remove(request))
508            .unwrap_or(false)
509    }
510
511    /// Returns a mutable reference to the connection in `self.connected`
512    /// corresponding to the given [`PeerId`] and [`ConnectionId`].
513    fn get_connection_mut(
514        &mut self,
515        peer: &PeerId,
516        connection: ConnectionId,
517    ) -> Option<&mut Connection> {
518        self.connected.get_mut(peer).and_then(|connections| {
519            connections.iter_mut().find(|c| c.id == connection)
520        })
521    }
522}
523
524impl<TCodec> NetworkBehaviour for RequestResponse<TCodec>
525where
526    TCodec: RequestResponseCodec + Send + Clone + 'static,
527{
528    type ProtocolsHandler = RequestResponseHandler<TCodec>;
529    type OutEvent = RequestResponseEvent<TCodec::Request, TCodec::Response>;
530
531    fn new_handler(&mut self) -> Self::ProtocolsHandler {
532        RequestResponseHandler::new(
533            self.inbound_protocols.clone(),
534            self.codec.clone(),
535            self.config.connection_keep_alive,
536            self.config.request_timeout,
537            self.next_inbound_id.clone()
538        )
539    }
540
541    fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
542        let mut addresses = Vec::new();
543        if let Some(connections) = self.connected.get(peer) {
544            addresses.extend(connections.iter().filter_map(|c| c.address.clone()))
545        }
546        if let Some(more) = self.addresses.get(peer) {
547            addresses.extend(more.into_iter().cloned());
548        }
549        addresses
550    }
551
552    fn inject_connected(&mut self, peer: &PeerId) {
553        if let Some(pending) = self.pending_outbound_requests.remove(peer) {
554            for request in pending {
555                let request = self.try_send_request(peer, request);
556                assert!(request.is_none());
557            }
558        }
559    }
560
561    fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
562        let address = match endpoint {
563            ConnectedPoint::Dialer { address } => Some(address.clone()),
564            ConnectedPoint::Listener { .. } => None
565        };
566        self.connected.entry(peer.clone())
567            .or_default()
568            .push(Connection::new(*conn, address));
569    }
570
571    fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
572        let connections = self.connected.get_mut(peer_id)
573            .expect("Expected some established connection to peer before closing.");
574
575        let connection = connections.iter()
576            .position(|c| &c.id == conn)
577            .map(|p: usize| connections.remove(p))
578            .expect("Expected connection to be established before closing.");
579
580        if connections.is_empty() {
581            self.connected.remove(peer_id);
582        }
583
584        for request_id in connection.pending_outbound_responses {
585            self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
586                RequestResponseEvent::InboundFailure {
587                    peer: peer_id.clone(),
588                    request_id,
589                    error: InboundFailure::ConnectionClosed
590                }
591            ));
592
593        }
594
595        for request_id in connection.pending_inbound_responses {
596            self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
597                RequestResponseEvent::OutboundFailure {
598                    peer: peer_id.clone(),
599                    request_id,
600                    error: OutboundFailure::ConnectionClosed
601                }
602            ));
603        }
604    }
605
606    fn inject_disconnected(&mut self, peer: &PeerId) {
607        self.connected.remove(peer);
608    }
609
610    fn inject_dial_failure(&mut self, peer: &PeerId) {
611        // If there are pending outgoing requests when a dial failure occurs,
612        // it is implied that we are not connected to the peer, since pending
613        // outgoing requests are drained when a connection is established and
614        // only created when a peer is not connected when a request is made.
615        // Thus these requests must be considered failed, even if there is
616        // another, concurrent dialing attempt ongoing.
617        if let Some(pending) = self.pending_outbound_requests.remove(peer) {
618            for request in pending {
619                self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
620                    RequestResponseEvent::OutboundFailure {
621                        peer: peer.clone(),
622                        request_id: request.request_id,
623                        error: OutboundFailure::DialFailure
624                    }
625                ));
626            }
627        }
628    }
629
630    fn inject_event(
631        &mut self,
632        peer: PeerId,
633        connection: ConnectionId,
634        event: RequestResponseHandlerEvent<TCodec>,
635    ) {
636        match event {
637            RequestResponseHandlerEvent::Response { request_id, response } => {
638                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
639                debug_assert!(
640                    removed,
641                    "Expect request_id to be pending before receiving response.",
642                );
643
644                let message = RequestResponseMessage::Response { request_id, response };
645                self.pending_events.push_back(
646                    NetworkBehaviourAction::GenerateEvent(
647                        RequestResponseEvent::Message { peer, message }));
648            }
649            RequestResponseHandlerEvent::Request { request_id, request, sender } => {
650                let channel = ResponseChannel { request_id, peer: peer.clone(), sender };
651                let message = RequestResponseMessage::Request { request_id, request, channel };
652                self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
653                    RequestResponseEvent::Message { peer: peer.clone(), message }
654                ));
655
656                match self.get_connection_mut(&peer, connection) {
657                    Some(connection) => {
658                        let inserted = connection.pending_outbound_responses.insert(request_id);
659                        debug_assert!(inserted, "Expect id of new request to be unknown.");
660                    },
661                    // Connection closed after `RequestResponseEvent::Request` has been emitted.
662                    None => {
663                        self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
664                            RequestResponseEvent::InboundFailure {
665                                peer: peer.clone(),
666                                request_id,
667                                error: InboundFailure::ConnectionClosed
668                            }
669                        ));
670                    }
671                }
672            }
673            RequestResponseHandlerEvent::ResponseSent(request_id) => {
674                let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
675                debug_assert!(removed, "Expect request_id to be pending before response is sent.");
676
677                self.pending_events.push_back(
678                    NetworkBehaviourAction::GenerateEvent(
679                        RequestResponseEvent::ResponseSent { peer, request_id }));
680            }
681            RequestResponseHandlerEvent::ResponseOmission(request_id) => {
682                let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
683                debug_assert!(
684                    removed,
685                    "Expect request_id to be pending before response is omitted.",
686                );
687
688                self.pending_events.push_back(
689                    NetworkBehaviourAction::GenerateEvent(
690                        RequestResponseEvent::InboundFailure {
691                            peer,
692                            request_id,
693                            error: InboundFailure::ResponseOmission
694                        }));
695            }
696            RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
697                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
698                debug_assert!(removed, "Expect request_id to be pending before request times out.");
699
700                self.pending_events.push_back(
701                    NetworkBehaviourAction::GenerateEvent(
702                        RequestResponseEvent::OutboundFailure {
703                            peer,
704                            request_id,
705                            error: OutboundFailure::Timeout,
706                        }));
707            }
708            RequestResponseHandlerEvent::InboundTimeout(request_id) => {
709                // Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing
710                // out to receive the request and for timing out sending the response. In the former
711                // case the request is never added to `pending_outbound_responses` and thus one can
712                // not assert the request_id to be present before removing it.
713                self.remove_pending_outbound_response(&peer, connection, request_id);
714
715                self.pending_events.push_back(
716                    NetworkBehaviourAction::GenerateEvent(
717                        RequestResponseEvent::InboundFailure {
718                            peer,
719                            request_id,
720                            error: InboundFailure::Timeout,
721                        }));
722            }
723            RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
724                let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
725                debug_assert!(
726                    removed,
727                    "Expect request_id to be pending before failing to connect.",
728                );
729
730                self.pending_events.push_back(
731                    NetworkBehaviourAction::GenerateEvent(
732                        RequestResponseEvent::OutboundFailure {
733                            peer,
734                            request_id,
735                            error: OutboundFailure::UnsupportedProtocols,
736                        }));
737            }
738            RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => {
739                // Note: No need to call `self.remove_pending_outbound_response`,
740                // `RequestResponseHandlerEvent::Request` was never emitted for this request and
741                // thus request was never added to `pending_outbound_responses`.
742                self.pending_events.push_back(
743                    NetworkBehaviourAction::GenerateEvent(
744                        RequestResponseEvent::InboundFailure {
745                            peer,
746                            request_id,
747                            error: InboundFailure::UnsupportedProtocols,
748                        }));
749            }
750        }
751    }
752
753    fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters)
754        -> Poll<NetworkBehaviourAction<
755            RequestProtocol<TCodec>,
756            RequestResponseEvent<TCodec::Request, TCodec::Response>
757        >>
758    {
759        if let Some(ev) = self.pending_events.pop_front() {
760            return Poll::Ready(ev);
761        } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
762            self.pending_events.shrink_to_fit();
763        }
764
765        Poll::Pending
766    }
767}
768
769/// Internal threshold for when to shrink the capacity
770/// of empty queues. If the capacity of an empty queue
771/// exceeds this threshold, the associated memory is
772/// released.
773const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
774
775/// Internal information tracked for an established connection.
776struct Connection {
777    id: ConnectionId,
778    address: Option<Multiaddr>,
779    /// Pending outbound responses where corresponding inbound requests have
780    /// been received on this connection and emitted via `poll` but have not yet
781    /// been answered.
782    pending_outbound_responses: HashSet<RequestId>,
783    /// Pending inbound responses for previously sent requests on this
784    /// connection.
785    pending_inbound_responses: HashSet<RequestId>
786}
787
788impl Connection {
789    fn new(id: ConnectionId, address: Option<Multiaddr>) -> Self {
790        Self {
791            id,
792            address,
793            pending_outbound_responses: Default::default(),
794            pending_inbound_responses: Default::default(),
795        }
796    }
797}