1use crate::{
61 protocol::notifications::upgrade::{
62 NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutError,
63 NotificationsOutSubstream, UpgradeCollec,
64 },
65 service::metrics::NotificationMetrics,
66 types::ProtocolName,
67};
68
69use bytes::BytesMut;
70use futures::{
71 channel::mpsc,
72 lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
73 prelude::*,
74};
75use libp2p::{
76 swarm::{
77 handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream,
78 SubstreamProtocol,
79 },
80 PeerId,
81};
82
83use parking_lot::{Mutex, RwLock};
84use std::{
85 collections::VecDeque,
86 mem,
87 pin::Pin,
88 sync::Arc,
89 task::{Context, Poll},
90 time::Duration,
91};
92
93const LOG_TARGET: &str = "sub-libp2p::notification::handler";
95
96pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
99
100const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
102
103const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
106
107const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
111
112pub struct NotifsHandler {
116 protocols: Vec<Protocol>,
118
119 keep_alive: bool,
121
122 keep_alive_timeout_future: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
125
126 peer_id: PeerId,
128
129 events_queue: VecDeque<ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut>>,
131
132 metrics: Option<Arc<NotificationMetrics>>,
134}
135
136impl NotifsHandler {
137 pub fn new(
139 peer_id: PeerId,
140 protocols: Vec<ProtocolConfig>,
141 metrics: Option<NotificationMetrics>,
142 ) -> Self {
143 Self {
144 protocols: protocols
145 .into_iter()
146 .map(|config| {
147 let in_upgrade = NotificationsIn::new(
148 config.name.clone(),
149 config.fallback_names.clone(),
150 config.max_notification_size,
151 );
152
153 Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
154 })
155 .collect(),
156 peer_id,
157 keep_alive: true,
159 keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))),
163 events_queue: VecDeque::with_capacity(16),
164 metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
165 }
166 }
167}
168
169#[derive(Debug, Clone)]
171pub struct ProtocolConfig {
172 pub name: ProtocolName,
174 pub fallback_names: Vec<ProtocolName>,
176 pub handshake: Arc<RwLock<Vec<u8>>>,
178 pub max_notification_size: u64,
180}
181
182struct Protocol {
184 config: ProtocolConfig,
186
187 in_upgrade: NotificationsIn,
189
190 state: State,
192}
193
194enum State {
196 Closed {
198 pending_opening: bool,
200 },
201
202 OpenDesiredByRemote {
205 in_substream: NotificationsInSubstream<Stream>,
207
208 pending_opening: bool,
210 },
211
212 Opening {
218 in_substream: Option<NotificationsInSubstream<Stream>>,
220 inbound: bool,
222 },
223
224 Open {
226 notifications_sink_rx: stream::Peekable<
232 stream::Select<
233 stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
234 stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
235 >,
236 >,
237
238 out_substream: Option<NotificationsOutSubstream<Stream>>,
244
245 in_substream: Option<NotificationsInSubstream<Stream>>,
251 },
252}
253
254#[derive(Debug, Clone, Copy, PartialEq, Eq)]
256pub enum CloseReason {
257 RemoteRequest,
261
262 ProtocolMisbehavior,
266}
267
268#[derive(Debug, Clone)]
270pub enum NotifsHandlerIn {
271 Open {
279 protocol_index: usize,
281
282 peer_id: PeerId,
284 },
285
286 Close {
291 protocol_index: usize,
293 },
294}
295
296#[derive(Debug)]
298pub enum NotifsHandlerOut {
299 OpenResultOk {
301 protocol_index: usize,
303 negotiated_fallback: Option<ProtocolName>,
305 received_handshake: Vec<u8>,
308 notifications_sink: NotificationsSink,
310 inbound: bool,
312 },
313
314 OpenResultErr {
317 protocol_index: usize,
319 },
320
321 CloseResult {
323 protocol_index: usize,
325 },
326
327 OpenDesiredByRemote {
333 protocol_index: usize,
335 handshake: Vec<u8>,
337 },
338
339 CloseDesired {
345 protocol_index: usize,
347
348 reason: CloseReason,
350 },
351
352 Notification {
356 protocol_index: usize,
358 message: BytesMut,
360 },
361
362 Close {
364 protocol_index: usize,
366 },
367}
368
369#[derive(Debug, Clone)]
373pub struct NotificationsSink {
374 inner: Arc<NotificationsSinkInner>,
375 metrics: Option<Arc<NotificationMetrics>>,
376}
377
378impl NotificationsSink {
379 pub fn new(
382 peer_id: PeerId,
383 ) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
384 {
385 let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
386 let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
387 (
388 NotificationsSink {
389 inner: Arc::new(NotificationsSinkInner {
390 peer_id,
391 async_channel: FuturesMutex::new(async_tx),
392 sync_channel: Mutex::new(Some(sync_tx)),
393 }),
394 metrics: None,
395 },
396 async_rx,
397 sync_rx,
398 )
399 }
400
401 pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
403 &self.metrics
404 }
405}
406
407#[derive(Debug)]
408struct NotificationsSinkInner {
409 peer_id: PeerId,
411 async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
413 sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
420}
421
422#[derive(Debug, PartialEq, Eq)]
425pub enum NotificationsSinkMessage {
426 Notification { message: Vec<u8> },
429
430 ForceClose,
432}
433
434impl NotificationsSink {
435 pub fn peer_id(&self) -> &PeerId {
437 &self.inner.peer_id
438 }
439
440 pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
450 let mut lock = self.inner.sync_channel.lock();
451
452 if let Some(tx) = lock.as_mut() {
453 let message = message.into();
454 let result = tx.try_send(NotificationsSinkMessage::Notification { message });
455
456 if result.is_err() {
457 let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
460 debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
461
462 *lock = None;
464 }
465 }
466 }
467
468 pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
475 let mut lock = self.inner.async_channel.lock().await;
476
477 let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
478 if poll_ready.is_ok() {
479 Ok(Ready { lock })
480 } else {
481 Err(())
482 }
483 }
484}
485
486#[must_use]
488#[derive(Debug)]
489pub struct Ready<'a> {
490 lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
492}
493
494impl<'a> Ready<'a> {
495 pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
499 self.lock
500 .start_send(NotificationsSinkMessage::Notification { message: notification.into() })
501 .map_err(|_| ())
502 }
503}
504
505impl ConnectionHandler for NotifsHandler {
506 type FromBehaviour = NotifsHandlerIn;
507 type ToBehaviour = NotifsHandlerOut;
508 type InboundProtocol = UpgradeCollec<NotificationsIn>;
509 type OutboundProtocol = NotificationsOut;
510 type OutboundOpenInfo = usize;
512 type InboundOpenInfo = ();
513
514 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
515 let protocols = self
516 .protocols
517 .iter()
518 .map(|p| p.in_upgrade.clone())
519 .collect::<UpgradeCollec<_>>();
520
521 SubstreamProtocol::new(protocols, ())
522 }
523
524 fn on_connection_event(
525 &mut self,
526 event: ConnectionEvent<
527 '_,
528 Self::InboundProtocol,
529 Self::OutboundProtocol,
530 Self::InboundOpenInfo,
531 Self::OutboundOpenInfo,
532 >,
533 ) {
534 match event {
535 ConnectionEvent::FullyNegotiatedInbound(inbound) => {
536 let (mut in_substream_open, protocol_index) = inbound.protocol;
537 let protocol_info = &mut self.protocols[protocol_index];
538
539 match protocol_info.state {
540 State::Closed { pending_opening } => {
541 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
542 NotifsHandlerOut::OpenDesiredByRemote {
543 protocol_index,
544 handshake: in_substream_open.handshake,
545 },
546 ));
547
548 protocol_info.state = State::OpenDesiredByRemote {
549 in_substream: in_substream_open.substream,
550 pending_opening,
551 };
552 },
553 State::OpenDesiredByRemote { .. } => {
554 return;
562 },
563 State::Opening { ref mut in_substream, .. } |
564 State::Open { ref mut in_substream, .. } => {
565 if in_substream.is_some() {
566 return;
568 }
569
570 let handshake_message = protocol_info.config.handshake.read().clone();
573 in_substream_open.substream.send_handshake(handshake_message);
574 *in_substream = Some(in_substream_open.substream);
575 },
576 }
577 },
578 ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
579 let (new_open, protocol_index) = (outbound.protocol, outbound.info);
580
581 match self.protocols[protocol_index].state {
582 State::Closed { ref mut pending_opening } |
583 State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
584 debug_assert!(*pending_opening);
585 *pending_opening = false;
586 },
587 State::Open { .. } => {
588 log::error!(target: LOG_TARGET, "☎️ State mismatch in notifications handler");
589 debug_assert!(false);
590 },
591 State::Opening { ref mut in_substream, inbound } => {
592 let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
593 let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
594 let notifications_sink = NotificationsSink {
595 inner: Arc::new(NotificationsSinkInner {
596 peer_id: self.peer_id,
597 async_channel: FuturesMutex::new(async_tx),
598 sync_channel: Mutex::new(Some(sync_tx)),
599 }),
600 metrics: self.metrics.clone(),
601 };
602
603 self.protocols[protocol_index].state = State::Open {
604 notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
605 .peekable(),
606 out_substream: Some(new_open.substream),
607 in_substream: in_substream.take(),
608 };
609
610 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
611 NotifsHandlerOut::OpenResultOk {
612 protocol_index,
613 negotiated_fallback: new_open.negotiated_fallback,
614 received_handshake: new_open.handshake,
615 notifications_sink,
616 inbound,
617 },
618 ));
619 },
620 }
621 },
622 ConnectionEvent::AddressChange(_address_change) => {},
623 ConnectionEvent::LocalProtocolsChange(_) => {},
624 ConnectionEvent::RemoteProtocolsChange(_) => {},
625 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
626 [dial_upgrade_error.info]
627 .state
628 {
629 State::Closed { ref mut pending_opening } |
630 State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
631 debug_assert!(*pending_opening);
632 *pending_opening = false;
633 },
634
635 State::Opening { .. } => {
636 self.protocols[dial_upgrade_error.info].state =
637 State::Closed { pending_opening: false };
638
639 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
640 NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
641 ));
642 },
643
644 State::Open { .. } => debug_assert!(false),
646 },
647 ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
648 event => {
649 log::warn!(target: LOG_TARGET, "New unknown `ConnectionEvent` libp2p event: {event:?}");
650 },
651 }
652 }
653
654 fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
655 match message {
656 NotifsHandlerIn::Open { protocol_index, peer_id } => {
657 let protocol_info = &mut self.protocols[protocol_index];
658 match &mut protocol_info.state {
659 State::Closed { pending_opening } => {
660 if !*pending_opening {
661 let proto = NotificationsOut::new(
662 protocol_info.config.name.clone(),
663 protocol_info.config.fallback_names.clone(),
664 protocol_info.config.handshake.read().clone(),
665 protocol_info.config.max_notification_size,
666 peer_id,
667 );
668
669 self.events_queue.push_back(
670 ConnectionHandlerEvent::OutboundSubstreamRequest {
671 protocol: SubstreamProtocol::new(proto, protocol_index)
672 .with_timeout(OPEN_TIMEOUT),
673 },
674 );
675 }
676
677 protocol_info.state = State::Opening { in_substream: None, inbound: false };
678 },
679 State::OpenDesiredByRemote { pending_opening, in_substream } => {
680 let handshake_message = protocol_info.config.handshake.read().clone();
681
682 if !*pending_opening {
683 let proto = NotificationsOut::new(
684 protocol_info.config.name.clone(),
685 protocol_info.config.fallback_names.clone(),
686 handshake_message.clone(),
687 protocol_info.config.max_notification_size,
688 peer_id,
689 );
690
691 self.events_queue.push_back(
692 ConnectionHandlerEvent::OutboundSubstreamRequest {
693 protocol: SubstreamProtocol::new(proto, protocol_index)
694 .with_timeout(OPEN_TIMEOUT),
695 },
696 );
697 }
698
699 in_substream.send_handshake(handshake_message);
700
701 let in_substream = match mem::replace(
703 &mut protocol_info.state,
704 State::Opening { in_substream: None, inbound: false },
705 ) {
706 State::OpenDesiredByRemote { in_substream, .. } => in_substream,
707 _ => unreachable!(),
708 };
709 protocol_info.state =
710 State::Opening { in_substream: Some(in_substream), inbound: true };
711 },
712 State::Opening { .. } | State::Open { .. } => {
713 log::error!(target: LOG_TARGET, "opening already-opened handler");
716 debug_assert!(false);
717 },
718 }
719 },
720
721 NotifsHandlerIn::Close { protocol_index } => {
722 match self.protocols[protocol_index].state {
723 State::Open { .. } => {
724 self.protocols[protocol_index].state =
725 State::Closed { pending_opening: false };
726 },
727 State::Opening { .. } => {
728 self.protocols[protocol_index].state =
729 State::Closed { pending_opening: true };
730
731 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
732 NotifsHandlerOut::OpenResultErr { protocol_index },
733 ));
734 },
735 State::OpenDesiredByRemote { pending_opening, .. } => {
736 self.protocols[protocol_index].state = State::Closed { pending_opening };
737 },
738 State::Closed { .. } => {},
739 }
740
741 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
742 NotifsHandlerOut::CloseResult { protocol_index },
743 ));
744 },
745 }
746 }
747
748 fn connection_keep_alive(&self) -> bool {
749 if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
751 return true;
752 }
753
754 self.keep_alive
755 }
756
757 fn poll(
758 &mut self,
759 cx: &mut Context,
760 ) -> Poll<
761 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
762 > {
763 {
764 let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future;
765 if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future {
766 if keep_alive_timeout_future.poll_unpin(cx).is_ready() {
767 maybe_keep_alive_timeout_future.take();
768 self.keep_alive = false;
769 }
770 }
771 }
772
773 if let Some(ev) = self.events_queue.pop_front() {
774 return Poll::Ready(ev);
775 }
776
777 for protocol_index in 0..self.protocols.len() {
780 if let State::Open {
781 notifications_sink_rx, out_substream: Some(out_substream), ..
782 } = &mut self.protocols[protocol_index].state
783 {
784 loop {
785 match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
789 Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
790 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
791 NotifsHandlerOut::Close { protocol_index },
792 ))
793 },
794 Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
795 Poll::Ready(None) | Poll::Pending => break,
796 }
797
798 match out_substream.poll_ready_unpin(cx) {
801 Poll::Ready(_) => {},
802 Poll::Pending => break,
803 }
804
805 let message = match notifications_sink_rx.poll_next_unpin(cx) {
807 Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
808 message
809 },
810 Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) |
811 Poll::Ready(None) |
812 Poll::Pending => {
813 debug_assert!(false);
815 break;
816 },
817 };
818
819 let _ = out_substream.start_send_unpin(message);
820 }
822 }
823 }
824
825 for protocol_index in 0..self.protocols.len() {
836 match &mut self.protocols[protocol_index].state {
837 State::Open { out_substream: out_substream @ Some(_), .. } => {
838 match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
839 Poll::Pending | Poll::Ready(Ok(())) => {},
840 Poll::Ready(Err(error)) => {
841 *out_substream = None;
842
843 let reason = match error {
844 NotificationsOutError::Io(_) | NotificationsOutError::Closed => {
845 CloseReason::RemoteRequest
846 },
847 NotificationsOutError::UnexpectedData => {
848 CloseReason::ProtocolMisbehavior
849 },
850 };
851
852 let event = NotifsHandlerOut::CloseDesired { protocol_index, reason };
853 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
854 },
855 };
856 },
857
858 State::Closed { .. } |
859 State::Opening { .. } |
860 State::Open { out_substream: None, .. } |
861 State::OpenDesiredByRemote { .. } => {},
862 }
863 }
864
865 for protocol_index in 0..self.protocols.len() {
867 match &mut self.protocols[protocol_index].state {
870 State::Closed { .. } |
871 State::Open { in_substream: None, .. } |
872 State::Opening { in_substream: None, .. } => {},
873
874 State::Open { in_substream: in_substream @ Some(_), .. } => {
875 match futures::prelude::stream::Stream::poll_next(
876 Pin::new(in_substream.as_mut().unwrap()),
877 cx,
878 ) {
879 Poll::Pending => {},
880 Poll::Ready(Some(Ok(message))) => {
881 let event = NotifsHandlerOut::Notification { protocol_index, message };
882 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
883 },
884 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
885 }
886 },
887
888 State::OpenDesiredByRemote { in_substream, pending_opening } => {
889 match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
890 Poll::Pending => {},
891 Poll::Ready(Ok(())) => {},
892 Poll::Ready(Err(_)) => {
893 self.protocols[protocol_index].state =
894 State::Closed { pending_opening: *pending_opening };
895 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
896 NotifsHandlerOut::CloseDesired {
897 protocol_index,
898 reason: CloseReason::RemoteRequest,
899 },
900 ));
901 },
902 }
903 },
904
905 State::Opening { in_substream: in_substream @ Some(_), .. } => {
906 match NotificationsInSubstream::poll_process(
907 Pin::new(in_substream.as_mut().unwrap()),
908 cx,
909 ) {
910 Poll::Pending => {},
911 Poll::Ready(Ok(())) => {},
912 Poll::Ready(Err(_)) => *in_substream = None,
913 }
914 },
915 }
916 }
917
918 Poll::Pending
922 }
923}
924
925#[cfg(test)]
926pub mod tests {
927 use super::*;
928 use crate::protocol::notifications::upgrade::{
929 NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
930 };
931 use asynchronous_codec::Framed;
932 use libp2p::{
933 core::muxing::SubstreamBox,
934 swarm::handler::{self, StreamUpgradeError},
935 };
936 use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
937 use std::{
938 collections::HashMap,
939 io::{Error, IoSlice, IoSliceMut},
940 };
941 use tokio::sync::mpsc;
942 use unsigned_varint::codec::UviBytes;
943
944 struct OpenSubstream {
945 notifications: stream::Peekable<
946 stream::Select<
947 stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
948 stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
949 >,
950 >,
951 _in_substream: MockSubstream,
952 _out_substream: MockSubstream,
953 }
954
955 pub struct ConnectionYielder {
956 connections: HashMap<(PeerId, usize), OpenSubstream>,
957 }
958
959 impl ConnectionYielder {
960 pub fn new() -> Self {
962 Self { connections: HashMap::new() }
963 }
964
965 pub fn open_substream(
967 &mut self,
968 peer: PeerId,
969 protocol_index: usize,
970 received_handshake: Vec<u8>,
971 ) -> NotifsHandlerOut {
972 let (async_tx, async_rx) =
973 futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
974 let (sync_tx, sync_rx) =
975 futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
976 let notifications_sink = NotificationsSink {
977 inner: Arc::new(NotificationsSinkInner {
978 peer_id: peer,
979 async_channel: FuturesMutex::new(async_tx),
980 sync_channel: Mutex::new(Some(sync_tx)),
981 }),
982 metrics: None,
983 };
984 let (in_substream, out_substream) = MockSubstream::new();
985
986 self.connections.insert(
987 (peer, protocol_index),
988 OpenSubstream {
989 notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
990 _in_substream: in_substream,
991 _out_substream: out_substream,
992 },
993 );
994
995 NotifsHandlerOut::OpenResultOk {
996 protocol_index,
997 negotiated_fallback: None,
998 received_handshake,
999 notifications_sink,
1000 inbound: false,
1001 }
1002 }
1003
1004 pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
1006 let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
1007 info
1008 } else {
1009 return None;
1010 };
1011
1012 futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
1013 Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
1014 Poll::Ready(Some(message))
1015 },
1016 Poll::Pending => Poll::Ready(None),
1017 Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
1018 panic!("sink closed")
1019 },
1020 })
1021 .await
1022 }
1023 }
1024
1025 struct MockSubstream {
1026 pub rx: mpsc::Receiver<Vec<u8>>,
1027 pub tx: mpsc::Sender<Vec<u8>>,
1028 rx_buffer: BytesMut,
1029 }
1030
1031 #[allow(dead_code)]
1033 struct MockActiveStreamCounter(Arc<()>);
1034
1035 #[allow(dead_code)]
1037 struct MockStream {
1038 stream: Negotiated<SubstreamBox>,
1039 counter: Option<MockActiveStreamCounter>,
1040 }
1041
1042 impl MockSubstream {
1043 pub fn new() -> (Self, Self) {
1045 let (tx1, rx1) = mpsc::channel(32);
1046 let (tx2, rx2) = mpsc::channel(32);
1047
1048 (
1049 Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
1050 Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
1051 )
1052 }
1053
1054 pub async fn negotiated() -> (Stream, Stream) {
1056 let (socket1, socket2) = Self::new();
1057 let socket1 = SubstreamBox::new(socket1);
1058 let socket2 = SubstreamBox::new(socket2);
1059
1060 let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
1061 let (res1, res2) = tokio::join!(
1062 dialer_select_proto(socket1, protos.clone(), Version::V1),
1063 listener_select_proto(socket2, protos),
1064 );
1065
1066 (Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1067 }
1068
1069 fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1071 let stream = MockStream { stream, counter: None };
1072 const _: () = {
1074 assert!(core::mem::size_of::<Stream>() == core::mem::size_of::<MockStream>());
1075 assert!(core::mem::align_of::<Stream>() == core::mem::align_of::<MockStream>());
1076 };
1077
1078 unsafe { core::mem::transmute(stream) }
1079 }
1080 }
1081
1082 impl AsyncWrite for MockSubstream {
1083 fn poll_write<'a>(
1084 self: Pin<&mut Self>,
1085 _cx: &mut Context<'a>,
1086 buf: &[u8],
1087 ) -> Poll<Result<usize, Error>> {
1088 match self.tx.try_send(buf.to_vec()) {
1089 Ok(_) => Poll::Ready(Ok(buf.len())),
1090 Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1091 }
1092 }
1093
1094 fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1095 Poll::Ready(Ok(()))
1096 }
1097
1098 fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1099 Poll::Ready(Ok(()))
1100 }
1101
1102 fn poll_write_vectored<'a, 'b>(
1103 self: Pin<&mut Self>,
1104 _cx: &mut Context<'a>,
1105 _bufs: &[IoSlice<'b>],
1106 ) -> Poll<Result<usize, Error>> {
1107 unimplemented!();
1108 }
1109 }
1110
1111 impl AsyncRead for MockSubstream {
1112 fn poll_read<'a>(
1113 mut self: Pin<&mut Self>,
1114 cx: &mut Context<'a>,
1115 buf: &mut [u8],
1116 ) -> Poll<Result<usize, Error>> {
1117 match self.rx.poll_recv(cx) {
1118 Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1119 Poll::Ready(None) => {
1120 return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()))
1121 },
1122 _ => {},
1123 }
1124
1125 let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1126 let data = self.rx_buffer.split_to(nsize);
1127 buf[..nsize].copy_from_slice(&data[..]);
1128
1129 if nsize > 0 {
1130 return Poll::Ready(Ok(nsize));
1131 }
1132
1133 Poll::Pending
1134 }
1135
1136 fn poll_read_vectored<'a, 'b>(
1137 self: Pin<&mut Self>,
1138 _cx: &mut Context<'a>,
1139 _bufs: &mut [IoSliceMut<'b>],
1140 ) -> Poll<Result<usize, Error>> {
1141 unimplemented!();
1142 }
1143 }
1144
1145 fn notifs_handler() -> NotifsHandler {
1147 NotifsHandler::new(
1148 PeerId::random(),
1149 vec![ProtocolConfig {
1150 name: "/foo".into(),
1151 fallback_names: vec![],
1152 handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1153 max_notification_size: u64::MAX,
1154 }],
1155 None,
1156 )
1157 }
1158
1159 #[tokio::test]
1162 async fn second_open_desired_by_remote_rejected() {
1163 let mut handler = notifs_handler();
1164 let (io, mut io2) = MockSubstream::negotiated().await;
1165 let mut codec = UviBytes::default();
1166 codec.set_max_len(usize::MAX);
1167
1168 let notif_in = NotificationsInOpen {
1169 handshake: b"hello, world".to_vec(),
1170 substream: NotificationsInSubstream::new(
1171 Framed::new(io, codec),
1172 NotificationsInSubstreamHandshake::NotSent,
1173 ),
1174 };
1175
1176 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1177 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1178 ));
1179
1180 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1182 futures::future::poll_fn(|cx| {
1183 let mut buf = Vec::with_capacity(512);
1184 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1185 Poll::Ready(())
1186 })
1187 .await;
1188
1189 let (io, mut io2) = MockSubstream::negotiated().await;
1191 let mut codec = UviBytes::default();
1192 codec.set_max_len(usize::MAX);
1193
1194 let notif_in = NotificationsInOpen {
1195 handshake: b"hello, world".to_vec(),
1196 substream: NotificationsInSubstream::new(
1197 Framed::new(io, codec),
1198 NotificationsInSubstreamHandshake::NotSent,
1199 ),
1200 };
1201
1202 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1203 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1204 ));
1205
1206 futures::future::poll_fn(|cx| {
1208 let mut buf = Vec::with_capacity(512);
1209
1210 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1211 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1212 }
1213
1214 Poll::Ready(())
1215 })
1216 .await;
1217 }
1218
1219 #[tokio::test]
1220 async fn open_rejected_if_substream_is_opening() {
1221 let mut handler = notifs_handler();
1222 let (io, mut io2) = MockSubstream::negotiated().await;
1223 let mut codec = UviBytes::default();
1224 codec.set_max_len(usize::MAX);
1225
1226 let notif_in = NotificationsInOpen {
1227 handshake: b"hello, world".to_vec(),
1228 substream: NotificationsInSubstream::new(
1229 Framed::new(io, codec),
1230 NotificationsInSubstreamHandshake::NotSent,
1231 ),
1232 };
1233
1234 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1235 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1236 ));
1237
1238 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1240 futures::future::poll_fn(|cx| {
1241 let mut buf = Vec::with_capacity(512);
1242 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1243 Poll::Ready(())
1244 })
1245 .await;
1246
1247 handler.on_behaviour_event(NotifsHandlerIn::Open {
1249 protocol_index: 0,
1250 peer_id: PeerId::random(),
1251 });
1252 assert!(std::matches!(
1253 handler.protocols[0].state,
1254 State::Opening { in_substream: Some(_), .. }
1255 ));
1256
1257 let (io, mut io2) = MockSubstream::negotiated().await;
1259 let mut codec = UviBytes::default();
1260 codec.set_max_len(usize::MAX);
1261
1262 let notif_in = NotificationsInOpen {
1263 handshake: b"hello, world".to_vec(),
1264 substream: NotificationsInSubstream::new(
1265 Framed::new(io, codec),
1266 NotificationsInSubstreamHandshake::NotSent,
1267 ),
1268 };
1269
1270 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1271 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1272 ));
1273
1274 futures::future::poll_fn(|cx| {
1277 let mut buf = Vec::with_capacity(512);
1278
1279 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1280 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1281 } else {
1282 panic!("unexpected result");
1283 }
1284
1285 Poll::Ready(())
1286 })
1287 .await;
1288 assert!(std::matches!(
1289 handler.protocols[0].state,
1290 State::Opening { in_substream: Some(_), .. }
1291 ));
1292 }
1293
1294 #[tokio::test]
1295 async fn open_rejected_if_substream_already_open() {
1296 let mut handler = notifs_handler();
1297 let (io, mut io2) = MockSubstream::negotiated().await;
1298 let mut codec = UviBytes::default();
1299 codec.set_max_len(usize::MAX);
1300
1301 let notif_in = NotificationsInOpen {
1302 handshake: b"hello, world".to_vec(),
1303 substream: NotificationsInSubstream::new(
1304 Framed::new(io, codec),
1305 NotificationsInSubstreamHandshake::NotSent,
1306 ),
1307 };
1308 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1309 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1310 ));
1311
1312 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1314 futures::future::poll_fn(|cx| {
1315 let mut buf = Vec::with_capacity(512);
1316 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1317 Poll::Ready(())
1318 })
1319 .await;
1320
1321 handler.on_behaviour_event(NotifsHandlerIn::Open {
1323 protocol_index: 0,
1324 peer_id: PeerId::random(),
1325 });
1326 assert!(std::matches!(
1327 handler.protocols[0].state,
1328 State::Opening { in_substream: Some(_), .. }
1329 ));
1330
1331 let (io, _io2) = MockSubstream::negotiated().await;
1333 let mut codec = UviBytes::default();
1334 codec.set_max_len(usize::MAX);
1335
1336 let notif_out = NotificationsOutOpen {
1337 handshake: b"hello, world".to_vec(),
1338 negotiated_fallback: None,
1339 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1340 };
1341 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1342 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1343 ));
1344
1345 assert!(std::matches!(
1346 handler.protocols[0].state,
1347 State::Open { in_substream: Some(_), .. }
1348 ));
1349
1350 let (io, mut io2) = MockSubstream::negotiated().await;
1352 let mut codec = UviBytes::default();
1353 codec.set_max_len(usize::MAX);
1354 let notif_in = NotificationsInOpen {
1355 handshake: b"hello, world".to_vec(),
1356 substream: NotificationsInSubstream::new(
1357 Framed::new(io, codec),
1358 NotificationsInSubstreamHandshake::NotSent,
1359 ),
1360 };
1361 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1362 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1363 ));
1364
1365 futures::future::poll_fn(|cx| {
1368 let mut buf = Vec::with_capacity(512);
1369
1370 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1371 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1372 } else {
1373 panic!("unexpected result");
1374 }
1375
1376 Poll::Ready(())
1377 })
1378 .await;
1379 assert!(std::matches!(
1380 handler.protocols[0].state,
1381 State::Open { in_substream: Some(_), .. }
1382 ));
1383 }
1384
1385 #[tokio::test]
1386 async fn fully_negotiated_resets_state_for_closed_substream() {
1387 let mut handler = notifs_handler();
1388 let (io, mut io2) = MockSubstream::negotiated().await;
1389 let mut codec = UviBytes::default();
1390 codec.set_max_len(usize::MAX);
1391
1392 let notif_in = NotificationsInOpen {
1393 handshake: b"hello, world".to_vec(),
1394 substream: NotificationsInSubstream::new(
1395 Framed::new(io, codec),
1396 NotificationsInSubstreamHandshake::NotSent,
1397 ),
1398 };
1399 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1400 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1401 ));
1402
1403 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1405 futures::future::poll_fn(|cx| {
1406 let mut buf = Vec::with_capacity(512);
1407 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1408 Poll::Ready(())
1409 })
1410 .await;
1411
1412 handler.on_behaviour_event(NotifsHandlerIn::Open {
1415 protocol_index: 0,
1416 peer_id: PeerId::random(),
1417 });
1418 assert!(std::matches!(
1419 handler.protocols[0].state,
1420 State::Opening { in_substream: Some(_), .. }
1421 ));
1422
1423 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1424 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1425
1426 let (io, _io2) = MockSubstream::negotiated().await;
1429 let mut codec = UviBytes::default();
1430 codec.set_max_len(usize::MAX);
1431
1432 let notif_out = NotificationsOutOpen {
1433 handshake: b"hello, world".to_vec(),
1434 negotiated_fallback: None,
1435 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1436 };
1437 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1438 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1439 ));
1440
1441 assert!(std::matches!(
1442 handler.protocols[0].state,
1443 State::Closed { pending_opening: false }
1444 ));
1445 }
1446
1447 #[tokio::test]
1448 async fn fully_negotiated_resets_state_for_open_desired_substream() {
1449 let mut handler = notifs_handler();
1450 let (io, mut io2) = MockSubstream::negotiated().await;
1451 let mut codec = UviBytes::default();
1452 codec.set_max_len(usize::MAX);
1453
1454 let notif_in = NotificationsInOpen {
1455 handshake: b"hello, world".to_vec(),
1456 substream: NotificationsInSubstream::new(
1457 Framed::new(io, codec),
1458 NotificationsInSubstreamHandshake::NotSent,
1459 ),
1460 };
1461 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1462 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1463 ));
1464
1465 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1467 futures::future::poll_fn(|cx| {
1468 let mut buf = Vec::with_capacity(512);
1469 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1470 Poll::Ready(())
1471 })
1472 .await;
1473
1474 handler.on_behaviour_event(NotifsHandlerIn::Open {
1477 protocol_index: 0,
1478 peer_id: PeerId::random(),
1479 });
1480 assert!(std::matches!(
1481 handler.protocols[0].state,
1482 State::Opening { in_substream: Some(_), .. }
1483 ));
1484
1485 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1486 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1487
1488 let (io, _io2) = MockSubstream::negotiated().await;
1490 let mut codec = UviBytes::default();
1491 codec.set_max_len(usize::MAX);
1492
1493 let notif_in = NotificationsInOpen {
1494 handshake: b"hello, world".to_vec(),
1495 substream: NotificationsInSubstream::new(
1496 Framed::new(io, codec),
1497 NotificationsInSubstreamHandshake::NotSent,
1498 ),
1499 };
1500 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1501 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1502 ));
1503
1504 assert!(std::matches!(
1505 handler.protocols[0].state,
1506 State::OpenDesiredByRemote { pending_opening: true, .. }
1507 ));
1508
1509 let (io, _io2) = MockSubstream::negotiated().await;
1512 let mut codec = UviBytes::default();
1513 codec.set_max_len(usize::MAX);
1514
1515 let notif_out = NotificationsOutOpen {
1516 handshake: b"hello, world".to_vec(),
1517 negotiated_fallback: None,
1518 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1519 };
1520
1521 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1522 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1523 ));
1524
1525 assert!(std::matches!(
1526 handler.protocols[0].state,
1527 State::OpenDesiredByRemote { pending_opening: false, .. }
1528 ));
1529 }
1530
1531 #[tokio::test]
1532 async fn dial_upgrade_error_resets_closed_outbound_state() {
1533 let mut handler = notifs_handler();
1534 let (io, mut io2) = MockSubstream::negotiated().await;
1535 let mut codec = UviBytes::default();
1536 codec.set_max_len(usize::MAX);
1537
1538 let notif_in = NotificationsInOpen {
1539 handshake: b"hello, world".to_vec(),
1540 substream: NotificationsInSubstream::new(
1541 Framed::new(io, codec),
1542 NotificationsInSubstreamHandshake::NotSent,
1543 ),
1544 };
1545 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1546 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1547 ));
1548
1549 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1551 futures::future::poll_fn(|cx| {
1552 let mut buf = Vec::with_capacity(512);
1553 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1554 Poll::Ready(())
1555 })
1556 .await;
1557
1558 handler.on_behaviour_event(NotifsHandlerIn::Open {
1561 protocol_index: 0,
1562 peer_id: PeerId::random(),
1563 });
1564 assert!(std::matches!(
1565 handler.protocols[0].state,
1566 State::Opening { in_substream: Some(_), .. }
1567 ));
1568
1569 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1570 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1571
1572 handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1574 handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1575 ));
1576 assert!(std::matches!(
1577 handler.protocols[0].state,
1578 State::Closed { pending_opening: false }
1579 ));
1580 }
1581
1582 #[tokio::test]
1583 async fn dial_upgrade_error_resets_open_desired_state() {
1584 let mut handler = notifs_handler();
1585 let (io, mut io2) = MockSubstream::negotiated().await;
1586 let mut codec = UviBytes::default();
1587 codec.set_max_len(usize::MAX);
1588
1589 let notif_in = NotificationsInOpen {
1590 handshake: b"hello, world".to_vec(),
1591 substream: NotificationsInSubstream::new(
1592 Framed::new(io, codec),
1593 NotificationsInSubstreamHandshake::NotSent,
1594 ),
1595 };
1596 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1597 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1598 ));
1599
1600 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1602 futures::future::poll_fn(|cx| {
1603 let mut buf = Vec::with_capacity(512);
1604 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1605 Poll::Ready(())
1606 })
1607 .await;
1608
1609 handler.on_behaviour_event(NotifsHandlerIn::Open {
1612 protocol_index: 0,
1613 peer_id: PeerId::random(),
1614 });
1615 assert!(std::matches!(
1616 handler.protocols[0].state,
1617 State::Opening { in_substream: Some(_), .. }
1618 ));
1619
1620 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1621 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1622
1623 let (io, _io2) = MockSubstream::negotiated().await;
1624 let mut codec = UviBytes::default();
1625 codec.set_max_len(usize::MAX);
1626
1627 let notif_in = NotificationsInOpen {
1628 handshake: b"hello, world".to_vec(),
1629 substream: NotificationsInSubstream::new(
1630 Framed::new(io, codec),
1631 NotificationsInSubstreamHandshake::NotSent,
1632 ),
1633 };
1634 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1635 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1636 ));
1637
1638 assert!(std::matches!(
1639 handler.protocols[0].state,
1640 State::OpenDesiredByRemote { pending_opening: true, .. }
1641 ));
1642
1643 handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1645 handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1646 ));
1647 assert!(std::matches!(
1648 handler.protocols[0].state,
1649 State::OpenDesiredByRemote { pending_opening: false, .. }
1650 ));
1651 }
1652
1653 #[tokio::test]
1654 async fn sync_notifications_clogged() {
1655 let mut handler = notifs_handler();
1656 let (io, _) = MockSubstream::negotiated().await;
1657 let codec = UviBytes::default();
1658
1659 let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1660 let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1661 let notifications_sink = NotificationsSink {
1662 inner: Arc::new(NotificationsSinkInner {
1663 peer_id: PeerId::random(),
1664 async_channel: FuturesMutex::new(async_tx),
1665 sync_channel: Mutex::new(Some(sync_tx)),
1666 }),
1667 metrics: None,
1668 };
1669
1670 handler.protocols[0].state = State::Open {
1671 notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1672 out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1673 in_substream: None,
1674 };
1675
1676 notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1677 notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1678 notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1679 notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1680
1681 futures::future::poll_fn(|cx| {
1682 assert!(std::matches!(
1683 handler.poll(cx),
1684 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1685 NotifsHandlerOut::Close { .. }
1686 ))
1687 ));
1688 Poll::Ready(())
1689 })
1690 .await;
1691 }
1692
1693 #[tokio::test]
1694 async fn close_desired_by_remote() {
1695 let mut handler = notifs_handler();
1696 let (io, io2) = MockSubstream::negotiated().await;
1697 let mut codec = UviBytes::default();
1698 codec.set_max_len(usize::MAX);
1699
1700 let notif_in = NotificationsInOpen {
1701 handshake: b"hello, world".to_vec(),
1702 substream: NotificationsInSubstream::new(
1703 Framed::new(io, codec),
1704 NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1705 ),
1706 };
1707
1708 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1711 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1712 ));
1713 drop(io2);
1714
1715 futures::future::poll_fn(|cx| {
1716 assert!(std::matches!(
1717 handler.poll(cx),
1718 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1719 NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1720 ))
1721 ));
1722 assert!(std::matches!(
1723 handler.poll(cx),
1724 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1725 NotifsHandlerOut::CloseDesired {
1726 protocol_index: 0,
1727 reason: CloseReason::RemoteRequest,
1728 },
1729 ))
1730 ));
1731 Poll::Ready(())
1732 })
1733 .await;
1734 }
1735}