1use crate::{
49 protocol::notifications::upgrade::{
50 NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutError,
51 NotificationsOutSubstream, UpgradeCollec,
52 },
53 service::metrics::NotificationMetrics,
54 types::ProtocolName,
55};
56
57use bytes::BytesMut;
58use futures::{
59 channel::mpsc,
60 lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
61 prelude::*,
62};
63use libp2p::{
64 swarm::{
65 handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream,
66 SubstreamProtocol,
67 },
68 PeerId,
69};
70
71use parking_lot::{Mutex, RwLock};
72use std::{
73 collections::VecDeque,
74 mem,
75 pin::Pin,
76 sync::Arc,
77 task::{Context, Poll},
78 time::Duration,
79};
80
81const LOG_TARGET: &str = "sub-libp2p::notification::handler";
83
84pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
87
88const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
90
91const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
94
95const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
99
100pub struct NotifsHandler {
104 protocols: Vec<Protocol>,
106
107 keep_alive: bool,
109
110 keep_alive_timeout_future: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
113
114 peer_id: PeerId,
116
117 events_queue: VecDeque<ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut>>,
119
120 metrics: Option<Arc<NotificationMetrics>>,
122}
123
124impl NotifsHandler {
125 pub fn new(
127 peer_id: PeerId,
128 protocols: Vec<ProtocolConfig>,
129 metrics: Option<NotificationMetrics>,
130 ) -> Self {
131 Self {
132 protocols: protocols
133 .into_iter()
134 .map(|config| {
135 let in_upgrade = NotificationsIn::new(
136 config.name.clone(),
137 config.fallback_names.clone(),
138 config.max_notification_size,
139 );
140
141 Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
142 })
143 .collect(),
144 peer_id,
145 keep_alive: true,
147 keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))),
151 events_queue: VecDeque::with_capacity(16),
152 metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
153 }
154 }
155}
156
157#[derive(Debug, Clone)]
159pub struct ProtocolConfig {
160 pub name: ProtocolName,
162 pub fallback_names: Vec<ProtocolName>,
164 pub handshake: Arc<RwLock<Vec<u8>>>,
166 pub max_notification_size: u64,
168}
169
170struct Protocol {
172 config: ProtocolConfig,
174
175 in_upgrade: NotificationsIn,
177
178 state: State,
180}
181
182enum State {
184 Closed {
186 pending_opening: bool,
188 },
189
190 OpenDesiredByRemote {
193 in_substream: NotificationsInSubstream<Stream>,
195
196 pending_opening: bool,
198 },
199
200 Opening {
206 in_substream: Option<NotificationsInSubstream<Stream>>,
208 inbound: bool,
210 },
211
212 Open {
214 notifications_sink_rx: stream::Peekable<
220 stream::Select<
221 stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
222 stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
223 >,
224 >,
225
226 out_substream: Option<NotificationsOutSubstream<Stream>>,
232
233 in_substream: Option<NotificationsInSubstream<Stream>>,
239 },
240}
241
242#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244pub enum CloseReason {
245 RemoteRequest,
249
250 ProtocolMisbehavior,
254}
255
256#[derive(Debug, Clone)]
258pub enum NotifsHandlerIn {
259 Open {
267 protocol_index: usize,
269
270 peer_id: PeerId,
272 },
273
274 Close {
279 protocol_index: usize,
281 },
282}
283
284#[derive(Debug)]
286pub enum NotifsHandlerOut {
287 OpenResultOk {
289 protocol_index: usize,
291 negotiated_fallback: Option<ProtocolName>,
293 received_handshake: Vec<u8>,
296 notifications_sink: NotificationsSink,
298 inbound: bool,
300 },
301
302 OpenResultErr {
305 protocol_index: usize,
307 },
308
309 CloseResult {
311 protocol_index: usize,
313 },
314
315 OpenDesiredByRemote {
321 protocol_index: usize,
323 handshake: Vec<u8>,
325 },
326
327 CloseDesired {
333 protocol_index: usize,
335
336 reason: CloseReason,
338 },
339
340 Notification {
344 protocol_index: usize,
346 message: BytesMut,
348 },
349
350 Close {
352 protocol_index: usize,
354 },
355}
356
357#[derive(Debug, Clone)]
361pub struct NotificationsSink {
362 inner: Arc<NotificationsSinkInner>,
363 metrics: Option<Arc<NotificationMetrics>>,
364}
365
366impl NotificationsSink {
367 pub fn new(
370 peer_id: PeerId,
371 ) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
372 {
373 let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
374 let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
375 (
376 NotificationsSink {
377 inner: Arc::new(NotificationsSinkInner {
378 peer_id,
379 async_channel: FuturesMutex::new(async_tx),
380 sync_channel: Mutex::new(Some(sync_tx)),
381 }),
382 metrics: None,
383 },
384 async_rx,
385 sync_rx,
386 )
387 }
388
389 pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
391 &self.metrics
392 }
393}
394
395#[derive(Debug)]
396struct NotificationsSinkInner {
397 peer_id: PeerId,
399 async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
401 sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
408}
409
410#[derive(Debug, PartialEq, Eq)]
413pub enum NotificationsSinkMessage {
414 Notification { message: Vec<u8> },
417
418 ForceClose,
420}
421
422impl NotificationsSink {
423 pub fn peer_id(&self) -> &PeerId {
425 &self.inner.peer_id
426 }
427
428 pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
438 let mut lock = self.inner.sync_channel.lock();
439
440 if let Some(tx) = lock.as_mut() {
441 let message = message.into();
442 let result = tx.try_send(NotificationsSinkMessage::Notification { message });
443
444 if result.is_err() {
445 let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
448 debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
449
450 *lock = None;
452 }
453 }
454 }
455
456 pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
463 let mut lock = self.inner.async_channel.lock().await;
464
465 let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
466 if poll_ready.is_ok() {
467 Ok(Ready { lock })
468 } else {
469 Err(())
470 }
471 }
472}
473
474#[must_use]
476#[derive(Debug)]
477pub struct Ready<'a> {
478 lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
480}
481
482impl<'a> Ready<'a> {
483 pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
487 self.lock
488 .start_send(NotificationsSinkMessage::Notification { message: notification.into() })
489 .map_err(|_| ())
490 }
491}
492
493impl ConnectionHandler for NotifsHandler {
494 type FromBehaviour = NotifsHandlerIn;
495 type ToBehaviour = NotifsHandlerOut;
496 type InboundProtocol = UpgradeCollec<NotificationsIn>;
497 type OutboundProtocol = NotificationsOut;
498 type OutboundOpenInfo = usize;
500 type InboundOpenInfo = ();
501
502 fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
503 let protocols = self
504 .protocols
505 .iter()
506 .map(|p| p.in_upgrade.clone())
507 .collect::<UpgradeCollec<_>>();
508
509 SubstreamProtocol::new(protocols, ())
510 }
511
512 fn on_connection_event(
513 &mut self,
514 event: ConnectionEvent<
515 '_,
516 Self::InboundProtocol,
517 Self::OutboundProtocol,
518 Self::InboundOpenInfo,
519 Self::OutboundOpenInfo,
520 >,
521 ) {
522 match event {
523 ConnectionEvent::FullyNegotiatedInbound(inbound) => {
524 let (mut in_substream_open, protocol_index) = inbound.protocol;
525 let protocol_info = &mut self.protocols[protocol_index];
526
527 match protocol_info.state {
528 State::Closed { pending_opening } => {
529 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
530 NotifsHandlerOut::OpenDesiredByRemote {
531 protocol_index,
532 handshake: in_substream_open.handshake,
533 },
534 ));
535
536 protocol_info.state = State::OpenDesiredByRemote {
537 in_substream: in_substream_open.substream,
538 pending_opening,
539 };
540 },
541 State::OpenDesiredByRemote { .. } => {
542 return;
550 },
551 State::Opening { ref mut in_substream, .. }
552 | State::Open { ref mut in_substream, .. } => {
553 if in_substream.is_some() {
554 return;
556 }
557
558 let handshake_message = protocol_info.config.handshake.read().clone();
561 in_substream_open.substream.send_handshake(handshake_message);
562 *in_substream = Some(in_substream_open.substream);
563 },
564 }
565 },
566 ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
567 let (new_open, protocol_index) = (outbound.protocol, outbound.info);
568
569 match self.protocols[protocol_index].state {
570 State::Closed { ref mut pending_opening }
571 | State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
572 debug_assert!(*pending_opening);
573 *pending_opening = false;
574 },
575 State::Open { .. } => {
576 log::error!(target: LOG_TARGET, "☎️ State mismatch in notifications handler");
577 debug_assert!(false);
578 },
579 State::Opening { ref mut in_substream, inbound } => {
580 let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
581 let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
582 let notifications_sink = NotificationsSink {
583 inner: Arc::new(NotificationsSinkInner {
584 peer_id: self.peer_id,
585 async_channel: FuturesMutex::new(async_tx),
586 sync_channel: Mutex::new(Some(sync_tx)),
587 }),
588 metrics: self.metrics.clone(),
589 };
590
591 self.protocols[protocol_index].state = State::Open {
592 notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
593 .peekable(),
594 out_substream: Some(new_open.substream),
595 in_substream: in_substream.take(),
596 };
597
598 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
599 NotifsHandlerOut::OpenResultOk {
600 protocol_index,
601 negotiated_fallback: new_open.negotiated_fallback,
602 received_handshake: new_open.handshake,
603 notifications_sink,
604 inbound,
605 },
606 ));
607 },
608 }
609 },
610 ConnectionEvent::AddressChange(_address_change) => {},
611 ConnectionEvent::LocalProtocolsChange(_) => {},
612 ConnectionEvent::RemoteProtocolsChange(_) => {},
613 ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
614 [dial_upgrade_error.info]
615 .state
616 {
617 State::Closed { ref mut pending_opening }
618 | State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
619 debug_assert!(*pending_opening);
620 *pending_opening = false;
621 },
622
623 State::Opening { .. } => {
624 self.protocols[dial_upgrade_error.info].state =
625 State::Closed { pending_opening: false };
626
627 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
628 NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
629 ));
630 },
631
632 State::Open { .. } => debug_assert!(false),
634 },
635 ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
636 event => {
637 log::warn!(target: LOG_TARGET, "New unknown `ConnectionEvent` libp2p event: {event:?}");
638 },
639 }
640 }
641
642 fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
643 match message {
644 NotifsHandlerIn::Open { protocol_index, peer_id } => {
645 let protocol_info = &mut self.protocols[protocol_index];
646 match &mut protocol_info.state {
647 State::Closed { pending_opening } => {
648 if !*pending_opening {
649 let proto = NotificationsOut::new(
650 protocol_info.config.name.clone(),
651 protocol_info.config.fallback_names.clone(),
652 protocol_info.config.handshake.read().clone(),
653 protocol_info.config.max_notification_size,
654 peer_id,
655 );
656
657 self.events_queue.push_back(
658 ConnectionHandlerEvent::OutboundSubstreamRequest {
659 protocol: SubstreamProtocol::new(proto, protocol_index)
660 .with_timeout(OPEN_TIMEOUT),
661 },
662 );
663 }
664
665 protocol_info.state = State::Opening { in_substream: None, inbound: false };
666 },
667 State::OpenDesiredByRemote { pending_opening, in_substream } => {
668 let handshake_message = protocol_info.config.handshake.read().clone();
669
670 if !*pending_opening {
671 let proto = NotificationsOut::new(
672 protocol_info.config.name.clone(),
673 protocol_info.config.fallback_names.clone(),
674 handshake_message.clone(),
675 protocol_info.config.max_notification_size,
676 peer_id,
677 );
678
679 self.events_queue.push_back(
680 ConnectionHandlerEvent::OutboundSubstreamRequest {
681 protocol: SubstreamProtocol::new(proto, protocol_index)
682 .with_timeout(OPEN_TIMEOUT),
683 },
684 );
685 }
686
687 in_substream.send_handshake(handshake_message);
688
689 let in_substream = match mem::replace(
691 &mut protocol_info.state,
692 State::Opening { in_substream: None, inbound: false },
693 ) {
694 State::OpenDesiredByRemote { in_substream, .. } => in_substream,
695 _ => unreachable!(),
696 };
697 protocol_info.state =
698 State::Opening { in_substream: Some(in_substream), inbound: true };
699 },
700 State::Opening { .. } | State::Open { .. } => {
701 log::error!(target: LOG_TARGET, "opening already-opened handler");
704 debug_assert!(false);
705 },
706 }
707 },
708
709 NotifsHandlerIn::Close { protocol_index } => {
710 match self.protocols[protocol_index].state {
711 State::Open { .. } => {
712 self.protocols[protocol_index].state =
713 State::Closed { pending_opening: false };
714 },
715 State::Opening { .. } => {
716 self.protocols[protocol_index].state =
717 State::Closed { pending_opening: true };
718
719 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
720 NotifsHandlerOut::OpenResultErr { protocol_index },
721 ));
722 },
723 State::OpenDesiredByRemote { pending_opening, .. } => {
724 self.protocols[protocol_index].state = State::Closed { pending_opening };
725 },
726 State::Closed { .. } => {},
727 }
728
729 self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
730 NotifsHandlerOut::CloseResult { protocol_index },
731 ));
732 },
733 }
734 }
735
736 fn connection_keep_alive(&self) -> bool {
737 if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
739 return true;
740 }
741
742 self.keep_alive
743 }
744
745 fn poll(
746 &mut self,
747 cx: &mut Context,
748 ) -> Poll<
749 ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
750 > {
751 {
752 let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future;
753 if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future {
754 if keep_alive_timeout_future.poll_unpin(cx).is_ready() {
755 maybe_keep_alive_timeout_future.take();
756 self.keep_alive = false;
757 }
758 }
759 }
760
761 if let Some(ev) = self.events_queue.pop_front() {
762 return Poll::Ready(ev);
763 }
764
765 for protocol_index in 0..self.protocols.len() {
768 if let State::Open {
769 notifications_sink_rx, out_substream: Some(out_substream), ..
770 } = &mut self.protocols[protocol_index].state
771 {
772 loop {
773 match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
777 Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
778 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
779 NotifsHandlerOut::Close { protocol_index },
780 ))
781 },
782 Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
783 Poll::Ready(None) | Poll::Pending => break,
784 }
785
786 match out_substream.poll_ready_unpin(cx) {
789 Poll::Ready(_) => {},
790 Poll::Pending => break,
791 }
792
793 let message = match notifications_sink_rx.poll_next_unpin(cx) {
795 Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
796 message
797 },
798 Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
799 | Poll::Ready(None)
800 | Poll::Pending => {
801 debug_assert!(false);
803 break;
804 },
805 };
806
807 let _ = out_substream.start_send_unpin(message);
808 }
810 }
811 }
812
813 for protocol_index in 0..self.protocols.len() {
824 match &mut self.protocols[protocol_index].state {
825 State::Open { out_substream: out_substream @ Some(_), .. } => {
826 match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
827 Poll::Pending | Poll::Ready(Ok(())) => {},
828 Poll::Ready(Err(error)) => {
829 *out_substream = None;
830
831 let reason = match error {
832 NotificationsOutError::Io(_) | NotificationsOutError::Closed => {
833 CloseReason::RemoteRequest
834 },
835 NotificationsOutError::UnexpectedData => {
836 CloseReason::ProtocolMisbehavior
837 },
838 };
839
840 let event = NotifsHandlerOut::CloseDesired { protocol_index, reason };
841 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
842 },
843 };
844 },
845
846 State::Closed { .. }
847 | State::Opening { .. }
848 | State::Open { out_substream: None, .. }
849 | State::OpenDesiredByRemote { .. } => {},
850 }
851 }
852
853 for protocol_index in 0..self.protocols.len() {
855 match &mut self.protocols[protocol_index].state {
858 State::Closed { .. }
859 | State::Open { in_substream: None, .. }
860 | State::Opening { in_substream: None, .. } => {},
861
862 State::Open { in_substream: in_substream @ Some(_), .. } => {
863 match futures::prelude::stream::Stream::poll_next(
864 Pin::new(in_substream.as_mut().unwrap()),
865 cx,
866 ) {
867 Poll::Pending => {},
868 Poll::Ready(Some(Ok(message))) => {
869 let event = NotifsHandlerOut::Notification { protocol_index, message };
870 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
871 },
872 Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
873 }
874 },
875
876 State::OpenDesiredByRemote { in_substream, pending_opening } => {
877 match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
878 Poll::Pending => {},
879 Poll::Ready(Ok(())) => {},
880 Poll::Ready(Err(_)) => {
881 self.protocols[protocol_index].state =
882 State::Closed { pending_opening: *pending_opening };
883 return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
884 NotifsHandlerOut::CloseDesired {
885 protocol_index,
886 reason: CloseReason::RemoteRequest,
887 },
888 ));
889 },
890 }
891 },
892
893 State::Opening { in_substream: in_substream @ Some(_), .. } => {
894 match NotificationsInSubstream::poll_process(
895 Pin::new(in_substream.as_mut().unwrap()),
896 cx,
897 ) {
898 Poll::Pending => {},
899 Poll::Ready(Ok(())) => {},
900 Poll::Ready(Err(_)) => *in_substream = None,
901 }
902 },
903 }
904 }
905
906 Poll::Pending
910 }
911}
912
913#[cfg(test)]
914pub mod tests {
915 use super::*;
916 use crate::protocol::notifications::upgrade::{
917 NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
918 };
919 use asynchronous_codec::Framed;
920 use libp2p::{
921 core::muxing::SubstreamBox,
922 swarm::handler::{self, StreamUpgradeError},
923 };
924 use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
925 use std::{
926 collections::HashMap,
927 io::{Error, IoSlice, IoSliceMut},
928 };
929 use tokio::sync::mpsc;
930 use unsigned_varint::codec::UviBytes;
931
932 struct OpenSubstream {
933 notifications: stream::Peekable<
934 stream::Select<
935 stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
936 stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
937 >,
938 >,
939 _in_substream: MockSubstream,
940 _out_substream: MockSubstream,
941 }
942
943 pub struct ConnectionYielder {
944 connections: HashMap<(PeerId, usize), OpenSubstream>,
945 }
946
947 impl ConnectionYielder {
948 pub fn new() -> Self {
950 Self { connections: HashMap::new() }
951 }
952
953 pub fn open_substream(
955 &mut self,
956 peer: PeerId,
957 protocol_index: usize,
958 received_handshake: Vec<u8>,
959 ) -> NotifsHandlerOut {
960 let (async_tx, async_rx) =
961 futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
962 let (sync_tx, sync_rx) =
963 futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
964 let notifications_sink = NotificationsSink {
965 inner: Arc::new(NotificationsSinkInner {
966 peer_id: peer,
967 async_channel: FuturesMutex::new(async_tx),
968 sync_channel: Mutex::new(Some(sync_tx)),
969 }),
970 metrics: None,
971 };
972 let (in_substream, out_substream) = MockSubstream::new();
973
974 self.connections.insert(
975 (peer, protocol_index),
976 OpenSubstream {
977 notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
978 _in_substream: in_substream,
979 _out_substream: out_substream,
980 },
981 );
982
983 NotifsHandlerOut::OpenResultOk {
984 protocol_index,
985 negotiated_fallback: None,
986 received_handshake,
987 notifications_sink,
988 inbound: false,
989 }
990 }
991
992 pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
994 let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
995 info
996 } else {
997 return None;
998 };
999
1000 futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
1001 Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
1002 Poll::Ready(Some(message))
1003 },
1004 Poll::Pending => Poll::Ready(None),
1005 Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
1006 panic!("sink closed")
1007 },
1008 })
1009 .await
1010 }
1011 }
1012
1013 struct MockSubstream {
1014 pub rx: mpsc::Receiver<Vec<u8>>,
1015 pub tx: mpsc::Sender<Vec<u8>>,
1016 rx_buffer: BytesMut,
1017 }
1018
1019 #[allow(dead_code)]
1021 struct MockActiveStreamCounter(Arc<()>);
1022
1023 #[allow(dead_code)]
1025 struct MockStream {
1026 stream: Negotiated<SubstreamBox>,
1027 counter: Option<MockActiveStreamCounter>,
1028 }
1029
1030 impl MockSubstream {
1031 pub fn new() -> (Self, Self) {
1033 let (tx1, rx1) = mpsc::channel(32);
1034 let (tx2, rx2) = mpsc::channel(32);
1035
1036 (
1037 Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
1038 Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
1039 )
1040 }
1041
1042 pub async fn negotiated() -> (Stream, Stream) {
1044 let (socket1, socket2) = Self::new();
1045 let socket1 = SubstreamBox::new(socket1);
1046 let socket2 = SubstreamBox::new(socket2);
1047
1048 let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
1049 let (res1, res2) = tokio::join!(
1050 dialer_select_proto(socket1, protos.clone(), Version::V1),
1051 listener_select_proto(socket2, protos),
1052 );
1053
1054 (Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1055 }
1056
1057 fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1059 let stream = MockStream { stream, counter: None };
1060 const _: () = {
1062 assert!(core::mem::size_of::<Stream>() == core::mem::size_of::<MockStream>());
1063 assert!(core::mem::align_of::<Stream>() == core::mem::align_of::<MockStream>());
1064 };
1065
1066 unsafe { core::mem::transmute(stream) }
1067 }
1068 }
1069
1070 impl AsyncWrite for MockSubstream {
1071 fn poll_write<'a>(
1072 self: Pin<&mut Self>,
1073 _cx: &mut Context<'a>,
1074 buf: &[u8],
1075 ) -> Poll<Result<usize, Error>> {
1076 match self.tx.try_send(buf.to_vec()) {
1077 Ok(_) => Poll::Ready(Ok(buf.len())),
1078 Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1079 }
1080 }
1081
1082 fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1083 Poll::Ready(Ok(()))
1084 }
1085
1086 fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1087 Poll::Ready(Ok(()))
1088 }
1089
1090 fn poll_write_vectored<'a, 'b>(
1091 self: Pin<&mut Self>,
1092 _cx: &mut Context<'a>,
1093 _bufs: &[IoSlice<'b>],
1094 ) -> Poll<Result<usize, Error>> {
1095 unimplemented!();
1096 }
1097 }
1098
1099 impl AsyncRead for MockSubstream {
1100 fn poll_read<'a>(
1101 mut self: Pin<&mut Self>,
1102 cx: &mut Context<'a>,
1103 buf: &mut [u8],
1104 ) -> Poll<Result<usize, Error>> {
1105 match self.rx.poll_recv(cx) {
1106 Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1107 Poll::Ready(None) => {
1108 return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()))
1109 },
1110 _ => {},
1111 }
1112
1113 let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1114 let data = self.rx_buffer.split_to(nsize);
1115 buf[..nsize].copy_from_slice(&data[..]);
1116
1117 if nsize > 0 {
1118 return Poll::Ready(Ok(nsize));
1119 }
1120
1121 Poll::Pending
1122 }
1123
1124 fn poll_read_vectored<'a, 'b>(
1125 self: Pin<&mut Self>,
1126 _cx: &mut Context<'a>,
1127 _bufs: &mut [IoSliceMut<'b>],
1128 ) -> Poll<Result<usize, Error>> {
1129 unimplemented!();
1130 }
1131 }
1132
1133 fn notifs_handler() -> NotifsHandler {
1135 NotifsHandler::new(
1136 PeerId::random(),
1137 vec![ProtocolConfig {
1138 name: "/foo".into(),
1139 fallback_names: vec![],
1140 handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1141 max_notification_size: u64::MAX,
1142 }],
1143 None,
1144 )
1145 }
1146
1147 #[tokio::test]
1150 async fn second_open_desired_by_remote_rejected() {
1151 let mut handler = notifs_handler();
1152 let (io, mut io2) = MockSubstream::negotiated().await;
1153 let mut codec = UviBytes::default();
1154 codec.set_max_len(usize::MAX);
1155
1156 let notif_in = NotificationsInOpen {
1157 handshake: b"hello, world".to_vec(),
1158 substream: NotificationsInSubstream::new(
1159 Framed::new(io, codec),
1160 NotificationsInSubstreamHandshake::NotSent,
1161 ),
1162 };
1163
1164 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1165 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1166 ));
1167
1168 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1170 futures::future::poll_fn(|cx| {
1171 let mut buf = Vec::with_capacity(512);
1172 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1173 Poll::Ready(())
1174 })
1175 .await;
1176
1177 let (io, mut io2) = MockSubstream::negotiated().await;
1179 let mut codec = UviBytes::default();
1180 codec.set_max_len(usize::MAX);
1181
1182 let notif_in = NotificationsInOpen {
1183 handshake: b"hello, world".to_vec(),
1184 substream: NotificationsInSubstream::new(
1185 Framed::new(io, codec),
1186 NotificationsInSubstreamHandshake::NotSent,
1187 ),
1188 };
1189
1190 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1191 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1192 ));
1193
1194 futures::future::poll_fn(|cx| {
1196 let mut buf = Vec::with_capacity(512);
1197
1198 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1199 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1200 }
1201
1202 Poll::Ready(())
1203 })
1204 .await;
1205 }
1206
1207 #[tokio::test]
1208 async fn open_rejected_if_substream_is_opening() {
1209 let mut handler = notifs_handler();
1210 let (io, mut io2) = MockSubstream::negotiated().await;
1211 let mut codec = UviBytes::default();
1212 codec.set_max_len(usize::MAX);
1213
1214 let notif_in = NotificationsInOpen {
1215 handshake: b"hello, world".to_vec(),
1216 substream: NotificationsInSubstream::new(
1217 Framed::new(io, codec),
1218 NotificationsInSubstreamHandshake::NotSent,
1219 ),
1220 };
1221
1222 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1223 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1224 ));
1225
1226 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1228 futures::future::poll_fn(|cx| {
1229 let mut buf = Vec::with_capacity(512);
1230 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1231 Poll::Ready(())
1232 })
1233 .await;
1234
1235 handler.on_behaviour_event(NotifsHandlerIn::Open {
1237 protocol_index: 0,
1238 peer_id: PeerId::random(),
1239 });
1240 assert!(std::matches!(
1241 handler.protocols[0].state,
1242 State::Opening { in_substream: Some(_), .. }
1243 ));
1244
1245 let (io, mut io2) = MockSubstream::negotiated().await;
1247 let mut codec = UviBytes::default();
1248 codec.set_max_len(usize::MAX);
1249
1250 let notif_in = NotificationsInOpen {
1251 handshake: b"hello, world".to_vec(),
1252 substream: NotificationsInSubstream::new(
1253 Framed::new(io, codec),
1254 NotificationsInSubstreamHandshake::NotSent,
1255 ),
1256 };
1257
1258 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1259 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1260 ));
1261
1262 futures::future::poll_fn(|cx| {
1265 let mut buf = Vec::with_capacity(512);
1266
1267 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1268 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1269 } else {
1270 panic!("unexpected result");
1271 }
1272
1273 Poll::Ready(())
1274 })
1275 .await;
1276 assert!(std::matches!(
1277 handler.protocols[0].state,
1278 State::Opening { in_substream: Some(_), .. }
1279 ));
1280 }
1281
1282 #[tokio::test]
1283 async fn open_rejected_if_substream_already_open() {
1284 let mut handler = notifs_handler();
1285 let (io, mut io2) = MockSubstream::negotiated().await;
1286 let mut codec = UviBytes::default();
1287 codec.set_max_len(usize::MAX);
1288
1289 let notif_in = NotificationsInOpen {
1290 handshake: b"hello, world".to_vec(),
1291 substream: NotificationsInSubstream::new(
1292 Framed::new(io, codec),
1293 NotificationsInSubstreamHandshake::NotSent,
1294 ),
1295 };
1296 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1297 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1298 ));
1299
1300 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1302 futures::future::poll_fn(|cx| {
1303 let mut buf = Vec::with_capacity(512);
1304 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1305 Poll::Ready(())
1306 })
1307 .await;
1308
1309 handler.on_behaviour_event(NotifsHandlerIn::Open {
1311 protocol_index: 0,
1312 peer_id: PeerId::random(),
1313 });
1314 assert!(std::matches!(
1315 handler.protocols[0].state,
1316 State::Opening { in_substream: Some(_), .. }
1317 ));
1318
1319 let (io, _io2) = MockSubstream::negotiated().await;
1321 let mut codec = UviBytes::default();
1322 codec.set_max_len(usize::MAX);
1323
1324 let notif_out = NotificationsOutOpen {
1325 handshake: b"hello, world".to_vec(),
1326 negotiated_fallback: None,
1327 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1328 };
1329 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1330 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1331 ));
1332
1333 assert!(std::matches!(
1334 handler.protocols[0].state,
1335 State::Open { in_substream: Some(_), .. }
1336 ));
1337
1338 let (io, mut io2) = MockSubstream::negotiated().await;
1340 let mut codec = UviBytes::default();
1341 codec.set_max_len(usize::MAX);
1342 let notif_in = NotificationsInOpen {
1343 handshake: b"hello, world".to_vec(),
1344 substream: NotificationsInSubstream::new(
1345 Framed::new(io, codec),
1346 NotificationsInSubstreamHandshake::NotSent,
1347 ),
1348 };
1349 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1350 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1351 ));
1352
1353 futures::future::poll_fn(|cx| {
1356 let mut buf = Vec::with_capacity(512);
1357
1358 if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1359 assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1360 } else {
1361 panic!("unexpected result");
1362 }
1363
1364 Poll::Ready(())
1365 })
1366 .await;
1367 assert!(std::matches!(
1368 handler.protocols[0].state,
1369 State::Open { in_substream: Some(_), .. }
1370 ));
1371 }
1372
1373 #[tokio::test]
1374 async fn fully_negotiated_resets_state_for_closed_substream() {
1375 let mut handler = notifs_handler();
1376 let (io, mut io2) = MockSubstream::negotiated().await;
1377 let mut codec = UviBytes::default();
1378 codec.set_max_len(usize::MAX);
1379
1380 let notif_in = NotificationsInOpen {
1381 handshake: b"hello, world".to_vec(),
1382 substream: NotificationsInSubstream::new(
1383 Framed::new(io, codec),
1384 NotificationsInSubstreamHandshake::NotSent,
1385 ),
1386 };
1387 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1388 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1389 ));
1390
1391 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1393 futures::future::poll_fn(|cx| {
1394 let mut buf = Vec::with_capacity(512);
1395 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1396 Poll::Ready(())
1397 })
1398 .await;
1399
1400 handler.on_behaviour_event(NotifsHandlerIn::Open {
1403 protocol_index: 0,
1404 peer_id: PeerId::random(),
1405 });
1406 assert!(std::matches!(
1407 handler.protocols[0].state,
1408 State::Opening { in_substream: Some(_), .. }
1409 ));
1410
1411 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1412 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1413
1414 let (io, _io2) = MockSubstream::negotiated().await;
1417 let mut codec = UviBytes::default();
1418 codec.set_max_len(usize::MAX);
1419
1420 let notif_out = NotificationsOutOpen {
1421 handshake: b"hello, world".to_vec(),
1422 negotiated_fallback: None,
1423 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1424 };
1425 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1426 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1427 ));
1428
1429 assert!(std::matches!(
1430 handler.protocols[0].state,
1431 State::Closed { pending_opening: false }
1432 ));
1433 }
1434
1435 #[tokio::test]
1436 async fn fully_negotiated_resets_state_for_open_desired_substream() {
1437 let mut handler = notifs_handler();
1438 let (io, mut io2) = MockSubstream::negotiated().await;
1439 let mut codec = UviBytes::default();
1440 codec.set_max_len(usize::MAX);
1441
1442 let notif_in = NotificationsInOpen {
1443 handshake: b"hello, world".to_vec(),
1444 substream: NotificationsInSubstream::new(
1445 Framed::new(io, codec),
1446 NotificationsInSubstreamHandshake::NotSent,
1447 ),
1448 };
1449 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1450 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1451 ));
1452
1453 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1455 futures::future::poll_fn(|cx| {
1456 let mut buf = Vec::with_capacity(512);
1457 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1458 Poll::Ready(())
1459 })
1460 .await;
1461
1462 handler.on_behaviour_event(NotifsHandlerIn::Open {
1465 protocol_index: 0,
1466 peer_id: PeerId::random(),
1467 });
1468 assert!(std::matches!(
1469 handler.protocols[0].state,
1470 State::Opening { in_substream: Some(_), .. }
1471 ));
1472
1473 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1474 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1475
1476 let (io, _io2) = MockSubstream::negotiated().await;
1478 let mut codec = UviBytes::default();
1479 codec.set_max_len(usize::MAX);
1480
1481 let notif_in = NotificationsInOpen {
1482 handshake: b"hello, world".to_vec(),
1483 substream: NotificationsInSubstream::new(
1484 Framed::new(io, codec),
1485 NotificationsInSubstreamHandshake::NotSent,
1486 ),
1487 };
1488 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1489 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1490 ));
1491
1492 assert!(std::matches!(
1493 handler.protocols[0].state,
1494 State::OpenDesiredByRemote { pending_opening: true, .. }
1495 ));
1496
1497 let (io, _io2) = MockSubstream::negotiated().await;
1500 let mut codec = UviBytes::default();
1501 codec.set_max_len(usize::MAX);
1502
1503 let notif_out = NotificationsOutOpen {
1504 handshake: b"hello, world".to_vec(),
1505 negotiated_fallback: None,
1506 substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1507 };
1508
1509 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1510 handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1511 ));
1512
1513 assert!(std::matches!(
1514 handler.protocols[0].state,
1515 State::OpenDesiredByRemote { pending_opening: false, .. }
1516 ));
1517 }
1518
1519 #[tokio::test]
1520 async fn dial_upgrade_error_resets_closed_outbound_state() {
1521 let mut handler = notifs_handler();
1522 let (io, mut io2) = MockSubstream::negotiated().await;
1523 let mut codec = UviBytes::default();
1524 codec.set_max_len(usize::MAX);
1525
1526 let notif_in = NotificationsInOpen {
1527 handshake: b"hello, world".to_vec(),
1528 substream: NotificationsInSubstream::new(
1529 Framed::new(io, codec),
1530 NotificationsInSubstreamHandshake::NotSent,
1531 ),
1532 };
1533 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1534 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1535 ));
1536
1537 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1539 futures::future::poll_fn(|cx| {
1540 let mut buf = Vec::with_capacity(512);
1541 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1542 Poll::Ready(())
1543 })
1544 .await;
1545
1546 handler.on_behaviour_event(NotifsHandlerIn::Open {
1549 protocol_index: 0,
1550 peer_id: PeerId::random(),
1551 });
1552 assert!(std::matches!(
1553 handler.protocols[0].state,
1554 State::Opening { in_substream: Some(_), .. }
1555 ));
1556
1557 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1558 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1559
1560 handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1562 handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1563 ));
1564 assert!(std::matches!(
1565 handler.protocols[0].state,
1566 State::Closed { pending_opening: false }
1567 ));
1568 }
1569
1570 #[tokio::test]
1571 async fn dial_upgrade_error_resets_open_desired_state() {
1572 let mut handler = notifs_handler();
1573 let (io, mut io2) = MockSubstream::negotiated().await;
1574 let mut codec = UviBytes::default();
1575 codec.set_max_len(usize::MAX);
1576
1577 let notif_in = NotificationsInOpen {
1578 handshake: b"hello, world".to_vec(),
1579 substream: NotificationsInSubstream::new(
1580 Framed::new(io, codec),
1581 NotificationsInSubstreamHandshake::NotSent,
1582 ),
1583 };
1584 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1585 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1586 ));
1587
1588 assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1590 futures::future::poll_fn(|cx| {
1591 let mut buf = Vec::with_capacity(512);
1592 assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1593 Poll::Ready(())
1594 })
1595 .await;
1596
1597 handler.on_behaviour_event(NotifsHandlerIn::Open {
1600 protocol_index: 0,
1601 peer_id: PeerId::random(),
1602 });
1603 assert!(std::matches!(
1604 handler.protocols[0].state,
1605 State::Opening { in_substream: Some(_), .. }
1606 ));
1607
1608 handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1609 assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1610
1611 let (io, _io2) = MockSubstream::negotiated().await;
1612 let mut codec = UviBytes::default();
1613 codec.set_max_len(usize::MAX);
1614
1615 let notif_in = NotificationsInOpen {
1616 handshake: b"hello, world".to_vec(),
1617 substream: NotificationsInSubstream::new(
1618 Framed::new(io, codec),
1619 NotificationsInSubstreamHandshake::NotSent,
1620 ),
1621 };
1622 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1623 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1624 ));
1625
1626 assert!(std::matches!(
1627 handler.protocols[0].state,
1628 State::OpenDesiredByRemote { pending_opening: true, .. }
1629 ));
1630
1631 handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1633 handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1634 ));
1635 assert!(std::matches!(
1636 handler.protocols[0].state,
1637 State::OpenDesiredByRemote { pending_opening: false, .. }
1638 ));
1639 }
1640
1641 #[tokio::test]
1642 async fn sync_notifications_clogged() {
1643 let mut handler = notifs_handler();
1644 let (io, _) = MockSubstream::negotiated().await;
1645 let codec = UviBytes::default();
1646
1647 let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1648 let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1649 let notifications_sink = NotificationsSink {
1650 inner: Arc::new(NotificationsSinkInner {
1651 peer_id: PeerId::random(),
1652 async_channel: FuturesMutex::new(async_tx),
1653 sync_channel: Mutex::new(Some(sync_tx)),
1654 }),
1655 metrics: None,
1656 };
1657
1658 handler.protocols[0].state = State::Open {
1659 notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1660 out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1661 in_substream: None,
1662 };
1663
1664 notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1665 notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1666 notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1667 notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1668
1669 futures::future::poll_fn(|cx| {
1670 assert!(std::matches!(
1671 handler.poll(cx),
1672 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1673 NotifsHandlerOut::Close { .. }
1674 ))
1675 ));
1676 Poll::Ready(())
1677 })
1678 .await;
1679 }
1680
1681 #[tokio::test]
1682 async fn close_desired_by_remote() {
1683 let mut handler = notifs_handler();
1684 let (io, io2) = MockSubstream::negotiated().await;
1685 let mut codec = UviBytes::default();
1686 codec.set_max_len(usize::MAX);
1687
1688 let notif_in = NotificationsInOpen {
1689 handshake: b"hello, world".to_vec(),
1690 substream: NotificationsInSubstream::new(
1691 Framed::new(io, codec),
1692 NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1693 ),
1694 };
1695
1696 handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1699 handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1700 ));
1701 drop(io2);
1702
1703 futures::future::poll_fn(|cx| {
1704 assert!(std::matches!(
1705 handler.poll(cx),
1706 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1707 NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1708 ))
1709 ));
1710 assert!(std::matches!(
1711 handler.poll(cx),
1712 Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1713 NotifsHandlerOut::CloseDesired {
1714 protocol_index: 0,
1715 reason: CloseReason::RemoteRequest,
1716 },
1717 ))
1718 ));
1719 Poll::Ready(())
1720 })
1721 .await;
1722 }
1723}