Skip to main content

sc_network/
request_responses.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Collection of request-response protocols.
20//!
21//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
22//! more so-called "request-response" protocols.
23//!
24//! A request-response protocol works in the following way:
25//!
26//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
27//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
28//! with the request itself. The remote then sends the size of the response as a LEB128 number,
29//! followed with the response.
30//!
31//! - Requests have a certain time limit before they time out. This time includes the time it
32//! takes to send/receive the request and response.
33//!
34//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
35//! is used to handle incoming requests.
36
37use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{transport::PortUse, Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51	},
52	PeerId,
53};
54
55use std::{
56	collections::{hash_map::Entry, HashMap},
57	io, iter,
58	ops::Deref,
59	pin::Pin,
60	sync::Arc,
61	task::{Context, Poll},
62	time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67/// Logging target for the file.
68const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70/// Periodically check if requests are taking too long.
71const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73/// Possible failures occurring in the context of sending an outbound request and receiving the
74/// response.
75#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77	/// The request could not be sent because a dialing attempt failed.
78	#[error("Failed to dial the requested peer")]
79	DialFailure,
80	/// The request timed out before a response was received.
81	#[error("Timeout while waiting for a response")]
82	Timeout,
83	/// The connection closed before a response was received.
84	#[error("Connection was closed before a response was received")]
85	ConnectionClosed,
86	/// The remote supports none of the requested protocols.
87	#[error("The remote supports none of the requested protocols")]
88	UnsupportedProtocols,
89	/// An IO failure happened on an outbound stream.
90	#[error("An IO failure happened on an outbound stream")]
91	Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95	fn from(out: request_response::OutboundFailure) -> Self {
96		match out {
97			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99			request_response::OutboundFailure::ConnectionClosed => {
100				OutboundFailure::ConnectionClosed
101			},
102			request_response::OutboundFailure::UnsupportedProtocols => {
103				OutboundFailure::UnsupportedProtocols
104			},
105			request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
106		}
107	}
108}
109
110/// Possible failures occurring in the context of receiving an inbound request and sending a
111/// response.
112#[derive(Debug, thiserror::Error)]
113pub enum InboundFailure {
114	/// The inbound request timed out, either while reading the incoming request or before a
115	/// response is sent
116	#[error("Timeout while receiving request or sending response")]
117	Timeout,
118	/// The connection closed before a response could be send.
119	#[error("Connection was closed before a response could be sent")]
120	ConnectionClosed,
121	/// The local peer supports none of the protocols requested by the remote.
122	#[error("The local peer supports none of the protocols requested by the remote")]
123	UnsupportedProtocols,
124	/// The local peer failed to respond to an inbound request
125	#[error("The response channel was dropped without sending a response to the remote")]
126	ResponseOmission,
127	/// An IO failure happened on an inbound stream.
128	#[error("An IO failure happened on an inbound stream")]
129	Io(Arc<io::Error>),
130}
131
132impl From<request_response::InboundFailure> for InboundFailure {
133	fn from(out: request_response::InboundFailure) -> Self {
134		match out {
135			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
136			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
137			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
138			request_response::InboundFailure::UnsupportedProtocols => {
139				InboundFailure::UnsupportedProtocols
140			},
141			request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
142		}
143	}
144}
145
146/// Error in a request.
147#[derive(Debug, thiserror::Error)]
148#[allow(missing_docs)]
149pub enum RequestFailure {
150	#[error("We are not currently connected to the requested peer.")]
151	NotConnected,
152	#[error("Given protocol hasn't been registered.")]
153	UnknownProtocol,
154	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
155	Refused,
156	#[error("The remote replied, but the local node is no longer interested in the response.")]
157	Obsolete,
158	#[error("Problem on the network: {0}")]
159	Network(OutboundFailure),
160}
161
162/// Configuration for a single request-response protocol.
163#[derive(Debug, Clone)]
164pub struct ProtocolConfig {
165	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
166	pub name: ProtocolName,
167
168	/// Fallback on the wire protocol names to support.
169	pub fallback_names: Vec<ProtocolName>,
170
171	/// Maximum allowed size, in bytes, of a request.
172	///
173	/// Any request larger than this value will be declined as a way to avoid allocating too
174	/// much memory for it.
175	pub max_request_size: u64,
176
177	/// Maximum allowed size, in bytes, of a response.
178	///
179	/// Any response larger than this value will be declined as a way to avoid allocating too
180	/// much memory for it.
181	pub max_response_size: u64,
182
183	/// Duration after which emitted requests are considered timed out.
184	///
185	/// If you expect the response to come back quickly, you should set this to a smaller duration.
186	pub request_timeout: Duration,
187
188	/// Channel on which the networking service will send incoming requests.
189	///
190	/// Every time a peer sends a request to the local node using this protocol, the networking
191	/// service will push an element on this channel. The receiving side of this channel then has
192	/// to pull this element, process the request, and send back the response to send back to the
193	/// peer.
194	///
195	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
196	/// service will discard the incoming request send back an error to the peer. Consequently,
197	/// the channel being full is an indicator that the node is overloaded.
198	///
199	/// You can typically set the size of the channel to `T / d`, where `T` is the
200	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
201	/// build a response.
202	///
203	/// Can be `None` if the local node does not support answering incoming requests.
204	/// If this is `None`, then the local node will not advertise support for this protocol towards
205	/// other peers. If this is `Some` but the channel is closed, then the local node will
206	/// advertise support for this protocol, but any incoming request will lead to an error being
207	/// sent back.
208	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
209}
210
211impl RequestResponseConfigT for ProtocolConfig {
212	fn protocol_name(&self) -> &ProtocolName {
213		&self.name
214	}
215}
216
217/// A single request received by a peer on a request-response protocol.
218#[derive(Debug)]
219pub struct IncomingRequest {
220	/// Who sent the request.
221	pub peer: sc_network_types::PeerId,
222
223	/// Request sent by the remote. Will always be smaller than
224	/// [`ProtocolConfig::max_request_size`].
225	pub payload: Vec<u8>,
226
227	/// Channel to send back the response.
228	///
229	/// There are two ways to indicate that handling the request failed:
230	///
231	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
232	///
233	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
234	/// the given peer.
235	pub pending_response: oneshot::Sender<OutgoingResponse>,
236}
237
238/// Response for an incoming request to be send by a request protocol handler.
239#[derive(Debug)]
240pub struct OutgoingResponse {
241	/// The payload of the response.
242	///
243	/// `Err(())` if none is available e.g. due an error while handling the request.
244	pub result: Result<Vec<u8>, ()>,
245
246	/// Reputation changes accrued while handling the request. To be applied to the reputation of
247	/// the peer sending the request.
248	pub reputation_changes: Vec<ReputationChange>,
249
250	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
251	/// peer.
252	///
253	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
254	/// >			outgoing data for each TCP socket, and it is not possible for a user
255	/// >			application to inspect this buffer. This channel here is not actually notified
256	/// >			when the response has been fully sent out, but rather when it has fully been
257	/// >			written to the buffer managed by the operating system.
258	pub sent_feedback: Option<oneshot::Sender<()>>,
259}
260
261/// Information stored about a pending request.
262struct PendingRequest {
263	/// The time when the request was sent to the libp2p request-response protocol.
264	started_at: Instant,
265	/// The channel to send the response back to the caller.
266	///
267	/// This is wrapped in an `Option` to allow for the channel to be taken out
268	/// on force-detected timeouts.
269	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
270	/// Fallback request to send if the primary request fails.
271	fallback_request: Option<(Vec<u8>, ProtocolName)>,
272}
273
274/// When sending a request, what to do on a disconnected recipient.
275#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
276pub enum IfDisconnected {
277	/// Try to connect to the peer.
278	TryConnect,
279	/// Just fail if the destination is not yet connected.
280	ImmediateError,
281}
282
283/// Convenience functions for `IfDisconnected`.
284impl IfDisconnected {
285	/// Shall we connect to a disconnected peer?
286	pub fn should_connect(self) -> bool {
287		match self {
288			Self::TryConnect => true,
289			Self::ImmediateError => false,
290		}
291	}
292}
293
294/// Event generated by the [`RequestResponsesBehaviour`].
295#[derive(Debug)]
296pub enum Event {
297	/// A remote sent a request and either we have successfully answered it or an error happened.
298	///
299	/// This event is generated for statistics purposes.
300	InboundRequest {
301		/// Peer which has emitted the request.
302		peer: PeerId,
303		/// Name of the protocol in question.
304		protocol: ProtocolName,
305		/// Whether handling the request was successful or unsuccessful.
306		///
307		/// When successful contains the time elapsed between when we received the request and when
308		/// we sent back the response. When unsuccessful contains the failure reason.
309		result: Result<Duration, ResponseFailure>,
310	},
311
312	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
313	/// failed.
314	///
315	/// This event is generated for statistics purposes.
316	RequestFinished {
317		/// Peer that we send a request to.
318		peer: PeerId,
319		/// Name of the protocol in question.
320		protocol: ProtocolName,
321		/// Duration the request took.
322		duration: Duration,
323		/// Result of the request.
324		result: Result<(), RequestFailure>,
325	},
326
327	/// A request protocol handler issued reputation changes for the given peer.
328	ReputationChanges {
329		/// Peer whose reputation needs to be adjust.
330		peer: PeerId,
331		/// Reputation changes.
332		changes: Vec<ReputationChange>,
333	},
334}
335
336/// Combination of a protocol name and a request id.
337///
338/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
339/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
340/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
341/// [`ProtocolRequestId`]s.
342#[derive(Debug, Clone, PartialEq, Eq, Hash)]
343struct ProtocolRequestId<RequestId> {
344	protocol: ProtocolName,
345	request_id: RequestId,
346}
347
348impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
349	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
350		Self { protocol, request_id }
351	}
352}
353
354/// Details of a request-response protocol.
355struct ProtocolDetails {
356	behaviour: Behaviour<GenericCodec>,
357	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
358	request_timeout: Duration,
359}
360
361/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
362pub struct RequestResponsesBehaviour {
363	/// The multiple sub-protocols, by name.
364	///
365	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
366	/// "response builder" used to build responses for incoming requests.
367	protocols: HashMap<ProtocolName, ProtocolDetails>,
368
369	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
370	pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
371
372	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
373	/// start time and the response to send back to the remote.
374	pending_responses: stream::FuturesUnordered<
375		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
376	>,
377
378	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
379	pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
380
381	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
382	/// when the request has been sent out.
383	send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
384
385	/// Primarily used to get a reputation of a node.
386	peer_store: Arc<dyn PeerStoreProvider>,
387
388	/// Interval to check that the requests are not taking too long.
389	///
390	/// We had issues in the past where libp2p did not produce a timeout event in due time.
391	///
392	/// For more details, see:
393	/// - <https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096>
394	periodic_request_check: tokio::time::Interval,
395}
396
397/// Generated by the response builder and waiting to be processed.
398struct RequestProcessingOutcome {
399	peer: PeerId,
400	request_id: InboundRequestId,
401	protocol: ProtocolName,
402	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
403	response: OutgoingResponse,
404}
405
406impl RequestResponsesBehaviour {
407	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
408	/// the same protocol is passed twice.
409	pub fn new(
410		list: impl Iterator<Item = ProtocolConfig>,
411		peer_store: Arc<dyn PeerStoreProvider>,
412	) -> Result<Self, RegisterError> {
413		let mut protocols = HashMap::new();
414		for protocol in list {
415			let cfg = Config::default().with_request_timeout(protocol.request_timeout);
416
417			let protocol_support = if protocol.inbound_queue.is_some() {
418				ProtocolSupport::Full
419			} else {
420				ProtocolSupport::Outbound
421			};
422
423			let behaviour = Behaviour::with_codec(
424				GenericCodec {
425					max_request_size: protocol.max_request_size,
426					max_response_size: protocol.max_response_size,
427				},
428				iter::once(protocol.name.clone())
429					.chain(protocol.fallback_names)
430					.zip(iter::repeat(protocol_support)),
431				cfg,
432			);
433
434			match protocols.entry(protocol.name) {
435				Entry::Vacant(e) => e.insert(ProtocolDetails {
436					behaviour,
437					inbound_queue: protocol.inbound_queue,
438					request_timeout: protocol.request_timeout,
439				}),
440				Entry::Occupied(e) => {
441					return Err(RegisterError::DuplicateProtocol(e.key().clone()))
442				},
443			};
444		}
445
446		Ok(Self {
447			protocols,
448			pending_requests: Default::default(),
449			pending_responses: Default::default(),
450			pending_responses_arrival_time: Default::default(),
451			send_feedback: Default::default(),
452			peer_store,
453			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
454		})
455	}
456
457	/// Initiates sending a request.
458	///
459	/// If there is no established connection to the target peer, the behavior is determined by the
460	/// choice of `connect`.
461	///
462	/// An error is returned if the protocol doesn't match one that has been registered.
463	pub fn send_request(
464		&mut self,
465		target: &PeerId,
466		protocol_name: ProtocolName,
467		request: Vec<u8>,
468		fallback_request: Option<(Vec<u8>, ProtocolName)>,
469		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
470		connect: IfDisconnected,
471	) {
472		log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
473
474		if let Some(ProtocolDetails { behaviour, .. }) =
475			self.protocols.get_mut(protocol_name.deref())
476		{
477			Self::send_request_inner(
478				behaviour,
479				&mut self.pending_requests,
480				target,
481				protocol_name,
482				request,
483				fallback_request,
484				pending_response,
485				connect,
486			)
487		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
488			log::debug!(
489				target: LOG_TARGET,
490				"Unknown protocol {:?}. At the same time local \
491				 node is no longer interested in the result.",
492				protocol_name,
493			);
494		}
495	}
496
497	fn send_request_inner(
498		behaviour: &mut Behaviour<GenericCodec>,
499		pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
500		target: &PeerId,
501		protocol_name: ProtocolName,
502		request: Vec<u8>,
503		fallback_request: Option<(Vec<u8>, ProtocolName)>,
504		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
505		connect: IfDisconnected,
506	) {
507		if behaviour.is_connected(target) || connect.should_connect() {
508			let request_id = behaviour.send_request(target, request);
509			let prev_req_id = pending_requests.insert(
510				(protocol_name.to_string().into(), request_id).into(),
511				PendingRequest {
512					started_at: Instant::now(),
513					response_tx: Some(pending_response),
514					fallback_request,
515				},
516			);
517			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
518		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
519			log::debug!(
520				target: LOG_TARGET,
521				"Not connected to peer {:?}. At the same time local \
522				 node is no longer interested in the result.",
523				target,
524			);
525		}
526	}
527}
528
529impl NetworkBehaviour for RequestResponsesBehaviour {
530	type ConnectionHandler =
531		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
532	type ToSwarm = Event;
533
534	fn handle_pending_inbound_connection(
535		&mut self,
536		_connection_id: ConnectionId,
537		_local_addr: &Multiaddr,
538		_remote_addr: &Multiaddr,
539	) -> Result<(), ConnectionDenied> {
540		Ok(())
541	}
542
543	fn handle_pending_outbound_connection(
544		&mut self,
545		_connection_id: ConnectionId,
546		_maybe_peer: Option<PeerId>,
547		_addresses: &[Multiaddr],
548		_effective_role: Endpoint,
549	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
550		Ok(Vec::new())
551	}
552
553	fn handle_established_inbound_connection(
554		&mut self,
555		connection_id: ConnectionId,
556		peer: PeerId,
557		local_addr: &Multiaddr,
558		remote_addr: &Multiaddr,
559	) -> Result<THandler<Self>, ConnectionDenied> {
560		let iter =
561			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
562				if let Ok(handler) = behaviour.handle_established_inbound_connection(
563					connection_id,
564					peer,
565					local_addr,
566					remote_addr,
567				) {
568					Some((p.to_string(), handler))
569				} else {
570					None
571				}
572			});
573
574		Ok(MultiHandler::try_from_iter(iter).expect(
575			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
576			 which is the only possible error; qed",
577		))
578	}
579
580	fn handle_established_outbound_connection(
581		&mut self,
582		connection_id: ConnectionId,
583		peer: PeerId,
584		addr: &Multiaddr,
585		role_override: Endpoint,
586		port_use: PortUse,
587	) -> Result<THandler<Self>, ConnectionDenied> {
588		let iter =
589			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
590				if let Ok(handler) = behaviour.handle_established_outbound_connection(
591					connection_id,
592					peer,
593					addr,
594					role_override,
595					port_use,
596				) {
597					Some((p.to_string(), handler))
598				} else {
599					None
600				}
601			});
602
603		Ok(MultiHandler::try_from_iter(iter).expect(
604			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
605			 which is the only possible error; qed",
606		))
607	}
608
609	fn on_swarm_event(&mut self, event: FromSwarm) {
610		for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
611			behaviour.on_swarm_event(event);
612		}
613	}
614
615	fn on_connection_handler_event(
616		&mut self,
617		peer_id: PeerId,
618		connection_id: ConnectionId,
619		event: THandlerOutEvent<Self>,
620	) {
621		let p_name = event.0;
622		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
623			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
624		} else {
625			log::warn!(
626				target: LOG_TARGET,
627				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
628				p_name
629			);
630		}
631	}
632
633	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
634		'poll_all: loop {
635			// Poll the periodic request check.
636			if self.periodic_request_check.poll_tick(cx).is_ready() {
637				self.pending_requests.retain(|id, req| {
638					let Some(ProtocolDetails { request_timeout, .. }) =
639						self.protocols.get(&id.protocol)
640					else {
641						log::warn!(
642							target: LOG_TARGET,
643							"Request {id:?} has no protocol registered.",
644						);
645
646						if let Some(response_tx) = req.response_tx.take() {
647							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
648								log::debug!(
649									target: LOG_TARGET,
650									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
651								);
652							}
653						}
654						return false
655					};
656
657					let elapsed = req.started_at.elapsed();
658					if elapsed > *request_timeout {
659						log::debug!(
660							target: LOG_TARGET,
661							"Request {id:?} force detected as timeout.",
662						);
663
664						if let Some(response_tx) = req.response_tx.take() {
665							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
666								log::debug!(
667									target: LOG_TARGET,
668									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
669								);
670							}
671						}
672
673						false
674					} else {
675						true
676					}
677				});
678			}
679
680			// Poll to see if any response is ready to be sent back.
681			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
682				let RequestProcessingOutcome {
683					peer,
684					request_id,
685					protocol: protocol_name,
686					inner_channel,
687					response: OutgoingResponse { result, reputation_changes, sent_feedback },
688				} = match outcome {
689					Some(outcome) => outcome,
690					// The response builder was too busy or handling the request failed. This is
691					// later on reported as a `InboundFailure::Omission`.
692					None => continue,
693				};
694
695				if let Ok(payload) = result {
696					if let Some(ProtocolDetails { behaviour, .. }) =
697						self.protocols.get_mut(&*protocol_name)
698					{
699						log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
700
701						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
702							// Note: Failure is handled further below when receiving
703							// `InboundFailure` event from request-response [`Behaviour`].
704							log::debug!(
705								target: LOG_TARGET,
706								"Failed to send response for {:?} on protocol {:?} due to a \
707								 timeout or due to the connection to the peer being closed. \
708								 Dropping response",
709								request_id, protocol_name,
710							);
711						} else if let Some(sent_feedback) = sent_feedback {
712							self.send_feedback
713								.insert((protocol_name, request_id).into(), sent_feedback);
714						}
715					}
716				}
717
718				if !reputation_changes.is_empty() {
719					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
720						peer,
721						changes: reputation_changes,
722					}));
723				}
724			}
725
726			let mut fallback_requests = vec![];
727
728			// Poll request-responses protocols.
729			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
730			{
731				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
732					let ev = match ev {
733						// Main events we are interested in.
734						ToSwarm::GenerateEvent(ev) => ev,
735
736						// Other events generated by the underlying behaviour are transparently
737						// passed through.
738						ToSwarm::Dial { opts } => {
739							if opts.get_peer_id().is_none() {
740								log::error!(
741									target: LOG_TARGET,
742									"The request-response isn't supposed to start dialing addresses"
743								);
744							}
745							return Poll::Ready(ToSwarm::Dial { opts });
746						},
747						event => {
748							return Poll::Ready(
749								event.map_in(|event| ((*protocol).to_string(), event)).map_out(
750									|_| {
751										unreachable!(
752											"`GenerateEvent` is handled in a branch above; qed"
753										)
754									},
755								),
756							);
757						},
758					};
759
760					match ev {
761						// Received a request from a remote.
762						request_response::Event::Message {
763							peer,
764							message: Message::Request { request_id, request, channel, .. },
765						} => {
766							self.pending_responses_arrival_time
767								.insert((protocol.clone(), request_id).into(), Instant::now());
768
769							let reputation = self.peer_store.peer_reputation(&peer.into());
770
771							if reputation < BANNED_THRESHOLD {
772								log::debug!(
773									target: LOG_TARGET,
774									"Cannot handle requests from a node with a low reputation {}: {}",
775									peer,
776									reputation,
777								);
778								continue 'poll_protocol;
779							}
780
781							let (tx, rx) = oneshot::channel();
782
783							// Submit the request to the "response builder" passed by the user at
784							// initialization.
785							if let Some(resp_builder) = inbound_queue {
786								// If the response builder is too busy, silently drop `tx`. This
787								// will be reported by the corresponding request-response
788								// [`Behaviour`] through an `InboundFailure::Omission` event.
789								// Note that we use `async_channel::bounded` and not `mpsc::channel`
790								// because the latter allocates an extra slot for every cloned
791								// sender.
792								let _ = resp_builder.try_send(IncomingRequest {
793									peer: peer.into(),
794									payload: request,
795									pending_response: tx,
796								});
797							} else {
798								debug_assert!(false, "Received message on outbound-only protocol.");
799							}
800
801							let protocol = protocol.clone();
802
803							self.pending_responses.push(Box::pin(async move {
804								// The `tx` created above can be dropped if we are not capable of
805								// processing this request, which is reflected as a
806								// `InboundFailure::Omission` event.
807								rx.await.map_or(None, |response| {
808									Some(RequestProcessingOutcome {
809										peer,
810										request_id,
811										protocol,
812										inner_channel: channel,
813										response,
814									})
815								})
816							}));
817
818							// This `continue` makes sure that `pending_responses` gets polled
819							// after we have added the new element.
820							continue 'poll_all;
821						},
822
823						// Received a response from a remote to one of our requests.
824						request_response::Event::Message {
825							peer,
826							message: Message::Response { request_id, response },
827							..
828						} => {
829							let (started, delivered) = match self
830								.pending_requests
831								.remove(&(protocol.clone(), request_id).into())
832							{
833								Some(PendingRequest {
834									started_at,
835									response_tx: Some(response_tx),
836									..
837								}) => {
838									log::trace!(
839										target: LOG_TARGET,
840										"received response from {peer} ({protocol:?}), {} bytes",
841										response.as_ref().map_or(0usize, |response| response.len()),
842									);
843
844									let delivered = response_tx
845										.send(
846											response
847												.map_err(|()| RequestFailure::Refused)
848												.map(|resp| (resp, protocol.clone())),
849										)
850										.map_err(|_| RequestFailure::Obsolete);
851									(started_at, delivered)
852								},
853								_ => {
854									log::debug!(
855										target: LOG_TARGET,
856										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
857										request_id,
858										peer,
859									);
860									continue;
861								},
862							};
863
864							let out = Event::RequestFinished {
865								peer,
866								protocol: protocol.clone(),
867								duration: started.elapsed(),
868								result: delivered,
869							};
870
871							return Poll::Ready(ToSwarm::GenerateEvent(out));
872						},
873
874						// One of our requests has failed.
875						request_response::Event::OutboundFailure {
876							peer,
877							request_id,
878							error,
879							..
880						} => {
881							let error = OutboundFailure::from(error);
882							let started = match self
883								.pending_requests
884								.remove(&(protocol.clone(), request_id).into())
885							{
886								Some(PendingRequest {
887									started_at,
888									response_tx: Some(response_tx),
889									fallback_request,
890								}) => {
891									// Try using the fallback request if the protocol was not
892									// supported.
893									if matches!(error, OutboundFailure::UnsupportedProtocols) {
894										if let Some((fallback_request, fallback_protocol)) =
895											fallback_request
896										{
897											log::trace!(
898												target: LOG_TARGET,
899												"Request with id {:?} failed. Trying the fallback protocol. {}",
900												request_id,
901												fallback_protocol.deref()
902											);
903											fallback_requests.push((
904												peer,
905												fallback_protocol,
906												fallback_request,
907												response_tx,
908											));
909											continue;
910										}
911									}
912
913									if response_tx
914										.send(Err(RequestFailure::Network(error.clone())))
915										.is_err()
916									{
917										log::debug!(
918											target: LOG_TARGET,
919											"Request with id {:?} failed. At the same time local \
920											 node is no longer interested in the result.",
921											request_id,
922										);
923									}
924									started_at
925								},
926								_ => {
927									log::debug!(
928										target: LOG_TARGET,
929										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
930										request_id,
931										error,
932										peer
933									);
934									continue;
935								},
936							};
937
938							let out = Event::RequestFinished {
939								peer,
940								protocol: protocol.clone(),
941								duration: started.elapsed(),
942								result: Err(RequestFailure::Network(error)),
943							};
944
945							return Poll::Ready(ToSwarm::GenerateEvent(out));
946						},
947
948						// An inbound request failed, either while reading the request or due to
949						// failing to send a response.
950						request_response::Event::InboundFailure {
951							request_id, peer, error, ..
952						} => {
953							self.pending_responses_arrival_time
954								.remove(&(protocol.clone(), request_id).into());
955							self.send_feedback.remove(&(protocol.clone(), request_id).into());
956							let out = Event::InboundRequest {
957								peer,
958								protocol: protocol.clone(),
959								result: Err(ResponseFailure::Network(error.into())),
960							};
961							return Poll::Ready(ToSwarm::GenerateEvent(out));
962						},
963
964						// A response to an inbound request has been sent.
965						request_response::Event::ResponseSent { request_id, peer } => {
966							let arrival_time = self
967								.pending_responses_arrival_time
968								.remove(&(protocol.clone(), request_id).into())
969								.map(|t| t.elapsed())
970								.expect(
971									"Time is added for each inbound request on arrival and only \
972									 removed on success (`ResponseSent`) or failure \
973									 (`InboundFailure`). One can not receive a success event for a \
974									 request that either never arrived, or that has previously \
975									 failed; qed.",
976								);
977
978							if let Some(send_feedback) =
979								self.send_feedback.remove(&(protocol.clone(), request_id).into())
980							{
981								let _ = send_feedback.send(());
982							}
983
984							let out = Event::InboundRequest {
985								peer,
986								protocol: protocol.clone(),
987								result: Ok(arrival_time),
988							};
989
990							return Poll::Ready(ToSwarm::GenerateEvent(out));
991						},
992					};
993				}
994			}
995
996			// Send out fallback requests.
997			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
998				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
999					Self::send_request_inner(
1000						behaviour,
1001						&mut self.pending_requests,
1002						&peer,
1003						protocol,
1004						request,
1005						None,
1006						pending_response,
1007						// We can error if not connected because the
1008						// previous attempt would have tried to establish a
1009						// connection already or errored and we wouldn't have gotten here.
1010						IfDisconnected::ImmediateError,
1011					);
1012				}
1013			}
1014
1015			break Poll::Pending;
1016		}
1017	}
1018}
1019
1020/// Error when registering a protocol.
1021#[derive(Debug, thiserror::Error)]
1022pub enum RegisterError {
1023	/// A protocol has been specified multiple times.
1024	#[error("{0}")]
1025	DuplicateProtocol(ProtocolName),
1026}
1027
1028/// Error when processing a request sent by a remote.
1029#[derive(Debug, thiserror::Error)]
1030pub enum ResponseFailure {
1031	/// Problem on the network.
1032	#[error("Problem on the network: {0}")]
1033	Network(InboundFailure),
1034}
1035
1036/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1037/// into requests and responses and vice-versa.
1038#[derive(Debug, Clone)]
1039#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1040pub struct GenericCodec {
1041	max_request_size: u64,
1042	max_response_size: u64,
1043}
1044
1045#[async_trait::async_trait]
1046impl Codec for GenericCodec {
1047	type Protocol = ProtocolName;
1048	type Request = Vec<u8>;
1049	type Response = Result<Vec<u8>, ()>;
1050
1051	async fn read_request<T>(
1052		&mut self,
1053		_: &Self::Protocol,
1054		mut io: &mut T,
1055	) -> io::Result<Self::Request>
1056	where
1057		T: AsyncRead + Unpin + Send,
1058	{
1059		// Read the length.
1060		let length = unsigned_varint::aio::read_usize(&mut io)
1061			.await
1062			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1063		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1064			return Err(io::Error::new(
1065				io::ErrorKind::InvalidInput,
1066				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1067			));
1068		}
1069
1070		// Read the payload.
1071		let mut buffer = vec![0; length];
1072		io.read_exact(&mut buffer).await?;
1073		Ok(buffer)
1074	}
1075
1076	async fn read_response<T>(
1077		&mut self,
1078		_: &Self::Protocol,
1079		mut io: &mut T,
1080	) -> io::Result<Self::Response>
1081	where
1082		T: AsyncRead + Unpin + Send,
1083	{
1084		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1085		// considered as a protocol error and will result in the entire connection being closed.
1086		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1087		// that this response is an error.
1088
1089		// Read the length.
1090		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1091			Ok(l) => l,
1092			Err(unsigned_varint::io::ReadError::Io(err))
1093				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1094			{
1095				return Ok(Err(()))
1096			},
1097			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1098		};
1099
1100		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1101			return Err(io::Error::new(
1102				io::ErrorKind::InvalidInput,
1103				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1104			));
1105		}
1106
1107		// Read the payload.
1108		let mut buffer = vec![0; length];
1109		io.read_exact(&mut buffer).await?;
1110		Ok(Ok(buffer))
1111	}
1112
1113	async fn write_request<T>(
1114		&mut self,
1115		_: &Self::Protocol,
1116		io: &mut T,
1117		req: Self::Request,
1118	) -> io::Result<()>
1119	where
1120		T: AsyncWrite + Unpin + Send,
1121	{
1122		// TODO: check the length?
1123		// Write the length.
1124		{
1125			let mut buffer = unsigned_varint::encode::usize_buffer();
1126			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1127		}
1128
1129		// Write the payload.
1130		io.write_all(&req).await?;
1131
1132		io.close().await?;
1133		Ok(())
1134	}
1135
1136	async fn write_response<T>(
1137		&mut self,
1138		_: &Self::Protocol,
1139		io: &mut T,
1140		res: Self::Response,
1141	) -> io::Result<()>
1142	where
1143		T: AsyncWrite + Unpin + Send,
1144	{
1145		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1146		if let Ok(res) = res {
1147			// TODO: check the length?
1148			// Write the length.
1149			{
1150				let mut buffer = unsigned_varint::encode::usize_buffer();
1151				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1152			}
1153
1154			// Write the payload.
1155			io.write_all(&res).await?;
1156		}
1157
1158		io.close().await?;
1159		Ok(())
1160	}
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165	use super::*;
1166
1167	use crate::mock::MockPeerStore;
1168	use assert_matches::assert_matches;
1169	use futures::channel::oneshot;
1170	use libp2p::{
1171		core::{
1172			transport::{MemoryTransport, Transport},
1173			upgrade,
1174		},
1175		identity::Keypair,
1176		noise,
1177		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1178		Multiaddr,
1179	};
1180	use std::{iter, time::Duration};
1181
1182	struct TokioExecutor;
1183	impl Executor for TokioExecutor {
1184		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1185			tokio::spawn(f);
1186		}
1187	}
1188
1189	fn build_swarm(
1190		list: impl Iterator<Item = ProtocolConfig>,
1191	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1192		let keypair = Keypair::generate_ed25519();
1193
1194		let transport = MemoryTransport::new()
1195			.upgrade(upgrade::Version::V1)
1196			.authenticate(noise::Config::new(&keypair).unwrap())
1197			.multiplex(libp2p::yamux::Config::default())
1198			.boxed();
1199
1200		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1201
1202		let mut swarm = Swarm::new(
1203			transport,
1204			behaviour,
1205			keypair.public().to_peer_id(),
1206			SwarmConfig::with_executor(TokioExecutor {})
1207				// This is taken care of by notification protocols in non-test environment
1208				// It is very slow in test environment for some reason, hence larger timeout
1209				.with_idle_connection_timeout(Duration::from_secs(10)),
1210		);
1211
1212		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1213
1214		swarm.listen_on(listen_addr.clone()).unwrap();
1215
1216		(swarm, listen_addr)
1217	}
1218
1219	#[tokio::test]
1220	async fn basic_request_response_works() {
1221		let protocol_name = ProtocolName::from("/test/req-resp/1");
1222
1223		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1224		let mut swarms = (0..2)
1225			.map(|_| {
1226				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1227
1228				tokio::spawn(async move {
1229					while let Some(rq) = rx.next().await {
1230						let (fb_tx, fb_rx) = oneshot::channel();
1231						assert_eq!(rq.payload, b"this is a request");
1232						let _ = rq.pending_response.send(super::OutgoingResponse {
1233							result: Ok(b"this is a response".to_vec()),
1234							reputation_changes: Vec::new(),
1235							sent_feedback: Some(fb_tx),
1236						});
1237						fb_rx.await.unwrap();
1238					}
1239				});
1240
1241				let protocol_config = ProtocolConfig {
1242					name: protocol_name.clone(),
1243					fallback_names: Vec::new(),
1244					max_request_size: 1024,
1245					max_response_size: 1024 * 1024,
1246					request_timeout: Duration::from_secs(30),
1247					inbound_queue: Some(tx),
1248				};
1249
1250				build_swarm(iter::once(protocol_config))
1251			})
1252			.collect::<Vec<_>>();
1253
1254		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1255		// this test, so they wouldn't connect to each other.
1256		{
1257			let dial_addr = swarms[1].1.clone();
1258			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1259		}
1260
1261		let (mut swarm, _) = swarms.remove(0);
1262		// Running `swarm[0]` in the background.
1263		tokio::spawn(async move {
1264			loop {
1265				match swarm.select_next_some().await {
1266					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1267						result.unwrap();
1268					},
1269					_ => {},
1270				}
1271			}
1272		});
1273
1274		// Remove and run the remaining swarm.
1275		let (mut swarm, _) = swarms.remove(0);
1276		let mut response_receiver = None;
1277
1278		loop {
1279			match swarm.select_next_some().await {
1280				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1281					let (sender, receiver) = oneshot::channel();
1282					swarm.behaviour_mut().send_request(
1283						&peer_id,
1284						protocol_name.clone(),
1285						b"this is a request".to_vec(),
1286						None,
1287						sender,
1288						IfDisconnected::ImmediateError,
1289					);
1290					assert!(response_receiver.is_none());
1291					response_receiver = Some(receiver);
1292				},
1293				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1294					result.unwrap();
1295					break;
1296				},
1297				_ => {},
1298			}
1299		}
1300
1301		assert_eq!(
1302			response_receiver.unwrap().await.unwrap().unwrap(),
1303			(b"this is a response".to_vec(), protocol_name)
1304		);
1305	}
1306
1307	#[tokio::test]
1308	async fn max_response_size_exceeded() {
1309		let protocol_name = ProtocolName::from("/test/req-resp/1");
1310
1311		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1312		let mut swarms = (0..2)
1313			.map(|_| {
1314				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1315
1316				tokio::spawn(async move {
1317					while let Some(rq) = rx.next().await {
1318						assert_eq!(rq.payload, b"this is a request");
1319						let _ = rq.pending_response.send(super::OutgoingResponse {
1320							result: Ok(b"this response exceeds the limit".to_vec()),
1321							reputation_changes: Vec::new(),
1322							sent_feedback: None,
1323						});
1324					}
1325				});
1326
1327				let protocol_config = ProtocolConfig {
1328					name: protocol_name.clone(),
1329					fallback_names: Vec::new(),
1330					max_request_size: 1024,
1331					max_response_size: 8, // <-- important for the test
1332					request_timeout: Duration::from_secs(30),
1333					inbound_queue: Some(tx),
1334				};
1335
1336				build_swarm(iter::once(protocol_config))
1337			})
1338			.collect::<Vec<_>>();
1339
1340		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1341		// this test, so they wouldn't connect to each other.
1342		{
1343			let dial_addr = swarms[1].1.clone();
1344			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1345		}
1346
1347		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1348		// which is a hint about the test having ended.
1349		let (mut swarm, _) = swarms.remove(0);
1350		tokio::spawn(async move {
1351			loop {
1352				match swarm.select_next_some().await {
1353					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1354						assert!(result.is_ok());
1355					},
1356					SwarmEvent::ConnectionClosed { .. } => {
1357						break;
1358					},
1359					_ => {},
1360				}
1361			}
1362		});
1363
1364		// Remove and run the remaining swarm.
1365		let (mut swarm, _) = swarms.remove(0);
1366
1367		let mut response_receiver = None;
1368
1369		loop {
1370			match swarm.select_next_some().await {
1371				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1372					let (sender, receiver) = oneshot::channel();
1373					swarm.behaviour_mut().send_request(
1374						&peer_id,
1375						protocol_name.clone(),
1376						b"this is a request".to_vec(),
1377						None,
1378						sender,
1379						IfDisconnected::ImmediateError,
1380					);
1381					assert!(response_receiver.is_none());
1382					response_receiver = Some(receiver);
1383				},
1384				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1385					assert!(result.is_err());
1386					break;
1387				},
1388				_ => {},
1389			}
1390		}
1391
1392		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1393			RequestFailure::Network(OutboundFailure::Io(_)) => {},
1394			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1395		}
1396	}
1397
1398	/// A `RequestId` is a unique identifier among either all inbound or all outbound requests for
1399	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1400	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus, when handling `RequestId` in the
1401	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1402	/// protocol name with the `RequestId` to get a unique request identifier.
1403	///
1404	/// This test ensures that two requests on different protocols can be handled concurrently
1405	/// without a `RequestId` collision.
1406	///
1407	/// See [`ProtocolRequestId`] for additional information.
1408	#[tokio::test]
1409	async fn request_id_collision() {
1410		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1411		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1412
1413		let mut swarm_1 = {
1414			let protocol_configs = vec![
1415				ProtocolConfig {
1416					name: protocol_name_1.clone(),
1417					fallback_names: Vec::new(),
1418					max_request_size: 1024,
1419					max_response_size: 1024 * 1024,
1420					request_timeout: Duration::from_secs(30),
1421					inbound_queue: None,
1422				},
1423				ProtocolConfig {
1424					name: protocol_name_2.clone(),
1425					fallback_names: Vec::new(),
1426					max_request_size: 1024,
1427					max_response_size: 1024 * 1024,
1428					request_timeout: Duration::from_secs(30),
1429					inbound_queue: None,
1430				},
1431			];
1432
1433			build_swarm(protocol_configs.into_iter()).0
1434		};
1435
1436		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1437			let (tx_1, rx_1) = async_channel::bounded(64);
1438			let (tx_2, rx_2) = async_channel::bounded(64);
1439
1440			let protocol_configs = vec![
1441				ProtocolConfig {
1442					name: protocol_name_1.clone(),
1443					fallback_names: Vec::new(),
1444					max_request_size: 1024,
1445					max_response_size: 1024 * 1024,
1446					request_timeout: Duration::from_secs(30),
1447					inbound_queue: Some(tx_1),
1448				},
1449				ProtocolConfig {
1450					name: protocol_name_2.clone(),
1451					fallback_names: Vec::new(),
1452					max_request_size: 1024,
1453					max_response_size: 1024 * 1024,
1454					request_timeout: Duration::from_secs(30),
1455					inbound_queue: Some(tx_2),
1456				},
1457			];
1458
1459			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1460
1461			(swarm, rx_1, rx_2, listen_addr)
1462		};
1463
1464		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1465		// so they wouldn't connect to each other.
1466		swarm_1.dial(listen_add_2).unwrap();
1467
1468		// Run swarm 2 in the background, receiving two requests.
1469		tokio::spawn(async move {
1470			loop {
1471				match swarm_2.select_next_some().await {
1472					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1473						result.unwrap();
1474					},
1475					_ => {},
1476				}
1477			}
1478		});
1479
1480		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1481		//
1482		// Make sure both requests overlap, by answering the first only after receiving the
1483		// second.
1484		tokio::spawn(async move {
1485			let protocol_1_request = swarm_2_handler_1.next().await;
1486			let protocol_2_request = swarm_2_handler_2.next().await;
1487
1488			protocol_1_request
1489				.unwrap()
1490				.pending_response
1491				.send(OutgoingResponse {
1492					result: Ok(b"this is a response".to_vec()),
1493					reputation_changes: Vec::new(),
1494					sent_feedback: None,
1495				})
1496				.unwrap();
1497			protocol_2_request
1498				.unwrap()
1499				.pending_response
1500				.send(OutgoingResponse {
1501					result: Ok(b"this is a response".to_vec()),
1502					reputation_changes: Vec::new(),
1503					sent_feedback: None,
1504				})
1505				.unwrap();
1506		});
1507
1508		// Have swarm 1 send two requests to swarm 2 and await responses.
1509
1510		let mut response_receivers = None;
1511		let mut num_responses = 0;
1512
1513		loop {
1514			match swarm_1.select_next_some().await {
1515				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1516					let (sender_1, receiver_1) = oneshot::channel();
1517					let (sender_2, receiver_2) = oneshot::channel();
1518					swarm_1.behaviour_mut().send_request(
1519						&peer_id,
1520						protocol_name_1.clone(),
1521						b"this is a request".to_vec(),
1522						None,
1523						sender_1,
1524						IfDisconnected::ImmediateError,
1525					);
1526					swarm_1.behaviour_mut().send_request(
1527						&peer_id,
1528						protocol_name_2.clone(),
1529						b"this is a request".to_vec(),
1530						None,
1531						sender_2,
1532						IfDisconnected::ImmediateError,
1533					);
1534					assert!(response_receivers.is_none());
1535					response_receivers = Some((receiver_1, receiver_2));
1536				},
1537				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1538					num_responses += 1;
1539					result.unwrap();
1540					if num_responses == 2 {
1541						break;
1542					}
1543				},
1544				_ => {},
1545			}
1546		}
1547		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1548		assert_eq!(
1549			response_receiver_1.await.unwrap().unwrap(),
1550			(b"this is a response".to_vec(), protocol_name_1)
1551		);
1552		assert_eq!(
1553			response_receiver_2.await.unwrap().unwrap(),
1554			(b"this is a response".to_vec(), protocol_name_2)
1555		);
1556	}
1557
1558	#[tokio::test]
1559	async fn request_fallback() {
1560		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1561		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1562		let protocol_name_2 = ProtocolName::from("/test/another");
1563
1564		let protocol_config_1 = ProtocolConfig {
1565			name: protocol_name_1.clone(),
1566			fallback_names: Vec::new(),
1567			max_request_size: 1024,
1568			max_response_size: 1024 * 1024,
1569			request_timeout: Duration::from_secs(30),
1570			inbound_queue: None,
1571		};
1572		let protocol_config_1_fallback = ProtocolConfig {
1573			name: protocol_name_1_fallback.clone(),
1574			fallback_names: Vec::new(),
1575			max_request_size: 1024,
1576			max_response_size: 1024 * 1024,
1577			request_timeout: Duration::from_secs(30),
1578			inbound_queue: None,
1579		};
1580		let protocol_config_2 = ProtocolConfig {
1581			name: protocol_name_2.clone(),
1582			fallback_names: Vec::new(),
1583			max_request_size: 1024,
1584			max_response_size: 1024 * 1024,
1585			request_timeout: Duration::from_secs(30),
1586			inbound_queue: None,
1587		};
1588
1589		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1590		// It only responds to requests.
1591		let mut older_swarm = {
1592			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1593			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1594			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1595			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1596
1597			let mut protocol_config_2 = protocol_config_2.clone();
1598			protocol_config_2.inbound_queue = Some(tx_2);
1599
1600			tokio::spawn(async move {
1601				for _ in 0..2 {
1602					if let Some(rq) = rx_1.next().await {
1603						let (fb_tx, fb_rx) = oneshot::channel();
1604						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1605						let _ = rq.pending_response.send(super::OutgoingResponse {
1606							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1607							reputation_changes: Vec::new(),
1608							sent_feedback: Some(fb_tx),
1609						});
1610						fb_rx.await.unwrap();
1611					}
1612				}
1613
1614				if let Some(rq) = rx_2.next().await {
1615					let (fb_tx, fb_rx) = oneshot::channel();
1616					assert_eq!(rq.payload, b"request on protocol /test/other");
1617					let _ = rq.pending_response.send(super::OutgoingResponse {
1618						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1619						reputation_changes: Vec::new(),
1620						sent_feedback: Some(fb_tx),
1621					});
1622					fb_rx.await.unwrap();
1623				}
1624			});
1625
1626			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1627		};
1628
1629		// This swarm speaks all protocols.
1630		let mut new_swarm = build_swarm(
1631			vec![
1632				protocol_config_1.clone(),
1633				protocol_config_1_fallback.clone(),
1634				protocol_config_2.clone(),
1635			]
1636			.into_iter(),
1637		);
1638
1639		{
1640			let dial_addr = older_swarm.1.clone();
1641			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1642		}
1643
1644		// Running `older_swarm`` in the background.
1645		tokio::spawn(async move {
1646			loop {
1647				_ = older_swarm.0.select_next_some().await;
1648			}
1649		});
1650
1651		// Run the newer swarm. Attempt to make requests on all protocols.
1652		let (mut swarm, _) = new_swarm;
1653		let mut older_peer_id = None;
1654
1655		let mut response_receiver = None;
1656		// Try the new protocol with a fallback.
1657		loop {
1658			match swarm.select_next_some().await {
1659				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1660					older_peer_id = Some(peer_id);
1661					let (sender, receiver) = oneshot::channel();
1662					swarm.behaviour_mut().send_request(
1663						&peer_id,
1664						protocol_name_1.clone(),
1665						b"request on protocol /test/req-resp/2".to_vec(),
1666						Some((
1667							b"request on protocol /test/req-resp/1".to_vec(),
1668							protocol_config_1_fallback.name.clone(),
1669						)),
1670						sender,
1671						IfDisconnected::ImmediateError,
1672					);
1673					response_receiver = Some(receiver);
1674				},
1675				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1676					result.unwrap();
1677					break;
1678				},
1679				_ => {},
1680			}
1681		}
1682		assert_eq!(
1683			response_receiver.unwrap().await.unwrap().unwrap(),
1684			(
1685				b"this is a response on protocol /test/req-resp/1".to_vec(),
1686				protocol_name_1_fallback.clone()
1687			)
1688		);
1689		// Try the old protocol with a useless fallback.
1690		let (sender, response_receiver) = oneshot::channel();
1691		swarm.behaviour_mut().send_request(
1692			older_peer_id.as_ref().unwrap(),
1693			protocol_name_1_fallback.clone(),
1694			b"request on protocol /test/req-resp/1".to_vec(),
1695			Some((
1696				b"dummy request, will fail if processed".to_vec(),
1697				protocol_config_1_fallback.name.clone(),
1698			)),
1699			sender,
1700			IfDisconnected::ImmediateError,
1701		);
1702		loop {
1703			match swarm.select_next_some().await {
1704				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1705					result.unwrap();
1706					break;
1707				},
1708				_ => {},
1709			}
1710		}
1711		assert_eq!(
1712			response_receiver.await.unwrap().unwrap(),
1713			(
1714				b"this is a response on protocol /test/req-resp/1".to_vec(),
1715				protocol_name_1_fallback.clone()
1716			)
1717		);
1718		// Try the new protocol with no fallback. Should fail.
1719		let (sender, response_receiver) = oneshot::channel();
1720		swarm.behaviour_mut().send_request(
1721			older_peer_id.as_ref().unwrap(),
1722			protocol_name_1.clone(),
1723			b"request on protocol /test/req-resp-2".to_vec(),
1724			None,
1725			sender,
1726			IfDisconnected::ImmediateError,
1727		);
1728		loop {
1729			match swarm.select_next_some().await {
1730				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1731					assert_matches!(
1732						result.unwrap_err(),
1733						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1734					);
1735					break;
1736				},
1737				_ => {},
1738			}
1739		}
1740		assert!(response_receiver.await.unwrap().is_err());
1741		// Try the other protocol with no fallback.
1742		let (sender, response_receiver) = oneshot::channel();
1743		swarm.behaviour_mut().send_request(
1744			older_peer_id.as_ref().unwrap(),
1745			protocol_name_2.clone(),
1746			b"request on protocol /test/other".to_vec(),
1747			None,
1748			sender,
1749			IfDisconnected::ImmediateError,
1750		);
1751		loop {
1752			match swarm.select_next_some().await {
1753				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1754					result.unwrap();
1755					break;
1756				},
1757				_ => {},
1758			}
1759		}
1760		assert_eq!(
1761			response_receiver.await.unwrap().unwrap(),
1762			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1763		);
1764	}
1765
1766	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
1767	/// even if the libp2p component hangs.
1768	///
1769	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
1770	///
1771	/// This is achieved by:
1772	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
1773	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
1774	///   substrate this is set to 1 second.
1775	///
1776	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
1777	///
1778	/// - The second swarm must enforce the 1 second timeout.
1779	#[tokio::test]
1780	async fn enforce_outbound_timeouts() {
1781		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1782		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1783
1784		// These swarms only speaks protocol_name.
1785		let protocol_name = ProtocolName::from("/test/req-resp/1");
1786
1787		let protocol_config = ProtocolConfig {
1788			name: protocol_name.clone(),
1789			fallback_names: Vec::new(),
1790			max_request_size: 1024,
1791			max_response_size: 1024 * 1024,
1792			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
1793			inbound_queue: None,
1794		};
1795
1796		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1797		let (mut first_swarm, _) = {
1798			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1799
1800			tokio::spawn(async move {
1801				if let Some(rq) = rx.next().await {
1802					assert_eq!(rq.payload, b"this is a request");
1803
1804					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
1805					// `REQUEST_TIMEOUT`.
1806					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1807
1808					// By the time the response is sent back, the second swarm
1809					// received Timeout.
1810					let _ = rq.pending_response.send(super::OutgoingResponse {
1811						result: Ok(b"Second swarm already timedout".to_vec()),
1812						reputation_changes: Vec::new(),
1813						sent_feedback: None,
1814					});
1815				}
1816			});
1817
1818			let mut protocol_config = protocol_config.clone();
1819			protocol_config.inbound_queue = Some(tx);
1820
1821			build_swarm(iter::once(protocol_config))
1822		};
1823
1824		let (mut second_swarm, second_address) = {
1825			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1826
1827			tokio::spawn(async move {
1828				while let Some(rq) = rx.next().await {
1829					let _ = rq.pending_response.send(super::OutgoingResponse {
1830						result: Ok(b"This is the response".to_vec()),
1831						reputation_changes: Vec::new(),
1832						sent_feedback: None,
1833					});
1834				}
1835			});
1836			let mut protocol_config = protocol_config.clone();
1837			protocol_config.inbound_queue = Some(tx);
1838
1839			build_swarm(iter::once(protocol_config.clone()))
1840		};
1841		// Modify the second swarm to have a shorter timeout.
1842		second_swarm
1843			.behaviour_mut()
1844			.protocols
1845			.get_mut(&protocol_name)
1846			.unwrap()
1847			.request_timeout = REQUEST_TIMEOUT_SHORT;
1848
1849		// Ask first swarm to dial the second swarm.
1850		{
1851			Swarm::dial(&mut first_swarm, second_address).unwrap();
1852		}
1853
1854		// Running the first swarm in the background until a `InboundRequest` event happens,
1855		// which is a hint about the test having ended.
1856		tokio::spawn(async move {
1857			loop {
1858				let event = first_swarm.select_next_some().await;
1859				match event {
1860					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1861						assert!(result.is_ok());
1862						break;
1863					},
1864					SwarmEvent::ConnectionClosed { .. } => {
1865						break;
1866					},
1867					_ => {},
1868				}
1869			}
1870		});
1871
1872		// Run the second swarm.
1873		// - on connection established send the request to the first swarm
1874		// - expect to receive a timeout
1875		let mut response_receiver = None;
1876		loop {
1877			let event = second_swarm.select_next_some().await;
1878
1879			match event {
1880				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1881					let (sender, receiver) = oneshot::channel();
1882					second_swarm.behaviour_mut().send_request(
1883						&peer_id,
1884						protocol_name.clone(),
1885						b"this is a request".to_vec(),
1886						None,
1887						sender,
1888						IfDisconnected::ImmediateError,
1889					);
1890					assert!(response_receiver.is_none());
1891					response_receiver = Some(receiver);
1892				},
1893				SwarmEvent::ConnectionClosed { .. } => {
1894					break;
1895				},
1896				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1897					assert!(result.is_err());
1898					break;
1899				},
1900				_ => {},
1901			}
1902		}
1903
1904		// Expect the timeout.
1905		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1906			RequestFailure::Network(OutboundFailure::Timeout) => {},
1907			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1908		}
1909	}
1910}