Skip to main content

soil_network/
request_responses.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Collection of request-response protocols.
8//!
9//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
10//! more so-called "request-response" protocols.
11//!
12//! A request-response protocol works in the following way:
13//!
14//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
15//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
16//! with the request itself. The remote then sends the size of the response as a LEB128 number,
17//! followed with the response.
18//!
19//! - Requests have a certain time limit before they time out. This time includes the time it
20//! takes to send/receive the request and response.
21//!
22//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
23//! is used to handle incoming requests.
24
25use crate::{
26	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
27	service::traits::RequestResponseConfig as RequestResponseConfigT,
28	types::ProtocolName,
29	ReputationChange,
30};
31
32use futures::{channel::oneshot, prelude::*};
33use libp2p::{
34	core::{transport::PortUse, Endpoint, Multiaddr},
35	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
36	swarm::{
37		behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
38		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
39	},
40	PeerId,
41};
42
43use std::{
44	collections::{hash_map::Entry, HashMap},
45	io, iter,
46	ops::Deref,
47	pin::Pin,
48	sync::Arc,
49	task::{Context, Poll},
50	time::{Duration, Instant},
51};
52
53pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
54
55/// Logging target for the file.
56const LOG_TARGET: &str = "sub-libp2p::request-response";
57
58/// Periodically check if requests are taking too long.
59const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
60
61/// Possible failures occurring in the context of sending an outbound request and receiving the
62/// response.
63#[derive(Debug, Clone, thiserror::Error)]
64pub enum OutboundFailure {
65	/// The request could not be sent because a dialing attempt failed.
66	#[error("Failed to dial the requested peer")]
67	DialFailure,
68	/// The request timed out before a response was received.
69	#[error("Timeout while waiting for a response")]
70	Timeout,
71	/// The connection closed before a response was received.
72	#[error("Connection was closed before a response was received")]
73	ConnectionClosed,
74	/// The remote supports none of the requested protocols.
75	#[error("The remote supports none of the requested protocols")]
76	UnsupportedProtocols,
77	/// An IO failure happened on an outbound stream.
78	#[error("An IO failure happened on an outbound stream")]
79	Io(Arc<io::Error>),
80}
81
82impl From<request_response::OutboundFailure> for OutboundFailure {
83	fn from(out: request_response::OutboundFailure) -> Self {
84		match out {
85			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
86			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
87			request_response::OutboundFailure::ConnectionClosed => {
88				OutboundFailure::ConnectionClosed
89			},
90			request_response::OutboundFailure::UnsupportedProtocols => {
91				OutboundFailure::UnsupportedProtocols
92			},
93			request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
94		}
95	}
96}
97
98/// Possible failures occurring in the context of receiving an inbound request and sending a
99/// response.
100#[derive(Debug, thiserror::Error)]
101pub enum InboundFailure {
102	/// The inbound request timed out, either while reading the incoming request or before a
103	/// response is sent
104	#[error("Timeout while receiving request or sending response")]
105	Timeout,
106	/// The connection closed before a response could be send.
107	#[error("Connection was closed before a response could be sent")]
108	ConnectionClosed,
109	/// The local peer supports none of the protocols requested by the remote.
110	#[error("The local peer supports none of the protocols requested by the remote")]
111	UnsupportedProtocols,
112	/// The local peer failed to respond to an inbound request
113	#[error("The response channel was dropped without sending a response to the remote")]
114	ResponseOmission,
115	/// An IO failure happened on an inbound stream.
116	#[error("An IO failure happened on an inbound stream")]
117	Io(Arc<io::Error>),
118}
119
120impl From<request_response::InboundFailure> for InboundFailure {
121	fn from(out: request_response::InboundFailure) -> Self {
122		match out {
123			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
124			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
125			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
126			request_response::InboundFailure::UnsupportedProtocols => {
127				InboundFailure::UnsupportedProtocols
128			},
129			request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
130		}
131	}
132}
133
134/// Error in a request.
135#[derive(Debug, thiserror::Error)]
136#[allow(missing_docs)]
137pub enum RequestFailure {
138	#[error("We are not currently connected to the requested peer.")]
139	NotConnected,
140	#[error("Given protocol hasn't been registered.")]
141	UnknownProtocol,
142	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
143	Refused,
144	#[error("The remote replied, but the local node is no longer interested in the response.")]
145	Obsolete,
146	#[error("Problem on the network: {0}")]
147	Network(OutboundFailure),
148}
149
150/// Configuration for a single request-response protocol.
151#[derive(Debug, Clone)]
152pub struct ProtocolConfig {
153	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
154	pub name: ProtocolName,
155
156	/// Fallback on the wire protocol names to support.
157	pub fallback_names: Vec<ProtocolName>,
158
159	/// Maximum allowed size, in bytes, of a request.
160	///
161	/// Any request larger than this value will be declined as a way to avoid allocating too
162	/// much memory for it.
163	pub max_request_size: u64,
164
165	/// Maximum allowed size, in bytes, of a response.
166	///
167	/// Any response larger than this value will be declined as a way to avoid allocating too
168	/// much memory for it.
169	pub max_response_size: u64,
170
171	/// Duration after which emitted requests are considered timed out.
172	///
173	/// If you expect the response to come back quickly, you should set this to a smaller duration.
174	pub request_timeout: Duration,
175
176	/// Channel on which the networking service will send incoming requests.
177	///
178	/// Every time a peer sends a request to the local node using this protocol, the networking
179	/// service will push an element on this channel. The receiving side of this channel then has
180	/// to pull this element, process the request, and send back the response to send back to the
181	/// peer.
182	///
183	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
184	/// service will discard the incoming request send back an error to the peer. Consequently,
185	/// the channel being full is an indicator that the node is overloaded.
186	///
187	/// You can typically set the size of the channel to `T / d`, where `T` is the
188	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
189	/// build a response.
190	///
191	/// Can be `None` if the local node does not support answering incoming requests.
192	/// If this is `None`, then the local node will not advertise support for this protocol towards
193	/// other peers. If this is `Some` but the channel is closed, then the local node will
194	/// advertise support for this protocol, but any incoming request will lead to an error being
195	/// sent back.
196	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
197}
198
199impl RequestResponseConfigT for ProtocolConfig {
200	fn protocol_name(&self) -> &ProtocolName {
201		&self.name
202	}
203}
204
205/// A single request received by a peer on a request-response protocol.
206#[derive(Debug)]
207pub struct IncomingRequest {
208	/// Who sent the request.
209	pub peer: crate::types::PeerId,
210
211	/// Request sent by the remote. Will always be smaller than
212	/// [`ProtocolConfig::max_request_size`].
213	pub payload: Vec<u8>,
214
215	/// Channel to send back the response.
216	///
217	/// There are two ways to indicate that handling the request failed:
218	///
219	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
220	///
221	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
222	/// the given peer.
223	pub pending_response: oneshot::Sender<OutgoingResponse>,
224}
225
226/// Response for an incoming request to be send by a request protocol handler.
227#[derive(Debug)]
228pub struct OutgoingResponse {
229	/// The payload of the response.
230	///
231	/// `Err(())` if none is available e.g. due an error while handling the request.
232	pub result: Result<Vec<u8>, ()>,
233
234	/// Reputation changes accrued while handling the request. To be applied to the reputation of
235	/// the peer sending the request.
236	pub reputation_changes: Vec<ReputationChange>,
237
238	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
239	/// peer.
240	///
241	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
242	/// >			outgoing data for each TCP socket, and it is not possible for a user
243	/// >			application to inspect this buffer. This channel here is not actually notified
244	/// >			when the response has been fully sent out, but rather when it has fully been
245	/// >			written to the buffer managed by the operating system.
246	pub sent_feedback: Option<oneshot::Sender<()>>,
247}
248
249/// Information stored about a pending request.
250struct PendingRequest {
251	/// The time when the request was sent to the libp2p request-response protocol.
252	started_at: Instant,
253	/// The channel to send the response back to the caller.
254	///
255	/// This is wrapped in an `Option` to allow for the channel to be taken out
256	/// on force-detected timeouts.
257	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
258	/// Fallback request to send if the primary request fails.
259	fallback_request: Option<(Vec<u8>, ProtocolName)>,
260}
261
262/// When sending a request, what to do on a disconnected recipient.
263#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
264pub enum IfDisconnected {
265	/// Try to connect to the peer.
266	TryConnect,
267	/// Just fail if the destination is not yet connected.
268	ImmediateError,
269}
270
271/// Convenience functions for `IfDisconnected`.
272impl IfDisconnected {
273	/// Shall we connect to a disconnected peer?
274	pub fn should_connect(self) -> bool {
275		match self {
276			Self::TryConnect => true,
277			Self::ImmediateError => false,
278		}
279	}
280}
281
282/// Event generated by the [`RequestResponsesBehaviour`].
283#[derive(Debug)]
284pub enum Event {
285	/// A remote sent a request and either we have successfully answered it or an error happened.
286	///
287	/// This event is generated for statistics purposes.
288	InboundRequest {
289		/// Peer which has emitted the request.
290		peer: PeerId,
291		/// Name of the protocol in question.
292		protocol: ProtocolName,
293		/// Whether handling the request was successful or unsuccessful.
294		///
295		/// When successful contains the time elapsed between when we received the request and when
296		/// we sent back the response. When unsuccessful contains the failure reason.
297		result: Result<Duration, ResponseFailure>,
298	},
299
300	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
301	/// failed.
302	///
303	/// This event is generated for statistics purposes.
304	RequestFinished {
305		/// Peer that we send a request to.
306		peer: PeerId,
307		/// Name of the protocol in question.
308		protocol: ProtocolName,
309		/// Duration the request took.
310		duration: Duration,
311		/// Result of the request.
312		result: Result<(), RequestFailure>,
313	},
314
315	/// A request protocol handler issued reputation changes for the given peer.
316	ReputationChanges {
317		/// Peer whose reputation needs to be adjust.
318		peer: PeerId,
319		/// Reputation changes.
320		changes: Vec<ReputationChange>,
321	},
322}
323
324/// Combination of a protocol name and a request id.
325///
326/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
327/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
328/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
329/// [`ProtocolRequestId`]s.
330#[derive(Debug, Clone, PartialEq, Eq, Hash)]
331struct ProtocolRequestId<RequestId> {
332	protocol: ProtocolName,
333	request_id: RequestId,
334}
335
336impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
337	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
338		Self { protocol, request_id }
339	}
340}
341
342/// Details of a request-response protocol.
343struct ProtocolDetails {
344	behaviour: Behaviour<GenericCodec>,
345	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
346	request_timeout: Duration,
347}
348
349/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
350pub struct RequestResponsesBehaviour {
351	/// The multiple sub-protocols, by name.
352	///
353	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
354	/// "response builder" used to build responses for incoming requests.
355	protocols: HashMap<ProtocolName, ProtocolDetails>,
356
357	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
358	pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
359
360	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
361	/// start time and the response to send back to the remote.
362	pending_responses: stream::FuturesUnordered<
363		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
364	>,
365
366	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
367	pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
368
369	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
370	/// when the request has been sent out.
371	send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
372
373	/// Primarily used to get a reputation of a node.
374	peer_store: Arc<dyn PeerStoreProvider>,
375
376	/// Interval to check that the requests are not taking too long.
377	///
378	/// We had issues in the past where libp2p did not produce a timeout event in due time.
379	///
380	/// For more details, see:
381	/// - <https://github.com/paritytech/polkadot-sdk/issues/7076#issuecomment-2596085096>
382	periodic_request_check: tokio::time::Interval,
383}
384
385/// Generated by the response builder and waiting to be processed.
386struct RequestProcessingOutcome {
387	peer: PeerId,
388	request_id: InboundRequestId,
389	protocol: ProtocolName,
390	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
391	response: OutgoingResponse,
392}
393
394impl RequestResponsesBehaviour {
395	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
396	/// the same protocol is passed twice.
397	pub fn new(
398		list: impl Iterator<Item = ProtocolConfig>,
399		peer_store: Arc<dyn PeerStoreProvider>,
400	) -> Result<Self, RegisterError> {
401		let mut protocols = HashMap::new();
402		for protocol in list {
403			let cfg = Config::default().with_request_timeout(protocol.request_timeout);
404
405			let protocol_support = if protocol.inbound_queue.is_some() {
406				ProtocolSupport::Full
407			} else {
408				ProtocolSupport::Outbound
409			};
410
411			let behaviour = Behaviour::with_codec(
412				GenericCodec {
413					max_request_size: protocol.max_request_size,
414					max_response_size: protocol.max_response_size,
415				},
416				iter::once(protocol.name.clone())
417					.chain(protocol.fallback_names)
418					.zip(iter::repeat(protocol_support)),
419				cfg,
420			);
421
422			match protocols.entry(protocol.name) {
423				Entry::Vacant(e) => e.insert(ProtocolDetails {
424					behaviour,
425					inbound_queue: protocol.inbound_queue,
426					request_timeout: protocol.request_timeout,
427				}),
428				Entry::Occupied(e) => {
429					return Err(RegisterError::DuplicateProtocol(e.key().clone()))
430				},
431			};
432		}
433
434		Ok(Self {
435			protocols,
436			pending_requests: Default::default(),
437			pending_responses: Default::default(),
438			pending_responses_arrival_time: Default::default(),
439			send_feedback: Default::default(),
440			peer_store,
441			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
442		})
443	}
444
445	/// Initiates sending a request.
446	///
447	/// If there is no established connection to the target peer, the behavior is determined by the
448	/// choice of `connect`.
449	///
450	/// An error is returned if the protocol doesn't match one that has been registered.
451	pub fn send_request(
452		&mut self,
453		target: &PeerId,
454		protocol_name: ProtocolName,
455		request: Vec<u8>,
456		fallback_request: Option<(Vec<u8>, ProtocolName)>,
457		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
458		connect: IfDisconnected,
459	) {
460		log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
461
462		if let Some(ProtocolDetails { behaviour, .. }) =
463			self.protocols.get_mut(protocol_name.deref())
464		{
465			Self::send_request_inner(
466				behaviour,
467				&mut self.pending_requests,
468				target,
469				protocol_name,
470				request,
471				fallback_request,
472				pending_response,
473				connect,
474			)
475		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
476			log::debug!(
477				target: LOG_TARGET,
478				"Unknown protocol {:?}. At the same time local \
479				 node is no longer interested in the result.",
480				protocol_name,
481			);
482		}
483	}
484
485	fn send_request_inner(
486		behaviour: &mut Behaviour<GenericCodec>,
487		pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
488		target: &PeerId,
489		protocol_name: ProtocolName,
490		request: Vec<u8>,
491		fallback_request: Option<(Vec<u8>, ProtocolName)>,
492		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
493		connect: IfDisconnected,
494	) {
495		if behaviour.is_connected(target) || connect.should_connect() {
496			let request_id = behaviour.send_request(target, request);
497			let prev_req_id = pending_requests.insert(
498				(protocol_name.to_string().into(), request_id).into(),
499				PendingRequest {
500					started_at: Instant::now(),
501					response_tx: Some(pending_response),
502					fallback_request,
503				},
504			);
505			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
506		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
507			log::debug!(
508				target: LOG_TARGET,
509				"Not connected to peer {:?}. At the same time local \
510				 node is no longer interested in the result.",
511				target,
512			);
513		}
514	}
515}
516
517impl NetworkBehaviour for RequestResponsesBehaviour {
518	type ConnectionHandler =
519		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
520	type ToSwarm = Event;
521
522	fn handle_pending_inbound_connection(
523		&mut self,
524		_connection_id: ConnectionId,
525		_local_addr: &Multiaddr,
526		_remote_addr: &Multiaddr,
527	) -> Result<(), ConnectionDenied> {
528		Ok(())
529	}
530
531	fn handle_pending_outbound_connection(
532		&mut self,
533		_connection_id: ConnectionId,
534		_maybe_peer: Option<PeerId>,
535		_addresses: &[Multiaddr],
536		_effective_role: Endpoint,
537	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
538		Ok(Vec::new())
539	}
540
541	fn handle_established_inbound_connection(
542		&mut self,
543		connection_id: ConnectionId,
544		peer: PeerId,
545		local_addr: &Multiaddr,
546		remote_addr: &Multiaddr,
547	) -> Result<THandler<Self>, ConnectionDenied> {
548		let iter =
549			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
550				if let Ok(handler) = behaviour.handle_established_inbound_connection(
551					connection_id,
552					peer,
553					local_addr,
554					remote_addr,
555				) {
556					Some((p.to_string(), handler))
557				} else {
558					None
559				}
560			});
561
562		Ok(MultiHandler::try_from_iter(iter).expect(
563			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
564			 which is the only possible error; qed",
565		))
566	}
567
568	fn handle_established_outbound_connection(
569		&mut self,
570		connection_id: ConnectionId,
571		peer: PeerId,
572		addr: &Multiaddr,
573		role_override: Endpoint,
574		port_use: PortUse,
575	) -> Result<THandler<Self>, ConnectionDenied> {
576		let iter =
577			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
578				if let Ok(handler) = behaviour.handle_established_outbound_connection(
579					connection_id,
580					peer,
581					addr,
582					role_override,
583					port_use,
584				) {
585					Some((p.to_string(), handler))
586				} else {
587					None
588				}
589			});
590
591		Ok(MultiHandler::try_from_iter(iter).expect(
592			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
593			 which is the only possible error; qed",
594		))
595	}
596
597	fn on_swarm_event(&mut self, event: FromSwarm) {
598		for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
599			behaviour.on_swarm_event(event);
600		}
601	}
602
603	fn on_connection_handler_event(
604		&mut self,
605		peer_id: PeerId,
606		connection_id: ConnectionId,
607		event: THandlerOutEvent<Self>,
608	) {
609		let p_name = event.0;
610		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
611			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
612		} else {
613			log::warn!(
614				target: LOG_TARGET,
615				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
616				p_name
617			);
618		}
619	}
620
621	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
622		'poll_all: loop {
623			// Poll the periodic request check.
624			if self.periodic_request_check.poll_tick(cx).is_ready() {
625				self.pending_requests.retain(|id, req| {
626					let Some(ProtocolDetails { request_timeout, .. }) =
627						self.protocols.get(&id.protocol)
628					else {
629						log::warn!(
630							target: LOG_TARGET,
631							"Request {id:?} has no protocol registered.",
632						);
633
634						if let Some(response_tx) = req.response_tx.take() {
635							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
636								log::debug!(
637									target: LOG_TARGET,
638									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
639								);
640							}
641						}
642						return false
643					};
644
645					let elapsed = req.started_at.elapsed();
646					if elapsed > *request_timeout {
647						log::debug!(
648							target: LOG_TARGET,
649							"Request {id:?} force detected as timeout.",
650						);
651
652						if let Some(response_tx) = req.response_tx.take() {
653							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
654								log::debug!(
655									target: LOG_TARGET,
656									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
657								);
658							}
659						}
660
661						false
662					} else {
663						true
664					}
665				});
666			}
667
668			// Poll to see if any response is ready to be sent back.
669			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
670				let RequestProcessingOutcome {
671					peer,
672					request_id,
673					protocol: protocol_name,
674					inner_channel,
675					response: OutgoingResponse { result, reputation_changes, sent_feedback },
676				} = match outcome {
677					Some(outcome) => outcome,
678					// The response builder was too busy or handling the request failed. This is
679					// later on reported as a `InboundFailure::Omission`.
680					None => continue,
681				};
682
683				if let Ok(payload) = result {
684					if let Some(ProtocolDetails { behaviour, .. }) =
685						self.protocols.get_mut(&*protocol_name)
686					{
687						log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
688
689						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
690							// Note: Failure is handled further below when receiving
691							// `InboundFailure` event from request-response [`Behaviour`].
692							log::debug!(
693								target: LOG_TARGET,
694								"Failed to send response for {:?} on protocol {:?} due to a \
695								 timeout or due to the connection to the peer being closed. \
696								 Dropping response",
697								request_id, protocol_name,
698							);
699						} else if let Some(sent_feedback) = sent_feedback {
700							self.send_feedback
701								.insert((protocol_name, request_id).into(), sent_feedback);
702						}
703					}
704				}
705
706				if !reputation_changes.is_empty() {
707					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
708						peer,
709						changes: reputation_changes,
710					}));
711				}
712			}
713
714			let mut fallback_requests = vec![];
715
716			// Poll request-responses protocols.
717			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
718			{
719				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
720					let ev = match ev {
721						// Main events we are interested in.
722						ToSwarm::GenerateEvent(ev) => ev,
723
724						// Other events generated by the underlying behaviour are transparently
725						// passed through.
726						ToSwarm::Dial { opts } => {
727							if opts.get_peer_id().is_none() {
728								log::error!(
729									target: LOG_TARGET,
730									"The request-response isn't supposed to start dialing addresses"
731								);
732							}
733							return Poll::Ready(ToSwarm::Dial { opts });
734						},
735						event => {
736							return Poll::Ready(
737								event.map_in(|event| ((*protocol).to_string(), event)).map_out(
738									|_| {
739										unreachable!(
740											"`GenerateEvent` is handled in a branch above; qed"
741										)
742									},
743								),
744							);
745						},
746					};
747
748					match ev {
749						// Received a request from a remote.
750						request_response::Event::Message {
751							peer,
752							message: Message::Request { request_id, request, channel, .. },
753						} => {
754							self.pending_responses_arrival_time
755								.insert((protocol.clone(), request_id).into(), Instant::now());
756
757							let reputation = self.peer_store.peer_reputation(&peer.into());
758
759							if reputation < BANNED_THRESHOLD {
760								log::debug!(
761									target: LOG_TARGET,
762									"Cannot handle requests from a node with a low reputation {}: {}",
763									peer,
764									reputation,
765								);
766								continue 'poll_protocol;
767							}
768
769							let (tx, rx) = oneshot::channel();
770
771							// Submit the request to the "response builder" passed by the user at
772							// initialization.
773							if let Some(resp_builder) = inbound_queue {
774								// If the response builder is too busy, silently drop `tx`. This
775								// will be reported by the corresponding request-response
776								// [`Behaviour`] through an `InboundFailure::Omission` event.
777								// Note that we use `async_channel::bounded` and not `mpsc::channel`
778								// because the latter allocates an extra slot for every cloned
779								// sender.
780								let _ = resp_builder.try_send(IncomingRequest {
781									peer: peer.into(),
782									payload: request,
783									pending_response: tx,
784								});
785							} else {
786								debug_assert!(false, "Received message on outbound-only protocol.");
787							}
788
789							let protocol = protocol.clone();
790
791							self.pending_responses.push(Box::pin(async move {
792								// The `tx` created above can be dropped if we are not capable of
793								// processing this request, which is reflected as a
794								// `InboundFailure::Omission` event.
795								rx.await.map_or(None, |response| {
796									Some(RequestProcessingOutcome {
797										peer,
798										request_id,
799										protocol,
800										inner_channel: channel,
801										response,
802									})
803								})
804							}));
805
806							// This `continue` makes sure that `pending_responses` gets polled
807							// after we have added the new element.
808							continue 'poll_all;
809						},
810
811						// Received a response from a remote to one of our requests.
812						request_response::Event::Message {
813							peer,
814							message: Message::Response { request_id, response },
815							..
816						} => {
817							let (started, delivered) = match self
818								.pending_requests
819								.remove(&(protocol.clone(), request_id).into())
820							{
821								Some(PendingRequest {
822									started_at,
823									response_tx: Some(response_tx),
824									..
825								}) => {
826									log::trace!(
827										target: LOG_TARGET,
828										"received response from {peer} ({protocol:?}), {} bytes",
829										response.as_ref().map_or(0usize, |response| response.len()),
830									);
831
832									let delivered = response_tx
833										.send(
834											response
835												.map_err(|()| RequestFailure::Refused)
836												.map(|resp| (resp, protocol.clone())),
837										)
838										.map_err(|_| RequestFailure::Obsolete);
839									(started_at, delivered)
840								},
841								_ => {
842									log::debug!(
843										target: LOG_TARGET,
844										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
845										request_id,
846										peer,
847									);
848									continue;
849								},
850							};
851
852							let out = Event::RequestFinished {
853								peer,
854								protocol: protocol.clone(),
855								duration: started.elapsed(),
856								result: delivered,
857							};
858
859							return Poll::Ready(ToSwarm::GenerateEvent(out));
860						},
861
862						// One of our requests has failed.
863						request_response::Event::OutboundFailure {
864							peer,
865							request_id,
866							error,
867							..
868						} => {
869							let error = OutboundFailure::from(error);
870							let started = match self
871								.pending_requests
872								.remove(&(protocol.clone(), request_id).into())
873							{
874								Some(PendingRequest {
875									started_at,
876									response_tx: Some(response_tx),
877									fallback_request,
878								}) => {
879									// Try using the fallback request if the protocol was not
880									// supported.
881									if matches!(error, OutboundFailure::UnsupportedProtocols) {
882										if let Some((fallback_request, fallback_protocol)) =
883											fallback_request
884										{
885											log::trace!(
886												target: LOG_TARGET,
887												"Request with id {:?} failed. Trying the fallback protocol. {}",
888												request_id,
889												fallback_protocol.deref()
890											);
891											fallback_requests.push((
892												peer,
893												fallback_protocol,
894												fallback_request,
895												response_tx,
896											));
897											continue;
898										}
899									}
900
901									if response_tx
902										.send(Err(RequestFailure::Network(error.clone())))
903										.is_err()
904									{
905										log::debug!(
906											target: LOG_TARGET,
907											"Request with id {:?} failed. At the same time local \
908											 node is no longer interested in the result.",
909											request_id,
910										);
911									}
912									started_at
913								},
914								_ => {
915									log::debug!(
916										target: LOG_TARGET,
917										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
918										request_id,
919										error,
920										peer
921									);
922									continue;
923								},
924							};
925
926							let out = Event::RequestFinished {
927								peer,
928								protocol: protocol.clone(),
929								duration: started.elapsed(),
930								result: Err(RequestFailure::Network(error)),
931							};
932
933							return Poll::Ready(ToSwarm::GenerateEvent(out));
934						},
935
936						// An inbound request failed, either while reading the request or due to
937						// failing to send a response.
938						request_response::Event::InboundFailure {
939							request_id, peer, error, ..
940						} => {
941							self.pending_responses_arrival_time
942								.remove(&(protocol.clone(), request_id).into());
943							self.send_feedback.remove(&(protocol.clone(), request_id).into());
944							let out = Event::InboundRequest {
945								peer,
946								protocol: protocol.clone(),
947								result: Err(ResponseFailure::Network(error.into())),
948							};
949							return Poll::Ready(ToSwarm::GenerateEvent(out));
950						},
951
952						// A response to an inbound request has been sent.
953						request_response::Event::ResponseSent { request_id, peer } => {
954							let arrival_time = self
955								.pending_responses_arrival_time
956								.remove(&(protocol.clone(), request_id).into())
957								.map(|t| t.elapsed())
958								.expect(
959									"Time is added for each inbound request on arrival and only \
960									 removed on success (`ResponseSent`) or failure \
961									 (`InboundFailure`). One can not receive a success event for a \
962									 request that either never arrived, or that has previously \
963									 failed; qed.",
964								);
965
966							if let Some(send_feedback) =
967								self.send_feedback.remove(&(protocol.clone(), request_id).into())
968							{
969								let _ = send_feedback.send(());
970							}
971
972							let out = Event::InboundRequest {
973								peer,
974								protocol: protocol.clone(),
975								result: Ok(arrival_time),
976							};
977
978							return Poll::Ready(ToSwarm::GenerateEvent(out));
979						},
980					};
981				}
982			}
983
984			// Send out fallback requests.
985			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
986				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
987					Self::send_request_inner(
988						behaviour,
989						&mut self.pending_requests,
990						&peer,
991						protocol,
992						request,
993						None,
994						pending_response,
995						// We can error if not connected because the
996						// previous attempt would have tried to establish a
997						// connection already or errored and we wouldn't have gotten here.
998						IfDisconnected::ImmediateError,
999					);
1000				}
1001			}
1002
1003			break Poll::Pending;
1004		}
1005	}
1006}
1007
1008/// Error when registering a protocol.
1009#[derive(Debug, thiserror::Error)]
1010pub enum RegisterError {
1011	/// A protocol has been specified multiple times.
1012	#[error("{0}")]
1013	DuplicateProtocol(ProtocolName),
1014}
1015
1016/// Error when processing a request sent by a remote.
1017#[derive(Debug, thiserror::Error)]
1018pub enum ResponseFailure {
1019	/// Problem on the network.
1020	#[error("Problem on the network: {0}")]
1021	Network(InboundFailure),
1022}
1023
1024/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1025/// into requests and responses and vice-versa.
1026#[derive(Debug, Clone)]
1027#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1028pub struct GenericCodec {
1029	max_request_size: u64,
1030	max_response_size: u64,
1031}
1032
1033#[async_trait::async_trait]
1034impl Codec for GenericCodec {
1035	type Protocol = ProtocolName;
1036	type Request = Vec<u8>;
1037	type Response = Result<Vec<u8>, ()>;
1038
1039	async fn read_request<T>(
1040		&mut self,
1041		_: &Self::Protocol,
1042		mut io: &mut T,
1043	) -> io::Result<Self::Request>
1044	where
1045		T: AsyncRead + Unpin + Send,
1046	{
1047		// Read the length.
1048		let length = unsigned_varint::aio::read_usize(&mut io)
1049			.await
1050			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1051		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1052			return Err(io::Error::new(
1053				io::ErrorKind::InvalidInput,
1054				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1055			));
1056		}
1057
1058		// Read the payload.
1059		let mut buffer = vec![0; length];
1060		io.read_exact(&mut buffer).await?;
1061		Ok(buffer)
1062	}
1063
1064	async fn read_response<T>(
1065		&mut self,
1066		_: &Self::Protocol,
1067		mut io: &mut T,
1068	) -> io::Result<Self::Response>
1069	where
1070		T: AsyncRead + Unpin + Send,
1071	{
1072		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1073		// considered as a protocol error and will result in the entire connection being closed.
1074		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1075		// that this response is an error.
1076
1077		// Read the length.
1078		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1079			Ok(l) => l,
1080			Err(unsigned_varint::io::ReadError::Io(err))
1081				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1082			{
1083				return Ok(Err(()))
1084			},
1085			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1086		};
1087
1088		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1089			return Err(io::Error::new(
1090				io::ErrorKind::InvalidInput,
1091				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1092			));
1093		}
1094
1095		// Read the payload.
1096		let mut buffer = vec![0; length];
1097		io.read_exact(&mut buffer).await?;
1098		Ok(Ok(buffer))
1099	}
1100
1101	async fn write_request<T>(
1102		&mut self,
1103		_: &Self::Protocol,
1104		io: &mut T,
1105		req: Self::Request,
1106	) -> io::Result<()>
1107	where
1108		T: AsyncWrite + Unpin + Send,
1109	{
1110		// TODO: check the length?
1111		// Write the length.
1112		{
1113			let mut buffer = unsigned_varint::encode::usize_buffer();
1114			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1115		}
1116
1117		// Write the payload.
1118		io.write_all(&req).await?;
1119
1120		io.close().await?;
1121		Ok(())
1122	}
1123
1124	async fn write_response<T>(
1125		&mut self,
1126		_: &Self::Protocol,
1127		io: &mut T,
1128		res: Self::Response,
1129	) -> io::Result<()>
1130	where
1131		T: AsyncWrite + Unpin + Send,
1132	{
1133		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1134		if let Ok(res) = res {
1135			// TODO: check the length?
1136			// Write the length.
1137			{
1138				let mut buffer = unsigned_varint::encode::usize_buffer();
1139				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1140			}
1141
1142			// Write the payload.
1143			io.write_all(&res).await?;
1144		}
1145
1146		io.close().await?;
1147		Ok(())
1148	}
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153	use super::*;
1154
1155	use crate::mock::MockPeerStore;
1156	use assert_matches::assert_matches;
1157	use futures::channel::oneshot;
1158	use libp2p::{
1159		core::{
1160			transport::{MemoryTransport, Transport},
1161			upgrade,
1162		},
1163		identity::Keypair,
1164		noise,
1165		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1166		Multiaddr,
1167	};
1168	use std::{iter, time::Duration};
1169
1170	struct TokioExecutor;
1171	impl Executor for TokioExecutor {
1172		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1173			tokio::spawn(f);
1174		}
1175	}
1176
1177	fn build_swarm(
1178		list: impl Iterator<Item = ProtocolConfig>,
1179	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1180		let keypair = Keypair::generate_ed25519();
1181
1182		let transport = MemoryTransport::new()
1183			.upgrade(upgrade::Version::V1)
1184			.authenticate(noise::Config::new(&keypair).unwrap())
1185			.multiplex(libp2p::yamux::Config::default())
1186			.boxed();
1187
1188		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1189
1190		let mut swarm = Swarm::new(
1191			transport,
1192			behaviour,
1193			keypair.public().to_peer_id(),
1194			SwarmConfig::with_executor(TokioExecutor {})
1195				// This is taken care of by notification protocols in non-test environment
1196				// It is very slow in test environment for some reason, hence larger timeout
1197				.with_idle_connection_timeout(Duration::from_secs(10)),
1198		);
1199
1200		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1201
1202		swarm.listen_on(listen_addr.clone()).unwrap();
1203
1204		(swarm, listen_addr)
1205	}
1206
1207	#[tokio::test]
1208	async fn basic_request_response_works() {
1209		let protocol_name = ProtocolName::from("/test/req-resp/1");
1210
1211		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1212		let mut swarms = (0..2)
1213			.map(|_| {
1214				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1215
1216				tokio::spawn(async move {
1217					while let Some(rq) = rx.next().await {
1218						let (fb_tx, fb_rx) = oneshot::channel();
1219						assert_eq!(rq.payload, b"this is a request");
1220						let _ = rq.pending_response.send(super::OutgoingResponse {
1221							result: Ok(b"this is a response".to_vec()),
1222							reputation_changes: Vec::new(),
1223							sent_feedback: Some(fb_tx),
1224						});
1225						fb_rx.await.unwrap();
1226					}
1227				});
1228
1229				let protocol_config = ProtocolConfig {
1230					name: protocol_name.clone(),
1231					fallback_names: Vec::new(),
1232					max_request_size: 1024,
1233					max_response_size: 1024 * 1024,
1234					request_timeout: Duration::from_secs(30),
1235					inbound_queue: Some(tx),
1236				};
1237
1238				build_swarm(iter::once(protocol_config))
1239			})
1240			.collect::<Vec<_>>();
1241
1242		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1243		// this test, so they wouldn't connect to each other.
1244		{
1245			let dial_addr = swarms[1].1.clone();
1246			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1247		}
1248
1249		let (mut swarm, _) = swarms.remove(0);
1250		// Running `swarm[0]` in the background.
1251		tokio::spawn(async move {
1252			loop {
1253				match swarm.select_next_some().await {
1254					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1255						result.unwrap();
1256					},
1257					_ => {},
1258				}
1259			}
1260		});
1261
1262		// Remove and run the remaining swarm.
1263		let (mut swarm, _) = swarms.remove(0);
1264		let mut response_receiver = None;
1265
1266		loop {
1267			match swarm.select_next_some().await {
1268				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1269					let (sender, receiver) = oneshot::channel();
1270					swarm.behaviour_mut().send_request(
1271						&peer_id,
1272						protocol_name.clone(),
1273						b"this is a request".to_vec(),
1274						None,
1275						sender,
1276						IfDisconnected::ImmediateError,
1277					);
1278					assert!(response_receiver.is_none());
1279					response_receiver = Some(receiver);
1280				},
1281				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1282					result.unwrap();
1283					break;
1284				},
1285				_ => {},
1286			}
1287		}
1288
1289		assert_eq!(
1290			response_receiver.unwrap().await.unwrap().unwrap(),
1291			(b"this is a response".to_vec(), protocol_name)
1292		);
1293	}
1294
1295	#[tokio::test]
1296	async fn max_response_size_exceeded() {
1297		let protocol_name = ProtocolName::from("/test/req-resp/1");
1298
1299		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1300		let mut swarms = (0..2)
1301			.map(|_| {
1302				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1303
1304				tokio::spawn(async move {
1305					while let Some(rq) = rx.next().await {
1306						assert_eq!(rq.payload, b"this is a request");
1307						let _ = rq.pending_response.send(super::OutgoingResponse {
1308							result: Ok(b"this response exceeds the limit".to_vec()),
1309							reputation_changes: Vec::new(),
1310							sent_feedback: None,
1311						});
1312					}
1313				});
1314
1315				let protocol_config = ProtocolConfig {
1316					name: protocol_name.clone(),
1317					fallback_names: Vec::new(),
1318					max_request_size: 1024,
1319					max_response_size: 8, // <-- important for the test
1320					request_timeout: Duration::from_secs(30),
1321					inbound_queue: Some(tx),
1322				};
1323
1324				build_swarm(iter::once(protocol_config))
1325			})
1326			.collect::<Vec<_>>();
1327
1328		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1329		// this test, so they wouldn't connect to each other.
1330		{
1331			let dial_addr = swarms[1].1.clone();
1332			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1333		}
1334
1335		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1336		// which is a hint about the test having ended.
1337		let (mut swarm, _) = swarms.remove(0);
1338		tokio::spawn(async move {
1339			loop {
1340				match swarm.select_next_some().await {
1341					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1342						assert!(result.is_ok());
1343					},
1344					SwarmEvent::ConnectionClosed { .. } => {
1345						break;
1346					},
1347					_ => {},
1348				}
1349			}
1350		});
1351
1352		// Remove and run the remaining swarm.
1353		let (mut swarm, _) = swarms.remove(0);
1354
1355		let mut response_receiver = None;
1356
1357		loop {
1358			match swarm.select_next_some().await {
1359				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1360					let (sender, receiver) = oneshot::channel();
1361					swarm.behaviour_mut().send_request(
1362						&peer_id,
1363						protocol_name.clone(),
1364						b"this is a request".to_vec(),
1365						None,
1366						sender,
1367						IfDisconnected::ImmediateError,
1368					);
1369					assert!(response_receiver.is_none());
1370					response_receiver = Some(receiver);
1371				},
1372				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1373					assert!(result.is_err());
1374					break;
1375				},
1376				_ => {},
1377			}
1378		}
1379
1380		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1381			RequestFailure::Network(OutboundFailure::Io(_)) => {},
1382			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1383		}
1384	}
1385
1386	/// A `RequestId` is a unique identifier among either all inbound or all outbound requests for
1387	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1388	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus, when handling `RequestId` in the
1389	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1390	/// protocol name with the `RequestId` to get a unique request identifier.
1391	///
1392	/// This test ensures that two requests on different protocols can be handled concurrently
1393	/// without a `RequestId` collision.
1394	///
1395	/// See [`ProtocolRequestId`] for additional information.
1396	#[tokio::test]
1397	async fn request_id_collision() {
1398		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1399		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1400
1401		let mut swarm_1 = {
1402			let protocol_configs = vec![
1403				ProtocolConfig {
1404					name: protocol_name_1.clone(),
1405					fallback_names: Vec::new(),
1406					max_request_size: 1024,
1407					max_response_size: 1024 * 1024,
1408					request_timeout: Duration::from_secs(30),
1409					inbound_queue: None,
1410				},
1411				ProtocolConfig {
1412					name: protocol_name_2.clone(),
1413					fallback_names: Vec::new(),
1414					max_request_size: 1024,
1415					max_response_size: 1024 * 1024,
1416					request_timeout: Duration::from_secs(30),
1417					inbound_queue: None,
1418				},
1419			];
1420
1421			build_swarm(protocol_configs.into_iter()).0
1422		};
1423
1424		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1425			let (tx_1, rx_1) = async_channel::bounded(64);
1426			let (tx_2, rx_2) = async_channel::bounded(64);
1427
1428			let protocol_configs = vec![
1429				ProtocolConfig {
1430					name: protocol_name_1.clone(),
1431					fallback_names: Vec::new(),
1432					max_request_size: 1024,
1433					max_response_size: 1024 * 1024,
1434					request_timeout: Duration::from_secs(30),
1435					inbound_queue: Some(tx_1),
1436				},
1437				ProtocolConfig {
1438					name: protocol_name_2.clone(),
1439					fallback_names: Vec::new(),
1440					max_request_size: 1024,
1441					max_response_size: 1024 * 1024,
1442					request_timeout: Duration::from_secs(30),
1443					inbound_queue: Some(tx_2),
1444				},
1445			];
1446
1447			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1448
1449			(swarm, rx_1, rx_2, listen_addr)
1450		};
1451
1452		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1453		// so they wouldn't connect to each other.
1454		swarm_1.dial(listen_add_2).unwrap();
1455
1456		// Run swarm 2 in the background, receiving two requests.
1457		tokio::spawn(async move {
1458			loop {
1459				match swarm_2.select_next_some().await {
1460					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1461						result.unwrap();
1462					},
1463					_ => {},
1464				}
1465			}
1466		});
1467
1468		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1469		//
1470		// Make sure both requests overlap, by answering the first only after receiving the
1471		// second.
1472		tokio::spawn(async move {
1473			let protocol_1_request = swarm_2_handler_1.next().await;
1474			let protocol_2_request = swarm_2_handler_2.next().await;
1475
1476			protocol_1_request
1477				.unwrap()
1478				.pending_response
1479				.send(OutgoingResponse {
1480					result: Ok(b"this is a response".to_vec()),
1481					reputation_changes: Vec::new(),
1482					sent_feedback: None,
1483				})
1484				.unwrap();
1485			protocol_2_request
1486				.unwrap()
1487				.pending_response
1488				.send(OutgoingResponse {
1489					result: Ok(b"this is a response".to_vec()),
1490					reputation_changes: Vec::new(),
1491					sent_feedback: None,
1492				})
1493				.unwrap();
1494		});
1495
1496		// Have swarm 1 send two requests to swarm 2 and await responses.
1497
1498		let mut response_receivers = None;
1499		let mut num_responses = 0;
1500
1501		loop {
1502			match swarm_1.select_next_some().await {
1503				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1504					let (sender_1, receiver_1) = oneshot::channel();
1505					let (sender_2, receiver_2) = oneshot::channel();
1506					swarm_1.behaviour_mut().send_request(
1507						&peer_id,
1508						protocol_name_1.clone(),
1509						b"this is a request".to_vec(),
1510						None,
1511						sender_1,
1512						IfDisconnected::ImmediateError,
1513					);
1514					swarm_1.behaviour_mut().send_request(
1515						&peer_id,
1516						protocol_name_2.clone(),
1517						b"this is a request".to_vec(),
1518						None,
1519						sender_2,
1520						IfDisconnected::ImmediateError,
1521					);
1522					assert!(response_receivers.is_none());
1523					response_receivers = Some((receiver_1, receiver_2));
1524				},
1525				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1526					num_responses += 1;
1527					result.unwrap();
1528					if num_responses == 2 {
1529						break;
1530					}
1531				},
1532				_ => {},
1533			}
1534		}
1535		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1536		assert_eq!(
1537			response_receiver_1.await.unwrap().unwrap(),
1538			(b"this is a response".to_vec(), protocol_name_1)
1539		);
1540		assert_eq!(
1541			response_receiver_2.await.unwrap().unwrap(),
1542			(b"this is a response".to_vec(), protocol_name_2)
1543		);
1544	}
1545
1546	#[tokio::test]
1547	async fn request_fallback() {
1548		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1549		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1550		let protocol_name_2 = ProtocolName::from("/test/another");
1551
1552		let protocol_config_1 = ProtocolConfig {
1553			name: protocol_name_1.clone(),
1554			fallback_names: Vec::new(),
1555			max_request_size: 1024,
1556			max_response_size: 1024 * 1024,
1557			request_timeout: Duration::from_secs(30),
1558			inbound_queue: None,
1559		};
1560		let protocol_config_1_fallback = ProtocolConfig {
1561			name: protocol_name_1_fallback.clone(),
1562			fallback_names: Vec::new(),
1563			max_request_size: 1024,
1564			max_response_size: 1024 * 1024,
1565			request_timeout: Duration::from_secs(30),
1566			inbound_queue: None,
1567		};
1568		let protocol_config_2 = ProtocolConfig {
1569			name: protocol_name_2.clone(),
1570			fallback_names: Vec::new(),
1571			max_request_size: 1024,
1572			max_response_size: 1024 * 1024,
1573			request_timeout: Duration::from_secs(30),
1574			inbound_queue: None,
1575		};
1576
1577		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1578		// It only responds to requests.
1579		let mut older_swarm = {
1580			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1581			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1582			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1583			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1584
1585			let mut protocol_config_2 = protocol_config_2.clone();
1586			protocol_config_2.inbound_queue = Some(tx_2);
1587
1588			tokio::spawn(async move {
1589				for _ in 0..2 {
1590					if let Some(rq) = rx_1.next().await {
1591						let (fb_tx, fb_rx) = oneshot::channel();
1592						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1593						let _ = rq.pending_response.send(super::OutgoingResponse {
1594							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1595							reputation_changes: Vec::new(),
1596							sent_feedback: Some(fb_tx),
1597						});
1598						fb_rx.await.unwrap();
1599					}
1600				}
1601
1602				if let Some(rq) = rx_2.next().await {
1603					let (fb_tx, fb_rx) = oneshot::channel();
1604					assert_eq!(rq.payload, b"request on protocol /test/other");
1605					let _ = rq.pending_response.send(super::OutgoingResponse {
1606						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1607						reputation_changes: Vec::new(),
1608						sent_feedback: Some(fb_tx),
1609					});
1610					fb_rx.await.unwrap();
1611				}
1612			});
1613
1614			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1615		};
1616
1617		// This swarm speaks all protocols.
1618		let mut new_swarm = build_swarm(
1619			vec![
1620				protocol_config_1.clone(),
1621				protocol_config_1_fallback.clone(),
1622				protocol_config_2.clone(),
1623			]
1624			.into_iter(),
1625		);
1626
1627		{
1628			let dial_addr = older_swarm.1.clone();
1629			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1630		}
1631
1632		// Running `older_swarm`` in the background.
1633		tokio::spawn(async move {
1634			loop {
1635				_ = older_swarm.0.select_next_some().await;
1636			}
1637		});
1638
1639		// Run the newer swarm. Attempt to make requests on all protocols.
1640		let (mut swarm, _) = new_swarm;
1641		let mut older_peer_id = None;
1642
1643		let mut response_receiver = None;
1644		// Try the new protocol with a fallback.
1645		loop {
1646			match swarm.select_next_some().await {
1647				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1648					older_peer_id = Some(peer_id);
1649					let (sender, receiver) = oneshot::channel();
1650					swarm.behaviour_mut().send_request(
1651						&peer_id,
1652						protocol_name_1.clone(),
1653						b"request on protocol /test/req-resp/2".to_vec(),
1654						Some((
1655							b"request on protocol /test/req-resp/1".to_vec(),
1656							protocol_config_1_fallback.name.clone(),
1657						)),
1658						sender,
1659						IfDisconnected::ImmediateError,
1660					);
1661					response_receiver = Some(receiver);
1662				},
1663				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1664					result.unwrap();
1665					break;
1666				},
1667				_ => {},
1668			}
1669		}
1670		assert_eq!(
1671			response_receiver.unwrap().await.unwrap().unwrap(),
1672			(
1673				b"this is a response on protocol /test/req-resp/1".to_vec(),
1674				protocol_name_1_fallback.clone()
1675			)
1676		);
1677		// Try the old protocol with a useless fallback.
1678		let (sender, response_receiver) = oneshot::channel();
1679		swarm.behaviour_mut().send_request(
1680			older_peer_id.as_ref().unwrap(),
1681			protocol_name_1_fallback.clone(),
1682			b"request on protocol /test/req-resp/1".to_vec(),
1683			Some((
1684				b"dummy request, will fail if processed".to_vec(),
1685				protocol_config_1_fallback.name.clone(),
1686			)),
1687			sender,
1688			IfDisconnected::ImmediateError,
1689		);
1690		loop {
1691			match swarm.select_next_some().await {
1692				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1693					result.unwrap();
1694					break;
1695				},
1696				_ => {},
1697			}
1698		}
1699		assert_eq!(
1700			response_receiver.await.unwrap().unwrap(),
1701			(
1702				b"this is a response on protocol /test/req-resp/1".to_vec(),
1703				protocol_name_1_fallback.clone()
1704			)
1705		);
1706		// Try the new protocol with no fallback. Should fail.
1707		let (sender, response_receiver) = oneshot::channel();
1708		swarm.behaviour_mut().send_request(
1709			older_peer_id.as_ref().unwrap(),
1710			protocol_name_1.clone(),
1711			b"request on protocol /test/req-resp-2".to_vec(),
1712			None,
1713			sender,
1714			IfDisconnected::ImmediateError,
1715		);
1716		loop {
1717			match swarm.select_next_some().await {
1718				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1719					assert_matches!(
1720						result.unwrap_err(),
1721						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1722					);
1723					break;
1724				},
1725				_ => {},
1726			}
1727		}
1728		assert!(response_receiver.await.unwrap().is_err());
1729		// Try the other protocol with no fallback.
1730		let (sender, response_receiver) = oneshot::channel();
1731		swarm.behaviour_mut().send_request(
1732			older_peer_id.as_ref().unwrap(),
1733			protocol_name_2.clone(),
1734			b"request on protocol /test/other".to_vec(),
1735			None,
1736			sender,
1737			IfDisconnected::ImmediateError,
1738		);
1739		loop {
1740			match swarm.select_next_some().await {
1741				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1742					result.unwrap();
1743					break;
1744				},
1745				_ => {},
1746			}
1747		}
1748		assert_eq!(
1749			response_receiver.await.unwrap().unwrap(),
1750			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1751		);
1752	}
1753
1754	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
1755	/// even if the libp2p component hangs.
1756	///
1757	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
1758	///
1759	/// This is achieved by:
1760	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
1761	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
1762	///   substrate this is set to 1 second.
1763	///
1764	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
1765	///
1766	/// - The second swarm must enforce the 1 second timeout.
1767	#[tokio::test]
1768	async fn enforce_outbound_timeouts() {
1769		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1770		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1771
1772		// These swarms only speaks protocol_name.
1773		let protocol_name = ProtocolName::from("/test/req-resp/1");
1774
1775		let protocol_config = ProtocolConfig {
1776			name: protocol_name.clone(),
1777			fallback_names: Vec::new(),
1778			max_request_size: 1024,
1779			max_response_size: 1024 * 1024,
1780			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
1781			inbound_queue: None,
1782		};
1783
1784		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1785		let (mut first_swarm, _) = {
1786			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1787
1788			tokio::spawn(async move {
1789				if let Some(rq) = rx.next().await {
1790					assert_eq!(rq.payload, b"this is a request");
1791
1792					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
1793					// `REQUEST_TIMEOUT`.
1794					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1795
1796					// By the time the response is sent back, the second swarm
1797					// received Timeout.
1798					let _ = rq.pending_response.send(super::OutgoingResponse {
1799						result: Ok(b"Second swarm already timedout".to_vec()),
1800						reputation_changes: Vec::new(),
1801						sent_feedback: None,
1802					});
1803				}
1804			});
1805
1806			let mut protocol_config = protocol_config.clone();
1807			protocol_config.inbound_queue = Some(tx);
1808
1809			build_swarm(iter::once(protocol_config))
1810		};
1811
1812		let (mut second_swarm, second_address) = {
1813			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1814
1815			tokio::spawn(async move {
1816				while let Some(rq) = rx.next().await {
1817					let _ = rq.pending_response.send(super::OutgoingResponse {
1818						result: Ok(b"This is the response".to_vec()),
1819						reputation_changes: Vec::new(),
1820						sent_feedback: None,
1821					});
1822				}
1823			});
1824			let mut protocol_config = protocol_config.clone();
1825			protocol_config.inbound_queue = Some(tx);
1826
1827			build_swarm(iter::once(protocol_config.clone()))
1828		};
1829		// Modify the second swarm to have a shorter timeout.
1830		second_swarm
1831			.behaviour_mut()
1832			.protocols
1833			.get_mut(&protocol_name)
1834			.unwrap()
1835			.request_timeout = REQUEST_TIMEOUT_SHORT;
1836
1837		// Ask first swarm to dial the second swarm.
1838		{
1839			Swarm::dial(&mut first_swarm, second_address).unwrap();
1840		}
1841
1842		// Running the first swarm in the background until a `InboundRequest` event happens,
1843		// which is a hint about the test having ended.
1844		tokio::spawn(async move {
1845			loop {
1846				let event = first_swarm.select_next_some().await;
1847				match event {
1848					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1849						assert!(result.is_ok());
1850						break;
1851					},
1852					SwarmEvent::ConnectionClosed { .. } => {
1853						break;
1854					},
1855					_ => {},
1856				}
1857			}
1858		});
1859
1860		// Run the second swarm.
1861		// - on connection established send the request to the first swarm
1862		// - expect to receive a timeout
1863		let mut response_receiver = None;
1864		loop {
1865			let event = second_swarm.select_next_some().await;
1866
1867			match event {
1868				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1869					let (sender, receiver) = oneshot::channel();
1870					second_swarm.behaviour_mut().send_request(
1871						&peer_id,
1872						protocol_name.clone(),
1873						b"this is a request".to_vec(),
1874						None,
1875						sender,
1876						IfDisconnected::ImmediateError,
1877					);
1878					assert!(response_receiver.is_none());
1879					response_receiver = Some(receiver);
1880				},
1881				SwarmEvent::ConnectionClosed { .. } => {
1882					break;
1883				},
1884				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1885					assert!(result.is_err());
1886					break;
1887				},
1888				_ => {},
1889			}
1890		}
1891
1892		// Expect the timeout.
1893		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1894			RequestFailure::Network(OutboundFailure::Timeout) => {},
1895			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1896		}
1897	}
1898}