1use crate::{
38 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39 service::traits::RequestResponseConfig as RequestResponseConfigT,
40 types::ProtocolName,
41 ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46 core::{transport::PortUse, Endpoint, Multiaddr},
47 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48 swarm::{
49 behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51 },
52 PeerId,
53};
54
55use std::{
56 collections::{hash_map::Entry, HashMap},
57 io, iter,
58 ops::Deref,
59 pin::Pin,
60 sync::Arc,
61 task::{Context, Poll},
62 time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77 #[error("Failed to dial the requested peer")]
79 DialFailure,
80 #[error("Timeout while waiting for a response")]
82 Timeout,
83 #[error("Connection was closed before a response was received")]
85 ConnectionClosed,
86 #[error("The remote supports none of the requested protocols")]
88 UnsupportedProtocols,
89 #[error("An IO failure happened on an outbound stream")]
91 Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95 fn from(out: request_response::OutboundFailure) -> Self {
96 match out {
97 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99 request_response::OutboundFailure::ConnectionClosed => {
100 OutboundFailure::ConnectionClosed
101 },
102 request_response::OutboundFailure::UnsupportedProtocols => {
103 OutboundFailure::UnsupportedProtocols
104 },
105 request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
106 }
107 }
108}
109
110#[derive(Debug, thiserror::Error)]
113pub enum InboundFailure {
114 #[error("Timeout while receiving request or sending response")]
117 Timeout,
118 #[error("Connection was closed before a response could be sent")]
120 ConnectionClosed,
121 #[error("The local peer supports none of the protocols requested by the remote")]
123 UnsupportedProtocols,
124 #[error("The response channel was dropped without sending a response to the remote")]
126 ResponseOmission,
127 #[error("An IO failure happened on an inbound stream")]
129 Io(Arc<io::Error>),
130}
131
132impl From<request_response::InboundFailure> for InboundFailure {
133 fn from(out: request_response::InboundFailure) -> Self {
134 match out {
135 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
136 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
137 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
138 request_response::InboundFailure::UnsupportedProtocols => {
139 InboundFailure::UnsupportedProtocols
140 },
141 request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
142 }
143 }
144}
145
146#[derive(Debug, thiserror::Error)]
148#[allow(missing_docs)]
149pub enum RequestFailure {
150 #[error("We are not currently connected to the requested peer.")]
151 NotConnected,
152 #[error("Given protocol hasn't been registered.")]
153 UnknownProtocol,
154 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
155 Refused,
156 #[error("The remote replied, but the local node is no longer interested in the response.")]
157 Obsolete,
158 #[error("Problem on the network: {0}")]
159 Network(OutboundFailure),
160}
161
162#[derive(Debug, Clone)]
164pub struct ProtocolConfig {
165 pub name: ProtocolName,
167
168 pub fallback_names: Vec<ProtocolName>,
170
171 pub max_request_size: u64,
176
177 pub max_response_size: u64,
182
183 pub request_timeout: Duration,
187
188 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
209}
210
211impl RequestResponseConfigT for ProtocolConfig {
212 fn protocol_name(&self) -> &ProtocolName {
213 &self.name
214 }
215}
216
217#[derive(Debug)]
219pub struct IncomingRequest {
220 pub peer: sc_network_types::PeerId,
222
223 pub payload: Vec<u8>,
226
227 pub pending_response: oneshot::Sender<OutgoingResponse>,
236}
237
238#[derive(Debug)]
240pub struct OutgoingResponse {
241 pub result: Result<Vec<u8>, ()>,
245
246 pub reputation_changes: Vec<ReputationChange>,
249
250 pub sent_feedback: Option<oneshot::Sender<()>>,
259}
260
261struct PendingRequest {
263 started_at: Instant,
265 response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
270 fallback_request: Option<(Vec<u8>, ProtocolName)>,
272}
273
274#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
276pub enum IfDisconnected {
277 TryConnect,
279 ImmediateError,
281}
282
283impl IfDisconnected {
285 pub fn should_connect(self) -> bool {
287 match self {
288 Self::TryConnect => true,
289 Self::ImmediateError => false,
290 }
291 }
292}
293
294#[derive(Debug)]
296pub enum Event {
297 InboundRequest {
301 peer: PeerId,
303 protocol: ProtocolName,
305 result: Result<Duration, ResponseFailure>,
310 },
311
312 RequestFinished {
317 peer: PeerId,
319 protocol: ProtocolName,
321 duration: Duration,
323 result: Result<(), RequestFailure>,
325 },
326
327 ReputationChanges {
329 peer: PeerId,
331 changes: Vec<ReputationChange>,
333 },
334}
335
336#[derive(Debug, Clone, PartialEq, Eq, Hash)]
343struct ProtocolRequestId<RequestId> {
344 protocol: ProtocolName,
345 request_id: RequestId,
346}
347
348impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
349 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
350 Self { protocol, request_id }
351 }
352}
353
354struct ProtocolDetails {
356 behaviour: Behaviour<GenericCodec>,
357 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
358 request_timeout: Duration,
359}
360
361pub struct RequestResponsesBehaviour {
363 protocols: HashMap<ProtocolName, ProtocolDetails>,
368
369 pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
371
372 pending_responses: stream::FuturesUnordered<
375 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
376 >,
377
378 pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
380
381 send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
384
385 peer_store: Arc<dyn PeerStoreProvider>,
387
388 periodic_request_check: tokio::time::Interval,
395}
396
397struct RequestProcessingOutcome {
399 peer: PeerId,
400 request_id: InboundRequestId,
401 protocol: ProtocolName,
402 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
403 response: OutgoingResponse,
404}
405
406impl RequestResponsesBehaviour {
407 pub fn new(
410 list: impl Iterator<Item = ProtocolConfig>,
411 peer_store: Arc<dyn PeerStoreProvider>,
412 ) -> Result<Self, RegisterError> {
413 let mut protocols = HashMap::new();
414 for protocol in list {
415 let cfg = Config::default().with_request_timeout(protocol.request_timeout);
416
417 let protocol_support = if protocol.inbound_queue.is_some() {
418 ProtocolSupport::Full
419 } else {
420 ProtocolSupport::Outbound
421 };
422
423 let behaviour = Behaviour::with_codec(
424 GenericCodec {
425 max_request_size: protocol.max_request_size,
426 max_response_size: protocol.max_response_size,
427 },
428 iter::once(protocol.name.clone())
429 .chain(protocol.fallback_names)
430 .zip(iter::repeat(protocol_support)),
431 cfg,
432 );
433
434 match protocols.entry(protocol.name) {
435 Entry::Vacant(e) => e.insert(ProtocolDetails {
436 behaviour,
437 inbound_queue: protocol.inbound_queue,
438 request_timeout: protocol.request_timeout,
439 }),
440 Entry::Occupied(e) => {
441 return Err(RegisterError::DuplicateProtocol(e.key().clone()))
442 },
443 };
444 }
445
446 Ok(Self {
447 protocols,
448 pending_requests: Default::default(),
449 pending_responses: Default::default(),
450 pending_responses_arrival_time: Default::default(),
451 send_feedback: Default::default(),
452 peer_store,
453 periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
454 })
455 }
456
457 pub fn send_request(
464 &mut self,
465 target: &PeerId,
466 protocol_name: ProtocolName,
467 request: Vec<u8>,
468 fallback_request: Option<(Vec<u8>, ProtocolName)>,
469 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
470 connect: IfDisconnected,
471 ) {
472 log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
473
474 if let Some(ProtocolDetails { behaviour, .. }) =
475 self.protocols.get_mut(protocol_name.deref())
476 {
477 Self::send_request_inner(
478 behaviour,
479 &mut self.pending_requests,
480 target,
481 protocol_name,
482 request,
483 fallback_request,
484 pending_response,
485 connect,
486 )
487 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
488 log::debug!(
489 target: LOG_TARGET,
490 "Unknown protocol {:?}. At the same time local \
491 node is no longer interested in the result.",
492 protocol_name,
493 );
494 }
495 }
496
497 fn send_request_inner(
498 behaviour: &mut Behaviour<GenericCodec>,
499 pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
500 target: &PeerId,
501 protocol_name: ProtocolName,
502 request: Vec<u8>,
503 fallback_request: Option<(Vec<u8>, ProtocolName)>,
504 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
505 connect: IfDisconnected,
506 ) {
507 if behaviour.is_connected(target) || connect.should_connect() {
508 let request_id = behaviour.send_request(target, request);
509 let prev_req_id = pending_requests.insert(
510 (protocol_name.to_string().into(), request_id).into(),
511 PendingRequest {
512 started_at: Instant::now(),
513 response_tx: Some(pending_response),
514 fallback_request,
515 },
516 );
517 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
518 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
519 log::debug!(
520 target: LOG_TARGET,
521 "Not connected to peer {:?}. At the same time local \
522 node is no longer interested in the result.",
523 target,
524 );
525 }
526 }
527}
528
529impl NetworkBehaviour for RequestResponsesBehaviour {
530 type ConnectionHandler =
531 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
532 type ToSwarm = Event;
533
534 fn handle_pending_inbound_connection(
535 &mut self,
536 _connection_id: ConnectionId,
537 _local_addr: &Multiaddr,
538 _remote_addr: &Multiaddr,
539 ) -> Result<(), ConnectionDenied> {
540 Ok(())
541 }
542
543 fn handle_pending_outbound_connection(
544 &mut self,
545 _connection_id: ConnectionId,
546 _maybe_peer: Option<PeerId>,
547 _addresses: &[Multiaddr],
548 _effective_role: Endpoint,
549 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
550 Ok(Vec::new())
551 }
552
553 fn handle_established_inbound_connection(
554 &mut self,
555 connection_id: ConnectionId,
556 peer: PeerId,
557 local_addr: &Multiaddr,
558 remote_addr: &Multiaddr,
559 ) -> Result<THandler<Self>, ConnectionDenied> {
560 let iter =
561 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
562 if let Ok(handler) = behaviour.handle_established_inbound_connection(
563 connection_id,
564 peer,
565 local_addr,
566 remote_addr,
567 ) {
568 Some((p.to_string(), handler))
569 } else {
570 None
571 }
572 });
573
574 Ok(MultiHandler::try_from_iter(iter).expect(
575 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
576 which is the only possible error; qed",
577 ))
578 }
579
580 fn handle_established_outbound_connection(
581 &mut self,
582 connection_id: ConnectionId,
583 peer: PeerId,
584 addr: &Multiaddr,
585 role_override: Endpoint,
586 port_use: PortUse,
587 ) -> Result<THandler<Self>, ConnectionDenied> {
588 let iter =
589 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
590 if let Ok(handler) = behaviour.handle_established_outbound_connection(
591 connection_id,
592 peer,
593 addr,
594 role_override,
595 port_use,
596 ) {
597 Some((p.to_string(), handler))
598 } else {
599 None
600 }
601 });
602
603 Ok(MultiHandler::try_from_iter(iter).expect(
604 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
605 which is the only possible error; qed",
606 ))
607 }
608
609 fn on_swarm_event(&mut self, event: FromSwarm) {
610 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
611 behaviour.on_swarm_event(event);
612 }
613 }
614
615 fn on_connection_handler_event(
616 &mut self,
617 peer_id: PeerId,
618 connection_id: ConnectionId,
619 event: THandlerOutEvent<Self>,
620 ) {
621 let p_name = event.0;
622 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
623 return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
624 } else {
625 log::warn!(
626 target: LOG_TARGET,
627 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
628 p_name
629 );
630 }
631 }
632
633 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
634 'poll_all: loop {
635 if self.periodic_request_check.poll_tick(cx).is_ready() {
637 self.pending_requests.retain(|id, req| {
638 let Some(ProtocolDetails { request_timeout, .. }) =
639 self.protocols.get(&id.protocol)
640 else {
641 log::warn!(
642 target: LOG_TARGET,
643 "Request {id:?} has no protocol registered.",
644 );
645
646 if let Some(response_tx) = req.response_tx.take() {
647 if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
648 log::debug!(
649 target: LOG_TARGET,
650 "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
651 );
652 }
653 }
654 return false
655 };
656
657 let elapsed = req.started_at.elapsed();
658 if elapsed > *request_timeout {
659 log::debug!(
660 target: LOG_TARGET,
661 "Request {id:?} force detected as timeout.",
662 );
663
664 if let Some(response_tx) = req.response_tx.take() {
665 if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
666 log::debug!(
667 target: LOG_TARGET,
668 "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
669 );
670 }
671 }
672
673 false
674 } else {
675 true
676 }
677 });
678 }
679
680 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
682 let RequestProcessingOutcome {
683 peer,
684 request_id,
685 protocol: protocol_name,
686 inner_channel,
687 response: OutgoingResponse { result, reputation_changes, sent_feedback },
688 } = match outcome {
689 Some(outcome) => outcome,
690 None => continue,
693 };
694
695 if let Ok(payload) = result {
696 if let Some(ProtocolDetails { behaviour, .. }) =
697 self.protocols.get_mut(&*protocol_name)
698 {
699 log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
700
701 if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
702 log::debug!(
705 target: LOG_TARGET,
706 "Failed to send response for {:?} on protocol {:?} due to a \
707 timeout or due to the connection to the peer being closed. \
708 Dropping response",
709 request_id, protocol_name,
710 );
711 } else if let Some(sent_feedback) = sent_feedback {
712 self.send_feedback
713 .insert((protocol_name, request_id).into(), sent_feedback);
714 }
715 }
716 }
717
718 if !reputation_changes.is_empty() {
719 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
720 peer,
721 changes: reputation_changes,
722 }));
723 }
724 }
725
726 let mut fallback_requests = vec![];
727
728 for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
730 {
731 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
732 let ev = match ev {
733 ToSwarm::GenerateEvent(ev) => ev,
735
736 ToSwarm::Dial { opts } => {
739 if opts.get_peer_id().is_none() {
740 log::error!(
741 target: LOG_TARGET,
742 "The request-response isn't supposed to start dialing addresses"
743 );
744 }
745 return Poll::Ready(ToSwarm::Dial { opts });
746 },
747 event => {
748 return Poll::Ready(
749 event.map_in(|event| ((*protocol).to_string(), event)).map_out(
750 |_| {
751 unreachable!(
752 "`GenerateEvent` is handled in a branch above; qed"
753 )
754 },
755 ),
756 );
757 },
758 };
759
760 match ev {
761 request_response::Event::Message {
763 peer,
764 message: Message::Request { request_id, request, channel, .. },
765 } => {
766 self.pending_responses_arrival_time
767 .insert((protocol.clone(), request_id).into(), Instant::now());
768
769 let reputation = self.peer_store.peer_reputation(&peer.into());
770
771 if reputation < BANNED_THRESHOLD {
772 log::debug!(
773 target: LOG_TARGET,
774 "Cannot handle requests from a node with a low reputation {}: {}",
775 peer,
776 reputation,
777 );
778 continue 'poll_protocol;
779 }
780
781 let (tx, rx) = oneshot::channel();
782
783 if let Some(resp_builder) = inbound_queue {
786 let _ = resp_builder.try_send(IncomingRequest {
793 peer: peer.into(),
794 payload: request,
795 pending_response: tx,
796 });
797 } else {
798 debug_assert!(false, "Received message on outbound-only protocol.");
799 }
800
801 let protocol = protocol.clone();
802
803 self.pending_responses.push(Box::pin(async move {
804 rx.await.map_or(None, |response| {
808 Some(RequestProcessingOutcome {
809 peer,
810 request_id,
811 protocol,
812 inner_channel: channel,
813 response,
814 })
815 })
816 }));
817
818 continue 'poll_all;
821 },
822
823 request_response::Event::Message {
825 peer,
826 message: Message::Response { request_id, response },
827 ..
828 } => {
829 let (started, delivered) = match self
830 .pending_requests
831 .remove(&(protocol.clone(), request_id).into())
832 {
833 Some(PendingRequest {
834 started_at,
835 response_tx: Some(response_tx),
836 ..
837 }) => {
838 log::trace!(
839 target: LOG_TARGET,
840 "received response from {peer} ({protocol:?}), {} bytes",
841 response.as_ref().map_or(0usize, |response| response.len()),
842 );
843
844 let delivered = response_tx
845 .send(
846 response
847 .map_err(|()| RequestFailure::Refused)
848 .map(|resp| (resp, protocol.clone())),
849 )
850 .map_err(|_| RequestFailure::Obsolete);
851 (started_at, delivered)
852 },
853 _ => {
854 log::debug!(
855 target: LOG_TARGET,
856 "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
857 request_id,
858 peer,
859 );
860 continue;
861 },
862 };
863
864 let out = Event::RequestFinished {
865 peer,
866 protocol: protocol.clone(),
867 duration: started.elapsed(),
868 result: delivered,
869 };
870
871 return Poll::Ready(ToSwarm::GenerateEvent(out));
872 },
873
874 request_response::Event::OutboundFailure {
876 peer,
877 request_id,
878 error,
879 ..
880 } => {
881 let error = OutboundFailure::from(error);
882 let started = match self
883 .pending_requests
884 .remove(&(protocol.clone(), request_id).into())
885 {
886 Some(PendingRequest {
887 started_at,
888 response_tx: Some(response_tx),
889 fallback_request,
890 }) => {
891 if matches!(error, OutboundFailure::UnsupportedProtocols) {
894 if let Some((fallback_request, fallback_protocol)) =
895 fallback_request
896 {
897 log::trace!(
898 target: LOG_TARGET,
899 "Request with id {:?} failed. Trying the fallback protocol. {}",
900 request_id,
901 fallback_protocol.deref()
902 );
903 fallback_requests.push((
904 peer,
905 fallback_protocol,
906 fallback_request,
907 response_tx,
908 ));
909 continue;
910 }
911 }
912
913 if response_tx
914 .send(Err(RequestFailure::Network(error.clone())))
915 .is_err()
916 {
917 log::debug!(
918 target: LOG_TARGET,
919 "Request with id {:?} failed. At the same time local \
920 node is no longer interested in the result.",
921 request_id,
922 );
923 }
924 started_at
925 },
926 _ => {
927 log::debug!(
928 target: LOG_TARGET,
929 "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
930 request_id,
931 error,
932 peer
933 );
934 continue;
935 },
936 };
937
938 let out = Event::RequestFinished {
939 peer,
940 protocol: protocol.clone(),
941 duration: started.elapsed(),
942 result: Err(RequestFailure::Network(error)),
943 };
944
945 return Poll::Ready(ToSwarm::GenerateEvent(out));
946 },
947
948 request_response::Event::InboundFailure {
951 request_id, peer, error, ..
952 } => {
953 self.pending_responses_arrival_time
954 .remove(&(protocol.clone(), request_id).into());
955 self.send_feedback.remove(&(protocol.clone(), request_id).into());
956 let out = Event::InboundRequest {
957 peer,
958 protocol: protocol.clone(),
959 result: Err(ResponseFailure::Network(error.into())),
960 };
961 return Poll::Ready(ToSwarm::GenerateEvent(out));
962 },
963
964 request_response::Event::ResponseSent { request_id, peer } => {
966 let arrival_time = self
967 .pending_responses_arrival_time
968 .remove(&(protocol.clone(), request_id).into())
969 .map(|t| t.elapsed())
970 .expect(
971 "Time is added for each inbound request on arrival and only \
972 removed on success (`ResponseSent`) or failure \
973 (`InboundFailure`). One can not receive a success event for a \
974 request that either never arrived, or that has previously \
975 failed; qed.",
976 );
977
978 if let Some(send_feedback) =
979 self.send_feedback.remove(&(protocol.clone(), request_id).into())
980 {
981 let _ = send_feedback.send(());
982 }
983
984 let out = Event::InboundRequest {
985 peer,
986 protocol: protocol.clone(),
987 result: Ok(arrival_time),
988 };
989
990 return Poll::Ready(ToSwarm::GenerateEvent(out));
991 },
992 };
993 }
994 }
995
996 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
998 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
999 Self::send_request_inner(
1000 behaviour,
1001 &mut self.pending_requests,
1002 &peer,
1003 protocol,
1004 request,
1005 None,
1006 pending_response,
1007 IfDisconnected::ImmediateError,
1011 );
1012 }
1013 }
1014
1015 break Poll::Pending;
1016 }
1017 }
1018}
1019
1020#[derive(Debug, thiserror::Error)]
1022pub enum RegisterError {
1023 #[error("{0}")]
1025 DuplicateProtocol(ProtocolName),
1026}
1027
1028#[derive(Debug, thiserror::Error)]
1030pub enum ResponseFailure {
1031 #[error("Problem on the network: {0}")]
1033 Network(InboundFailure),
1034}
1035
1036#[derive(Debug, Clone)]
1039#[doc(hidden)] pub struct GenericCodec {
1041 max_request_size: u64,
1042 max_response_size: u64,
1043}
1044
1045#[async_trait::async_trait]
1046impl Codec for GenericCodec {
1047 type Protocol = ProtocolName;
1048 type Request = Vec<u8>;
1049 type Response = Result<Vec<u8>, ()>;
1050
1051 async fn read_request<T>(
1052 &mut self,
1053 _: &Self::Protocol,
1054 mut io: &mut T,
1055 ) -> io::Result<Self::Request>
1056 where
1057 T: AsyncRead + Unpin + Send,
1058 {
1059 let length = unsigned_varint::aio::read_usize(&mut io)
1061 .await
1062 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1063 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1064 return Err(io::Error::new(
1065 io::ErrorKind::InvalidInput,
1066 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1067 ));
1068 }
1069
1070 let mut buffer = vec![0; length];
1072 io.read_exact(&mut buffer).await?;
1073 Ok(buffer)
1074 }
1075
1076 async fn read_response<T>(
1077 &mut self,
1078 _: &Self::Protocol,
1079 mut io: &mut T,
1080 ) -> io::Result<Self::Response>
1081 where
1082 T: AsyncRead + Unpin + Send,
1083 {
1084 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1091 Ok(l) => l,
1092 Err(unsigned_varint::io::ReadError::Io(err))
1093 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1094 {
1095 return Ok(Err(()))
1096 },
1097 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1098 };
1099
1100 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1101 return Err(io::Error::new(
1102 io::ErrorKind::InvalidInput,
1103 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1104 ));
1105 }
1106
1107 let mut buffer = vec![0; length];
1109 io.read_exact(&mut buffer).await?;
1110 Ok(Ok(buffer))
1111 }
1112
1113 async fn write_request<T>(
1114 &mut self,
1115 _: &Self::Protocol,
1116 io: &mut T,
1117 req: Self::Request,
1118 ) -> io::Result<()>
1119 where
1120 T: AsyncWrite + Unpin + Send,
1121 {
1122 {
1125 let mut buffer = unsigned_varint::encode::usize_buffer();
1126 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1127 }
1128
1129 io.write_all(&req).await?;
1131
1132 io.close().await?;
1133 Ok(())
1134 }
1135
1136 async fn write_response<T>(
1137 &mut self,
1138 _: &Self::Protocol,
1139 io: &mut T,
1140 res: Self::Response,
1141 ) -> io::Result<()>
1142 where
1143 T: AsyncWrite + Unpin + Send,
1144 {
1145 if let Ok(res) = res {
1147 {
1150 let mut buffer = unsigned_varint::encode::usize_buffer();
1151 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1152 }
1153
1154 io.write_all(&res).await?;
1156 }
1157
1158 io.close().await?;
1159 Ok(())
1160 }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165 use super::*;
1166
1167 use crate::mock::MockPeerStore;
1168 use assert_matches::assert_matches;
1169 use futures::channel::oneshot;
1170 use libp2p::{
1171 core::{
1172 transport::{MemoryTransport, Transport},
1173 upgrade,
1174 },
1175 identity::Keypair,
1176 noise,
1177 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1178 Multiaddr,
1179 };
1180 use std::{iter, time::Duration};
1181
1182 struct TokioExecutor;
1183 impl Executor for TokioExecutor {
1184 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1185 tokio::spawn(f);
1186 }
1187 }
1188
1189 fn build_swarm(
1190 list: impl Iterator<Item = ProtocolConfig>,
1191 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1192 let keypair = Keypair::generate_ed25519();
1193
1194 let transport = MemoryTransport::new()
1195 .upgrade(upgrade::Version::V1)
1196 .authenticate(noise::Config::new(&keypair).unwrap())
1197 .multiplex(libp2p::yamux::Config::default())
1198 .boxed();
1199
1200 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1201
1202 let mut swarm = Swarm::new(
1203 transport,
1204 behaviour,
1205 keypair.public().to_peer_id(),
1206 SwarmConfig::with_executor(TokioExecutor {})
1207 .with_idle_connection_timeout(Duration::from_secs(10)),
1210 );
1211
1212 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1213
1214 swarm.listen_on(listen_addr.clone()).unwrap();
1215
1216 (swarm, listen_addr)
1217 }
1218
1219 #[tokio::test]
1220 async fn basic_request_response_works() {
1221 let protocol_name = ProtocolName::from("/test/req-resp/1");
1222
1223 let mut swarms = (0..2)
1225 .map(|_| {
1226 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1227
1228 tokio::spawn(async move {
1229 while let Some(rq) = rx.next().await {
1230 let (fb_tx, fb_rx) = oneshot::channel();
1231 assert_eq!(rq.payload, b"this is a request");
1232 let _ = rq.pending_response.send(super::OutgoingResponse {
1233 result: Ok(b"this is a response".to_vec()),
1234 reputation_changes: Vec::new(),
1235 sent_feedback: Some(fb_tx),
1236 });
1237 fb_rx.await.unwrap();
1238 }
1239 });
1240
1241 let protocol_config = ProtocolConfig {
1242 name: protocol_name.clone(),
1243 fallback_names: Vec::new(),
1244 max_request_size: 1024,
1245 max_response_size: 1024 * 1024,
1246 request_timeout: Duration::from_secs(30),
1247 inbound_queue: Some(tx),
1248 };
1249
1250 build_swarm(iter::once(protocol_config))
1251 })
1252 .collect::<Vec<_>>();
1253
1254 {
1257 let dial_addr = swarms[1].1.clone();
1258 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1259 }
1260
1261 let (mut swarm, _) = swarms.remove(0);
1262 tokio::spawn(async move {
1264 loop {
1265 match swarm.select_next_some().await {
1266 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1267 result.unwrap();
1268 },
1269 _ => {},
1270 }
1271 }
1272 });
1273
1274 let (mut swarm, _) = swarms.remove(0);
1276 let mut response_receiver = None;
1277
1278 loop {
1279 match swarm.select_next_some().await {
1280 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1281 let (sender, receiver) = oneshot::channel();
1282 swarm.behaviour_mut().send_request(
1283 &peer_id,
1284 protocol_name.clone(),
1285 b"this is a request".to_vec(),
1286 None,
1287 sender,
1288 IfDisconnected::ImmediateError,
1289 );
1290 assert!(response_receiver.is_none());
1291 response_receiver = Some(receiver);
1292 },
1293 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1294 result.unwrap();
1295 break;
1296 },
1297 _ => {},
1298 }
1299 }
1300
1301 assert_eq!(
1302 response_receiver.unwrap().await.unwrap().unwrap(),
1303 (b"this is a response".to_vec(), protocol_name)
1304 );
1305 }
1306
1307 #[tokio::test]
1308 async fn max_response_size_exceeded() {
1309 let protocol_name = ProtocolName::from("/test/req-resp/1");
1310
1311 let mut swarms = (0..2)
1313 .map(|_| {
1314 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1315
1316 tokio::spawn(async move {
1317 while let Some(rq) = rx.next().await {
1318 assert_eq!(rq.payload, b"this is a request");
1319 let _ = rq.pending_response.send(super::OutgoingResponse {
1320 result: Ok(b"this response exceeds the limit".to_vec()),
1321 reputation_changes: Vec::new(),
1322 sent_feedback: None,
1323 });
1324 }
1325 });
1326
1327 let protocol_config = ProtocolConfig {
1328 name: protocol_name.clone(),
1329 fallback_names: Vec::new(),
1330 max_request_size: 1024,
1331 max_response_size: 8, request_timeout: Duration::from_secs(30),
1333 inbound_queue: Some(tx),
1334 };
1335
1336 build_swarm(iter::once(protocol_config))
1337 })
1338 .collect::<Vec<_>>();
1339
1340 {
1343 let dial_addr = swarms[1].1.clone();
1344 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1345 }
1346
1347 let (mut swarm, _) = swarms.remove(0);
1350 tokio::spawn(async move {
1351 loop {
1352 match swarm.select_next_some().await {
1353 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1354 assert!(result.is_ok());
1355 },
1356 SwarmEvent::ConnectionClosed { .. } => {
1357 break;
1358 },
1359 _ => {},
1360 }
1361 }
1362 });
1363
1364 let (mut swarm, _) = swarms.remove(0);
1366
1367 let mut response_receiver = None;
1368
1369 loop {
1370 match swarm.select_next_some().await {
1371 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1372 let (sender, receiver) = oneshot::channel();
1373 swarm.behaviour_mut().send_request(
1374 &peer_id,
1375 protocol_name.clone(),
1376 b"this is a request".to_vec(),
1377 None,
1378 sender,
1379 IfDisconnected::ImmediateError,
1380 );
1381 assert!(response_receiver.is_none());
1382 response_receiver = Some(receiver);
1383 },
1384 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1385 assert!(result.is_err());
1386 break;
1387 },
1388 _ => {},
1389 }
1390 }
1391
1392 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1393 RequestFailure::Network(OutboundFailure::Io(_)) => {},
1394 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1395 }
1396 }
1397
1398 #[tokio::test]
1409 async fn request_id_collision() {
1410 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1411 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1412
1413 let mut swarm_1 = {
1414 let protocol_configs = vec![
1415 ProtocolConfig {
1416 name: protocol_name_1.clone(),
1417 fallback_names: Vec::new(),
1418 max_request_size: 1024,
1419 max_response_size: 1024 * 1024,
1420 request_timeout: Duration::from_secs(30),
1421 inbound_queue: None,
1422 },
1423 ProtocolConfig {
1424 name: protocol_name_2.clone(),
1425 fallback_names: Vec::new(),
1426 max_request_size: 1024,
1427 max_response_size: 1024 * 1024,
1428 request_timeout: Duration::from_secs(30),
1429 inbound_queue: None,
1430 },
1431 ];
1432
1433 build_swarm(protocol_configs.into_iter()).0
1434 };
1435
1436 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1437 let (tx_1, rx_1) = async_channel::bounded(64);
1438 let (tx_2, rx_2) = async_channel::bounded(64);
1439
1440 let protocol_configs = vec![
1441 ProtocolConfig {
1442 name: protocol_name_1.clone(),
1443 fallback_names: Vec::new(),
1444 max_request_size: 1024,
1445 max_response_size: 1024 * 1024,
1446 request_timeout: Duration::from_secs(30),
1447 inbound_queue: Some(tx_1),
1448 },
1449 ProtocolConfig {
1450 name: protocol_name_2.clone(),
1451 fallback_names: Vec::new(),
1452 max_request_size: 1024,
1453 max_response_size: 1024 * 1024,
1454 request_timeout: Duration::from_secs(30),
1455 inbound_queue: Some(tx_2),
1456 },
1457 ];
1458
1459 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1460
1461 (swarm, rx_1, rx_2, listen_addr)
1462 };
1463
1464 swarm_1.dial(listen_add_2).unwrap();
1467
1468 tokio::spawn(async move {
1470 loop {
1471 match swarm_2.select_next_some().await {
1472 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1473 result.unwrap();
1474 },
1475 _ => {},
1476 }
1477 }
1478 });
1479
1480 tokio::spawn(async move {
1485 let protocol_1_request = swarm_2_handler_1.next().await;
1486 let protocol_2_request = swarm_2_handler_2.next().await;
1487
1488 protocol_1_request
1489 .unwrap()
1490 .pending_response
1491 .send(OutgoingResponse {
1492 result: Ok(b"this is a response".to_vec()),
1493 reputation_changes: Vec::new(),
1494 sent_feedback: None,
1495 })
1496 .unwrap();
1497 protocol_2_request
1498 .unwrap()
1499 .pending_response
1500 .send(OutgoingResponse {
1501 result: Ok(b"this is a response".to_vec()),
1502 reputation_changes: Vec::new(),
1503 sent_feedback: None,
1504 })
1505 .unwrap();
1506 });
1507
1508 let mut response_receivers = None;
1511 let mut num_responses = 0;
1512
1513 loop {
1514 match swarm_1.select_next_some().await {
1515 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1516 let (sender_1, receiver_1) = oneshot::channel();
1517 let (sender_2, receiver_2) = oneshot::channel();
1518 swarm_1.behaviour_mut().send_request(
1519 &peer_id,
1520 protocol_name_1.clone(),
1521 b"this is a request".to_vec(),
1522 None,
1523 sender_1,
1524 IfDisconnected::ImmediateError,
1525 );
1526 swarm_1.behaviour_mut().send_request(
1527 &peer_id,
1528 protocol_name_2.clone(),
1529 b"this is a request".to_vec(),
1530 None,
1531 sender_2,
1532 IfDisconnected::ImmediateError,
1533 );
1534 assert!(response_receivers.is_none());
1535 response_receivers = Some((receiver_1, receiver_2));
1536 },
1537 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1538 num_responses += 1;
1539 result.unwrap();
1540 if num_responses == 2 {
1541 break;
1542 }
1543 },
1544 _ => {},
1545 }
1546 }
1547 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1548 assert_eq!(
1549 response_receiver_1.await.unwrap().unwrap(),
1550 (b"this is a response".to_vec(), protocol_name_1)
1551 );
1552 assert_eq!(
1553 response_receiver_2.await.unwrap().unwrap(),
1554 (b"this is a response".to_vec(), protocol_name_2)
1555 );
1556 }
1557
1558 #[tokio::test]
1559 async fn request_fallback() {
1560 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1561 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1562 let protocol_name_2 = ProtocolName::from("/test/another");
1563
1564 let protocol_config_1 = ProtocolConfig {
1565 name: protocol_name_1.clone(),
1566 fallback_names: Vec::new(),
1567 max_request_size: 1024,
1568 max_response_size: 1024 * 1024,
1569 request_timeout: Duration::from_secs(30),
1570 inbound_queue: None,
1571 };
1572 let protocol_config_1_fallback = ProtocolConfig {
1573 name: protocol_name_1_fallback.clone(),
1574 fallback_names: Vec::new(),
1575 max_request_size: 1024,
1576 max_response_size: 1024 * 1024,
1577 request_timeout: Duration::from_secs(30),
1578 inbound_queue: None,
1579 };
1580 let protocol_config_2 = ProtocolConfig {
1581 name: protocol_name_2.clone(),
1582 fallback_names: Vec::new(),
1583 max_request_size: 1024,
1584 max_response_size: 1024 * 1024,
1585 request_timeout: Duration::from_secs(30),
1586 inbound_queue: None,
1587 };
1588
1589 let mut older_swarm = {
1592 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1593 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1594 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1595 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1596
1597 let mut protocol_config_2 = protocol_config_2.clone();
1598 protocol_config_2.inbound_queue = Some(tx_2);
1599
1600 tokio::spawn(async move {
1601 for _ in 0..2 {
1602 if let Some(rq) = rx_1.next().await {
1603 let (fb_tx, fb_rx) = oneshot::channel();
1604 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1605 let _ = rq.pending_response.send(super::OutgoingResponse {
1606 result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1607 reputation_changes: Vec::new(),
1608 sent_feedback: Some(fb_tx),
1609 });
1610 fb_rx.await.unwrap();
1611 }
1612 }
1613
1614 if let Some(rq) = rx_2.next().await {
1615 let (fb_tx, fb_rx) = oneshot::channel();
1616 assert_eq!(rq.payload, b"request on protocol /test/other");
1617 let _ = rq.pending_response.send(super::OutgoingResponse {
1618 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1619 reputation_changes: Vec::new(),
1620 sent_feedback: Some(fb_tx),
1621 });
1622 fb_rx.await.unwrap();
1623 }
1624 });
1625
1626 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1627 };
1628
1629 let mut new_swarm = build_swarm(
1631 vec![
1632 protocol_config_1.clone(),
1633 protocol_config_1_fallback.clone(),
1634 protocol_config_2.clone(),
1635 ]
1636 .into_iter(),
1637 );
1638
1639 {
1640 let dial_addr = older_swarm.1.clone();
1641 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1642 }
1643
1644 tokio::spawn(async move {
1646 loop {
1647 _ = older_swarm.0.select_next_some().await;
1648 }
1649 });
1650
1651 let (mut swarm, _) = new_swarm;
1653 let mut older_peer_id = None;
1654
1655 let mut response_receiver = None;
1656 loop {
1658 match swarm.select_next_some().await {
1659 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1660 older_peer_id = Some(peer_id);
1661 let (sender, receiver) = oneshot::channel();
1662 swarm.behaviour_mut().send_request(
1663 &peer_id,
1664 protocol_name_1.clone(),
1665 b"request on protocol /test/req-resp/2".to_vec(),
1666 Some((
1667 b"request on protocol /test/req-resp/1".to_vec(),
1668 protocol_config_1_fallback.name.clone(),
1669 )),
1670 sender,
1671 IfDisconnected::ImmediateError,
1672 );
1673 response_receiver = Some(receiver);
1674 },
1675 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1676 result.unwrap();
1677 break;
1678 },
1679 _ => {},
1680 }
1681 }
1682 assert_eq!(
1683 response_receiver.unwrap().await.unwrap().unwrap(),
1684 (
1685 b"this is a response on protocol /test/req-resp/1".to_vec(),
1686 protocol_name_1_fallback.clone()
1687 )
1688 );
1689 let (sender, response_receiver) = oneshot::channel();
1691 swarm.behaviour_mut().send_request(
1692 older_peer_id.as_ref().unwrap(),
1693 protocol_name_1_fallback.clone(),
1694 b"request on protocol /test/req-resp/1".to_vec(),
1695 Some((
1696 b"dummy request, will fail if processed".to_vec(),
1697 protocol_config_1_fallback.name.clone(),
1698 )),
1699 sender,
1700 IfDisconnected::ImmediateError,
1701 );
1702 loop {
1703 match swarm.select_next_some().await {
1704 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1705 result.unwrap();
1706 break;
1707 },
1708 _ => {},
1709 }
1710 }
1711 assert_eq!(
1712 response_receiver.await.unwrap().unwrap(),
1713 (
1714 b"this is a response on protocol /test/req-resp/1".to_vec(),
1715 protocol_name_1_fallback.clone()
1716 )
1717 );
1718 let (sender, response_receiver) = oneshot::channel();
1720 swarm.behaviour_mut().send_request(
1721 older_peer_id.as_ref().unwrap(),
1722 protocol_name_1.clone(),
1723 b"request on protocol /test/req-resp-2".to_vec(),
1724 None,
1725 sender,
1726 IfDisconnected::ImmediateError,
1727 );
1728 loop {
1729 match swarm.select_next_some().await {
1730 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1731 assert_matches!(
1732 result.unwrap_err(),
1733 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1734 );
1735 break;
1736 },
1737 _ => {},
1738 }
1739 }
1740 assert!(response_receiver.await.unwrap().is_err());
1741 let (sender, response_receiver) = oneshot::channel();
1743 swarm.behaviour_mut().send_request(
1744 older_peer_id.as_ref().unwrap(),
1745 protocol_name_2.clone(),
1746 b"request on protocol /test/other".to_vec(),
1747 None,
1748 sender,
1749 IfDisconnected::ImmediateError,
1750 );
1751 loop {
1752 match swarm.select_next_some().await {
1753 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1754 result.unwrap();
1755 break;
1756 },
1757 _ => {},
1758 }
1759 }
1760 assert_eq!(
1761 response_receiver.await.unwrap().unwrap(),
1762 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1763 );
1764 }
1765
1766 #[tokio::test]
1780 async fn enforce_outbound_timeouts() {
1781 const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1782 const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1783
1784 let protocol_name = ProtocolName::from("/test/req-resp/1");
1786
1787 let protocol_config = ProtocolConfig {
1788 name: protocol_name.clone(),
1789 fallback_names: Vec::new(),
1790 max_request_size: 1024,
1791 max_response_size: 1024 * 1024,
1792 request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1794 };
1795
1796 let (mut first_swarm, _) = {
1798 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1799
1800 tokio::spawn(async move {
1801 if let Some(rq) = rx.next().await {
1802 assert_eq!(rq.payload, b"this is a request");
1803
1804 tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1807
1808 let _ = rq.pending_response.send(super::OutgoingResponse {
1811 result: Ok(b"Second swarm already timedout".to_vec()),
1812 reputation_changes: Vec::new(),
1813 sent_feedback: None,
1814 });
1815 }
1816 });
1817
1818 let mut protocol_config = protocol_config.clone();
1819 protocol_config.inbound_queue = Some(tx);
1820
1821 build_swarm(iter::once(protocol_config))
1822 };
1823
1824 let (mut second_swarm, second_address) = {
1825 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1826
1827 tokio::spawn(async move {
1828 while let Some(rq) = rx.next().await {
1829 let _ = rq.pending_response.send(super::OutgoingResponse {
1830 result: Ok(b"This is the response".to_vec()),
1831 reputation_changes: Vec::new(),
1832 sent_feedback: None,
1833 });
1834 }
1835 });
1836 let mut protocol_config = protocol_config.clone();
1837 protocol_config.inbound_queue = Some(tx);
1838
1839 build_swarm(iter::once(protocol_config.clone()))
1840 };
1841 second_swarm
1843 .behaviour_mut()
1844 .protocols
1845 .get_mut(&protocol_name)
1846 .unwrap()
1847 .request_timeout = REQUEST_TIMEOUT_SHORT;
1848
1849 {
1851 Swarm::dial(&mut first_swarm, second_address).unwrap();
1852 }
1853
1854 tokio::spawn(async move {
1857 loop {
1858 let event = first_swarm.select_next_some().await;
1859 match event {
1860 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1861 assert!(result.is_ok());
1862 break;
1863 },
1864 SwarmEvent::ConnectionClosed { .. } => {
1865 break;
1866 },
1867 _ => {},
1868 }
1869 }
1870 });
1871
1872 let mut response_receiver = None;
1876 loop {
1877 let event = second_swarm.select_next_some().await;
1878
1879 match event {
1880 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1881 let (sender, receiver) = oneshot::channel();
1882 second_swarm.behaviour_mut().send_request(
1883 &peer_id,
1884 protocol_name.clone(),
1885 b"this is a request".to_vec(),
1886 None,
1887 sender,
1888 IfDisconnected::ImmediateError,
1889 );
1890 assert!(response_receiver.is_none());
1891 response_receiver = Some(receiver);
1892 },
1893 SwarmEvent::ConnectionClosed { .. } => {
1894 break;
1895 },
1896 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1897 assert!(result.is_err());
1898 break;
1899 },
1900 _ => {},
1901 }
1902 }
1903
1904 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1906 RequestFailure::Network(OutboundFailure::Timeout) => {},
1907 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1908 }
1909 }
1910}