1use crate::{
38 peer_store::{PeerStoreProvider, BANNED_THRESHOLD},
39 service::traits::RequestResponseConfig as RequestResponseConfigT,
40 types::ProtocolName,
41 ReputationChange,
42};
43
44use futures::{channel::oneshot, prelude::*};
45use libp2p::{
46 core::{transport::PortUse, Endpoint, Multiaddr},
47 request_response::{self, Behaviour, Codec, Message, ProtocolSupport, ResponseChannel},
48 swarm::{
49 behaviour::FromSwarm, handler::multi::MultiHandler, ConnectionDenied, ConnectionId,
50 NetworkBehaviour, THandler, THandlerInEvent, THandlerOutEvent, ToSwarm,
51 },
52 PeerId,
53};
54
55use std::{
56 collections::{hash_map::Entry, HashMap},
57 io, iter,
58 ops::Deref,
59 pin::Pin,
60 sync::Arc,
61 task::{Context, Poll},
62 time::{Duration, Instant},
63};
64
65pub use libp2p::request_response::{Config, InboundRequestId, OutboundRequestId};
66
67const LOG_TARGET: &str = "sub-libp2p::request-response";
69
70const PERIODIC_REQUEST_CHECK: Duration = Duration::from_secs(2);
72
73#[derive(Debug, Clone, thiserror::Error)]
76pub enum OutboundFailure {
77 #[error("Failed to dial the requested peer")]
79 DialFailure,
80 #[error("Timeout while waiting for a response")]
82 Timeout,
83 #[error("Connection was closed before a response was received")]
85 ConnectionClosed,
86 #[error("The remote supports none of the requested protocols")]
88 UnsupportedProtocols,
89 #[error("An IO failure happened on an outbound stream")]
91 Io(Arc<io::Error>),
92}
93
94impl From<request_response::OutboundFailure> for OutboundFailure {
95 fn from(out: request_response::OutboundFailure) -> Self {
96 match out {
97 request_response::OutboundFailure::DialFailure => OutboundFailure::DialFailure,
98 request_response::OutboundFailure::Timeout => OutboundFailure::Timeout,
99 request_response::OutboundFailure::ConnectionClosed => {
100 OutboundFailure::ConnectionClosed
101 },
102 request_response::OutboundFailure::UnsupportedProtocols => {
103 OutboundFailure::UnsupportedProtocols
104 },
105 request_response::OutboundFailure::Io(error) => OutboundFailure::Io(Arc::new(error)),
106 }
107 }
108}
109
110#[derive(Debug, thiserror::Error)]
113pub enum InboundFailure {
114 #[error("Timeout while receiving request or sending response")]
117 Timeout,
118 #[error("Connection was closed before a response could be sent")]
120 ConnectionClosed,
121 #[error("The local peer supports none of the protocols requested by the remote")]
123 UnsupportedProtocols,
124 #[error("The response channel was dropped without sending a response to the remote")]
126 ResponseOmission,
127 #[error("An IO failure happened on an inbound stream")]
129 Io(Arc<io::Error>),
130}
131
132impl From<request_response::InboundFailure> for InboundFailure {
133 fn from(out: request_response::InboundFailure) -> Self {
134 match out {
135 request_response::InboundFailure::ResponseOmission => InboundFailure::ResponseOmission,
136 request_response::InboundFailure::Timeout => InboundFailure::Timeout,
137 request_response::InboundFailure::ConnectionClosed => InboundFailure::ConnectionClosed,
138 request_response::InboundFailure::UnsupportedProtocols => {
139 InboundFailure::UnsupportedProtocols
140 },
141 request_response::InboundFailure::Io(error) => InboundFailure::Io(Arc::new(error)),
142 }
143 }
144}
145
146#[derive(Debug, thiserror::Error)]
148#[allow(missing_docs)]
149pub enum RequestFailure {
150 #[error("We are not currently connected to the requested peer.")]
151 NotConnected,
152 #[error("Given protocol hasn't been registered.")]
153 UnknownProtocol,
154 #[error("Remote has closed the substream before answering, thereby signaling that it considers the request as valid, but refused to answer it.")]
155 Refused,
156 #[error("The remote replied, but the local node is no longer interested in the response.")]
157 Obsolete,
158 #[error("Problem on the network: {0}")]
159 Network(OutboundFailure),
160}
161
162#[derive(Debug, Clone)]
164pub struct ProtocolConfig {
165 pub name: ProtocolName,
167
168 pub fallback_names: Vec<ProtocolName>,
170
171 pub max_request_size: u64,
176
177 pub max_response_size: u64,
182
183 pub request_timeout: Duration,
187
188 pub inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
209}
210
211impl RequestResponseConfigT for ProtocolConfig {
212 fn protocol_name(&self) -> &ProtocolName {
213 &self.name
214 }
215}
216
217#[derive(Debug)]
219pub struct IncomingRequest {
220 pub peer: pezsc_network_types::PeerId,
222
223 pub payload: Vec<u8>,
226
227 pub pending_response: oneshot::Sender<OutgoingResponse>,
236}
237
238#[derive(Debug)]
240pub struct OutgoingResponse {
241 pub result: Result<Vec<u8>, ()>,
245
246 pub reputation_changes: Vec<ReputationChange>,
249
250 pub sent_feedback: Option<oneshot::Sender<()>>,
259}
260
261struct PendingRequest {
263 started_at: Instant,
265 response_tx: Option<oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>>,
270 fallback_request: Option<(Vec<u8>, ProtocolName)>,
272}
273
274#[derive(Debug, Copy, Clone, PartialEq, Eq, Hash)]
276pub enum IfDisconnected {
277 TryConnect,
279 ImmediateError,
281}
282
283impl IfDisconnected {
285 pub fn should_connect(self) -> bool {
287 match self {
288 Self::TryConnect => true,
289 Self::ImmediateError => false,
290 }
291 }
292}
293
294#[derive(Debug)]
296pub enum Event {
297 InboundRequest {
301 peer: PeerId,
303 protocol: ProtocolName,
305 result: Result<Duration, ResponseFailure>,
310 },
311
312 RequestFinished {
317 peer: PeerId,
319 protocol: ProtocolName,
321 duration: Duration,
323 result: Result<(), RequestFailure>,
325 },
326
327 ReputationChanges {
329 peer: PeerId,
331 changes: Vec<ReputationChange>,
333 },
334}
335
336#[derive(Debug, Clone, PartialEq, Eq, Hash)]
343struct ProtocolRequestId<RequestId> {
344 protocol: ProtocolName,
345 request_id: RequestId,
346}
347
348impl<RequestId> From<(ProtocolName, RequestId)> for ProtocolRequestId<RequestId> {
349 fn from((protocol, request_id): (ProtocolName, RequestId)) -> Self {
350 Self { protocol, request_id }
351 }
352}
353
354struct ProtocolDetails {
356 behaviour: Behaviour<GenericCodec>,
357 inbound_queue: Option<async_channel::Sender<IncomingRequest>>,
358 request_timeout: Duration,
359}
360
361pub struct RequestResponsesBehaviour {
363 protocols: HashMap<ProtocolName, ProtocolDetails>,
368
369 pending_requests: HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
371
372 pending_responses: stream::FuturesUnordered<
375 Pin<Box<dyn Future<Output = Option<RequestProcessingOutcome>> + Send>>,
376 >,
377
378 pending_responses_arrival_time: HashMap<ProtocolRequestId<InboundRequestId>, Instant>,
380
381 send_feedback: HashMap<ProtocolRequestId<InboundRequestId>, oneshot::Sender<()>>,
384
385 peer_store: Arc<dyn PeerStoreProvider>,
387
388 periodic_request_check: tokio::time::Interval,
395}
396
397struct RequestProcessingOutcome {
399 peer: PeerId,
400 request_id: InboundRequestId,
401 protocol: ProtocolName,
402 inner_channel: ResponseChannel<Result<Vec<u8>, ()>>,
403 response: OutgoingResponse,
404}
405
406impl RequestResponsesBehaviour {
407 pub fn new(
410 list: impl Iterator<Item = ProtocolConfig>,
411 peer_store: Arc<dyn PeerStoreProvider>,
412 ) -> Result<Self, RegisterError> {
413 let mut protocols = HashMap::new();
414 for protocol in list {
415 let cfg = Config::default().with_request_timeout(protocol.request_timeout);
416
417 let protocol_support = if protocol.inbound_queue.is_some() {
418 ProtocolSupport::Full
419 } else {
420 ProtocolSupport::Outbound
421 };
422
423 let behaviour = Behaviour::with_codec(
424 GenericCodec {
425 max_request_size: protocol.max_request_size,
426 max_response_size: protocol.max_response_size,
427 },
428 iter::once(protocol.name.clone())
429 .chain(protocol.fallback_names)
430 .zip(iter::repeat(protocol_support)),
431 cfg,
432 );
433
434 match protocols.entry(protocol.name) {
435 Entry::Vacant(e) => e.insert(ProtocolDetails {
436 behaviour,
437 inbound_queue: protocol.inbound_queue,
438 request_timeout: protocol.request_timeout,
439 }),
440 Entry::Occupied(e) => {
441 return Err(RegisterError::DuplicateProtocol(e.key().clone()))
442 },
443 };
444 }
445
446 Ok(Self {
447 protocols,
448 pending_requests: Default::default(),
449 pending_responses: Default::default(),
450 pending_responses_arrival_time: Default::default(),
451 send_feedback: Default::default(),
452 peer_store,
453 periodic_request_check: tokio::time::interval(PERIODIC_REQUEST_CHECK),
454 })
455 }
456
457 pub fn send_request(
464 &mut self,
465 target: &PeerId,
466 protocol_name: ProtocolName,
467 request: Vec<u8>,
468 fallback_request: Option<(Vec<u8>, ProtocolName)>,
469 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
470 connect: IfDisconnected,
471 ) {
472 log::trace!(target: LOG_TARGET, "send request to {target} ({protocol_name:?}), {} bytes", request.len());
473
474 if let Some(ProtocolDetails { behaviour, .. }) =
475 self.protocols.get_mut(protocol_name.deref())
476 {
477 Self::send_request_inner(
478 behaviour,
479 &mut self.pending_requests,
480 target,
481 protocol_name,
482 request,
483 fallback_request,
484 pending_response,
485 connect,
486 )
487 } else if pending_response.send(Err(RequestFailure::UnknownProtocol)).is_err() {
488 log::debug!(
489 target: LOG_TARGET,
490 "Unknown protocol {:?}. At the same time local \
491 node is no longer interested in the result.",
492 protocol_name,
493 );
494 }
495 }
496
497 fn send_request_inner(
498 behaviour: &mut Behaviour<GenericCodec>,
499 pending_requests: &mut HashMap<ProtocolRequestId<OutboundRequestId>, PendingRequest>,
500 target: &PeerId,
501 protocol_name: ProtocolName,
502 request: Vec<u8>,
503 fallback_request: Option<(Vec<u8>, ProtocolName)>,
504 pending_response: oneshot::Sender<Result<(Vec<u8>, ProtocolName), RequestFailure>>,
505 connect: IfDisconnected,
506 ) {
507 if behaviour.is_connected(target) || connect.should_connect() {
508 let request_id = behaviour.send_request(target, request);
509 let prev_req_id = pending_requests.insert(
510 (protocol_name.to_string().into(), request_id).into(),
511 PendingRequest {
512 started_at: Instant::now(),
513 response_tx: Some(pending_response),
514 fallback_request,
515 },
516 );
517 debug_assert!(prev_req_id.is_none(), "Expect request id to be unique.");
518 } else if pending_response.send(Err(RequestFailure::NotConnected)).is_err() {
519 log::debug!(
520 target: LOG_TARGET,
521 "Not connected to peer {:?}. At the same time local \
522 node is no longer interested in the result.",
523 target,
524 );
525 }
526 }
527}
528
529impl NetworkBehaviour for RequestResponsesBehaviour {
530 type ConnectionHandler =
531 MultiHandler<String, <Behaviour<GenericCodec> as NetworkBehaviour>::ConnectionHandler>;
532 type ToSwarm = Event;
533
534 fn handle_pending_inbound_connection(
535 &mut self,
536 _connection_id: ConnectionId,
537 _local_addr: &Multiaddr,
538 _remote_addr: &Multiaddr,
539 ) -> Result<(), ConnectionDenied> {
540 Ok(())
541 }
542
543 fn handle_pending_outbound_connection(
544 &mut self,
545 _connection_id: ConnectionId,
546 _maybe_peer: Option<PeerId>,
547 _addresses: &[Multiaddr],
548 _effective_role: Endpoint,
549 ) -> Result<Vec<Multiaddr>, ConnectionDenied> {
550 Ok(Vec::new())
551 }
552
553 fn handle_established_inbound_connection(
554 &mut self,
555 connection_id: ConnectionId,
556 peer: PeerId,
557 local_addr: &Multiaddr,
558 remote_addr: &Multiaddr,
559 ) -> Result<THandler<Self>, ConnectionDenied> {
560 let iter =
561 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
562 if let Ok(handler) = behaviour.handle_established_inbound_connection(
563 connection_id,
564 peer,
565 local_addr,
566 remote_addr,
567 ) {
568 Some((p.to_string(), handler))
569 } else {
570 None
571 }
572 });
573
574 Ok(MultiHandler::try_from_iter(iter).expect(
575 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
576 which is the only possible error; qed",
577 ))
578 }
579
580 fn handle_established_outbound_connection(
581 &mut self,
582 connection_id: ConnectionId,
583 peer: PeerId,
584 addr: &Multiaddr,
585 role_override: Endpoint,
586 port_use: PortUse,
587 ) -> Result<THandler<Self>, ConnectionDenied> {
588 let iter =
589 self.protocols.iter_mut().filter_map(|(p, ProtocolDetails { behaviour, .. })| {
590 if let Ok(handler) = behaviour.handle_established_outbound_connection(
591 connection_id,
592 peer,
593 addr,
594 role_override,
595 port_use,
596 ) {
597 Some((p.to_string(), handler))
598 } else {
599 None
600 }
601 });
602
603 Ok(MultiHandler::try_from_iter(iter).expect(
604 "Protocols are in a HashMap and there can be at most one handler per protocol name, \
605 which is the only possible error; qed",
606 ))
607 }
608
609 fn on_swarm_event(&mut self, event: FromSwarm) {
610 for ProtocolDetails { behaviour, .. } in self.protocols.values_mut() {
611 behaviour.on_swarm_event(event);
612 }
613 }
614
615 fn on_connection_handler_event(
616 &mut self,
617 peer_id: PeerId,
618 connection_id: ConnectionId,
619 event: THandlerOutEvent<Self>,
620 ) {
621 let p_name = event.0;
622 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(p_name.as_str()) {
623 return behaviour.on_connection_handler_event(peer_id, connection_id, event.1);
624 } else {
625 log::warn!(
626 target: LOG_TARGET,
627 "on_connection_handler_event: no request-response instance registered for protocol {:?}",
628 p_name
629 );
630 }
631 }
632
633 fn poll(&mut self, cx: &mut Context) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
634 'poll_all: loop {
635 if self.periodic_request_check.poll_tick(cx).is_ready() {
637 self.pending_requests.retain(|id, req| {
638 let Some(ProtocolDetails { request_timeout, .. }) =
639 self.protocols.get(&id.protocol)
640 else {
641 log::warn!(
642 target: LOG_TARGET,
643 "Request {id:?} has no protocol registered.",
644 );
645
646 if let Some(response_tx) = req.response_tx.take() {
647 if response_tx.send(Err(RequestFailure::UnknownProtocol)).is_err() {
648 log::debug!(
649 target: LOG_TARGET,
650 "Request {id:?} has no protocol registered. At the same time local node is no longer interested in the result.",
651 );
652 }
653 }
654 return false
655 };
656
657 let elapsed = req.started_at.elapsed();
658 if elapsed > *request_timeout {
659 log::debug!(
660 target: LOG_TARGET,
661 "Request {id:?} force detected as timeout.",
662 );
663
664 if let Some(response_tx) = req.response_tx.take() {
665 if response_tx.send(Err(RequestFailure::Network(OutboundFailure::Timeout))).is_err() {
666 log::debug!(
667 target: LOG_TARGET,
668 "Request {id:?} force detected as timeout. At the same time local node is no longer interested in the result.",
669 );
670 }
671 }
672
673 false
674 } else {
675 true
676 }
677 });
678 }
679
680 while let Poll::Ready(Some(outcome)) = self.pending_responses.poll_next_unpin(cx) {
682 let RequestProcessingOutcome {
683 peer,
684 request_id,
685 protocol: protocol_name,
686 inner_channel,
687 response: OutgoingResponse { result, reputation_changes, sent_feedback },
688 } = match outcome {
689 Some(outcome) => outcome,
690 None => continue,
693 };
694
695 if let Ok(payload) = result {
696 if let Some(ProtocolDetails { behaviour, .. }) =
697 self.protocols.get_mut(&*protocol_name)
698 {
699 log::trace!(target: LOG_TARGET, "send response to {peer} ({protocol_name:?}), {} bytes", payload.len());
700
701 if behaviour.send_response(inner_channel, Ok(payload)).is_err() {
702 log::debug!(
705 target: LOG_TARGET,
706 "Failed to send response for {:?} on protocol {:?} due to a \
707 timeout or due to the connection to the peer being closed. \
708 Dropping response",
709 request_id, protocol_name,
710 );
711 } else if let Some(sent_feedback) = sent_feedback {
712 self.send_feedback
713 .insert((protocol_name, request_id).into(), sent_feedback);
714 }
715 }
716 }
717
718 if !reputation_changes.is_empty() {
719 return Poll::Ready(ToSwarm::GenerateEvent(Event::ReputationChanges {
720 peer,
721 changes: reputation_changes,
722 }));
723 }
724 }
725
726 let mut fallback_requests = vec![];
727
728 for (protocol, ProtocolDetails { behaviour, inbound_queue, .. }) in &mut self.protocols
730 {
731 'poll_protocol: while let Poll::Ready(ev) = behaviour.poll(cx) {
732 let ev = match ev {
733 ToSwarm::GenerateEvent(ev) => ev,
735
736 ToSwarm::Dial { opts } => {
739 if opts.get_peer_id().is_none() {
740 log::error!(
741 target: LOG_TARGET,
742 "The request-response isn't supposed to start dialing addresses"
743 );
744 }
745 return Poll::Ready(ToSwarm::Dial { opts });
746 },
747 event => {
748 return Poll::Ready(
749 event.map_in(|event| ((*protocol).to_string(), event)).map_out(
750 |_| {
751 unreachable!(
752 "`GenerateEvent` is handled in a branch above; qed"
753 )
754 },
755 ),
756 );
757 },
758 };
759
760 match ev {
761 request_response::Event::Message {
763 peer,
764 message: Message::Request { request_id, request, channel, .. },
765 ..
766 } => {
767 self.pending_responses_arrival_time
768 .insert((protocol.clone(), request_id).into(), Instant::now());
769
770 let reputation = self.peer_store.peer_reputation(&peer.into());
771
772 if reputation < BANNED_THRESHOLD {
773 log::debug!(
774 target: LOG_TARGET,
775 "Cannot handle requests from a node with a low reputation {}: {}",
776 peer,
777 reputation,
778 );
779 continue 'poll_protocol;
780 }
781
782 let (tx, rx) = oneshot::channel();
783
784 if let Some(resp_builder) = inbound_queue {
787 let _ = resp_builder.try_send(IncomingRequest {
794 peer: peer.into(),
795 payload: request,
796 pending_response: tx,
797 });
798 } else {
799 debug_assert!(false, "Received message on outbound-only protocol.");
800 }
801
802 let protocol = protocol.clone();
803
804 self.pending_responses.push(Box::pin(async move {
805 rx.await.map_or(None, |response| {
809 Some(RequestProcessingOutcome {
810 peer,
811 request_id,
812 protocol,
813 inner_channel: channel,
814 response,
815 })
816 })
817 }));
818
819 continue 'poll_all;
822 },
823
824 request_response::Event::Message {
826 peer,
827 message: Message::Response { request_id, response },
828 ..
829 } => {
830 let (started, delivered) = match self
831 .pending_requests
832 .remove(&(protocol.clone(), request_id).into())
833 {
834 Some(PendingRequest {
835 started_at,
836 response_tx: Some(response_tx),
837 ..
838 }) => {
839 log::trace!(
840 target: LOG_TARGET,
841 "received response from {peer} ({protocol:?}), {} bytes",
842 response.as_ref().map_or(0usize, |response| response.len()),
843 );
844
845 let delivered = response_tx
846 .send(
847 response
848 .map_err(|()| RequestFailure::Refused)
849 .map(|resp| (resp, protocol.clone())),
850 )
851 .map_err(|_| RequestFailure::Obsolete);
852 (started_at, delivered)
853 },
854 _ => {
855 log::debug!(
856 target: LOG_TARGET,
857 "Received `RequestResponseEvent::Message` with unexpected request id {:?} from {:?}",
858 request_id,
859 peer,
860 );
861 continue;
862 },
863 };
864
865 let out = Event::RequestFinished {
866 peer,
867 protocol: protocol.clone(),
868 duration: started.elapsed(),
869 result: delivered,
870 };
871
872 return Poll::Ready(ToSwarm::GenerateEvent(out));
873 },
874
875 request_response::Event::OutboundFailure {
877 peer,
878 request_id,
879 error,
880 ..
881 } => {
882 let error = OutboundFailure::from(error);
883 let started = match self
884 .pending_requests
885 .remove(&(protocol.clone(), request_id).into())
886 {
887 Some(PendingRequest {
888 started_at,
889 response_tx: Some(response_tx),
890 fallback_request,
891 }) => {
892 if matches!(error, OutboundFailure::UnsupportedProtocols) {
895 if let Some((fallback_request, fallback_protocol)) =
896 fallback_request
897 {
898 log::trace!(
899 target: LOG_TARGET,
900 "Request with id {:?} failed. Trying the fallback protocol. {}",
901 request_id,
902 fallback_protocol.deref()
903 );
904 fallback_requests.push((
905 peer,
906 fallback_protocol,
907 fallback_request,
908 response_tx,
909 ));
910 continue;
911 }
912 }
913
914 if response_tx
915 .send(Err(RequestFailure::Network(error.clone())))
916 .is_err()
917 {
918 log::debug!(
919 target: LOG_TARGET,
920 "Request with id {:?} failed. At the same time local \
921 node is no longer interested in the result.",
922 request_id,
923 );
924 }
925 started_at
926 },
927 _ => {
928 log::debug!(
929 target: LOG_TARGET,
930 "Received `RequestResponseEvent::OutboundFailure` with unexpected request id {:?} error {:?} from {:?}",
931 request_id,
932 error,
933 peer
934 );
935 continue;
936 },
937 };
938
939 let out = Event::RequestFinished {
940 peer,
941 protocol: protocol.clone(),
942 duration: started.elapsed(),
943 result: Err(RequestFailure::Network(error)),
944 };
945
946 return Poll::Ready(ToSwarm::GenerateEvent(out));
947 },
948
949 request_response::Event::InboundFailure {
952 request_id, peer, error, ..
953 } => {
954 self.pending_responses_arrival_time
955 .remove(&(protocol.clone(), request_id).into());
956 self.send_feedback.remove(&(protocol.clone(), request_id).into());
957 let out = Event::InboundRequest {
958 peer,
959 protocol: protocol.clone(),
960 result: Err(ResponseFailure::Network(error.into())),
961 };
962 return Poll::Ready(ToSwarm::GenerateEvent(out));
963 },
964
965 request_response::Event::ResponseSent { request_id, peer, .. } => {
967 let arrival_time = self
968 .pending_responses_arrival_time
969 .remove(&(protocol.clone(), request_id).into())
970 .map(|t| t.elapsed())
971 .expect(
972 "Time is added for each inbound request on arrival and only \
973 removed on success (`ResponseSent`) or failure \
974 (`InboundFailure`). One can not receive a success event for a \
975 request that either never arrived, or that has previously \
976 failed; qed.",
977 );
978
979 if let Some(send_feedback) =
980 self.send_feedback.remove(&(protocol.clone(), request_id).into())
981 {
982 let _ = send_feedback.send(());
983 }
984
985 let out = Event::InboundRequest {
986 peer,
987 protocol: protocol.clone(),
988 result: Ok(arrival_time),
989 };
990
991 return Poll::Ready(ToSwarm::GenerateEvent(out));
992 },
993 };
994 }
995 }
996
997 for (peer, protocol, request, pending_response) in fallback_requests.drain(..) {
999 if let Some(ProtocolDetails { behaviour, .. }) = self.protocols.get_mut(&protocol) {
1000 Self::send_request_inner(
1001 behaviour,
1002 &mut self.pending_requests,
1003 &peer,
1004 protocol,
1005 request,
1006 None,
1007 pending_response,
1008 IfDisconnected::ImmediateError,
1012 );
1013 }
1014 }
1015
1016 break Poll::Pending;
1017 }
1018 }
1019}
1020
1021#[derive(Debug, thiserror::Error)]
1023pub enum RegisterError {
1024 #[error("{0}")]
1026 DuplicateProtocol(ProtocolName),
1027}
1028
1029#[derive(Debug, thiserror::Error)]
1031pub enum ResponseFailure {
1032 #[error("Problem on the network: {0}")]
1034 Network(InboundFailure),
1035}
1036
1037#[derive(Debug, Clone)]
1040#[doc(hidden)] pub struct GenericCodec {
1042 max_request_size: u64,
1043 max_response_size: u64,
1044}
1045
1046#[async_trait::async_trait]
1047impl Codec for GenericCodec {
1048 type Protocol = ProtocolName;
1049 type Request = Vec<u8>;
1050 type Response = Result<Vec<u8>, ()>;
1051
1052 async fn read_request<T>(
1053 &mut self,
1054 _: &Self::Protocol,
1055 mut io: &mut T,
1056 ) -> io::Result<Self::Request>
1057 where
1058 T: AsyncRead + Unpin + Send,
1059 {
1060 let length = unsigned_varint::aio::read_usize(&mut io)
1062 .await
1063 .map_err(|err| io::Error::new(io::ErrorKind::InvalidInput, err))?;
1064 if length > usize::try_from(self.max_request_size).unwrap_or(usize::MAX) {
1065 return Err(io::Error::new(
1066 io::ErrorKind::InvalidInput,
1067 format!("Request size exceeds limit: {} > {}", length, self.max_request_size),
1068 ));
1069 }
1070
1071 let mut buffer = vec![0; length];
1073 io.read_exact(&mut buffer).await?;
1074 Ok(buffer)
1075 }
1076
1077 async fn read_response<T>(
1078 &mut self,
1079 _: &Self::Protocol,
1080 mut io: &mut T,
1081 ) -> io::Result<Self::Response>
1082 where
1083 T: AsyncRead + Unpin + Send,
1084 {
1085 let length = match unsigned_varint::aio::read_usize(&mut io).await {
1092 Ok(l) => l,
1093 Err(unsigned_varint::io::ReadError::Io(err))
1094 if matches!(err.kind(), io::ErrorKind::UnexpectedEof) =>
1095 {
1096 return Ok(Err(()))
1097 },
1098 Err(err) => return Err(io::Error::new(io::ErrorKind::InvalidInput, err)),
1099 };
1100
1101 if length > usize::try_from(self.max_response_size).unwrap_or(usize::MAX) {
1102 return Err(io::Error::new(
1103 io::ErrorKind::InvalidInput,
1104 format!("Response size exceeds limit: {} > {}", length, self.max_response_size),
1105 ));
1106 }
1107
1108 let mut buffer = vec![0; length];
1110 io.read_exact(&mut buffer).await?;
1111 Ok(Ok(buffer))
1112 }
1113
1114 async fn write_request<T>(
1115 &mut self,
1116 _: &Self::Protocol,
1117 io: &mut T,
1118 req: Self::Request,
1119 ) -> io::Result<()>
1120 where
1121 T: AsyncWrite + Unpin + Send,
1122 {
1123 {
1126 let mut buffer = unsigned_varint::encode::usize_buffer();
1127 io.write_all(unsigned_varint::encode::usize(req.len(), &mut buffer)).await?;
1128 }
1129
1130 io.write_all(&req).await?;
1132
1133 io.close().await?;
1134 Ok(())
1135 }
1136
1137 async fn write_response<T>(
1138 &mut self,
1139 _: &Self::Protocol,
1140 io: &mut T,
1141 res: Self::Response,
1142 ) -> io::Result<()>
1143 where
1144 T: AsyncWrite + Unpin + Send,
1145 {
1146 if let Ok(res) = res {
1148 {
1151 let mut buffer = unsigned_varint::encode::usize_buffer();
1152 io.write_all(unsigned_varint::encode::usize(res.len(), &mut buffer)).await?;
1153 }
1154
1155 io.write_all(&res).await?;
1157 }
1158
1159 io.close().await?;
1160 Ok(())
1161 }
1162}
1163
1164#[cfg(test)]
1165mod tests {
1166 use super::*;
1167
1168 use crate::mock::MockPeerStore;
1169 use assert_matches::assert_matches;
1170 use futures::channel::oneshot;
1171 use libp2p::{
1172 core::{
1173 transport::{MemoryTransport, Transport},
1174 upgrade,
1175 },
1176 identity::Keypair,
1177 noise,
1178 swarm::{Config as SwarmConfig, Executor, Swarm, SwarmEvent},
1179 Multiaddr,
1180 };
1181 use std::{iter, time::Duration};
1182
1183 struct TokioExecutor;
1184 impl Executor for TokioExecutor {
1185 fn exec(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
1186 tokio::spawn(f);
1187 }
1188 }
1189
1190 fn build_swarm(
1191 list: impl Iterator<Item = ProtocolConfig>,
1192 ) -> (Swarm<RequestResponsesBehaviour>, Multiaddr) {
1193 let keypair = Keypair::generate_ed25519();
1194
1195 let transport = MemoryTransport::new()
1196 .upgrade(upgrade::Version::V1)
1197 .authenticate(noise::Config::new(&keypair).unwrap())
1198 .multiplex(libp2p::yamux::Config::default())
1199 .boxed();
1200
1201 let behaviour = RequestResponsesBehaviour::new(list, Arc::new(MockPeerStore {})).unwrap();
1202
1203 let mut swarm = Swarm::new(
1204 transport,
1205 behaviour,
1206 keypair.public().to_peer_id(),
1207 SwarmConfig::with_executor(TokioExecutor {})
1208 .with_idle_connection_timeout(Duration::from_secs(10)),
1211 );
1212
1213 let listen_addr: Multiaddr = format!("/memory/{}", rand::random::<u64>()).parse().unwrap();
1214
1215 swarm.listen_on(listen_addr.clone()).unwrap();
1216
1217 (swarm, listen_addr)
1218 }
1219
1220 #[tokio::test]
1221 async fn basic_request_response_works() {
1222 let protocol_name = ProtocolName::from("/test/req-resp/1");
1223
1224 let mut swarms = (0..2)
1226 .map(|_| {
1227 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1228
1229 tokio::spawn(async move {
1230 while let Some(rq) = rx.next().await {
1231 let (fb_tx, fb_rx) = oneshot::channel();
1232 assert_eq!(rq.payload, b"this is a request");
1233 let _ = rq.pending_response.send(super::OutgoingResponse {
1234 result: Ok(b"this is a response".to_vec()),
1235 reputation_changes: Vec::new(),
1236 sent_feedback: Some(fb_tx),
1237 });
1238 fb_rx.await.unwrap();
1239 }
1240 });
1241
1242 let protocol_config = ProtocolConfig {
1243 name: protocol_name.clone(),
1244 fallback_names: Vec::new(),
1245 max_request_size: 1024,
1246 max_response_size: 1024 * 1024,
1247 request_timeout: Duration::from_secs(30),
1248 inbound_queue: Some(tx),
1249 };
1250
1251 build_swarm(iter::once(protocol_config))
1252 })
1253 .collect::<Vec<_>>();
1254
1255 {
1258 let dial_addr = swarms[1].1.clone();
1259 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1260 }
1261
1262 let (mut swarm, _) = swarms.remove(0);
1263 tokio::spawn(async move {
1265 loop {
1266 match swarm.select_next_some().await {
1267 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1268 result.unwrap();
1269 },
1270 _ => {},
1271 }
1272 }
1273 });
1274
1275 let (mut swarm, _) = swarms.remove(0);
1277 let mut response_receiver = None;
1278
1279 loop {
1280 match swarm.select_next_some().await {
1281 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1282 let (sender, receiver) = oneshot::channel();
1283 swarm.behaviour_mut().send_request(
1284 &peer_id,
1285 protocol_name.clone(),
1286 b"this is a request".to_vec(),
1287 None,
1288 sender,
1289 IfDisconnected::ImmediateError,
1290 );
1291 assert!(response_receiver.is_none());
1292 response_receiver = Some(receiver);
1293 },
1294 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1295 result.unwrap();
1296 break;
1297 },
1298 _ => {},
1299 }
1300 }
1301
1302 assert_eq!(
1303 response_receiver.unwrap().await.unwrap().unwrap(),
1304 (b"this is a response".to_vec(), protocol_name)
1305 );
1306 }
1307
1308 #[tokio::test]
1309 async fn max_response_size_exceeded() {
1310 let protocol_name = ProtocolName::from("/test/req-resp/1");
1311
1312 let mut swarms = (0..2)
1314 .map(|_| {
1315 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1316
1317 tokio::spawn(async move {
1318 while let Some(rq) = rx.next().await {
1319 assert_eq!(rq.payload, b"this is a request");
1320 let _ = rq.pending_response.send(super::OutgoingResponse {
1321 result: Ok(b"this response exceeds the limit".to_vec()),
1322 reputation_changes: Vec::new(),
1323 sent_feedback: None,
1324 });
1325 }
1326 });
1327
1328 let protocol_config = ProtocolConfig {
1329 name: protocol_name.clone(),
1330 fallback_names: Vec::new(),
1331 max_request_size: 1024,
1332 max_response_size: 8, request_timeout: Duration::from_secs(30),
1334 inbound_queue: Some(tx),
1335 };
1336
1337 build_swarm(iter::once(protocol_config))
1338 })
1339 .collect::<Vec<_>>();
1340
1341 {
1344 let dial_addr = swarms[1].1.clone();
1345 Swarm::dial(&mut swarms[0].0, dial_addr).unwrap();
1346 }
1347
1348 let (mut swarm, _) = swarms.remove(0);
1351 tokio::spawn(async move {
1352 loop {
1353 match swarm.select_next_some().await {
1354 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1355 assert!(result.is_ok());
1356 },
1357 SwarmEvent::ConnectionClosed { .. } => {
1358 break;
1359 },
1360 _ => {},
1361 }
1362 }
1363 });
1364
1365 let (mut swarm, _) = swarms.remove(0);
1367
1368 let mut response_receiver = None;
1369
1370 loop {
1371 match swarm.select_next_some().await {
1372 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1373 let (sender, receiver) = oneshot::channel();
1374 swarm.behaviour_mut().send_request(
1375 &peer_id,
1376 protocol_name.clone(),
1377 b"this is a request".to_vec(),
1378 None,
1379 sender,
1380 IfDisconnected::ImmediateError,
1381 );
1382 assert!(response_receiver.is_none());
1383 response_receiver = Some(receiver);
1384 },
1385 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1386 assert!(result.is_err());
1387 break;
1388 },
1389 _ => {},
1390 }
1391 }
1392
1393 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1394 RequestFailure::Network(OutboundFailure::Io(_)) => {},
1395 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1396 }
1397 }
1398
1399 #[tokio::test]
1410 async fn request_id_collision() {
1411 let protocol_name_1 = ProtocolName::from("/test/req-resp-1/1");
1412 let protocol_name_2 = ProtocolName::from("/test/req-resp-2/1");
1413
1414 let mut swarm_1 = {
1415 let protocol_configs = vec![
1416 ProtocolConfig {
1417 name: protocol_name_1.clone(),
1418 fallback_names: Vec::new(),
1419 max_request_size: 1024,
1420 max_response_size: 1024 * 1024,
1421 request_timeout: Duration::from_secs(30),
1422 inbound_queue: None,
1423 },
1424 ProtocolConfig {
1425 name: protocol_name_2.clone(),
1426 fallback_names: Vec::new(),
1427 max_request_size: 1024,
1428 max_response_size: 1024 * 1024,
1429 request_timeout: Duration::from_secs(30),
1430 inbound_queue: None,
1431 },
1432 ];
1433
1434 build_swarm(protocol_configs.into_iter()).0
1435 };
1436
1437 let (mut swarm_2, mut swarm_2_handler_1, mut swarm_2_handler_2, listen_add_2) = {
1438 let (tx_1, rx_1) = async_channel::bounded(64);
1439 let (tx_2, rx_2) = async_channel::bounded(64);
1440
1441 let protocol_configs = vec![
1442 ProtocolConfig {
1443 name: protocol_name_1.clone(),
1444 fallback_names: Vec::new(),
1445 max_request_size: 1024,
1446 max_response_size: 1024 * 1024,
1447 request_timeout: Duration::from_secs(30),
1448 inbound_queue: Some(tx_1),
1449 },
1450 ProtocolConfig {
1451 name: protocol_name_2.clone(),
1452 fallback_names: Vec::new(),
1453 max_request_size: 1024,
1454 max_response_size: 1024 * 1024,
1455 request_timeout: Duration::from_secs(30),
1456 inbound_queue: Some(tx_2),
1457 },
1458 ];
1459
1460 let (swarm, listen_addr) = build_swarm(protocol_configs.into_iter());
1461
1462 (swarm, rx_1, rx_2, listen_addr)
1463 };
1464
1465 swarm_1.dial(listen_add_2).unwrap();
1468
1469 tokio::spawn(async move {
1471 loop {
1472 match swarm_2.select_next_some().await {
1473 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1474 result.unwrap();
1475 },
1476 _ => {},
1477 }
1478 }
1479 });
1480
1481 tokio::spawn(async move {
1486 let protocol_1_request = swarm_2_handler_1.next().await;
1487 let protocol_2_request = swarm_2_handler_2.next().await;
1488
1489 protocol_1_request
1490 .unwrap()
1491 .pending_response
1492 .send(OutgoingResponse {
1493 result: Ok(b"this is a response".to_vec()),
1494 reputation_changes: Vec::new(),
1495 sent_feedback: None,
1496 })
1497 .unwrap();
1498 protocol_2_request
1499 .unwrap()
1500 .pending_response
1501 .send(OutgoingResponse {
1502 result: Ok(b"this is a response".to_vec()),
1503 reputation_changes: Vec::new(),
1504 sent_feedback: None,
1505 })
1506 .unwrap();
1507 });
1508
1509 let mut response_receivers = None;
1512 let mut num_responses = 0;
1513
1514 loop {
1515 match swarm_1.select_next_some().await {
1516 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1517 let (sender_1, receiver_1) = oneshot::channel();
1518 let (sender_2, receiver_2) = oneshot::channel();
1519 swarm_1.behaviour_mut().send_request(
1520 &peer_id,
1521 protocol_name_1.clone(),
1522 b"this is a request".to_vec(),
1523 None,
1524 sender_1,
1525 IfDisconnected::ImmediateError,
1526 );
1527 swarm_1.behaviour_mut().send_request(
1528 &peer_id,
1529 protocol_name_2.clone(),
1530 b"this is a request".to_vec(),
1531 None,
1532 sender_2,
1533 IfDisconnected::ImmediateError,
1534 );
1535 assert!(response_receivers.is_none());
1536 response_receivers = Some((receiver_1, receiver_2));
1537 },
1538 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1539 num_responses += 1;
1540 result.unwrap();
1541 if num_responses == 2 {
1542 break;
1543 }
1544 },
1545 _ => {},
1546 }
1547 }
1548 let (response_receiver_1, response_receiver_2) = response_receivers.unwrap();
1549 assert_eq!(
1550 response_receiver_1.await.unwrap().unwrap(),
1551 (b"this is a response".to_vec(), protocol_name_1)
1552 );
1553 assert_eq!(
1554 response_receiver_2.await.unwrap().unwrap(),
1555 (b"this is a response".to_vec(), protocol_name_2)
1556 );
1557 }
1558
1559 #[tokio::test]
1560 async fn request_fallback() {
1561 let protocol_name_1 = ProtocolName::from("/test/req-resp/2");
1562 let protocol_name_1_fallback = ProtocolName::from("/test/req-resp/1");
1563 let protocol_name_2 = ProtocolName::from("/test/another");
1564
1565 let protocol_config_1 = ProtocolConfig {
1566 name: protocol_name_1.clone(),
1567 fallback_names: Vec::new(),
1568 max_request_size: 1024,
1569 max_response_size: 1024 * 1024,
1570 request_timeout: Duration::from_secs(30),
1571 inbound_queue: None,
1572 };
1573 let protocol_config_1_fallback = ProtocolConfig {
1574 name: protocol_name_1_fallback.clone(),
1575 fallback_names: Vec::new(),
1576 max_request_size: 1024,
1577 max_response_size: 1024 * 1024,
1578 request_timeout: Duration::from_secs(30),
1579 inbound_queue: None,
1580 };
1581 let protocol_config_2 = ProtocolConfig {
1582 name: protocol_name_2.clone(),
1583 fallback_names: Vec::new(),
1584 max_request_size: 1024,
1585 max_response_size: 1024 * 1024,
1586 request_timeout: Duration::from_secs(30),
1587 inbound_queue: None,
1588 };
1589
1590 let mut older_swarm = {
1593 let (tx_1, mut rx_1) = async_channel::bounded::<IncomingRequest>(64);
1594 let (tx_2, mut rx_2) = async_channel::bounded::<IncomingRequest>(64);
1595 let mut protocol_config_1_fallback = protocol_config_1_fallback.clone();
1596 protocol_config_1_fallback.inbound_queue = Some(tx_1);
1597
1598 let mut protocol_config_2 = protocol_config_2.clone();
1599 protocol_config_2.inbound_queue = Some(tx_2);
1600
1601 tokio::spawn(async move {
1602 for _ in 0..2 {
1603 if let Some(rq) = rx_1.next().await {
1604 let (fb_tx, fb_rx) = oneshot::channel();
1605 assert_eq!(rq.payload, b"request on protocol /test/req-resp/1");
1606 let _ = rq.pending_response.send(super::OutgoingResponse {
1607 result: Ok(b"this is a response on protocol /test/req-resp/1".to_vec()),
1608 reputation_changes: Vec::new(),
1609 sent_feedback: Some(fb_tx),
1610 });
1611 fb_rx.await.unwrap();
1612 }
1613 }
1614
1615 if let Some(rq) = rx_2.next().await {
1616 let (fb_tx, fb_rx) = oneshot::channel();
1617 assert_eq!(rq.payload, b"request on protocol /test/other");
1618 let _ = rq.pending_response.send(super::OutgoingResponse {
1619 result: Ok(b"this is a response on protocol /test/other".to_vec()),
1620 reputation_changes: Vec::new(),
1621 sent_feedback: Some(fb_tx),
1622 });
1623 fb_rx.await.unwrap();
1624 }
1625 });
1626
1627 build_swarm(vec![protocol_config_1_fallback, protocol_config_2].into_iter())
1628 };
1629
1630 let mut new_swarm = build_swarm(
1632 vec![
1633 protocol_config_1.clone(),
1634 protocol_config_1_fallback.clone(),
1635 protocol_config_2.clone(),
1636 ]
1637 .into_iter(),
1638 );
1639
1640 {
1641 let dial_addr = older_swarm.1.clone();
1642 Swarm::dial(&mut new_swarm.0, dial_addr).unwrap();
1643 }
1644
1645 tokio::spawn(async move {
1647 loop {
1648 _ = older_swarm.0.select_next_some().await;
1649 }
1650 });
1651
1652 let (mut swarm, _) = new_swarm;
1654 let mut older_peer_id = None;
1655
1656 let mut response_receiver = None;
1657 loop {
1659 match swarm.select_next_some().await {
1660 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1661 older_peer_id = Some(peer_id);
1662 let (sender, receiver) = oneshot::channel();
1663 swarm.behaviour_mut().send_request(
1664 &peer_id,
1665 protocol_name_1.clone(),
1666 b"request on protocol /test/req-resp/2".to_vec(),
1667 Some((
1668 b"request on protocol /test/req-resp/1".to_vec(),
1669 protocol_config_1_fallback.name.clone(),
1670 )),
1671 sender,
1672 IfDisconnected::ImmediateError,
1673 );
1674 response_receiver = Some(receiver);
1675 },
1676 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1677 result.unwrap();
1678 break;
1679 },
1680 _ => {},
1681 }
1682 }
1683 assert_eq!(
1684 response_receiver.unwrap().await.unwrap().unwrap(),
1685 (
1686 b"this is a response on protocol /test/req-resp/1".to_vec(),
1687 protocol_name_1_fallback.clone()
1688 )
1689 );
1690 let (sender, response_receiver) = oneshot::channel();
1692 swarm.behaviour_mut().send_request(
1693 older_peer_id.as_ref().unwrap(),
1694 protocol_name_1_fallback.clone(),
1695 b"request on protocol /test/req-resp/1".to_vec(),
1696 Some((
1697 b"dummy request, will fail if processed".to_vec(),
1698 protocol_config_1_fallback.name.clone(),
1699 )),
1700 sender,
1701 IfDisconnected::ImmediateError,
1702 );
1703 loop {
1704 match swarm.select_next_some().await {
1705 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1706 result.unwrap();
1707 break;
1708 },
1709 _ => {},
1710 }
1711 }
1712 assert_eq!(
1713 response_receiver.await.unwrap().unwrap(),
1714 (
1715 b"this is a response on protocol /test/req-resp/1".to_vec(),
1716 protocol_name_1_fallback.clone()
1717 )
1718 );
1719 let (sender, response_receiver) = oneshot::channel();
1721 swarm.behaviour_mut().send_request(
1722 older_peer_id.as_ref().unwrap(),
1723 protocol_name_1.clone(),
1724 b"request on protocol /test/req-resp-2".to_vec(),
1725 None,
1726 sender,
1727 IfDisconnected::ImmediateError,
1728 );
1729 loop {
1730 match swarm.select_next_some().await {
1731 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1732 assert_matches!(
1733 result.unwrap_err(),
1734 RequestFailure::Network(OutboundFailure::UnsupportedProtocols)
1735 );
1736 break;
1737 },
1738 _ => {},
1739 }
1740 }
1741 assert!(response_receiver.await.unwrap().is_err());
1742 let (sender, response_receiver) = oneshot::channel();
1744 swarm.behaviour_mut().send_request(
1745 older_peer_id.as_ref().unwrap(),
1746 protocol_name_2.clone(),
1747 b"request on protocol /test/other".to_vec(),
1748 None,
1749 sender,
1750 IfDisconnected::ImmediateError,
1751 );
1752 loop {
1753 match swarm.select_next_some().await {
1754 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1755 result.unwrap();
1756 break;
1757 },
1758 _ => {},
1759 }
1760 }
1761 assert_eq!(
1762 response_receiver.await.unwrap().unwrap(),
1763 (b"this is a response on protocol /test/other".to_vec(), protocol_name_2.clone())
1764 );
1765 }
1766
1767 #[tokio::test]
1781 async fn enforce_outbound_timeouts() {
1782 const REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
1783 const REQUEST_TIMEOUT_SHORT: Duration = Duration::from_secs(1);
1784
1785 let protocol_name = ProtocolName::from("/test/req-resp/1");
1787
1788 let protocol_config = ProtocolConfig {
1789 name: protocol_name.clone(),
1790 fallback_names: Vec::new(),
1791 max_request_size: 1024,
1792 max_response_size: 1024 * 1024,
1793 request_timeout: REQUEST_TIMEOUT, inbound_queue: None,
1795 };
1796
1797 let (mut first_swarm, _) = {
1799 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1800
1801 tokio::spawn(async move {
1802 if let Some(rq) = rx.next().await {
1803 assert_eq!(rq.payload, b"this is a request");
1804
1805 tokio::time::sleep(REQUEST_TIMEOUT_SHORT * 2).await;
1808
1809 let _ = rq.pending_response.send(super::OutgoingResponse {
1812 result: Ok(b"Second swarm already timedout".to_vec()),
1813 reputation_changes: Vec::new(),
1814 sent_feedback: None,
1815 });
1816 }
1817 });
1818
1819 let mut protocol_config = protocol_config.clone();
1820 protocol_config.inbound_queue = Some(tx);
1821
1822 build_swarm(iter::once(protocol_config))
1823 };
1824
1825 let (mut second_swarm, second_address) = {
1826 let (tx, mut rx) = async_channel::bounded::<IncomingRequest>(64);
1827
1828 tokio::spawn(async move {
1829 while let Some(rq) = rx.next().await {
1830 let _ = rq.pending_response.send(super::OutgoingResponse {
1831 result: Ok(b"This is the response".to_vec()),
1832 reputation_changes: Vec::new(),
1833 sent_feedback: None,
1834 });
1835 }
1836 });
1837 let mut protocol_config = protocol_config.clone();
1838 protocol_config.inbound_queue = Some(tx);
1839
1840 build_swarm(iter::once(protocol_config.clone()))
1841 };
1842 second_swarm
1844 .behaviour_mut()
1845 .protocols
1846 .get_mut(&protocol_name)
1847 .unwrap()
1848 .request_timeout = REQUEST_TIMEOUT_SHORT;
1849
1850 {
1852 Swarm::dial(&mut first_swarm, second_address).unwrap();
1853 }
1854
1855 tokio::spawn(async move {
1858 loop {
1859 let event = first_swarm.select_next_some().await;
1860 match event {
1861 SwarmEvent::Behaviour(Event::InboundRequest { result, .. }) => {
1862 assert!(result.is_ok());
1863 break;
1864 },
1865 SwarmEvent::ConnectionClosed { .. } => {
1866 break;
1867 },
1868 _ => {},
1869 }
1870 }
1871 });
1872
1873 let mut response_receiver = None;
1877 loop {
1878 let event = second_swarm.select_next_some().await;
1879
1880 match event {
1881 SwarmEvent::ConnectionEstablished { peer_id, .. } => {
1882 let (sender, receiver) = oneshot::channel();
1883 second_swarm.behaviour_mut().send_request(
1884 &peer_id,
1885 protocol_name.clone(),
1886 b"this is a request".to_vec(),
1887 None,
1888 sender,
1889 IfDisconnected::ImmediateError,
1890 );
1891 assert!(response_receiver.is_none());
1892 response_receiver = Some(receiver);
1893 },
1894 SwarmEvent::ConnectionClosed { .. } => {
1895 break;
1896 },
1897 SwarmEvent::Behaviour(Event::RequestFinished { result, .. }) => {
1898 assert!(result.is_err());
1899 break;
1900 },
1901 _ => {},
1902 }
1903 }
1904
1905 match response_receiver.unwrap().await.unwrap().unwrap_err() {
1907 RequestFailure::Network(OutboundFailure::Timeout) => {},
1908 request_failure => panic!("Unexpected failure: {request_failure:?}"),
1909 }
1910 }
1911}