pezsc_network/
request_responses.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! Collection of request-response protocols.
20//!
21//! The [`RequestResponsesBehaviour`] struct defined in this module provides support for zero or
22//! more so-called "request-response" protocols.
23//!
24//! A request-response protocol works in the following way:
25//!
26//! - For every emitted request, a new substream is open and the protocol is negotiated. If the
27//! remote supports the protocol, the size of the request is sent as a LEB128 number, followed
28//! with the request itself. The remote then sends the size of the response as a LEB128 number,
29//! followed with the response.
30//!
31//! - Requests have a certain time limit before they time out. This time includes the time it
32//! takes to send/receive the request and response.
33//!
34//! - If provided, a ["requests processing"](ProtocolConfig::inbound_queue) channel
35//! is used to handle incoming requests.
36
37use crate::{
38	peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39	service::traits::RequestResponseConfig as RequestResponseConfigT,
40	types::ProtocolName,
41	ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46	core::{transport::PortUse, Endpoint, Multiaddr},
47	request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48	swarm::{
49		behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50		NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51	},
52	PeerId,
53};
54
55use std::{
56	collections::{hash_map::Entry, HashMap},
57	io, iter,
58	ops::Deref,
59	pin::Pin,
60	sync::Arc,
61	task::{Context, Poll},
62	time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67/// Logging target for the file.
68const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70/// Periodically check if requests are taking too long.
71const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73/// Possible failures occurring in the context of sending an outbound request and receiving the
74/// response.
75#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77	/// The request could not be sent because a dialing attempt failed.
78	#[error("Failed to dial the requested peer")]
79	DialFailure,
80	/// The request timed out before a response was received.
81	#[error("Timeout while waiting for a response")]
82	Timeout,
83	/// The connection closed before a response was received.
84	#[error("Connection was closed before a response was received")]
85	ConnectionClosed,
86	/// The remote supports none of the requested protocols.
87	#[error("The remote supports none of the requested protocols")]
88	UnsupportedProtocols,
89	/// An IO failure happened on an outbound stream.
90	#[error("An IO failure happened on an outbound stream")]
91	Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95	fn from(out: request_response::OutboundFailure) -> Self {
96		match out {
97			request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98			request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99			request_response::OutboundFailure::ConnectionClosed => {
100				OutboundFailure::ConnectionClosed
101			},
102			request_response::OutboundFailure::UnsupportedProtocols => {
103				OutboundFailure::UnsupportedProtocols
104			},
105			request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
106		}
107	}
108}
109
110/// Possible failures occurring in the context of receiving an inbound request and sending a
111/// response.
112#[derive(Debug, thiserror::Error)]
113pub enum InboundFailure {
114	/// The inbound request timed out, either while reading the incoming request or before a
115	/// response is sent
116	#[error("Timeout while receiving request or sending response")]
117	Timeout,
118	/// The connection closed before a response could be send.
119	#[error("Connection was closed before a response could be sent")]
120	ConnectionClosed,
121	/// The local peer supports none of the protocols requested by the remote.
122	#[error("The local peer supports none of the protocols requested by the remote")]
123	UnsupportedProtocols,
124	/// The local peer failed to respond to an inbound request
125	#[error("The response channel was dropped without sending a response to the remote")]
126	ResponseOmission,
127	/// An IO failure happened on an inbound stream.
128	#[error("An IO failure happened on an inbound stream")]
129	Io(Arc<io::Error>),
130}
131
132impl From<request_response::InboundFailure> for InboundFailure {
133	fn from(out: request_response::InboundFailure) -> Self {
134		match out {
135			request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
136			request_response::InboundFailure::Timeout => InboundFailure::Timeout,
137			request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
138			request_response::InboundFailure::UnsupportedProtocols => {
139				InboundFailure::UnsupportedProtocols
140			},
141			request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
142		}
143	}
144}
145
146/// Error in a request.
147#[derive(Debug, thiserror::Error)]
148#[allow(missing_docs)]
149pub enum RequestFailure {
150	#[error("We are not currently connected to the requested peer.")]
151	NotConnected,
152	#[error("Given protocol hasn't been registered.")]
153	UnknownProtocol,
154	#[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
155	Refused,
156	#[error("The remote replied, but the local node is no longer interested in the response.")]
157	Obsolete,
158	#[error("Problem on the network: {0}")]
159	Network(OutboundFailure),
160}
161
162/// Configuration for a single request-response protocol.
163#[derive(Debug, Clone)]
164pub struct ProtocolConfig {
165	/// Name of the protocol on the wire. Should be something like `/foo/bar`.
166	pub name: ProtocolName,
167
168	/// Fallback on the wire protocol names to support.
169	pub fallback_names: Vec<ProtocolName>,
170
171	/// Maximum allowed size, in bytes, of a request.
172	///
173	/// Any request larger than this value will be declined as a way to avoid allocating too
174	/// much memory for it.
175	pub max_request_size: u64,
176
177	/// Maximum allowed size, in bytes, of a response.
178	///
179	/// Any response larger than this value will be declined as a way to avoid allocating too
180	/// much memory for it.
181	pub max_response_size: u64,
182
183	/// Duration after which emitted requests are considered timed out.
184	///
185	/// If you expect the response to come back quickly, you should set this to a smaller duration.
186	pub request_timeout: Duration,
187
188	/// Channel on which the networking service will send incoming requests.
189	///
190	/// Every time a peer sends a request to the local node using this protocol, the networking
191	/// service will push an element on this channel. The receiving side of this channel then has
192	/// to pull this element, process the request, and send back the response to send back to the
193	/// peer.
194	///
195	/// The size of the channel has to be carefully chosen. If the channel is full, the networking
196	/// service will discard the incoming request send back an error to the peer. Consequently,
197	/// the channel being full is an indicator that the node is overloaded.
198	///
199	/// You can typically set the size of the channel to `T / d`, where `T` is the
200	/// `request_timeout` and `d` is the expected average duration of CPU and I/O it takes to
201	/// build a response.
202	///
203	/// Can be `None` if the local node does not support answering incoming requests.
204	/// If this is `None`, then the local node will not advertise support for this protocol towards
205	/// other peers. If this is `Some` but the channel is closed, then the local node will
206	/// advertise support for this protocol, but any incoming request will lead to an error being
207	/// sent back.
208	pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
209}
210
211impl RequestResponseConfigT for ProtocolConfig {
212	fn protocol_name(&self) -> &ProtocolName {
213		&self.name
214	}
215}
216
217/// A single request received by a peer on a request-response protocol.
218#[derive(Debug)]
219pub struct IncomingRequest {
220	/// Who sent the request.
221	pub peer: pezsc_network_types::PeerId,
222
223	/// Request sent by the remote. Will always be smaller than
224	/// [`ProtocolConfig::max_request_size`].
225	pub payload: Vec<u8>,
226
227	/// Channel to send back the response.
228	///
229	/// There are two ways to indicate that handling the request failed:
230	///
231	/// 1. Drop `pending_response` and thus not changing the reputation of the peer.
232	///
233	/// 2. Sending an `Err(())` via `pending_response`, optionally including reputation changes for
234	/// the given peer.
235	pub pending_response: oneshot::Sender<OutgoingResponse>,
236}
237
238/// Response for an incoming request to be send by a request protocol handler.
239#[derive(Debug)]
240pub struct OutgoingResponse {
241	/// The payload of the response.
242	///
243	/// `Err(())` if none is available e.g. due an error while handling the request.
244	pub result: Result<Vec<u8>, ()>,
245
246	/// Reputation changes accrued while handling the request. To be applied to the reputation of
247	/// the peer sending the request.
248	pub reputation_changes: Vec<ReputationChange>,
249
250	/// If provided, the `oneshot::Sender` will be notified when the request has been sent to the
251	/// peer.
252	///
253	/// > **Note**: Operating systems typically maintain a buffer of a few dozen kilobytes of
254	/// >			outgoing data for each TCP socket, and it is not possible for a user
255	/// >			application to inspect this buffer. This channel here is not actually notified
256	/// >			when the response has been fully sent out, but rather when it has fully been
257	/// >			written to the buffer managed by the operating system.
258	pub sent_feedback: Option<oneshot::Sender<()>>,
259}
260
261/// Information stored about a pending request.
262struct PendingRequest {
263	/// The time when the request was sent to the libp2p request-response protocol.
264	started_at: Instant,
265	/// The channel to send the response back to the caller.
266	///
267	/// This is wrapped in an `Option` to allow for the channel to be taken out
268	/// on force-detected timeouts.
269	response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
270	/// Fallback request to send if the primary request fails.
271	fallback_request: Option<(Vec<u8>, ProtocolName)>,
272}
273
274/// When sending a request, what to do on a disconnected recipient.
275#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
276pub enum IfDisconnected {
277	/// Try to connect to the peer.
278	TryConnect,
279	/// Just fail if the destination is not yet connected.
280	ImmediateError,
281}
282
283/// Convenience functions for `IfDisconnected`.
284impl IfDisconnected {
285	/// Shall we connect to a disconnected peer?
286	pub fn should_connect(self) -> bool {
287		match self {
288			Self::TryConnect => true,
289			Self::ImmediateError => false,
290		}
291	}
292}
293
294/// Event generated by the [`RequestResponsesBehaviour`].
295#[derive(Debug)]
296pub enum Event {
297	/// A remote sent a request and either we have successfully answered it or an error happened.
298	///
299	/// This event is generated for statistics purposes.
300	InboundRequest {
301		/// Peer which has emitted the request.
302		peer: PeerId,
303		/// Name of the protocol in question.
304		protocol: ProtocolName,
305		/// Whether handling the request was successful or unsuccessful.
306		///
307		/// When successful contains the time elapsed between when we received the request and when
308		/// we sent back the response. When unsuccessful contains the failure reason.
309		result: Result<Duration, ResponseFailure>,
310	},
311
312	/// A request initiated using [`RequestResponsesBehaviour::send_request`] has succeeded or
313	/// failed.
314	///
315	/// This event is generated for statistics purposes.
316	RequestFinished {
317		/// Peer that we send a request to.
318		peer: PeerId,
319		/// Name of the protocol in question.
320		protocol: ProtocolName,
321		/// Duration the request took.
322		duration: Duration,
323		/// Result of the request.
324		result: Result<(), RequestFailure>,
325	},
326
327	/// A request protocol handler issued reputation changes for the given peer.
328	ReputationChanges {
329		/// Peer whose reputation needs to be adjust.
330		peer: PeerId,
331		/// Reputation changes.
332		changes: Vec<ReputationChange>,
333	},
334}
335
336/// Combination of a protocol name and a request id.
337///
338/// Uniquely identifies an inbound or outbound request among all handled protocols. Note however
339/// that uniqueness is only guaranteed between two inbound and likewise between two outbound
340/// requests. There is no uniqueness guarantee in a set of both inbound and outbound
341/// [`ProtocolRequestId`]s.
342#[derive(Debug, Clone, PartialEq, Eq, Hash)]
343struct ProtocolRequestId<RequestId> {
344	protocol: ProtocolName,
345	request_id: RequestId,
346}
347
348impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
349	fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
350		Self { protocol, request_id }
351	}
352}
353
354/// Details of a request-response protocol.
355struct ProtocolDetails {
356	behaviour: Behaviour<GenericCodec>,
357	inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
358	request_timeout: Duration,
359}
360
361/// Implementation of `NetworkBehaviour` that provides support for request-response protocols.
362pub struct RequestResponsesBehaviour {
363	/// The multiple sub-protocols, by name.
364	///
365	/// Contains the underlying libp2p request-response [`Behaviour`], plus an optional
366	/// "response builder" used to build responses for incoming requests.
367	protocols: HashMap<ProtocolName, ProtocolDetails>,
368
369	/// Pending requests, passed down to a request-response [`Behaviour`], awaiting a reply.
370	pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
371
372	/// Whenever an incoming request arrives, a `Future` is added to this list and will yield the
373	/// start time and the response to send back to the remote.
374	pending_responses: stream::FuturesUnordered<
375		Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
376	>,
377
378	/// Whenever an incoming request arrives, the arrival [`Instant`] is recorded here.
379	pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
380
381	/// Whenever a response is received on `pending_responses`, insert a channel to be notified
382	/// when the request has been sent out.
383	send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
384
385	/// Primarily used to get a reputation of a node.
386	peer_store: Arc<dyn PeerStoreProvider>,
387
388	/// Interval to check that the requests are not taking too long.
389	///
390	/// We had issues in the past where libp2p did not produce a timeout event in due time.
391	///
392	/// For more details, see:
393	/// - <https://github.com/pezkuwichain/pezkuwi-sdk/issues/294#issuecomment-2596085096>
394	periodic_request_check: tokio::time::Interval,
395}
396
397/// Generated by the response builder and waiting to be processed.
398struct RequestProcessingOutcome {
399	peer: PeerId,
400	request_id: InboundRequestId,
401	protocol: ProtocolName,
402	inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
403	response: OutgoingResponse,
404}
405
406impl RequestResponsesBehaviour {
407	/// Creates a new behaviour. Must be passed a list of supported protocols. Returns an error if
408	/// the same protocol is passed twice.
409	pub fn new(
410		list: impl Iterator<Item = ProtocolConfig>,
411		peer_store: Arc<dyn PeerStoreProvider>,
412	) -> Result<Self, RegisterError> {
413		let mut protocols = HashMap::new();
414		for protocol in list {
415			let cfg = Config::default().with_request_timeout(protocol.request_timeout);
416
417			let protocol_support = if protocol.inbound_queue.is_some() {
418				ProtocolSupport::Full
419			} else {
420				ProtocolSupport::Outbound
421			};
422
423			let behaviour = Behaviour::with_codec(
424				GenericCodec {
425					max_request_size: protocol.max_request_size,
426					max_response_size: protocol.max_response_size,
427				},
428				iter::once(protocol.name.clone())
429					.chain(protocol.fallback_names)
430					.zip(iter::repeat(protocol_support)),
431				cfg,
432			);
433
434			match protocols.entry(protocol.name) {
435				Entry::Vacant(e) => e.insert(ProtocolDetails {
436					behaviour,
437					inbound_queue: protocol.inbound_queue,
438					request_timeout: protocol.request_timeout,
439				}),
440				Entry::Occupied(e) => {
441					return Err(RegisterError::DuplicateProtocol(e.key().clone()))
442				},
443			};
444		}
445
446		Ok(Self {
447			protocols,
448			pending_requests: Default::default(),
449			pending_responses: Default::default(),
450			pending_responses_arrival_time: Default::default(),
451			send_feedback: Default::default(),
452			peer_store,
453			periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
454		})
455	}
456
457	/// Initiates sending a request.
458	///
459	/// If there is no established connection to the target peer, the behavior is determined by the
460	/// choice of `connect`.
461	///
462	/// An error is returned if the protocol doesn't match one that has been registered.
463	pub fn send_request(
464		&mut self,
465		target: &PeerId,
466		protocol_name: ProtocolName,
467		request: Vec<u8>,
468		fallback_request: Option<(Vec<u8>, ProtocolName)>,
469		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
470		connect: IfDisconnected,
471	) {
472		log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
473
474		if let Some(ProtocolDetails { behaviour, .. }) =
475			self.protocols.get_mut(protocol_name.deref())
476		{
477			Self::send_request_inner(
478				behaviour,
479				&mut self.pending_requests,
480				target,
481				protocol_name,
482				request,
483				fallback_request,
484				pending_response,
485				connect,
486			)
487		} else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
488			log::debug!(
489				target: LOG_TARGET,
490				"Unknown protocol {:?}. At the same time local \
491				 node is no longer interested in the result.",
492				protocol_name,
493			);
494		}
495	}
496
497	fn send_request_inner(
498		behaviour: &mut Behaviour<GenericCodec>,
499		pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
500		target: &PeerId,
501		protocol_name: ProtocolName,
502		request: Vec<u8>,
503		fallback_request: Option<(Vec<u8>, ProtocolName)>,
504		pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
505		connect: IfDisconnected,
506	) {
507		if behaviour.is_connected(target) || connect.should_connect() {
508			let request_id = behaviour.send_request(target, request);
509			let prev_req_id = pending_requests.insert(
510				(protocol_name.to_string().into(), request_id).into(),
511				PendingRequest {
512					started_at: Instant::now(),
513					response_tx: Some(pending_response),
514					fallback_request,
515				},
516			);
517			debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
518		} else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
519			log::debug!(
520				target: LOG_TARGET,
521				"Not connected to peer {:?}. At the same time local \
522				 node is no longer interested in the result.",
523				target,
524			);
525		}
526	}
527}
528
529impl NetworkBehaviour for RequestResponsesBehaviour {
530	type ConnectionHandler =
531		MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
532	type ToSwarm = Event;
533
534	fn handle_pending_inbound_connection(
535		&mut self,
536		_connection_id: ConnectionId,
537		_local_addr: &Multiaddr,
538		_remote_addr: &Multiaddr,
539	) -> Result<(), ConnectionDenied> {
540		Ok(())
541	}
542
543	fn handle_pending_outbound_connection(
544		&mut self,
545		_connection_id: ConnectionId,
546		_maybe_peer: Option<PeerId>,
547		_addresses: &[Multiaddr],
548		_effective_role: Endpoint,
549	) -> Result<Vec<Multiaddr>, ConnectionDenied> {
550		Ok(Vec::new())
551	}
552
553	fn handle_established_inbound_connection(
554		&mut self,
555		connection_id: ConnectionId,
556		peer: PeerId,
557		local_addr: &Multiaddr,
558		remote_addr: &Multiaddr,
559	) -> Result<THandler<Self>, ConnectionDenied> {
560		let iter =
561			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
562				if let Ok(handler) = behaviour.handle_established_inbound_connection(
563					connection_id,
564					peer,
565					local_addr,
566					remote_addr,
567				) {
568					Some((p.to_string(), handler))
569				} else {
570					None
571				}
572			});
573
574		Ok(MultiHandler::try_from_iter(iter).expect(
575			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
576			 which is the only possible error; qed",
577		))
578	}
579
580	fn handle_established_outbound_connection(
581		&mut self,
582		connection_id: ConnectionId,
583		peer: PeerId,
584		addr: &Multiaddr,
585		role_override: Endpoint,
586		port_use: PortUse,
587	) -> Result<THandler<Self>, ConnectionDenied> {
588		let iter =
589			self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
590				if let Ok(handler) = behaviour.handle_established_outbound_connection(
591					connection_id,
592					peer,
593					addr,
594					role_override,
595					port_use,
596				) {
597					Some((p.to_string(), handler))
598				} else {
599					None
600				}
601			});
602
603		Ok(MultiHandler::try_from_iter(iter).expect(
604			"Protocols are in a HashMap and there can be at most one handler per protocol name, \
605			 which is the only possible error; qed",
606		))
607	}
608
609	fn on_swarm_event(&mut self, event: FromSwarm) {
610		for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
611			behaviour.on_swarm_event(event);
612		}
613	}
614
615	fn on_connection_handler_event(
616		&mut self,
617		peer_id: PeerId,
618		connection_id: ConnectionId,
619		event: THandlerOutEvent<Self>,
620	) {
621		let p_name = event.0;
622		if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
623			return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
624		} else {
625			log::warn!(
626				target: LOG_TARGET,
627				"on_connection_handler_event: no request-response instance registered for protocol {:?}",
628				p_name
629			);
630		}
631	}
632
633	fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
634		'poll_all: loop {
635			// Poll the periodic request check.
636			if self.periodic_request_check.poll_tick(cx).is_ready() {
637				self.pending_requests.retain(|id, req| {
638					let Some(ProtocolDetails { request_timeout, .. }) =
639						self.protocols.get(&id.protocol)
640					else {
641						log::warn!(
642							target: LOG_TARGET,
643							"Request {id:?} has no protocol registered.",
644						);
645
646						if let Some(response_tx) = req.response_tx.take() {
647							if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
648								log::debug!(
649									target: LOG_TARGET,
650									"Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
651								);
652							}
653						}
654						return false
655					};
656
657					let elapsed = req.started_at.elapsed();
658					if elapsed > *request_timeout {
659						log::debug!(
660							target: LOG_TARGET,
661							"Request {id:?} force detected as timeout.",
662						);
663
664						if let Some(response_tx) = req.response_tx.take() {
665							if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
666								log::debug!(
667									target: LOG_TARGET,
668									"Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
669								);
670							}
671						}
672
673						false
674					} else {
675						true
676					}
677				});
678			}
679
680			// Poll to see if any response is ready to be sent back.
681			while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
682				let RequestProcessingOutcome {
683					peer,
684					request_id,
685					protocol: protocol_name,
686					inner_channel,
687					response: OutgoingResponse { result, reputation_changes, sent_feedback },
688				} = match outcome {
689					Some(outcome) => outcome,
690					// The response builder was too busy or handling the request failed. This is
691					// later on reported as a `InboundFailure::Omission`.
692					None => continue,
693				};
694
695				if let Ok(payload) = result {
696					if let Some(ProtocolDetails { behaviour, .. }) =
697						self.protocols.get_mut(&*protocol_name)
698					{
699						log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
700
701						if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
702							// Note: Failure is handled further below when receiving
703							// `InboundFailure` event from request-response [`Behaviour`].
704							log::debug!(
705								target: LOG_TARGET,
706								"Failed to send response for {:?} on protocol {:?} due to a \
707								 timeout or due to the connection to the peer being closed. \
708								 Dropping response",
709								request_id, protocol_name,
710							);
711						} else if let Some(sent_feedback) = sent_feedback {
712							self.send_feedback
713								.insert((protocol_name, request_id).into(), sent_feedback);
714						}
715					}
716				}
717
718				if !reputation_changes.is_empty() {
719					return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
720						peer,
721						changes: reputation_changes,
722					}));
723				}
724			}
725
726			let mut fallback_requests = vec![];
727
728			// Poll request-responses protocols.
729			for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
730			{
731				'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
732					let ev = match ev {
733						// Main events we are interested in.
734						ToSwarm::GenerateEvent(ev) => ev,
735
736						// Other events generated by the underlying behaviour are transparently
737						// passed through.
738						ToSwarm::Dial { opts } => {
739							if opts.get_peer_id().is_none() {
740								log::error!(
741									target: LOG_TARGET,
742									"The request-response isn't supposed to start dialing addresses"
743								);
744							}
745							return Poll::Ready(ToSwarm::Dial { opts });
746						},
747						event => {
748							return Poll::Ready(
749								event.map_in(|event| ((*protocol).to_string(), event)).map_out(
750									|_| {
751										unreachable!(
752											"`GenerateEvent` is handled in a branch above; qed"
753										)
754									},
755								),
756							);
757						},
758					};
759
760					match ev {
761						// Received a request from a remote.
762						request_response::Event::Message {
763							peer,
764							message: Message::Request { request_id, request, channel, .. },
765							..
766						} => {
767							self.pending_responses_arrival_time
768								.insert((protocol.clone(), request_id).into(), Instant::now());
769
770							let reputation = self.peer_store.peer_reputation(&peer.into());
771
772							if reputation < BANNED_THRESHOLD {
773								log::debug!(
774									target: LOG_TARGET,
775									"Cannot handle requests from a node with a low reputation {}: {}",
776									peer,
777									reputation,
778								);
779								continue 'poll_protocol;
780							}
781
782							let (tx, rx) = oneshot::channel();
783
784							// Submit the request to the "response builder" passed by the user at
785							// initialization.
786							if let Some(resp_builder) = inbound_queue {
787								// If the response builder is too busy, silently drop `tx`. This
788								// will be reported by the corresponding request-response
789								// [`Behaviour`] through an `InboundFailure::Omission` event.
790								// Note that we use `async_channel::bounded` and not `mpsc::channel`
791								// because the latter allocates an extra slot for every cloned
792								// sender.
793								let _ = resp_builder.try_send(IncomingRequest {
794									peer: peer.into(),
795									payload: request,
796									pending_response: tx,
797								});
798							} else {
799								debug_assert!(false, "Received message on outbound-only protocol.");
800							}
801
802							let protocol = protocol.clone();
803
804							self.pending_responses.push(Box::pin(async move {
805								// The `tx` created above can be dropped if we are not capable of
806								// processing this request, which is reflected as a
807								// `InboundFailure::Omission` event.
808								rx.await.map_or(None, |response| {
809									Some(RequestProcessingOutcome {
810										peer,
811										request_id,
812										protocol,
813										inner_channel: channel,
814										response,
815									})
816								})
817							}));
818
819							// This `continue` makes sure that `pending_responses` gets polled
820							// after we have added the new element.
821							continue 'poll_all;
822						},
823
824						// Received a response from a remote to one of our requests.
825						request_response::Event::Message {
826							peer,
827							message: Message::Response { request_id, response },
828							..
829						} => {
830							let (started, delivered) = match self
831								.pending_requests
832								.remove(&(protocol.clone(), request_id).into())
833							{
834								Some(PendingRequest {
835									started_at,
836									response_tx: Some(response_tx),
837									..
838								}) => {
839									log::trace!(
840										target: LOG_TARGET,
841										"received response from {peer} ({protocol:?}), {} bytes",
842										response.as_ref().map_or(0usize, |response| response.len()),
843									);
844
845									let delivered = response_tx
846										.send(
847											response
848												.map_err(|()| RequestFailure::Refused)
849												.map(|resp| (resp, protocol.clone())),
850										)
851										.map_err(|_| RequestFailure::Obsolete);
852									(started_at, delivered)
853								},
854								_ => {
855									log::debug!(
856										target: LOG_TARGET,
857										"Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
858										request_id,
859										peer,
860									);
861									continue;
862								},
863							};
864
865							let out = Event::RequestFinished {
866								peer,
867								protocol: protocol.clone(),
868								duration: started.elapsed(),
869								result: delivered,
870							};
871
872							return Poll::Ready(ToSwarm::GenerateEvent(out));
873						},
874
875						// One of our requests has failed.
876						request_response::Event::OutboundFailure {
877							peer,
878							request_id,
879							error,
880							..
881						} => {
882							let error = OutboundFailure::from(error);
883							let started = match self
884								.pending_requests
885								.remove(&(protocol.clone(), request_id).into())
886							{
887								Some(PendingRequest {
888									started_at,
889									response_tx: Some(response_tx),
890									fallback_request,
891								}) => {
892									// Try using the fallback request if the protocol was not
893									// supported.
894									if matches!(error, OutboundFailure::UnsupportedProtocols) {
895										if let Some((fallback_request, fallback_protocol)) =
896											fallback_request
897										{
898											log::trace!(
899												target: LOG_TARGET,
900												"Request with id {:?} failed. Trying the fallback protocol. {}",
901												request_id,
902												fallback_protocol.deref()
903											);
904											fallback_requests.push((
905												peer,
906												fallback_protocol,
907												fallback_request,
908												response_tx,
909											));
910											continue;
911										}
912									}
913
914									if response_tx
915										.send(Err(RequestFailure::Network(error.clone())))
916										.is_err()
917									{
918										log::debug!(
919											target: LOG_TARGET,
920											"Request with id {:?} failed. At the same time local \
921											 node is no longer interested in the result.",
922											request_id,
923										);
924									}
925									started_at
926								},
927								_ => {
928									log::debug!(
929										target: LOG_TARGET,
930										"Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
931										request_id,
932										error,
933										peer
934									);
935									continue;
936								},
937							};
938
939							let out = Event::RequestFinished {
940								peer,
941								protocol: protocol.clone(),
942								duration: started.elapsed(),
943								result: Err(RequestFailure::Network(error)),
944							};
945
946							return Poll::Ready(ToSwarm::GenerateEvent(out));
947						},
948
949						// An inbound request failed, either while reading the request or due to
950						// failing to send a response.
951						request_response::Event::InboundFailure {
952							request_id, peer, error, ..
953						} => {
954							self.pending_responses_arrival_time
955								.remove(&(protocol.clone(), request_id).into());
956							self.send_feedback.remove(&(protocol.clone(), request_id).into());
957							let out = Event::InboundRequest {
958								peer,
959								protocol: protocol.clone(),
960								result: Err(ResponseFailure::Network(error.into())),
961							};
962							return Poll::Ready(ToSwarm::GenerateEvent(out));
963						},
964
965						// A response to an inbound request has been sent.
966						request_response::Event::ResponseSent { request_id, peer, .. } => {
967							let arrival_time = self
968								.pending_responses_arrival_time
969								.remove(&(protocol.clone(), request_id).into())
970								.map(|t| t.elapsed())
971								.expect(
972									"Time is added for each inbound request on arrival and only \
973									 removed on success (`ResponseSent`) or failure \
974									 (`InboundFailure`). One can not receive a success event for a \
975									 request that either never arrived, or that has previously \
976									 failed; qed.",
977								);
978
979							if let Some(send_feedback) =
980								self.send_feedback.remove(&(protocol.clone(), request_id).into())
981							{
982								let _ = send_feedback.send(());
983							}
984
985							let out = Event::InboundRequest {
986								peer,
987								protocol: protocol.clone(),
988								result: Ok(arrival_time),
989							};
990
991							return Poll::Ready(ToSwarm::GenerateEvent(out));
992						},
993					};
994				}
995			}
996
997			// Send out fallback requests.
998			for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
999				if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
1000					Self::send_request_inner(
1001						behaviour,
1002						&mut self.pending_requests,
1003						&peer,
1004						protocol,
1005						request,
1006						None,
1007						pending_response,
1008						// We can error if not connected because the
1009						// previous attempt would have tried to establish a
1010						// connection already or errored and we wouldn't have gotten here.
1011						IfDisconnected::ImmediateError,
1012					);
1013				}
1014			}
1015
1016			break Poll::Pending;
1017		}
1018	}
1019}
1020
1021/// Error when registering a protocol.
1022#[derive(Debug, thiserror::Error)]
1023pub enum RegisterError {
1024	/// A protocol has been specified multiple times.
1025	#[error("{0}")]
1026	DuplicateProtocol(ProtocolName),
1027}
1028
1029/// Error when processing a request sent by a remote.
1030#[derive(Debug, thiserror::Error)]
1031pub enum ResponseFailure {
1032	/// Problem on the network.
1033	#[error("Problem on the network: {0}")]
1034	Network(InboundFailure),
1035}
1036
1037/// Implements the libp2p [`Codec`] trait. Defines how streams of bytes are turned
1038/// into requests and responses and vice-versa.
1039#[derive(Debug, Clone)]
1040#[doc(hidden)] // Needs to be public in order to satisfy the Rust compiler.
1041pub struct GenericCodec {
1042	max_request_size: u64,
1043	max_response_size: u64,
1044}
1045
1046#[async_trait::async_trait]
1047impl Codec for GenericCodec {
1048	type Protocol = ProtocolName;
1049	type Request = Vec<u8>;
1050	type Response = Result<Vec<u8>, ()>;
1051
1052	async fn read_request<T>(
1053		&mut self,
1054		_: &Self::Protocol,
1055		mut io: &mut T,
1056	) -> io::Result<Self::Request>
1057	where
1058		T: AsyncRead + Unpin + Send,
1059	{
1060		// Read the length.
1061		let length = unsigned_varint::aio::read_usize(&mut io)
1062			.await
1063			.map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1064		if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1065			return Err(io::Error::new(
1066				io::ErrorKind::InvalidInput,
1067				format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1068			));
1069		}
1070
1071		// Read the payload.
1072		let mut buffer = vec![0; length];
1073		io.read_exact(&mut buffer).await?;
1074		Ok(buffer)
1075	}
1076
1077	async fn read_response<T>(
1078		&mut self,
1079		_: &Self::Protocol,
1080		mut io: &mut T,
1081	) -> io::Result<Self::Response>
1082	where
1083		T: AsyncRead + Unpin + Send,
1084	{
1085		// Note that this function returns a `Result<Result<...>>`. Returning an `Err` is
1086		// considered as a protocol error and will result in the entire connection being closed.
1087		// Returning `Ok(Err(_))` signifies that a response has successfully been fetched, and
1088		// that this response is an error.
1089
1090		// Read the length.
1091		let length = match unsigned_varint::aio::read_usize(&mut io).await {
1092			Ok(l) => l,
1093			Err(unsigned_varint::io::ReadError::Io(err))
1094				if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1095			{
1096				return Ok(Err(()))
1097			},
1098			Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1099		};
1100
1101		if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1102			return Err(io::Error::new(
1103				io::ErrorKind::InvalidInput,
1104				format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1105			));
1106		}
1107
1108		// Read the payload.
1109		let mut buffer = vec![0; length];
1110		io.read_exact(&mut buffer).await?;
1111		Ok(Ok(buffer))
1112	}
1113
1114	async fn write_request<T>(
1115		&mut self,
1116		_: &Self::Protocol,
1117		io: &mut T,
1118		req: Self::Request,
1119	) -> io::Result<()>
1120	where
1121		T: AsyncWrite + Unpin + Send,
1122	{
1123		// TODO: check the length?
1124		// Write the length.
1125		{
1126			let mut buffer = unsigned_varint::encode::usize_buffer();
1127			io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1128		}
1129
1130		// Write the payload.
1131		io.write_all(&req).await?;
1132
1133		io.close().await?;
1134		Ok(())
1135	}
1136
1137	async fn write_response<T>(
1138		&mut self,
1139		_: &Self::Protocol,
1140		io: &mut T,
1141		res: Self::Response,
1142	) -> io::Result<()>
1143	where
1144		T: AsyncWrite + Unpin + Send,
1145	{
1146		// If `res` is an `Err`, we jump to closing the substream without writing anything on it.
1147		if let Ok(res) = res {
1148			// TODO: check the length?
1149			// Write the length.
1150			{
1151				let mut buffer = unsigned_varint::encode::usize_buffer();
1152				io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1153			}
1154
1155			// Write the payload.
1156			io.write_all(&res).await?;
1157		}
1158
1159		io.close().await?;
1160		Ok(())
1161	}
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166	use super::*;
1167
1168	use crate::mock::MockPeerStore;
1169	use assert_matches::assert_matches;
1170	use futures::channel::oneshot;
1171	use libp2p::{
1172		core::{
1173			transport::{MemoryTransport, Transport},
1174			upgrade,
1175		},
1176		identity::Keypair,
1177		noise,
1178		swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1179		Multiaddr,
1180	};
1181	use std::{iter, time::Duration};
1182
1183	struct TokioExecutor;
1184	impl Executor for TokioExecutor {
1185		fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1186			tokio::spawn(f);
1187		}
1188	}
1189
1190	fn build_swarm(
1191		list: impl Iterator<Item = ProtocolConfig>,
1192	) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1193		let keypair = Keypair::generate_ed25519();
1194
1195		let transport = MemoryTransport::new()
1196			.upgrade(upgrade::Version::V1)
1197			.authenticate(noise::Config::new(&keypair).unwrap())
1198			.multiplex(libp2p::yamux::Config::default())
1199			.boxed();
1200
1201		let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1202
1203		let mut swarm = Swarm::new(
1204			transport,
1205			behaviour,
1206			keypair.public().to_peer_id(),
1207			SwarmConfig::with_executor(TokioExecutor {})
1208				// This is taken care of by notification protocols in non-test environment
1209				// It is very slow in test environment for some reason, hence larger timeout
1210				.with_idle_connection_timeout(Duration::from_secs(10)),
1211		);
1212
1213		let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1214
1215		swarm.listen_on(listen_addr.clone()).unwrap();
1216
1217		(swarm, listen_addr)
1218	}
1219
1220	#[tokio::test]
1221	async fn basic_request_response_works() {
1222		let protocol_name = ProtocolName::from("/test/req-resp/1");
1223
1224		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1225		let mut swarms = (0..2)
1226			.map(|_| {
1227				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1228
1229				tokio::spawn(async move {
1230					while let Some(rq) = rx.next().await {
1231						let (fb_tx, fb_rx) = oneshot::channel();
1232						assert_eq!(rq.payload, b"this is a request");
1233						let _ = rq.pending_response.send(super::OutgoingResponse {
1234							result: Ok(b"this is a response".to_vec()),
1235							reputation_changes: Vec::new(),
1236							sent_feedback: Some(fb_tx),
1237						});
1238						fb_rx.await.unwrap();
1239					}
1240				});
1241
1242				let protocol_config = ProtocolConfig {
1243					name: protocol_name.clone(),
1244					fallback_names: Vec::new(),
1245					max_request_size: 1024,
1246					max_response_size: 1024 * 1024,
1247					request_timeout: Duration::from_secs(30),
1248					inbound_queue: Some(tx),
1249				};
1250
1251				build_swarm(iter::once(protocol_config))
1252			})
1253			.collect::<Vec<_>>();
1254
1255		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1256		// this test, so they wouldn't connect to each other.
1257		{
1258			let dial_addr = swarms[1].1.clone();
1259			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1260		}
1261
1262		let (mut swarm, _) = swarms.remove(0);
1263		// Running `swarm[0]` in the background.
1264		tokio::spawn(async move {
1265			loop {
1266				match swarm.select_next_some().await {
1267					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1268						result.unwrap();
1269					},
1270					_ => {},
1271				}
1272			}
1273		});
1274
1275		// Remove and run the remaining swarm.
1276		let (mut swarm, _) = swarms.remove(0);
1277		let mut response_receiver = None;
1278
1279		loop {
1280			match swarm.select_next_some().await {
1281				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1282					let (sender, receiver) = oneshot::channel();
1283					swarm.behaviour_mut().send_request(
1284						&peer_id,
1285						protocol_name.clone(),
1286						b"this is a request".to_vec(),
1287						None,
1288						sender,
1289						IfDisconnected::ImmediateError,
1290					);
1291					assert!(response_receiver.is_none());
1292					response_receiver = Some(receiver);
1293				},
1294				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1295					result.unwrap();
1296					break;
1297				},
1298				_ => {},
1299			}
1300		}
1301
1302		assert_eq!(
1303			response_receiver.unwrap().await.unwrap().unwrap(),
1304			(b"this is a response".to_vec(), protocol_name)
1305		);
1306	}
1307
1308	#[tokio::test]
1309	async fn max_response_size_exceeded() {
1310		let protocol_name = ProtocolName::from("/test/req-resp/1");
1311
1312		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1313		let mut swarms = (0..2)
1314			.map(|_| {
1315				let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1316
1317				tokio::spawn(async move {
1318					while let Some(rq) = rx.next().await {
1319						assert_eq!(rq.payload, b"this is a request");
1320						let _ = rq.pending_response.send(super::OutgoingResponse {
1321							result: Ok(b"this response exceeds the limit".to_vec()),
1322							reputation_changes: Vec::new(),
1323							sent_feedback: None,
1324						});
1325					}
1326				});
1327
1328				let protocol_config = ProtocolConfig {
1329					name: protocol_name.clone(),
1330					fallback_names: Vec::new(),
1331					max_request_size: 1024,
1332					max_response_size: 8, // <-- important for the test
1333					request_timeout: Duration::from_secs(30),
1334					inbound_queue: Some(tx),
1335				};
1336
1337				build_swarm(iter::once(protocol_config))
1338			})
1339			.collect::<Vec<_>>();
1340
1341		// Ask `swarm[0]` to dial `swarm[1]`. There isn't any discovery mechanism in place in
1342		// this test, so they wouldn't connect to each other.
1343		{
1344			let dial_addr = swarms[1].1.clone();
1345			Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1346		}
1347
1348		// Running `swarm[0]` in the background until a `InboundRequest` event happens,
1349		// which is a hint about the test having ended.
1350		let (mut swarm, _) = swarms.remove(0);
1351		tokio::spawn(async move {
1352			loop {
1353				match swarm.select_next_some().await {
1354					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1355						assert!(result.is_ok());
1356					},
1357					SwarmEvent::ConnectionClosed { .. } => {
1358						break;
1359					},
1360					_ => {},
1361				}
1362			}
1363		});
1364
1365		// Remove and run the remaining swarm.
1366		let (mut swarm, _) = swarms.remove(0);
1367
1368		let mut response_receiver = None;
1369
1370		loop {
1371			match swarm.select_next_some().await {
1372				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1373					let (sender, receiver) = oneshot::channel();
1374					swarm.behaviour_mut().send_request(
1375						&peer_id,
1376						protocol_name.clone(),
1377						b"this is a request".to_vec(),
1378						None,
1379						sender,
1380						IfDisconnected::ImmediateError,
1381					);
1382					assert!(response_receiver.is_none());
1383					response_receiver = Some(receiver);
1384				},
1385				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1386					assert!(result.is_err());
1387					break;
1388				},
1389				_ => {},
1390			}
1391		}
1392
1393		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1394			RequestFailure::Network(OutboundFailure::Io(_)) => {},
1395			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1396		}
1397	}
1398
1399	/// A `RequestId` is a unique identifier among either all inbound or all outbound requests for
1400	/// a single [`RequestResponsesBehaviour`] behaviour. It is not guaranteed to be unique across
1401	/// multiple [`RequestResponsesBehaviour`] behaviours. Thus, when handling `RequestId` in the
1402	/// context of multiple [`RequestResponsesBehaviour`] behaviours, one needs to couple the
1403	/// protocol name with the `RequestId` to get a unique request identifier.
1404	///
1405	/// This test ensures that two requests on different protocols can be handled concurrently
1406	/// without a `RequestId` collision.
1407	///
1408	/// See [`ProtocolRequestId`] for additional information.
1409	#[tokio::test]
1410	async fn request_id_collision() {
1411		let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1412		let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1413
1414		let mut swarm_1 = {
1415			let protocol_configs = vec![
1416				ProtocolConfig {
1417					name: protocol_name_1.clone(),
1418					fallback_names: Vec::new(),
1419					max_request_size: 1024,
1420					max_response_size: 1024 * 1024,
1421					request_timeout: Duration::from_secs(30),
1422					inbound_queue: None,
1423				},
1424				ProtocolConfig {
1425					name: protocol_name_2.clone(),
1426					fallback_names: Vec::new(),
1427					max_request_size: 1024,
1428					max_response_size: 1024 * 1024,
1429					request_timeout: Duration::from_secs(30),
1430					inbound_queue: None,
1431				},
1432			];
1433
1434			build_swarm(protocol_configs.into_iter()).0
1435		};
1436
1437		let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1438			let (tx_1, rx_1) = async_channel::bounded(64);
1439			let (tx_2, rx_2) = async_channel::bounded(64);
1440
1441			let protocol_configs = vec![
1442				ProtocolConfig {
1443					name: protocol_name_1.clone(),
1444					fallback_names: Vec::new(),
1445					max_request_size: 1024,
1446					max_response_size: 1024 * 1024,
1447					request_timeout: Duration::from_secs(30),
1448					inbound_queue: Some(tx_1),
1449				},
1450				ProtocolConfig {
1451					name: protocol_name_2.clone(),
1452					fallback_names: Vec::new(),
1453					max_request_size: 1024,
1454					max_response_size: 1024 * 1024,
1455					request_timeout: Duration::from_secs(30),
1456					inbound_queue: Some(tx_2),
1457				},
1458			];
1459
1460			let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1461
1462			(swarm, rx_1, rx_2, listen_addr)
1463		};
1464
1465		// Ask swarm 1 to dial swarm 2. There isn't any discovery mechanism in place in this test,
1466		// so they wouldn't connect to each other.
1467		swarm_1.dial(listen_add_2).unwrap();
1468
1469		// Run swarm 2 in the background, receiving two requests.
1470		tokio::spawn(async move {
1471			loop {
1472				match swarm_2.select_next_some().await {
1473					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1474						result.unwrap();
1475					},
1476					_ => {},
1477				}
1478			}
1479		});
1480
1481		// Handle both requests sent by swarm 1 to swarm 2 in the background.
1482		//
1483		// Make sure both requests overlap, by answering the first only after receiving the
1484		// second.
1485		tokio::spawn(async move {
1486			let protocol_1_request = swarm_2_handler_1.next().await;
1487			let protocol_2_request = swarm_2_handler_2.next().await;
1488
1489			protocol_1_request
1490				.unwrap()
1491				.pending_response
1492				.send(OutgoingResponse {
1493					result: Ok(b"this is a response".to_vec()),
1494					reputation_changes: Vec::new(),
1495					sent_feedback: None,
1496				})
1497				.unwrap();
1498			protocol_2_request
1499				.unwrap()
1500				.pending_response
1501				.send(OutgoingResponse {
1502					result: Ok(b"this is a response".to_vec()),
1503					reputation_changes: Vec::new(),
1504					sent_feedback: None,
1505				})
1506				.unwrap();
1507		});
1508
1509		// Have swarm 1 send two requests to swarm 2 and await responses.
1510
1511		let mut response_receivers = None;
1512		let mut num_responses = 0;
1513
1514		loop {
1515			match swarm_1.select_next_some().await {
1516				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1517					let (sender_1, receiver_1) = oneshot::channel();
1518					let (sender_2, receiver_2) = oneshot::channel();
1519					swarm_1.behaviour_mut().send_request(
1520						&peer_id,
1521						protocol_name_1.clone(),
1522						b"this is a request".to_vec(),
1523						None,
1524						sender_1,
1525						IfDisconnected::ImmediateError,
1526					);
1527					swarm_1.behaviour_mut().send_request(
1528						&peer_id,
1529						protocol_name_2.clone(),
1530						b"this is a request".to_vec(),
1531						None,
1532						sender_2,
1533						IfDisconnected::ImmediateError,
1534					);
1535					assert!(response_receivers.is_none());
1536					response_receivers = Some((receiver_1, receiver_2));
1537				},
1538				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1539					num_responses += 1;
1540					result.unwrap();
1541					if num_responses == 2 {
1542						break;
1543					}
1544				},
1545				_ => {},
1546			}
1547		}
1548		let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1549		assert_eq!(
1550			response_receiver_1.await.unwrap().unwrap(),
1551			(b"this is a response".to_vec(), protocol_name_1)
1552		);
1553		assert_eq!(
1554			response_receiver_2.await.unwrap().unwrap(),
1555			(b"this is a response".to_vec(), protocol_name_2)
1556		);
1557	}
1558
1559	#[tokio::test]
1560	async fn request_fallback() {
1561		let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1562		let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1563		let protocol_name_2 = ProtocolName::from("/test/another");
1564
1565		let protocol_config_1 = ProtocolConfig {
1566			name: protocol_name_1.clone(),
1567			fallback_names: Vec::new(),
1568			max_request_size: 1024,
1569			max_response_size: 1024 * 1024,
1570			request_timeout: Duration::from_secs(30),
1571			inbound_queue: None,
1572		};
1573		let protocol_config_1_fallback = ProtocolConfig {
1574			name: protocol_name_1_fallback.clone(),
1575			fallback_names: Vec::new(),
1576			max_request_size: 1024,
1577			max_response_size: 1024 * 1024,
1578			request_timeout: Duration::from_secs(30),
1579			inbound_queue: None,
1580		};
1581		let protocol_config_2 = ProtocolConfig {
1582			name: protocol_name_2.clone(),
1583			fallback_names: Vec::new(),
1584			max_request_size: 1024,
1585			max_response_size: 1024 * 1024,
1586			request_timeout: Duration::from_secs(30),
1587			inbound_queue: None,
1588		};
1589
1590		// This swarm only speaks protocol_name_1_fallback and protocol_name_2.
1591		// It only responds to requests.
1592		let mut older_swarm = {
1593			let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1594			let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1595			let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1596			protocol_config_1_fallback.inbound_queue = Some(tx_1);
1597
1598			let mut protocol_config_2 = protocol_config_2.clone();
1599			protocol_config_2.inbound_queue = Some(tx_2);
1600
1601			tokio::spawn(async move {
1602				for _ in 0..2 {
1603					if let Some(rq) = rx_1.next().await {
1604						let (fb_tx, fb_rx) = oneshot::channel();
1605						assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1606						let _ = rq.pending_response.send(super::OutgoingResponse {
1607							result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1608							reputation_changes: Vec::new(),
1609							sent_feedback: Some(fb_tx),
1610						});
1611						fb_rx.await.unwrap();
1612					}
1613				}
1614
1615				if let Some(rq) = rx_2.next().await {
1616					let (fb_tx, fb_rx) = oneshot::channel();
1617					assert_eq!(rq.payload, b"request on protocol /test/other");
1618					let _ = rq.pending_response.send(super::OutgoingResponse {
1619						result: Ok(b"this is a response on protocol /test/other".to_vec()),
1620						reputation_changes: Vec::new(),
1621						sent_feedback: Some(fb_tx),
1622					});
1623					fb_rx.await.unwrap();
1624				}
1625			});
1626
1627			build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1628		};
1629
1630		// This swarm speaks all protocols.
1631		let mut new_swarm = build_swarm(
1632			vec![
1633				protocol_config_1.clone(),
1634				protocol_config_1_fallback.clone(),
1635				protocol_config_2.clone(),
1636			]
1637			.into_iter(),
1638		);
1639
1640		{
1641			let dial_addr = older_swarm.1.clone();
1642			Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1643		}
1644
1645		// Running `older_swarm`` in the background.
1646		tokio::spawn(async move {
1647			loop {
1648				_ = older_swarm.0.select_next_some().await;
1649			}
1650		});
1651
1652		// Run the newer swarm. Attempt to make requests on all protocols.
1653		let (mut swarm, _) = new_swarm;
1654		let mut older_peer_id = None;
1655
1656		let mut response_receiver = None;
1657		// Try the new protocol with a fallback.
1658		loop {
1659			match swarm.select_next_some().await {
1660				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1661					older_peer_id = Some(peer_id);
1662					let (sender, receiver) = oneshot::channel();
1663					swarm.behaviour_mut().send_request(
1664						&peer_id,
1665						protocol_name_1.clone(),
1666						b"request on protocol /test/req-resp/2".to_vec(),
1667						Some((
1668							b"request on protocol /test/req-resp/1".to_vec(),
1669							protocol_config_1_fallback.name.clone(),
1670						)),
1671						sender,
1672						IfDisconnected::ImmediateError,
1673					);
1674					response_receiver = Some(receiver);
1675				},
1676				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1677					result.unwrap();
1678					break;
1679				},
1680				_ => {},
1681			}
1682		}
1683		assert_eq!(
1684			response_receiver.unwrap().await.unwrap().unwrap(),
1685			(
1686				b"this is a response on protocol /test/req-resp/1".to_vec(),
1687				protocol_name_1_fallback.clone()
1688			)
1689		);
1690		// Try the old protocol with a useless fallback.
1691		let (sender, response_receiver) = oneshot::channel();
1692		swarm.behaviour_mut().send_request(
1693			older_peer_id.as_ref().unwrap(),
1694			protocol_name_1_fallback.clone(),
1695			b"request on protocol /test/req-resp/1".to_vec(),
1696			Some((
1697				b"dummy request, will fail if processed".to_vec(),
1698				protocol_config_1_fallback.name.clone(),
1699			)),
1700			sender,
1701			IfDisconnected::ImmediateError,
1702		);
1703		loop {
1704			match swarm.select_next_some().await {
1705				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1706					result.unwrap();
1707					break;
1708				},
1709				_ => {},
1710			}
1711		}
1712		assert_eq!(
1713			response_receiver.await.unwrap().unwrap(),
1714			(
1715				b"this is a response on protocol /test/req-resp/1".to_vec(),
1716				protocol_name_1_fallback.clone()
1717			)
1718		);
1719		// Try the new protocol with no fallback. Should fail.
1720		let (sender, response_receiver) = oneshot::channel();
1721		swarm.behaviour_mut().send_request(
1722			older_peer_id.as_ref().unwrap(),
1723			protocol_name_1.clone(),
1724			b"request on protocol /test/req-resp-2".to_vec(),
1725			None,
1726			sender,
1727			IfDisconnected::ImmediateError,
1728		);
1729		loop {
1730			match swarm.select_next_some().await {
1731				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1732					assert_matches!(
1733						result.unwrap_err(),
1734						RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1735					);
1736					break;
1737				},
1738				_ => {},
1739			}
1740		}
1741		assert!(response_receiver.await.unwrap().is_err());
1742		// Try the other protocol with no fallback.
1743		let (sender, response_receiver) = oneshot::channel();
1744		swarm.behaviour_mut().send_request(
1745			older_peer_id.as_ref().unwrap(),
1746			protocol_name_2.clone(),
1747			b"request on protocol /test/other".to_vec(),
1748			None,
1749			sender,
1750			IfDisconnected::ImmediateError,
1751		);
1752		loop {
1753			match swarm.select_next_some().await {
1754				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1755					result.unwrap();
1756					break;
1757				},
1758				_ => {},
1759			}
1760		}
1761		assert_eq!(
1762			response_receiver.await.unwrap().unwrap(),
1763			(b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1764		);
1765	}
1766
1767	/// This test ensures the `RequestResponsesBehaviour` propagates back the Request::Timeout error
1768	/// even if the libp2p component hangs.
1769	///
1770	/// For testing purposes, the communication happens on the `/test/req-resp/1` protocol.
1771	///
1772	/// This is achieved by:
1773	/// - Two swarms are connected, the first one is slow to respond and has the timeout set to 10
1774	///   seconds. The second swarm is configured with a timeout of 10 seconds in libp2p, however in
1775	///   bizinikiwi this is set to 1 second.
1776	///
1777	/// - The first swarm introduces a delay of 2 seconds before responding to the request.
1778	///
1779	/// - The second swarm must enforce the 1 second timeout.
1780	#[tokio::test]
1781	async fn enforce_outbound_timeouts() {
1782		const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1783		const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1784
1785		// These swarms only speaks protocol_name.
1786		let protocol_name = ProtocolName::from("/test/req-resp/1");
1787
1788		let protocol_config = ProtocolConfig {
1789			name: protocol_name.clone(),
1790			fallback_names: Vec::new(),
1791			max_request_size: 1024,
1792			max_response_size: 1024 * 1024,
1793			request_timeout: REQUEST_TIMEOUT, // <-- important for the test
1794			inbound_queue: None,
1795		};
1796
1797		// Build swarms whose behaviour is [`RequestResponsesBehaviour`].
1798		let (mut first_swarm, _) = {
1799			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1800
1801			tokio::spawn(async move {
1802				if let Some(rq) = rx.next().await {
1803					assert_eq!(rq.payload, b"this is a request");
1804
1805					// Sleep for more than `REQUEST_TIMEOUT_SHORT` and less than
1806					// `REQUEST_TIMEOUT`.
1807					tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1808
1809					// By the time the response is sent back, the second swarm
1810					// received Timeout.
1811					let _ = rq.pending_response.send(super::OutgoingResponse {
1812						result: Ok(b"Second swarm already timedout".to_vec()),
1813						reputation_changes: Vec::new(),
1814						sent_feedback: None,
1815					});
1816				}
1817			});
1818
1819			let mut protocol_config = protocol_config.clone();
1820			protocol_config.inbound_queue = Some(tx);
1821
1822			build_swarm(iter::once(protocol_config))
1823		};
1824
1825		let (mut second_swarm, second_address) = {
1826			let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1827
1828			tokio::spawn(async move {
1829				while let Some(rq) = rx.next().await {
1830					let _ = rq.pending_response.send(super::OutgoingResponse {
1831						result: Ok(b"This is the response".to_vec()),
1832						reputation_changes: Vec::new(),
1833						sent_feedback: None,
1834					});
1835				}
1836			});
1837			let mut protocol_config = protocol_config.clone();
1838			protocol_config.inbound_queue = Some(tx);
1839
1840			build_swarm(iter::once(protocol_config.clone()))
1841		};
1842		// Modify the second swarm to have a shorter timeout.
1843		second_swarm
1844			.behaviour_mut()
1845			.protocols
1846			.get_mut(&protocol_name)
1847			.unwrap()
1848			.request_timeout = REQUEST_TIMEOUT_SHORT;
1849
1850		// Ask first swarm to dial the second swarm.
1851		{
1852			Swarm::dial(&mut first_swarm, second_address).unwrap();
1853		}
1854
1855		// Running the first swarm in the background until a `InboundRequest` event happens,
1856		// which is a hint about the test having ended.
1857		tokio::spawn(async move {
1858			loop {
1859				let event = first_swarm.select_next_some().await;
1860				match event {
1861					SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1862						assert!(result.is_ok());
1863						break;
1864					},
1865					SwarmEvent::ConnectionClosed { .. } => {
1866						break;
1867					},
1868					_ => {},
1869				}
1870			}
1871		});
1872
1873		// Run the second swarm.
1874		// - on connection established send the request to the first swarm
1875		// - expect to receive a timeout
1876		let mut response_receiver = None;
1877		loop {
1878			let event = second_swarm.select_next_some().await;
1879
1880			match event {
1881				SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1882					let (sender, receiver) = oneshot::channel();
1883					second_swarm.behaviour_mut().send_request(
1884						&peer_id,
1885						protocol_name.clone(),
1886						b"this is a request".to_vec(),
1887						None,
1888						sender,
1889						IfDisconnected::ImmediateError,
1890					);
1891					assert!(response_receiver.is_none());
1892					response_receiver = Some(receiver);
1893				},
1894				SwarmEvent::ConnectionClosed { .. } => {
1895					break;
1896				},
1897				SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1898					assert!(result.is_err());
1899					break;
1900				},
1901				_ => {},
1902			}
1903		}
1904
1905		// Expect the timeout.
1906		match response_receiver.unwrap().await.unwrap().unwrap_err() {
1907			RequestFailure::Network(OutboundFailure::Timeout) => {},
1908			request_failure => panic!("Unexpected failure: {request_failure:?}"),
1909		}
1910	}
1911}