tetsy_libp2p_request_response/lib.rs
1// Copyright 2020 Parity Technologies (UK) Ltd.
2//
3// Permission is hereby granted, free of charge, to any person obtaining a
4// copy of this software and associated documentation files (the "Software"),
5// to deal in the Software without restriction, including without limitation
6// the rights to use, copy, modify, merge, publish, distribute, sublicense,
7// and/or sell copies of the Software, and to permit persons to whom the
8// Software is furnished to do so, subject to the following conditions:
9//
10// The above copyright notice and this permission notice shall be included in
11// all copies or substantial portions of the Software.
12//
13// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
14// OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
15// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
16// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
17// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
18// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
19// DEALINGS IN THE SOFTWARE.
20
21//! Generic request/response protocols.
22//!
23//! ## General Usage
24//!
25//! [`RequestResponse`] is a `NetworkBehaviour` that implements a generic
26//! request/response protocol or protocol family, whereby each request is
27//! sent over a new substream on a connection. `RequestResponse` is generic
28//! over the actual messages being sent, which are defined in terms of a
29//! [`RequestResponseCodec`]. Creating a request/response protocol thus amounts
30//! to providing an implementation of this trait which can then be
31//! given to [`RequestResponse::new`]. Further configuration options are
32//! available via the [`RequestResponseConfig`].
33//!
34//! Requests are sent using [`RequestResponse::send_request`] and the
35//! responses received as [`RequestResponseMessage::Response`] via
36//! [`RequestResponseEvent::Message`].
37//!
38//! Responses are sent using [`RequestResponse::send_response`] upon
39//! receiving a [`RequestResponseMessage::Request`] via
40//! [`RequestResponseEvent::Message`].
41//!
42//! ## Protocol Families
43//!
44//! A single [`RequestResponse`] instance can be used with an entire
45//! protocol family that share the same request and response types.
46//! For that purpose, [`RequestResponseCodec::Protocol`] is typically
47//! instantiated with a sum type.
48//!
49//! ## Limited Protocol Support
50//!
51//! It is possible to only support inbound or outbound requests for
52//! a particular protocol. This is achieved by instantiating `RequestResponse`
53//! with protocols using [`ProtocolSupport::Inbound`] or
54//! [`ProtocolSupport::Outbound`]. Any subset of protocols of a protocol
55//! family can be configured in this way. Such protocols will not be
56//! advertised during inbound respectively outbound protocol negotiation
57//! on the substreams.
58
59pub mod codec;
60pub mod handler;
61pub mod throttled;
62
63pub use codec::{RequestResponseCodec, ProtocolName};
64pub use handler::ProtocolSupport;
65pub use throttled::Throttled;
66
67use futures::{
68 channel::oneshot,
69};
70use handler::{
71 RequestProtocol,
72 RequestResponseHandler,
73 RequestResponseHandlerEvent,
74};
75use tetsy_libp2p_core::{
76 ConnectedPoint,
77 Multiaddr,
78 PeerId,
79 connection::ConnectionId,
80};
81use tetsy_libp2p_swarm::{
82 DialPeerCondition,
83 NetworkBehaviour,
84 NetworkBehaviourAction,
85 NotifyHandler,
86 PollParameters,
87};
88use smallvec::SmallVec;
89use std::{
90 collections::{HashMap, HashSet, VecDeque},
91 fmt,
92 time::Duration,
93 sync::{atomic::AtomicU64, Arc},
94 task::{Context, Poll}
95};
96
97/// An inbound request or response.
98#[derive(Debug)]
99pub enum RequestResponseMessage<TRequest, TResponse, TChannelResponse = TResponse> {
100 /// A request message.
101 Request {
102 /// The ID of this request.
103 request_id: RequestId,
104 /// The request message.
105 request: TRequest,
106 /// The channel waiting for the response.
107 ///
108 /// If this channel is dropped instead of being used to send a response
109 /// via [`RequestResponse::send_response`], a [`RequestResponseEvent::InboundFailure`]
110 /// with [`InboundFailure::ResponseOmission`] is emitted.
111 channel: ResponseChannel<TChannelResponse>,
112 },
113 /// A response message.
114 Response {
115 /// The ID of the request that produced this response.
116 ///
117 /// See [`RequestResponse::send_request`].
118 request_id: RequestId,
119 /// The response message.
120 response: TResponse
121 },
122}
123
124/// The events emitted by a [`RequestResponse`] protocol.
125#[derive(Debug)]
126pub enum RequestResponseEvent<TRequest, TResponse, TChannelResponse = TResponse> {
127 /// An incoming message (request or response).
128 Message {
129 /// The peer who sent the message.
130 peer: PeerId,
131 /// The incoming message.
132 message: RequestResponseMessage<TRequest, TResponse, TChannelResponse>
133 },
134 /// An outbound request failed.
135 OutboundFailure {
136 /// The peer to whom the request was sent.
137 peer: PeerId,
138 /// The (local) ID of the failed request.
139 request_id: RequestId,
140 /// The error that occurred.
141 error: OutboundFailure,
142 },
143 /// An inbound request failed.
144 InboundFailure {
145 /// The peer from whom the request was received.
146 peer: PeerId,
147 /// The ID of the failed inbound request.
148 request_id: RequestId,
149 /// The error that occurred.
150 error: InboundFailure,
151 },
152 /// A response to an inbound request has been sent.
153 ///
154 /// When this event is received, the response has been flushed on
155 /// the underlying transport connection.
156 ResponseSent {
157 /// The peer to whom the response was sent.
158 peer: PeerId,
159 /// The ID of the inbound request whose response was sent.
160 request_id: RequestId,
161 },
162}
163
164/// Possible failures occurring in the context of sending
165/// an outbound request and receiving the response.
166#[derive(Debug, Clone)]
167pub enum OutboundFailure {
168 /// The request could not be sent because a dialing attempt failed.
169 DialFailure,
170 /// The request timed out before a response was received.
171 ///
172 /// It is not known whether the request may have been
173 /// received (and processed) by the remote peer.
174 Timeout,
175 /// The connection closed before a response was received.
176 ///
177 /// It is not known whether the request may have been
178 /// received (and processed) by the remote peer.
179 ConnectionClosed,
180 /// The remote supports none of the requested protocols.
181 UnsupportedProtocols,
182}
183
184/// Possible failures occurring in the context of receiving an
185/// inbound request and sending a response.
186#[derive(Debug, Clone)]
187pub enum InboundFailure {
188 /// The inbound request timed out, either while reading the
189 /// incoming request or before a response is sent, e.g. if
190 /// [`RequestResponse::send_response`] is not called in a
191 /// timely manner.
192 Timeout,
193 /// The connection closed before a response could be send.
194 ConnectionClosed,
195 /// The local peer supports none of the protocols requested
196 /// by the remote.
197 UnsupportedProtocols,
198 /// The local peer failed to respond to an inbound request
199 /// due to the [`ResponseChannel`] being dropped instead of
200 /// being passed to [`RequestResponse::send_response`].
201 ResponseOmission,
202}
203
204/// A channel for sending a response to an inbound request.
205///
206/// See [`RequestResponse::send_response`].
207#[derive(Debug)]
208pub struct ResponseChannel<TResponse> {
209 request_id: RequestId,
210 peer: PeerId,
211 sender: oneshot::Sender<TResponse>,
212}
213
214impl<TResponse> ResponseChannel<TResponse> {
215 /// Checks whether the response channel is still open, i.e.
216 /// the `RequestResponse` behaviour is still waiting for a
217 /// a response to be sent via [`RequestResponse::send_response`]
218 /// and this response channel.
219 ///
220 /// If the response channel is no longer open then the inbound
221 /// request timed out waiting for the response.
222 pub fn is_open(&self) -> bool {
223 !self.sender.is_canceled()
224 }
225
226 /// Get the ID of the inbound request waiting for a response.
227 pub(crate) fn request_id(&self) -> RequestId {
228 self.request_id
229 }
230}
231
232/// The ID of an inbound or outbound request.
233#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
234pub struct RequestId(u64);
235
236impl fmt::Display for RequestId {
237 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
238 write!(f, "{}", self.0)
239 }
240}
241
242/// The configuration for a `RequestResponse` protocol.
243#[derive(Debug, Clone)]
244pub struct RequestResponseConfig {
245 request_timeout: Duration,
246 connection_keep_alive: Duration,
247}
248
249impl Default for RequestResponseConfig {
250 fn default() -> Self {
251 Self {
252 connection_keep_alive: Duration::from_secs(10),
253 request_timeout: Duration::from_secs(10),
254 }
255 }
256}
257
258impl RequestResponseConfig {
259 /// Sets the keep-alive timeout of idle connections.
260 pub fn set_connection_keep_alive(&mut self, v: Duration) -> &mut Self {
261 self.connection_keep_alive = v;
262 self
263 }
264
265 /// Sets the timeout for inbound and outbound requests.
266 pub fn set_request_timeout(&mut self, v: Duration) -> &mut Self {
267 self.request_timeout = v;
268 self
269 }
270}
271
272/// A request/response protocol for some message codec.
273pub struct RequestResponse<TCodec>
274where
275 TCodec: RequestResponseCodec,
276{
277 /// The supported inbound protocols.
278 inbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
279 /// The supported outbound protocols.
280 outbound_protocols: SmallVec<[TCodec::Protocol; 2]>,
281 /// The next (local) request ID.
282 next_request_id: RequestId,
283 /// The next (inbound) request ID.
284 next_inbound_id: Arc<AtomicU64>,
285 /// The protocol configuration.
286 config: RequestResponseConfig,
287 /// The protocol codec for reading and writing requests and responses.
288 codec: TCodec,
289 /// Pending events to return from `poll`.
290 pending_events: VecDeque<
291 NetworkBehaviourAction<
292 RequestProtocol<TCodec>,
293 RequestResponseEvent<TCodec::Request, TCodec::Response>>>,
294 /// The currently connected peers, their pending outbound and inbound responses and their known,
295 /// reachable addresses, if any.
296 connected: HashMap<PeerId, SmallVec<[Connection; 2]>>,
297 /// Externally managed addresses via `add_address` and `remove_address`.
298 addresses: HashMap<PeerId, SmallVec<[Multiaddr; 6]>>,
299 /// Requests that have not yet been sent and are waiting for a connection
300 /// to be established.
301 pending_outbound_requests: HashMap<PeerId, SmallVec<[RequestProtocol<TCodec>; 10]>>,
302}
303
304impl<TCodec> RequestResponse<TCodec>
305where
306 TCodec: RequestResponseCodec + Clone,
307{
308 /// Creates a new `RequestResponse` behaviour for the given
309 /// protocols, codec and configuration.
310 pub fn new<I>(codec: TCodec, protocols: I, cfg: RequestResponseConfig) -> Self
311 where
312 I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>
313 {
314 let mut inbound_protocols = SmallVec::new();
315 let mut outbound_protocols = SmallVec::new();
316 for (p, s) in protocols {
317 if s.inbound() {
318 inbound_protocols.push(p.clone());
319 }
320 if s.outbound() {
321 outbound_protocols.push(p.clone());
322 }
323 }
324 RequestResponse {
325 inbound_protocols,
326 outbound_protocols,
327 next_request_id: RequestId(1),
328 next_inbound_id: Arc::new(AtomicU64::new(1)),
329 config: cfg,
330 codec,
331 pending_events: VecDeque::new(),
332 connected: HashMap::new(),
333 pending_outbound_requests: HashMap::new(),
334 addresses: HashMap::new(),
335 }
336 }
337
338 /// Creates a `RequestResponse` which limits requests per peer.
339 ///
340 /// The behaviour is wrapped in [`Throttled`] and detects the limits
341 /// per peer at runtime which are then enforced.
342 pub fn throttled<I>(c: TCodec, protos: I, cfg: RequestResponseConfig) -> Throttled<TCodec>
343 where
344 I: IntoIterator<Item = (TCodec::Protocol, ProtocolSupport)>,
345 TCodec: Send,
346 TCodec::Protocol: Sync
347 {
348 Throttled::new(c, protos, cfg)
349 }
350
351 /// Initiates sending a request.
352 ///
353 /// If the targeted peer is currently not connected, a dialing
354 /// attempt is initiated and the request is sent as soon as a
355 /// connection is established.
356 ///
357 /// > **Note**: In order for such a dialing attempt to succeed,
358 /// > the `RequestResonse` protocol must either be embedded
359 /// > in another `NetworkBehaviour` that provides peer and
360 /// > address discovery, or known addresses of peers must be
361 /// > managed via [`RequestResponse::add_address`] and
362 /// > [`RequestResponse::remove_address`].
363 pub fn send_request(&mut self, peer: &PeerId, request: TCodec::Request) -> RequestId {
364 let request_id = self.next_request_id();
365 let request = RequestProtocol {
366 request_id,
367 codec: self.codec.clone(),
368 protocols: self.outbound_protocols.clone(),
369 request,
370 };
371
372 if let Some(request) = self.try_send_request(peer, request) {
373 self.pending_events.push_back(NetworkBehaviourAction::DialPeer {
374 peer_id: peer.clone(),
375 condition: DialPeerCondition::Disconnected,
376 });
377 self.pending_outbound_requests.entry(peer.clone()).or_default().push(request);
378 }
379
380 request_id
381 }
382
383 /// Initiates sending a response to an inbound request.
384 ///
385 /// If the [`ResponseChannel`] is already closed due to a timeout or the
386 /// connection being closed, the response is returned as an `Err` for
387 /// further handling. Once the response has been successfully sent on the
388 /// corresponding connection, [`RequestResponseEvent::ResponseSent`] is
389 /// emitted. In all other cases [`RequestResponseEvent::InboundFailure`]
390 /// will be or has been emitted.
391 ///
392 /// The provided `ResponseChannel` is obtained from an inbound
393 /// [`RequestResponseMessage::Request`].
394 pub fn send_response(&mut self, ch: ResponseChannel<TCodec::Response>, rs: TCodec::Response)
395 -> Result<(), TCodec::Response>
396 {
397 ch.sender.send(rs)
398 }
399
400 /// Adds a known address for a peer that can be used for
401 /// dialing attempts by the `Swarm`, i.e. is returned
402 /// by [`NetworkBehaviour::addresses_of_peer`].
403 ///
404 /// Addresses added in this way are only removed by `remove_address`.
405 pub fn add_address(&mut self, peer: &PeerId, address: Multiaddr) {
406 self.addresses.entry(peer.clone()).or_default().push(address);
407 }
408
409 /// Removes an address of a peer previously added via `add_address`.
410 pub fn remove_address(&mut self, peer: &PeerId, address: &Multiaddr) {
411 let mut last = false;
412 if let Some(addresses) = self.addresses.get_mut(peer) {
413 addresses.retain(|a| a != address);
414 last = addresses.is_empty();
415 }
416 if last {
417 self.addresses.remove(peer);
418 }
419 }
420
421 /// Checks whether a peer is currently connected.
422 pub fn is_connected(&self, peer: &PeerId) -> bool {
423 if let Some(connections) = self.connected.get(peer) {
424 !connections.is_empty()
425 } else {
426 false
427 }
428 }
429
430 /// Checks whether an outbound request to the peer with the provided
431 /// [`PeerId`] initiated by [`RequestResponse::send_request`] is still
432 /// pending, i.e. waiting for a response.
433 pub fn is_pending_outbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
434 self.connected.get(peer)
435 .map(|cs| cs.iter().any(|c| c.pending_inbound_responses.contains(request_id)))
436 .unwrap_or(false)
437 }
438
439 /// Checks whether an inbound request from the peer with the provided
440 /// [`PeerId`] is still pending, i.e. waiting for a response by the local
441 /// node through [`RequestResponse::send_response`].
442 pub fn is_pending_inbound(&self, peer: &PeerId, request_id: &RequestId) -> bool {
443 self.connected.get(peer)
444 .map(|cs| cs.iter().any(|c| c.pending_outbound_responses.contains(request_id)))
445 .unwrap_or(false)
446 }
447
448 /// Returns the next request ID.
449 fn next_request_id(&mut self) -> RequestId {
450 let request_id = self.next_request_id;
451 self.next_request_id.0 += 1;
452 request_id
453 }
454
455 /// Tries to send a request by queueing an appropriate event to be
456 /// emitted to the `Swarm`. If the peer is not currently connected,
457 /// the given request is return unchanged.
458 fn try_send_request(&mut self, peer: &PeerId, request: RequestProtocol<TCodec>)
459 -> Option<RequestProtocol<TCodec>>
460 {
461 if let Some(connections) = self.connected.get_mut(peer) {
462 if connections.is_empty() {
463 return Some(request)
464 }
465 let ix = (request.request_id.0 as usize) % connections.len();
466 let conn = &mut connections[ix];
467 conn.pending_inbound_responses.insert(request.request_id);
468 self.pending_events.push_back(NetworkBehaviourAction::NotifyHandler {
469 peer_id: peer.clone(),
470 handler: NotifyHandler::One(conn.id),
471 event: request
472 });
473 None
474 } else {
475 Some(request)
476 }
477 }
478
479 /// Remove pending outbound response for the given peer and connection.
480 ///
481 /// Returns `true` if the provided connection to the given peer is still
482 /// alive and the [`RequestId`] was previously present and is now removed.
483 /// Returns `false` otherwise.
484 fn remove_pending_outbound_response(
485 &mut self,
486 peer: &PeerId,
487 connection: ConnectionId,
488 request: RequestId,
489 ) -> bool {
490 self.get_connection_mut(peer, connection)
491 .map(|c| c.pending_outbound_responses.remove(&request))
492 .unwrap_or(false)
493 }
494
495 /// Remove pending inbound response for the given peer and connection.
496 ///
497 /// Returns `true` if the provided connection to the given peer is still
498 /// alive and the [`RequestId`] was previously present and is now removed.
499 /// Returns `false` otherwise.
500 fn remove_pending_inbound_response(
501 &mut self,
502 peer: &PeerId,
503 connection: ConnectionId,
504 request: &RequestId,
505 ) -> bool {
506 self.get_connection_mut(peer, connection)
507 .map(|c| c.pending_inbound_responses.remove(request))
508 .unwrap_or(false)
509 }
510
511 /// Returns a mutable reference to the connection in `self.connected`
512 /// corresponding to the given [`PeerId`] and [`ConnectionId`].
513 fn get_connection_mut(
514 &mut self,
515 peer: &PeerId,
516 connection: ConnectionId,
517 ) -> Option<&mut Connection> {
518 self.connected.get_mut(peer).and_then(|connections| {
519 connections.iter_mut().find(|c| c.id == connection)
520 })
521 }
522}
523
524impl<TCodec> NetworkBehaviour for RequestResponse<TCodec>
525where
526 TCodec: RequestResponseCodec + Send + Clone + 'static,
527{
528 type ProtocolsHandler = RequestResponseHandler<TCodec>;
529 type OutEvent = RequestResponseEvent<TCodec::Request, TCodec::Response>;
530
531 fn new_handler(&mut self) -> Self::ProtocolsHandler {
532 RequestResponseHandler::new(
533 self.inbound_protocols.clone(),
534 self.codec.clone(),
535 self.config.connection_keep_alive,
536 self.config.request_timeout,
537 self.next_inbound_id.clone()
538 )
539 }
540
541 fn addresses_of_peer(&mut self, peer: &PeerId) -> Vec<Multiaddr> {
542 let mut addresses = Vec::new();
543 if let Some(connections) = self.connected.get(peer) {
544 addresses.extend(connections.iter().filter_map(|c| c.address.clone()))
545 }
546 if let Some(more) = self.addresses.get(peer) {
547 addresses.extend(more.into_iter().cloned());
548 }
549 addresses
550 }
551
552 fn inject_connected(&mut self, peer: &PeerId) {
553 if let Some(pending) = self.pending_outbound_requests.remove(peer) {
554 for request in pending {
555 let request = self.try_send_request(peer, request);
556 assert!(request.is_none());
557 }
558 }
559 }
560
561 fn inject_connection_established(&mut self, peer: &PeerId, conn: &ConnectionId, endpoint: &ConnectedPoint) {
562 let address = match endpoint {
563 ConnectedPoint::Dialer { address } => Some(address.clone()),
564 ConnectedPoint::Listener { .. } => None
565 };
566 self.connected.entry(peer.clone())
567 .or_default()
568 .push(Connection::new(*conn, address));
569 }
570
571 fn inject_connection_closed(&mut self, peer_id: &PeerId, conn: &ConnectionId, _: &ConnectedPoint) {
572 let connections = self.connected.get_mut(peer_id)
573 .expect("Expected some established connection to peer before closing.");
574
575 let connection = connections.iter()
576 .position(|c| &c.id == conn)
577 .map(|p: usize| connections.remove(p))
578 .expect("Expected connection to be established before closing.");
579
580 if connections.is_empty() {
581 self.connected.remove(peer_id);
582 }
583
584 for request_id in connection.pending_outbound_responses {
585 self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
586 RequestResponseEvent::InboundFailure {
587 peer: peer_id.clone(),
588 request_id,
589 error: InboundFailure::ConnectionClosed
590 }
591 ));
592
593 }
594
595 for request_id in connection.pending_inbound_responses {
596 self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
597 RequestResponseEvent::OutboundFailure {
598 peer: peer_id.clone(),
599 request_id,
600 error: OutboundFailure::ConnectionClosed
601 }
602 ));
603 }
604 }
605
606 fn inject_disconnected(&mut self, peer: &PeerId) {
607 self.connected.remove(peer);
608 }
609
610 fn inject_dial_failure(&mut self, peer: &PeerId) {
611 // If there are pending outgoing requests when a dial failure occurs,
612 // it is implied that we are not connected to the peer, since pending
613 // outgoing requests are drained when a connection is established and
614 // only created when a peer is not connected when a request is made.
615 // Thus these requests must be considered failed, even if there is
616 // another, concurrent dialing attempt ongoing.
617 if let Some(pending) = self.pending_outbound_requests.remove(peer) {
618 for request in pending {
619 self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
620 RequestResponseEvent::OutboundFailure {
621 peer: peer.clone(),
622 request_id: request.request_id,
623 error: OutboundFailure::DialFailure
624 }
625 ));
626 }
627 }
628 }
629
630 fn inject_event(
631 &mut self,
632 peer: PeerId,
633 connection: ConnectionId,
634 event: RequestResponseHandlerEvent<TCodec>,
635 ) {
636 match event {
637 RequestResponseHandlerEvent::Response { request_id, response } => {
638 let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
639 debug_assert!(
640 removed,
641 "Expect request_id to be pending before receiving response.",
642 );
643
644 let message = RequestResponseMessage::Response { request_id, response };
645 self.pending_events.push_back(
646 NetworkBehaviourAction::GenerateEvent(
647 RequestResponseEvent::Message { peer, message }));
648 }
649 RequestResponseHandlerEvent::Request { request_id, request, sender } => {
650 let channel = ResponseChannel { request_id, peer: peer.clone(), sender };
651 let message = RequestResponseMessage::Request { request_id, request, channel };
652 self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
653 RequestResponseEvent::Message { peer: peer.clone(), message }
654 ));
655
656 match self.get_connection_mut(&peer, connection) {
657 Some(connection) => {
658 let inserted = connection.pending_outbound_responses.insert(request_id);
659 debug_assert!(inserted, "Expect id of new request to be unknown.");
660 },
661 // Connection closed after `RequestResponseEvent::Request` has been emitted.
662 None => {
663 self.pending_events.push_back(NetworkBehaviourAction::GenerateEvent(
664 RequestResponseEvent::InboundFailure {
665 peer: peer.clone(),
666 request_id,
667 error: InboundFailure::ConnectionClosed
668 }
669 ));
670 }
671 }
672 }
673 RequestResponseHandlerEvent::ResponseSent(request_id) => {
674 let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
675 debug_assert!(removed, "Expect request_id to be pending before response is sent.");
676
677 self.pending_events.push_back(
678 NetworkBehaviourAction::GenerateEvent(
679 RequestResponseEvent::ResponseSent { peer, request_id }));
680 }
681 RequestResponseHandlerEvent::ResponseOmission(request_id) => {
682 let removed = self.remove_pending_outbound_response(&peer, connection, request_id);
683 debug_assert!(
684 removed,
685 "Expect request_id to be pending before response is omitted.",
686 );
687
688 self.pending_events.push_back(
689 NetworkBehaviourAction::GenerateEvent(
690 RequestResponseEvent::InboundFailure {
691 peer,
692 request_id,
693 error: InboundFailure::ResponseOmission
694 }));
695 }
696 RequestResponseHandlerEvent::OutboundTimeout(request_id) => {
697 let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
698 debug_assert!(removed, "Expect request_id to be pending before request times out.");
699
700 self.pending_events.push_back(
701 NetworkBehaviourAction::GenerateEvent(
702 RequestResponseEvent::OutboundFailure {
703 peer,
704 request_id,
705 error: OutboundFailure::Timeout,
706 }));
707 }
708 RequestResponseHandlerEvent::InboundTimeout(request_id) => {
709 // Note: `RequestResponseHandlerEvent::InboundTimeout` is emitted both for timing
710 // out to receive the request and for timing out sending the response. In the former
711 // case the request is never added to `pending_outbound_responses` and thus one can
712 // not assert the request_id to be present before removing it.
713 self.remove_pending_outbound_response(&peer, connection, request_id);
714
715 self.pending_events.push_back(
716 NetworkBehaviourAction::GenerateEvent(
717 RequestResponseEvent::InboundFailure {
718 peer,
719 request_id,
720 error: InboundFailure::Timeout,
721 }));
722 }
723 RequestResponseHandlerEvent::OutboundUnsupportedProtocols(request_id) => {
724 let removed = self.remove_pending_inbound_response(&peer, connection, &request_id);
725 debug_assert!(
726 removed,
727 "Expect request_id to be pending before failing to connect.",
728 );
729
730 self.pending_events.push_back(
731 NetworkBehaviourAction::GenerateEvent(
732 RequestResponseEvent::OutboundFailure {
733 peer,
734 request_id,
735 error: OutboundFailure::UnsupportedProtocols,
736 }));
737 }
738 RequestResponseHandlerEvent::InboundUnsupportedProtocols(request_id) => {
739 // Note: No need to call `self.remove_pending_outbound_response`,
740 // `RequestResponseHandlerEvent::Request` was never emitted for this request and
741 // thus request was never added to `pending_outbound_responses`.
742 self.pending_events.push_back(
743 NetworkBehaviourAction::GenerateEvent(
744 RequestResponseEvent::InboundFailure {
745 peer,
746 request_id,
747 error: InboundFailure::UnsupportedProtocols,
748 }));
749 }
750 }
751 }
752
753 fn poll(&mut self, _: &mut Context<'_>, _: &mut impl PollParameters)
754 -> Poll<NetworkBehaviourAction<
755 RequestProtocol<TCodec>,
756 RequestResponseEvent<TCodec::Request, TCodec::Response>
757 >>
758 {
759 if let Some(ev) = self.pending_events.pop_front() {
760 return Poll::Ready(ev);
761 } else if self.pending_events.capacity() > EMPTY_QUEUE_SHRINK_THRESHOLD {
762 self.pending_events.shrink_to_fit();
763 }
764
765 Poll::Pending
766 }
767}
768
769/// Internal threshold for when to shrink the capacity
770/// of empty queues. If the capacity of an empty queue
771/// exceeds this threshold, the associated memory is
772/// released.
773const EMPTY_QUEUE_SHRINK_THRESHOLD: usize = 100;
774
775/// Internal information tracked for an established connection.
776struct Connection {
777 id: ConnectionId,
778 address: Option<Multiaddr>,
779 /// Pending outbound responses where corresponding inbound requests have
780 /// been received on this connection and emitted via `poll` but have not yet
781 /// been answered.
782 pending_outbound_responses: HashSet<RequestId>,
783 /// Pending inbound responses for previously sent requests on this
784 /// connection.
785 pending_inbound_responses: HashSet<RequestId>
786}
787
788impl Connection {
789 fn new(id: ConnectionId, address: Option<Multiaddr>) -> Self {
790 Self {
791 id,
792 address,
793 pending_outbound_responses: Default::default(),
794 pending_inbound_responses: Default::default(),
795 }
796 }
797}