1use crate::{
26 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
27 service::traits::RequestResponseConfig as RequestResponseConfigT,
28 types::ProtocolName,
29 ReputationChange,
30};
31
32use futures::{channel::oneshot, prelude::*};
33use libp2p::{
34 core::{transport::PortUse, Endpoint, Multiaddr},
35 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
36 swarm::{
37 behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
38 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
39 },
40 PeerId,
41};
42
43use std::{
44 collections::{hash_map::Entry, HashMap},
45 io, iter,
46 ops::Deref,
47 pin::Pin,
48 sync::Arc,
49 task::{Context, Poll},
50 time::{Duration, Instant},
51};
52
53pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
54
55const LOG_TARGET: &str = "sub-libp2p::request-response";
57
58const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
60
61#[derive(Debug, Clone, thiserror::Error)]
64pub enum OutboundFailure {
65 #[error("Failed to dial the requested peer")]
67 DialFailure,
68 #[error("Timeout while waiting for a response")]
70 Timeout,
71 #[error("Connection was closed before a response was received")]
73 ConnectionClosed,
74 #[error("The remote supports none of the requested protocols")]
76 UnsupportedProtocols,
77 #[error("An IO failure happened on an outbound stream")]
79 Io(Arc<io::Error>),
80}
81
82impl From<request_response::OutboundFailure> for OutboundFailure {
83 fn from(out: request_response::OutboundFailure) -> Self {
84 match out {
85 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
86 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
87 request_response::OutboundFailure::ConnectionClosed => {
88 OutboundFailure::ConnectionClosed
89 },
90 request_response::OutboundFailure::UnsupportedProtocols => {
91 OutboundFailure::UnsupportedProtocols
92 },
93 request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
94 }
95 }
96}
97
98#[derive(Debug, thiserror::Error)]
101pub enum InboundFailure {
102 #[error("Timeout while receiving request or sending response")]
105 Timeout,
106 #[error("Connection was closed before a response could be sent")]
108 ConnectionClosed,
109 #[error("The local peer supports none of the protocols requested by the remote")]
111 UnsupportedProtocols,
112 #[error("The response channel was dropped without sending a response to the remote")]
114 ResponseOmission,
115 #[error("An IO failure happened on an inbound stream")]
117 Io(Arc<io::Error>),
118}
119
120impl From<request_response::InboundFailure> for InboundFailure {
121 fn from(out: request_response::InboundFailure) -> Self {
122 match out {
123 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
124 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
125 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
126 request_response::InboundFailure::UnsupportedProtocols => {
127 InboundFailure::UnsupportedProtocols
128 },
129 request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
130 }
131 }
132}
133
134#[derive(Debug, thiserror::Error)]
136#[allow(missing_docs)]
137pub enum RequestFailure {
138 #[error("We are not currently connected to the requested peer.")]
139 NotConnected,
140 #[error("Given protocol hasn't been registered.")]
141 UnknownProtocol,
142 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
143 Refused,
144 #[error("The remote replied, but the local node is no longer interested in the response.")]
145 Obsolete,
146 #[error("Problem on the network: {0}")]
147 Network(OutboundFailure),
148}
149
150#[derive(Debug, Clone)]
152pub struct ProtocolConfig {
153 pub name: ProtocolName,
155
156 pub fallback_names: Vec<ProtocolName>,
158
159 pub max_request_size: u64,
164
165 pub max_response_size: u64,
170
171 pub request_timeout: Duration,
175
176 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
197}
198
199impl RequestResponseConfigT for ProtocolConfig {
200 fn protocol_name(&self) -> &ProtocolName {
201 &self.name
202 }
203}
204
205#[derive(Debug)]
207pub struct IncomingRequest {
208 pub peer: crate::types::PeerId,
210
211 pub payload: Vec<u8>,
214
215 pub pending_response: oneshot::Sender<OutgoingResponse>,
224}
225
226#[derive(Debug)]
228pub struct OutgoingResponse {
229 pub result: Result<Vec<u8>, ()>,
233
234 pub reputation_changes: Vec<ReputationChange>,
237
238 pub sent_feedback: Option<oneshot::Sender<()>>,
247}
248
249struct PendingRequest {
251 started_at: Instant,
253 response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
258 fallback_request: Option<(Vec<u8>, ProtocolName)>,
260}
261
262#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
264pub enum IfDisconnected {
265 TryConnect,
267 ImmediateError,
269}
270
271impl IfDisconnected {
273 pub fn should_connect(self) -> bool {
275 match self {
276 Self::TryConnect => true,
277 Self::ImmediateError => false,
278 }
279 }
280}
281
282#[derive(Debug)]
284pub enum Event {
285 InboundRequest {
289 peer: PeerId,
291 protocol: ProtocolName,
293 result: Result<Duration, ResponseFailure>,
298 },
299
300 RequestFinished {
305 peer: PeerId,
307 protocol: ProtocolName,
309 duration: Duration,
311 result: Result<(), RequestFailure>,
313 },
314
315 ReputationChanges {
317 peer: PeerId,
319 changes: Vec<ReputationChange>,
321 },
322}
323
324#[derive(Debug, Clone, PartialEq, Eq, Hash)]
331struct ProtocolRequestId<RequestId> {
332 protocol: ProtocolName,
333 request_id: RequestId,
334}
335
336impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
337 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
338 Self { protocol, request_id }
339 }
340}
341
342struct ProtocolDetails {
344 behaviour: Behaviour<GenericCodec>,
345 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
346 request_timeout: Duration,
347}
348
349pub struct RequestResponsesBehaviour {
351 protocols: HashMap<ProtocolName, ProtocolDetails>,
356
357 pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
359
360 pending_responses: stream::FuturesUnordered<
363 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
364 >,
365
366 pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
368
369 send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
372
373 peer_store: Arc<dyn PeerStoreProvider>,
375
376 periodic_request_check: tokio::time::Interval,
383}
384
385struct RequestProcessingOutcome {
387 peer: PeerId,
388 request_id: InboundRequestId,
389 protocol: ProtocolName,
390 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
391 response: OutgoingResponse,
392}
393
394impl RequestResponsesBehaviour {
395 pub fn new(
398 list: impl Iterator<Item = ProtocolConfig>,
399 peer_store: Arc<dyn PeerStoreProvider>,
400 ) -> Result<Self, RegisterError> {
401 let mut protocols = HashMap::new();
402 for protocol in list {
403 let cfg = Config::default().with_request_timeout(protocol.request_timeout);
404
405 let protocol_support = if protocol.inbound_queue.is_some() {
406 ProtocolSupport::Full
407 } else {
408 ProtocolSupport::Outbound
409 };
410
411 let behaviour = Behaviour::with_codec(
412 GenericCodec {
413 max_request_size: protocol.max_request_size,
414 max_response_size: protocol.max_response_size,
415 },
416 iter::once(protocol.name.clone())
417 .chain(protocol.fallback_names)
418 .zip(iter::repeat(protocol_support)),
419 cfg,
420 );
421
422 match protocols.entry(protocol.name) {
423 Entry::Vacant(e) => e.insert(ProtocolDetails {
424 behaviour,
425 inbound_queue: protocol.inbound_queue,
426 request_timeout: protocol.request_timeout,
427 }),
428 Entry::Occupied(e) => {
429 return Err(RegisterError::DuplicateProtocol(e.key().clone()))
430 },
431 };
432 }
433
434 Ok(Self {
435 protocols,
436 pending_requests: Default::default(),
437 pending_responses: Default::default(),
438 pending_responses_arrival_time: Default::default(),
439 send_feedback: Default::default(),
440 peer_store,
441 periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
442 })
443 }
444
445 pub fn send_request(
452 &mut self,
453 target: &PeerId,
454 protocol_name: ProtocolName,
455 request: Vec<u8>,
456 fallback_request: Option<(Vec<u8>, ProtocolName)>,
457 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
458 connect: IfDisconnected,
459 ) {
460 log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
461
462 if let Some(ProtocolDetails { behaviour, .. }) =
463 self.protocols.get_mut(protocol_name.deref())
464 {
465 Self::send_request_inner(
466 behaviour,
467 &mut self.pending_requests,
468 target,
469 protocol_name,
470 request,
471 fallback_request,
472 pending_response,
473 connect,
474 )
475 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
476 log::debug!(
477 target: LOG_TARGET,
478 "Unknown protocol {:?}. At the same time local \
479 node is no longer interested in the result.",
480 protocol_name,
481 );
482 }
483 }
484
485 fn send_request_inner(
486 behaviour: &mut Behaviour<GenericCodec>,
487 pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
488 target: &PeerId,
489 protocol_name: ProtocolName,
490 request: Vec<u8>,
491 fallback_request: Option<(Vec<u8>, ProtocolName)>,
492 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
493 connect: IfDisconnected,
494 ) {
495 if behaviour.is_connected(target) || connect.should_connect() {
496 let request_id = behaviour.send_request(target, request);
497 let prev_req_id = pending_requests.insert(
498 (protocol_name.to_string().into(), request_id).into(),
499 PendingRequest {
500 started_at: Instant::now(),
501 response_tx: Some(pending_response),
502 fallback_request,
503 },
504 );
505 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
506 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
507 log::debug!(
508 target: LOG_TARGET,
509 "Not connected to peer {:?}. At the same time local \
510 node is no longer interested in the result.",
511 target,
512 );
513 }
514 }
515}
516
517impl NetworkBehaviour for RequestResponsesBehaviour {
518 type ConnectionHandler =
519 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
520 type ToSwarm = Event;
521
522 fn handle_pending_inbound_connection(
523 &mut self,
524 _connection_id: ConnectionId,
525 _local_addr: &Multiaddr,
526 _remote_addr: &Multiaddr,
527 ) -> Result<(), ConnectionDenied> {
528 Ok(())
529 }
530
531 fn handle_pending_outbound_connection(
532 &mut self,
533 _connection_id: ConnectionId,
534 _maybe_peer: Option<PeerId>,
535 _addresses: &[Multiaddr],
536 _effective_role: Endpoint,
537 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
538 Ok(Vec::new())
539 }
540
541 fn handle_established_inbound_connection(
542 &mut self,
543 connection_id: ConnectionId,
544 peer: PeerId,
545 local_addr: &Multiaddr,
546 remote_addr: &Multiaddr,
547 ) -> Result<THandler<Self>, ConnectionDenied> {
548 let iter =
549 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
550 if let Ok(handler) = behaviour.handle_established_inbound_connection(
551 connection_id,
552 peer,
553 local_addr,
554 remote_addr,
555 ) {
556 Some((p.to_string(), handler))
557 } else {
558 None
559 }
560 });
561
562 Ok(MultiHandler::try_from_iter(iter).expect(
563 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
564 which is the only possible error; qed",
565 ))
566 }
567
568 fn handle_established_outbound_connection(
569 &mut self,
570 connection_id: ConnectionId,
571 peer: PeerId,
572 addr: &Multiaddr,
573 role_override: Endpoint,
574 port_use: PortUse,
575 ) -> Result<THandler<Self>, ConnectionDenied> {
576 let iter =
577 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
578 if let Ok(handler) = behaviour.handle_established_outbound_connection(
579 connection_id,
580 peer,
581 addr,
582 role_override,
583 port_use,
584 ) {
585 Some((p.to_string(), handler))
586 } else {
587 None
588 }
589 });
590
591 Ok(MultiHandler::try_from_iter(iter).expect(
592 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
593 which is the only possible error; qed",
594 ))
595 }
596
597 fn on_swarm_event(&mut self, event: FromSwarm) {
598 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
599 behaviour.on_swarm_event(event);
600 }
601 }
602
603 fn on_connection_handler_event(
604 &mut self,
605 peer_id: PeerId,
606 connection_id: ConnectionId,
607 event: THandlerOutEvent<Self>,
608 ) {
609 let p_name = event.0;
610 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
611 return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
612 } else {
613 log::warn!(
614 target: LOG_TARGET,
615 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
616 p_name
617 );
618 }
619 }
620
621 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
622 'poll_all: loop {
623 if self.periodic_request_check.poll_tick(cx).is_ready() {
625 self.pending_requests.retain(|id, req| {
626 let Some(ProtocolDetails { request_timeout, .. }) =
627 self.protocols.get(&id.protocol)
628 else {
629 log::warn!(
630 target: LOG_TARGET,
631 "Request {id:?} has no protocol registered.",
632 );
633
634 if let Some(response_tx) = req.response_tx.take() {
635 if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
636 log::debug!(
637 target: LOG_TARGET,
638 "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
639 );
640 }
641 }
642 return false
643 };
644
645 let elapsed = req.started_at.elapsed();
646 if elapsed > *request_timeout {
647 log::debug!(
648 target: LOG_TARGET,
649 "Request {id:?} force detected as timeout.",
650 );
651
652 if let Some(response_tx) = req.response_tx.take() {
653 if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
654 log::debug!(
655 target: LOG_TARGET,
656 "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
657 );
658 }
659 }
660
661 false
662 } else {
663 true
664 }
665 });
666 }
667
668 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
670 let RequestProcessingOutcome {
671 peer,
672 request_id,
673 protocol: protocol_name,
674 inner_channel,
675 response: OutgoingResponse { result, reputation_changes, sent_feedback },
676 } = match outcome {
677 Some(outcome) => outcome,
678 None => continue,
681 };
682
683 if let Ok(payload) = result {
684 if let Some(ProtocolDetails { behaviour, .. }) =
685 self.protocols.get_mut(&*protocol_name)
686 {
687 log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
688
689 if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
690 log::debug!(
693 target: LOG_TARGET,
694 "Failed to send response for {:?} on protocol {:?} due to a \
695 timeout or due to the connection to the peer being closed. \
696 Dropping response",
697 request_id, protocol_name,
698 );
699 } else if let Some(sent_feedback) = sent_feedback {
700 self.send_feedback
701 .insert((protocol_name, request_id).into(), sent_feedback);
702 }
703 }
704 }
705
706 if !reputation_changes.is_empty() {
707 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
708 peer,
709 changes: reputation_changes,
710 }));
711 }
712 }
713
714 let mut fallback_requests = vec![];
715
716 for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
718 {
719 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
720 let ev = match ev {
721 ToSwarm::GenerateEvent(ev) => ev,
723
724 ToSwarm::Dial { opts } => {
727 if opts.get_peer_id().is_none() {
728 log::error!(
729 target: LOG_TARGET,
730 "The request-response isn't supposed to start dialing addresses"
731 );
732 }
733 return Poll::Ready(ToSwarm::Dial { opts });
734 },
735 event => {
736 return Poll::Ready(
737 event.map_in(|event| ((*protocol).to_string(), event)).map_out(
738 |_| {
739 unreachable!(
740 "`GenerateEvent` is handled in a branch above; qed"
741 )
742 },
743 ),
744 );
745 },
746 };
747
748 match ev {
749 request_response::Event::Message {
751 peer,
752 message: Message::Request { request_id, request, channel, .. },
753 } => {
754 self.pending_responses_arrival_time
755 .insert((protocol.clone(), request_id).into(), Instant::now());
756
757 let reputation = self.peer_store.peer_reputation(&peer.into());
758
759 if reputation < BANNED_THRESHOLD {
760 log::debug!(
761 target: LOG_TARGET,
762 "Cannot handle requests from a node with a low reputation {}: {}",
763 peer,
764 reputation,
765 );
766 continue 'poll_protocol;
767 }
768
769 let (tx, rx) = oneshot::channel();
770
771 if let Some(resp_builder) = inbound_queue {
774 let _ = resp_builder.try_send(IncomingRequest {
781 peer: peer.into(),
782 payload: request,
783 pending_response: tx,
784 });
785 } else {
786 debug_assert!(false, "Received message on outbound-only protocol.");
787 }
788
789 let protocol = protocol.clone();
790
791 self.pending_responses.push(Box::pin(async move {
792 rx.await.map_or(None, |response| {
796 Some(RequestProcessingOutcome {
797 peer,
798 request_id,
799 protocol,
800 inner_channel: channel,
801 response,
802 })
803 })
804 }));
805
806 continue 'poll_all;
809 },
810
811 request_response::Event::Message {
813 peer,
814 message: Message::Response { request_id, response },
815 ..
816 } => {
817 let (started, delivered) = match self
818 .pending_requests
819 .remove(&(protocol.clone(), request_id).into())
820 {
821 Some(PendingRequest {
822 started_at,
823 response_tx: Some(response_tx),
824 ..
825 }) => {
826 log::trace!(
827 target: LOG_TARGET,
828 "received response from {peer} ({protocol:?}), {} bytes",
829 response.as_ref().map_or(0usize, |response| response.len()),
830 );
831
832 let delivered = response_tx
833 .send(
834 response
835 .map_err(|()| RequestFailure::Refused)
836 .map(|resp| (resp, protocol.clone())),
837 )
838 .map_err(|_| RequestFailure::Obsolete);
839 (started_at, delivered)
840 },
841 _ => {
842 log::debug!(
843 target: LOG_TARGET,
844 "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
845 request_id,
846 peer,
847 );
848 continue;
849 },
850 };
851
852 let out = Event::RequestFinished {
853 peer,
854 protocol: protocol.clone(),
855 duration: started.elapsed(),
856 result: delivered,
857 };
858
859 return Poll::Ready(ToSwarm::GenerateEvent(out));
860 },
861
862 request_response::Event::OutboundFailure {
864 peer,
865 request_id,
866 error,
867 ..
868 } => {
869 let error = OutboundFailure::from(error);
870 let started = match self
871 .pending_requests
872 .remove(&(protocol.clone(), request_id).into())
873 {
874 Some(PendingRequest {
875 started_at,
876 response_tx: Some(response_tx),
877 fallback_request,
878 }) => {
879 if matches!(error, OutboundFailure::UnsupportedProtocols) {
882 if let Some((fallback_request, fallback_protocol)) =
883 fallback_request
884 {
885 log::trace!(
886 target: LOG_TARGET,
887 "Request with id {:?} failed. Trying the fallback protocol. {}",
888 request_id,
889 fallback_protocol.deref()
890 );
891 fallback_requests.push((
892 peer,
893 fallback_protocol,
894 fallback_request,
895 response_tx,
896 ));
897 continue;
898 }
899 }
900
901 if response_tx
902 .send(Err(RequestFailure::Network(error.clone())))
903 .is_err()
904 {
905 log::debug!(
906 target: LOG_TARGET,
907 "Request with id {:?} failed. At the same time local \
908 node is no longer interested in the result.",
909 request_id,
910 );
911 }
912 started_at
913 },
914 _ => {
915 log::debug!(
916 target: LOG_TARGET,
917 "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
918 request_id,
919 error,
920 peer
921 );
922 continue;
923 },
924 };
925
926 let out = Event::RequestFinished {
927 peer,
928 protocol: protocol.clone(),
929 duration: started.elapsed(),
930 result: Err(RequestFailure::Network(error)),
931 };
932
933 return Poll::Ready(ToSwarm::GenerateEvent(out));
934 },
935
936 request_response::Event::InboundFailure {
939 request_id, peer, error, ..
940 } => {
941 self.pending_responses_arrival_time
942 .remove(&(protocol.clone(), request_id).into());
943 self.send_feedback.remove(&(protocol.clone(), request_id).into());
944 let out = Event::InboundRequest {
945 peer,
946 protocol: protocol.clone(),
947 result: Err(ResponseFailure::Network(error.into())),
948 };
949 return Poll::Ready(ToSwarm::GenerateEvent(out));
950 },
951
952 request_response::Event::ResponseSent { request_id, peer } => {
954 let arrival_time = self
955 .pending_responses_arrival_time
956 .remove(&(protocol.clone(), request_id).into())
957 .map(|t| t.elapsed())
958 .expect(
959 "Time is added for each inbound request on arrival and only \
960 removed on success (`ResponseSent`) or failure \
961 (`InboundFailure`). One can not receive a success event for a \
962 request that either never arrived, or that has previously \
963 failed; qed.",
964 );
965
966 if let Some(send_feedback) =
967 self.send_feedback.remove(&(protocol.clone(), request_id).into())
968 {
969 let _ = send_feedback.send(());
970 }
971
972 let out = Event::InboundRequest {
973 peer,
974 protocol: protocol.clone(),
975 result: Ok(arrival_time),
976 };
977
978 return Poll::Ready(ToSwarm::GenerateEvent(out));
979 },
980 };
981 }
982 }
983
984 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
986 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
987 Self::send_request_inner(
988 behaviour,
989 &mut self.pending_requests,
990 &peer,
991 protocol,
992 request,
993 None,
994 pending_response,
995 IfDisconnected::ImmediateError,
999 );
1000 }
1001 }
1002
1003 break Poll::Pending;
1004 }
1005 }
1006}
1007
1008#[derive(Debug, thiserror::Error)]
1010pub enum RegisterError {
1011 #[error("{0}")]
1013 DuplicateProtocol(ProtocolName),
1014}
1015
1016#[derive(Debug, thiserror::Error)]
1018pub enum ResponseFailure {
1019 #[error("Problem on the network: {0}")]
1021 Network(InboundFailure),
1022}
1023
1024#[derive(Debug, Clone)]
1027#[doc(hidden)] pub struct GenericCodec {
1029 max_request_size: u64,
1030 max_response_size: u64,
1031}
1032
1033#[async_trait::async_trait]
1034impl Codec for GenericCodec {
1035 type Protocol = ProtocolName;
1036 type Request = Vec<u8>;
1037 type Response = Result<Vec<u8>, ()>;
1038
1039 async fn read_request<T>(
1040 &mut self,
1041 _: &Self::Protocol,
1042 mut io: &mut T,
1043 ) -> io::Result<Self::Request>
1044 where
1045 T: AsyncRead + Unpin + Send,
1046 {
1047 let length = unsigned_varint::aio::read_usize(&mut io)
1049 .await
1050 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1051 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1052 return Err(io::Error::new(
1053 io::ErrorKind::InvalidInput,
1054 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1055 ));
1056 }
1057
1058 let mut buffer = vec![0; length];
1060 io.read_exact(&mut buffer).await?;
1061 Ok(buffer)
1062 }
1063
1064 async fn read_response<T>(
1065 &mut self,
1066 _: &Self::Protocol,
1067 mut io: &mut T,
1068 ) -> io::Result<Self::Response>
1069 where
1070 T: AsyncRead + Unpin + Send,
1071 {
1072 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1079 Ok(l) => l,
1080 Err(unsigned_varint::io::ReadError::Io(err))
1081 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1082 {
1083 return Ok(Err(()))
1084 },
1085 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1086 };
1087
1088 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1089 return Err(io::Error::new(
1090 io::ErrorKind::InvalidInput,
1091 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1092 ));
1093 }
1094
1095 let mut buffer = vec![0; length];
1097 io.read_exact(&mut buffer).await?;
1098 Ok(Ok(buffer))
1099 }
1100
1101 async fn write_request<T>(
1102 &mut self,
1103 _: &Self::Protocol,
1104 io: &mut T,
1105 req: Self::Request,
1106 ) -> io::Result<()>
1107 where
1108 T: AsyncWrite + Unpin + Send,
1109 {
1110 {
1113 let mut buffer = unsigned_varint::encode::usize_buffer();
1114 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1115 }
1116
1117 io.write_all(&req).await?;
1119
1120 io.close().await?;
1121 Ok(())
1122 }
1123
1124 async fn write_response<T>(
1125 &mut self,
1126 _: &Self::Protocol,
1127 io: &mut T,
1128 res: Self::Response,
1129 ) -> io::Result<()>
1130 where
1131 T: AsyncWrite + Unpin + Send,
1132 {
1133 if let Ok(res) = res {
1135 {
1138 let mut buffer = unsigned_varint::encode::usize_buffer();
1139 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1140 }
1141
1142 io.write_all(&res).await?;
1144 }
1145
1146 io.close().await?;
1147 Ok(())
1148 }
1149}
1150
1151#[cfg(test)]
1152mod tests {
1153 use super::*;
1154
1155 use crate::mock::MockPeerStore;
1156 use assert_matches::assert_matches;
1157 use futures::channel::oneshot;
1158 use libp2p::{
1159 core::{
1160 transport::{MemoryTransport, Transport},
1161 upgrade,
1162 },
1163 identity::Keypair,
1164 noise,
1165 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1166 Multiaddr,
1167 };
1168 use std::{iter, time::Duration};
1169
1170 struct TokioExecutor;
1171 impl Executor for TokioExecutor {
1172 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1173 tokio::spawn(f);
1174 }
1175 }
1176
1177 fn build_swarm(
1178 list: impl Iterator<Item = ProtocolConfig>,
1179 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1180 let keypair = Keypair::generate_ed25519();
1181
1182 let transport = MemoryTransport::new()
1183 .upgrade(upgrade::Version::V1)
1184 .authenticate(noise::Config::new(&keypair).unwrap())
1185 .multiplex(libp2p::yamux::Config::default())
1186 .boxed();
1187
1188 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1189
1190 let mut swarm = Swarm::new(
1191 transport,
1192 behaviour,
1193 keypair.public().to_peer_id(),
1194 SwarmConfig::with_executor(TokioExecutor {})
1195 .with_idle_connection_timeout(Duration::from_secs(10)),
1198 );
1199
1200 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1201
1202 swarm.listen_on(listen_addr.clone()).unwrap();
1203
1204 (swarm, listen_addr)
1205 }
1206
1207 #[tokio::test]
1208 async fn basic_request_response_works() {
1209 let protocol_name = ProtocolName::from("/test/req-resp/1");
1210
1211 let mut swarms = (0..2)
1213 .map(|_| {
1214 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1215
1216 tokio::spawn(async move {
1217 while let Some(rq) = rx.next().await {
1218 let (fb_tx, fb_rx) = oneshot::channel();
1219 assert_eq!(rq.payload, b"this is a request");
1220 let _ = rq.pending_response.send(super::OutgoingResponse {
1221 result: Ok(b"this is a response".to_vec()),
1222 reputation_changes: Vec::new(),
1223 sent_feedback: Some(fb_tx),
1224 });
1225 fb_rx.await.unwrap();
1226 }
1227 });
1228
1229 let protocol_config = ProtocolConfig {
1230 name: protocol_name.clone(),
1231 fallback_names: Vec::new(),
1232 max_request_size: 1024,
1233 max_response_size: 1024 * 1024,
1234 request_timeout: Duration::from_secs(30),
1235 inbound_queue: Some(tx),
1236 };
1237
1238 build_swarm(iter::once(protocol_config))
1239 })
1240 .collect::<Vec<_>>();
1241
1242 {
1245 let dial_addr = swarms[1].1.clone();
1246 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1247 }
1248
1249 let (mut swarm, _) = swarms.remove(0);
1250 tokio::spawn(async move {
1252 loop {
1253 match swarm.select_next_some().await {
1254 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1255 result.unwrap();
1256 },
1257 _ => {},
1258 }
1259 }
1260 });
1261
1262 let (mut swarm, _) = swarms.remove(0);
1264 let mut response_receiver = None;
1265
1266 loop {
1267 match swarm.select_next_some().await {
1268 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1269 let (sender, receiver) = oneshot::channel();
1270 swarm.behaviour_mut().send_request(
1271 &peer_id,
1272 protocol_name.clone(),
1273 b"this is a request".to_vec(),
1274 None,
1275 sender,
1276 IfDisconnected::ImmediateError,
1277 );
1278 assert!(response_receiver.is_none());
1279 response_receiver = Some(receiver);
1280 },
1281 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1282 result.unwrap();
1283 break;
1284 },
1285 _ => {},
1286 }
1287 }
1288
1289 assert_eq!(
1290 response_receiver.unwrap().await.unwrap().unwrap(),
1291 (b"this is a response".to_vec(), protocol_name)
1292 );
1293 }
1294
1295 #[tokio::test]
1296 async fn max_response_size_exceeded() {
1297 let protocol_name = ProtocolName::from("/test/req-resp/1");
1298
1299 let mut swarms = (0..2)
1301 .map(|_| {
1302 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1303
1304 tokio::spawn(async move {
1305 while let Some(rq) = rx.next().await {
1306 assert_eq!(rq.payload, b"this is a request");
1307 let _ = rq.pending_response.send(super::OutgoingResponse {
1308 result: Ok(b"this response exceeds the limit".to_vec()),
1309 reputation_changes: Vec::new(),
1310 sent_feedback: None,
1311 });
1312 }
1313 });
1314
1315 let protocol_config = ProtocolConfig {
1316 name: protocol_name.clone(),
1317 fallback_names: Vec::new(),
1318 max_request_size: 1024,
1319 max_response_size: 8, request_timeout: Duration::from_secs(30),
1321 inbound_queue: Some(tx),
1322 };
1323
1324 build_swarm(iter::once(protocol_config))
1325 })
1326 .collect::<Vec<_>>();
1327
1328 {
1331 let dial_addr = swarms[1].1.clone();
1332 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1333 }
1334
1335 let (mut swarm, _) = swarms.remove(0);
1338 tokio::spawn(async move {
1339 loop {
1340 match swarm.select_next_some().await {
1341 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1342 assert!(result.is_ok());
1343 },
1344 SwarmEvent::ConnectionClosed { .. } => {
1345 break;
1346 },
1347 _ => {},
1348 }
1349 }
1350 });
1351
1352 let (mut swarm, _) = swarms.remove(0);
1354
1355 let mut response_receiver = None;
1356
1357 loop {
1358 match swarm.select_next_some().await {
1359 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1360 let (sender, receiver) = oneshot::channel();
1361 swarm.behaviour_mut().send_request(
1362 &peer_id,
1363 protocol_name.clone(),
1364 b"this is a request".to_vec(),
1365 None,
1366 sender,
1367 IfDisconnected::ImmediateError,
1368 );
1369 assert!(response_receiver.is_none());
1370 response_receiver = Some(receiver);
1371 },
1372 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1373 assert!(result.is_err());
1374 break;
1375 },
1376 _ => {},
1377 }
1378 }
1379
1380 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1381 RequestFailure::Network(OutboundFailure::Io(_)) => {},
1382 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1383 }
1384 }
1385
1386 #[tokio::test]
1397 async fn request_id_collision() {
1398 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1399 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1400
1401 let mut swarm_1 = {
1402 let protocol_configs = vec![
1403 ProtocolConfig {
1404 name: protocol_name_1.clone(),
1405 fallback_names: Vec::new(),
1406 max_request_size: 1024,
1407 max_response_size: 1024 * 1024,
1408 request_timeout: Duration::from_secs(30),
1409 inbound_queue: None,
1410 },
1411 ProtocolConfig {
1412 name: protocol_name_2.clone(),
1413 fallback_names: Vec::new(),
1414 max_request_size: 1024,
1415 max_response_size: 1024 * 1024,
1416 request_timeout: Duration::from_secs(30),
1417 inbound_queue: None,
1418 },
1419 ];
1420
1421 build_swarm(protocol_configs.into_iter()).0
1422 };
1423
1424 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1425 let (tx_1, rx_1) = async_channel::bounded(64);
1426 let (tx_2, rx_2) = async_channel::bounded(64);
1427
1428 let protocol_configs = vec![
1429 ProtocolConfig {
1430 name: protocol_name_1.clone(),
1431 fallback_names: Vec::new(),
1432 max_request_size: 1024,
1433 max_response_size: 1024 * 1024,
1434 request_timeout: Duration::from_secs(30),
1435 inbound_queue: Some(tx_1),
1436 },
1437 ProtocolConfig {
1438 name: protocol_name_2.clone(),
1439 fallback_names: Vec::new(),
1440 max_request_size: 1024,
1441 max_response_size: 1024 * 1024,
1442 request_timeout: Duration::from_secs(30),
1443 inbound_queue: Some(tx_2),
1444 },
1445 ];
1446
1447 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1448
1449 (swarm, rx_1, rx_2, listen_addr)
1450 };
1451
1452 swarm_1.dial(listen_add_2).unwrap();
1455
1456 tokio::spawn(async move {
1458 loop {
1459 match swarm_2.select_next_some().await {
1460 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1461 result.unwrap();
1462 },
1463 _ => {},
1464 }
1465 }
1466 });
1467
1468 tokio::spawn(async move {
1473 let protocol_1_request = swarm_2_handler_1.next().await;
1474 let protocol_2_request = swarm_2_handler_2.next().await;
1475
1476 protocol_1_request
1477 .unwrap()
1478 .pending_response
1479 .send(OutgoingResponse {
1480 result: Ok(b"this is a response".to_vec()),
1481 reputation_changes: Vec::new(),
1482 sent_feedback: None,
1483 })
1484 .unwrap();
1485 protocol_2_request
1486 .unwrap()
1487 .pending_response
1488 .send(OutgoingResponse {
1489 result: Ok(b"this is a response".to_vec()),
1490 reputation_changes: Vec::new(),
1491 sent_feedback: None,
1492 })
1493 .unwrap();
1494 });
1495
1496 let mut response_receivers = None;
1499 let mut num_responses = 0;
1500
1501 loop {
1502 match swarm_1.select_next_some().await {
1503 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1504 let (sender_1, receiver_1) = oneshot::channel();
1505 let (sender_2, receiver_2) = oneshot::channel();
1506 swarm_1.behaviour_mut().send_request(
1507 &peer_id,
1508 protocol_name_1.clone(),
1509 b"this is a request".to_vec(),
1510 None,
1511 sender_1,
1512 IfDisconnected::ImmediateError,
1513 );
1514 swarm_1.behaviour_mut().send_request(
1515 &peer_id,
1516 protocol_name_2.clone(),
1517 b"this is a request".to_vec(),
1518 None,
1519 sender_2,
1520 IfDisconnected::ImmediateError,
1521 );
1522 assert!(response_receivers.is_none());
1523 response_receivers = Some((receiver_1, receiver_2));
1524 },
1525 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1526 num_responses += 1;
1527 result.unwrap();
1528 if num_responses == 2 {
1529 break;
1530 }
1531 },
1532 _ => {},
1533 }
1534 }
1535 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1536 assert_eq!(
1537 response_receiver_1.await.unwrap().unwrap(),
1538 (b"this is a response".to_vec(), protocol_name_1)
1539 );
1540 assert_eq!(
1541 response_receiver_2.await.unwrap().unwrap(),
1542 (b"this is a response".to_vec(), protocol_name_2)
1543 );
1544 }
1545
1546 #[tokio::test]
1547 async fn request_fallback() {
1548 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1549 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1550 let protocol_name_2 = ProtocolName::from("/test/another");
1551
1552 let protocol_config_1 = ProtocolConfig {
1553 name: protocol_name_1.clone(),
1554 fallback_names: Vec::new(),
1555 max_request_size: 1024,
1556 max_response_size: 1024 * 1024,
1557 request_timeout: Duration::from_secs(30),
1558 inbound_queue: None,
1559 };
1560 let protocol_config_1_fallback = ProtocolConfig {
1561 name: protocol_name_1_fallback.clone(),
1562 fallback_names: Vec::new(),
1563 max_request_size: 1024,
1564 max_response_size: 1024 * 1024,
1565 request_timeout: Duration::from_secs(30),
1566 inbound_queue: None,
1567 };
1568 let protocol_config_2 = ProtocolConfig {
1569 name: protocol_name_2.clone(),
1570 fallback_names: Vec::new(),
1571 max_request_size: 1024,
1572 max_response_size: 1024 * 1024,
1573 request_timeout: Duration::from_secs(30),
1574 inbound_queue: None,
1575 };
1576
1577 let mut older_swarm = {
1580 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1581 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1582 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1583 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1584
1585 let mut protocol_config_2 = protocol_config_2.clone();
1586 protocol_config_2.inbound_queue = Some(tx_2);
1587
1588 tokio::spawn(async move {
1589 for _ in 0..2 {
1590 if let Some(rq) = rx_1.next().await {
1591 let (fb_tx, fb_rx) = oneshot::channel();
1592 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1593 let _ = rq.pending_response.send(super::OutgoingResponse {
1594 result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1595 reputation_changes: Vec::new(),
1596 sent_feedback: Some(fb_tx),
1597 });
1598 fb_rx.await.unwrap();
1599 }
1600 }
1601
1602 if let Some(rq) = rx_2.next().await {
1603 let (fb_tx, fb_rx) = oneshot::channel();
1604 assert_eq!(rq.payload, b"request on protocol /test/other");
1605 let _ = rq.pending_response.send(super::OutgoingResponse {
1606 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1607 reputation_changes: Vec::new(),
1608 sent_feedback: Some(fb_tx),
1609 });
1610 fb_rx.await.unwrap();
1611 }
1612 });
1613
1614 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1615 };
1616
1617 let mut new_swarm = build_swarm(
1619 vec![
1620 protocol_config_1.clone(),
1621 protocol_config_1_fallback.clone(),
1622 protocol_config_2.clone(),
1623 ]
1624 .into_iter(),
1625 );
1626
1627 {
1628 let dial_addr = older_swarm.1.clone();
1629 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1630 }
1631
1632 tokio::spawn(async move {
1634 loop {
1635 _ = older_swarm.0.select_next_some().await;
1636 }
1637 });
1638
1639 let (mut swarm, _) = new_swarm;
1641 let mut older_peer_id = None;
1642
1643 let mut response_receiver = None;
1644 loop {
1646 match swarm.select_next_some().await {
1647 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1648 older_peer_id = Some(peer_id);
1649 let (sender, receiver) = oneshot::channel();
1650 swarm.behaviour_mut().send_request(
1651 &peer_id,
1652 protocol_name_1.clone(),
1653 b"request on protocol /test/req-resp/2".to_vec(),
1654 Some((
1655 b"request on protocol /test/req-resp/1".to_vec(),
1656 protocol_config_1_fallback.name.clone(),
1657 )),
1658 sender,
1659 IfDisconnected::ImmediateError,
1660 );
1661 response_receiver = Some(receiver);
1662 },
1663 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1664 result.unwrap();
1665 break;
1666 },
1667 _ => {},
1668 }
1669 }
1670 assert_eq!(
1671 response_receiver.unwrap().await.unwrap().unwrap(),
1672 (
1673 b"this is a response on protocol /test/req-resp/1".to_vec(),
1674 protocol_name_1_fallback.clone()
1675 )
1676 );
1677 let (sender, response_receiver) = oneshot::channel();
1679 swarm.behaviour_mut().send_request(
1680 older_peer_id.as_ref().unwrap(),
1681 protocol_name_1_fallback.clone(),
1682 b"request on protocol /test/req-resp/1".to_vec(),
1683 Some((
1684 b"dummy request, will fail if processed".to_vec(),
1685 protocol_config_1_fallback.name.clone(),
1686 )),
1687 sender,
1688 IfDisconnected::ImmediateError,
1689 );
1690 loop {
1691 match swarm.select_next_some().await {
1692 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1693 result.unwrap();
1694 break;
1695 },
1696 _ => {},
1697 }
1698 }
1699 assert_eq!(
1700 response_receiver.await.unwrap().unwrap(),
1701 (
1702 b"this is a response on protocol /test/req-resp/1".to_vec(),
1703 protocol_name_1_fallback.clone()
1704 )
1705 );
1706 let (sender, response_receiver) = oneshot::channel();
1708 swarm.behaviour_mut().send_request(
1709 older_peer_id.as_ref().unwrap(),
1710 protocol_name_1.clone(),
1711 b"request on protocol /test/req-resp-2".to_vec(),
1712 None,
1713 sender,
1714 IfDisconnected::ImmediateError,
1715 );
1716 loop {
1717 match swarm.select_next_some().await {
1718 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1719 assert_matches!(
1720 result.unwrap_err(),
1721 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1722 );
1723 break;
1724 },
1725 _ => {},
1726 }
1727 }
1728 assert!(response_receiver.await.unwrap().is_err());
1729 let (sender, response_receiver) = oneshot::channel();
1731 swarm.behaviour_mut().send_request(
1732 older_peer_id.as_ref().unwrap(),
1733 protocol_name_2.clone(),
1734 b"request on protocol /test/other".to_vec(),
1735 None,
1736 sender,
1737 IfDisconnected::ImmediateError,
1738 );
1739 loop {
1740 match swarm.select_next_some().await {
1741 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1742 result.unwrap();
1743 break;
1744 },
1745 _ => {},
1746 }
1747 }
1748 assert_eq!(
1749 response_receiver.await.unwrap().unwrap(),
1750 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1751 );
1752 }
1753
1754 #[tokio::test]
1768 async fn enforce_outbound_timeouts() {
1769 const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1770 const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1771
1772 let protocol_name = ProtocolName::from("/test/req-resp/1");
1774
1775 let protocol_config = ProtocolConfig {
1776 name: protocol_name.clone(),
1777 fallback_names: Vec::new(),
1778 max_request_size: 1024,
1779 max_response_size: 1024 * 1024,
1780 request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1782 };
1783
1784 let (mut first_swarm, _) = {
1786 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1787
1788 tokio::spawn(async move {
1789 if let Some(rq) = rx.next().await {
1790 assert_eq!(rq.payload, b"this is a request");
1791
1792 tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1795
1796 let _ = rq.pending_response.send(super::OutgoingResponse {
1799 result: Ok(b"Second swarm already timedout".to_vec()),
1800 reputation_changes: Vec::new(),
1801 sent_feedback: None,
1802 });
1803 }
1804 });
1805
1806 let mut protocol_config = protocol_config.clone();
1807 protocol_config.inbound_queue = Some(tx);
1808
1809 build_swarm(iter::once(protocol_config))
1810 };
1811
1812 let (mut second_swarm, second_address) = {
1813 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1814
1815 tokio::spawn(async move {
1816 while let Some(rq) = rx.next().await {
1817 let _ = rq.pending_response.send(super::OutgoingResponse {
1818 result: Ok(b"This is the response".to_vec()),
1819 reputation_changes: Vec::new(),
1820 sent_feedback: None,
1821 });
1822 }
1823 });
1824 let mut protocol_config = protocol_config.clone();
1825 protocol_config.inbound_queue = Some(tx);
1826
1827 build_swarm(iter::once(protocol_config.clone()))
1828 };
1829 second_swarm
1831 .behaviour_mut()
1832 .protocols
1833 .get_mut(&protocol_name)
1834 .unwrap()
1835 .request_timeout = REQUEST_TIMEOUT_SHORT;
1836
1837 {
1839 Swarm::dial(&mut first_swarm, second_address).unwrap();
1840 }
1841
1842 tokio::spawn(async move {
1845 loop {
1846 let event = first_swarm.select_next_some().await;
1847 match event {
1848 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1849 assert!(result.is_ok());
1850 break;
1851 },
1852 SwarmEvent::ConnectionClosed { .. } => {
1853 break;
1854 },
1855 _ => {},
1856 }
1857 }
1858 });
1859
1860 let mut response_receiver = None;
1864 loop {
1865 let event = second_swarm.select_next_some().await;
1866
1867 match event {
1868 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1869 let (sender, receiver) = oneshot::channel();
1870 second_swarm.behaviour_mut().send_request(
1871 &peer_id,
1872 protocol_name.clone(),
1873 b"this is a request".to_vec(),
1874 None,
1875 sender,
1876 IfDisconnected::ImmediateError,
1877 );
1878 assert!(response_receiver.is_none());
1879 response_receiver = Some(receiver);
1880 },
1881 SwarmEvent::ConnectionClosed { .. } => {
1882 break;
1883 },
1884 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1885 assert!(result.is_err());
1886 break;
1887 },
1888 _ => {},
1889 }
1890 }
1891
1892 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1894 RequestFailure::Network(OutboundFailure::Timeout) => {},
1895 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1896 }
1897 }
1898}