Skip to main content

soil_network/protocol/notifications/
handler.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Implementations of the `IntoConnectionHandler` and `ConnectionHandler` traits for both incoming
8//! and outgoing substreams for all gossiping protocols.
9//!
10//! This is the main implementation of `ConnectionHandler` in this crate, that handles all the
11//! gossiping protocols that are Substrate-related and outside of the scope of libp2p.
12//!
13//! # Usage
14//!
15//! From an API perspective, for each of its protocols, the [`NotifsHandler`] is always in one of
16//! the following state (see [`State`]):
17//!
18//! - Closed substream. This is the initial state.
19//! - Closed substream, but remote desires them to be open.
20//! - Open substream.
21//! - Open substream, but remote desires them to be closed.
22//!
23//! Each protocol in the [`NotifsHandler`] can spontaneously switch between these states:
24//!
25//! - "Closed substream" to "Closed substream but open desired". When that happens, a
26//! [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted.
27//! - "Closed substream but open desired" to "Closed substream" (i.e. the remote has cancelled
28//! their request). When that happens, a [`NotifsHandlerOut::CloseDesired`] is emitted.
29//! - "Open substream" to "Open substream but close desired". When that happens, a
30//! [`NotifsHandlerOut::CloseDesired`] is emitted.
31//!
32//! The user can instruct a protocol in the `NotifsHandler` to switch from "closed" to "open" or
33//! vice-versa by sending either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`]. The
34//! `NotifsHandler` must answer with [`NotifsHandlerOut::OpenResultOk`] or
35//! [`NotifsHandlerOut::OpenResultErr`], or with [`NotifsHandlerOut::CloseResult`].
36//!
37//! When a [`NotifsHandlerOut::OpenResultOk`] is emitted, the substream is now in the open state.
38//! When a [`NotifsHandlerOut::OpenResultErr`] or [`NotifsHandlerOut::CloseResult`] is emitted,
39//! the `NotifsHandler` is now (or remains) in the closed state.
40//!
41//! When a [`NotifsHandlerOut::OpenDesiredByRemote`] is emitted, the user should always send back
42//! either a [`NotifsHandlerIn::Open`] or a [`NotifsHandlerIn::Close`].If this isn't done, the
43//! remote will be left in a pending state.
44//!
45//! It is illegal to send a [`NotifsHandlerIn::Open`] before a previously-emitted
46//! [`NotifsHandlerIn::Open`] has gotten an answer.
47
48use crate::{
49	protocol::notifications::upgrade::{
50		NotificationsIn, NotificationsInSubstream, NotificationsOut, NotificationsOutError,
51		NotificationsOutSubstream, UpgradeCollec,
52	},
53	service::metrics::NotificationMetrics,
54	types::ProtocolName,
55};
56
57use bytes::BytesMut;
58use futures::{
59	channel::mpsc,
60	lock::{Mutex as FuturesMutex, MutexGuard as FuturesMutexGuard},
61	prelude::*,
62};
63use libp2p::{
64	swarm::{
65		handler::ConnectionEvent, ConnectionHandler, ConnectionHandlerEvent, Stream,
66		SubstreamProtocol,
67	},
68	PeerId,
69};
70
71use parking_lot::{Mutex, RwLock};
72use std::{
73	collections::VecDeque,
74	mem,
75	pin::Pin,
76	sync::Arc,
77	task::{Context, Poll},
78	time::Duration,
79};
80
81/// Logging target for the file.
82const LOG_TARGET: &str = "sub-libp2p::notification::handler";
83
84/// Number of pending notifications in asynchronous contexts.
85/// See [`NotificationsSink::reserve_notification`] for context.
86pub(crate) const ASYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 8;
87
88/// Number of pending notifications in synchronous contexts.
89const SYNC_NOTIFICATIONS_BUFFER_SIZE: usize = 2048;
90
91/// Maximum duration to open a substream and receive the handshake message. After that, we
92/// consider that we failed to open the substream.
93const OPEN_TIMEOUT: Duration = Duration::from_secs(10);
94
95/// After successfully establishing a connection with the remote, we keep the connection open for
96/// at least this amount of time in order to give the rest of the code the chance to notify us to
97/// open substreams.
98const INITIAL_KEEPALIVE_TIME: Duration = Duration::from_secs(5);
99
100/// The actual handler once the connection has been established.
101///
102/// See the documentation at the module level for more information.
103pub struct NotifsHandler {
104	/// List of notification protocols, specified by the user at initialization.
105	protocols: Vec<Protocol>,
106
107	/// Whether to keep connection alive
108	keep_alive: bool,
109
110	/// Optional future that keeps connection alive for a certain amount of time.
111	// TODO: this should be safe to remove, see https://github.com/paritytech/polkadot-sdk/issues/6350
112	keep_alive_timeout_future: Option<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
113
114	/// Remote we are connected to.
115	peer_id: PeerId,
116
117	/// Events to return in priority from `poll`.
118	events_queue: VecDeque<ConnectionHandlerEvent<NotificationsOut, usize, NotifsHandlerOut>>,
119
120	/// Metrics.
121	metrics: Option<Arc<NotificationMetrics>>,
122}
123
124impl NotifsHandler {
125	/// Creates new [`NotifsHandler`].
126	pub fn new(
127		peer_id: PeerId,
128		protocols: Vec<ProtocolConfig>,
129		metrics: Option<NotificationMetrics>,
130	) -> Self {
131		Self {
132			protocols: protocols
133				.into_iter()
134				.map(|config| {
135					let in_upgrade = NotificationsIn::new(
136						config.name.clone(),
137						config.fallback_names.clone(),
138						config.max_notification_size,
139					);
140
141					Protocol { config, in_upgrade, state: State::Closed { pending_opening: false } }
142				})
143				.collect(),
144			peer_id,
145			// Keep connection alive initially until below timeout expires
146			keep_alive: true,
147			// A grace period of `INITIAL_KEEPALIVE_TIME` must be given to leave time for the remote
148			// to express desire to open substreams.
149			// TODO: This is a hack and ideally should not be necessary
150			keep_alive_timeout_future: Some(Box::pin(tokio::time::sleep(INITIAL_KEEPALIVE_TIME))),
151			events_queue: VecDeque::with_capacity(16),
152			metrics: metrics.map_or(None, |metrics| Some(Arc::new(metrics))),
153		}
154	}
155}
156
157/// Configuration for a notifications protocol.
158#[derive(Debug, Clone)]
159pub struct ProtocolConfig {
160	/// Name of the protocol.
161	pub name: ProtocolName,
162	/// Names of the protocol to use if the main one isn't available.
163	pub fallback_names: Vec<ProtocolName>,
164	/// Handshake of the protocol. The `RwLock` is locked every time a new substream is opened.
165	pub handshake: Arc<RwLock<Vec<u8>>>,
166	/// Maximum allowed size for a notification.
167	pub max_notification_size: u64,
168}
169
170/// Fields specific for each individual protocol.
171struct Protocol {
172	/// Other fields.
173	config: ProtocolConfig,
174
175	/// Prototype for the inbound upgrade.
176	in_upgrade: NotificationsIn,
177
178	/// Current state of the substreams for this protocol.
179	state: State,
180}
181
182/// See the module-level documentation to learn about the meaning of these variants.
183enum State {
184	/// Protocol is in the "Closed" state.
185	Closed {
186		/// True if an outgoing substream is still in the process of being opened.
187		pending_opening: bool,
188	},
189
190	/// Protocol is in the "Closed" state. A [`NotifsHandlerOut::OpenDesiredByRemote`] has been
191	/// emitted.
192	OpenDesiredByRemote {
193		/// Substream opened by the remote and that hasn't been accepted/rejected yet.
194		in_substream: NotificationsInSubstream<Stream>,
195
196		/// See [`State::Closed::pending_opening`].
197		pending_opening: bool,
198	},
199
200	/// Protocol is in the "Closed" state, but has received a [`NotifsHandlerIn::Open`] and is
201	/// consequently trying to open the various notifications substreams.
202	///
203	/// A [`NotifsHandlerOut::OpenResultOk`] or a [`NotifsHandlerOut::OpenResultErr`] event must
204	/// be emitted when transitioning to respectively [`State::Open`] or [`State::Closed`].
205	Opening {
206		/// Substream opened by the remote. If `Some`, has been accepted.
207		in_substream: Option<NotificationsInSubstream<Stream>>,
208		/// Is the connection inbound.
209		inbound: bool,
210	},
211
212	/// Protocol is in the "Open" state.
213	Open {
214		/// Contains the two `Receiver`s connected to the [`NotificationsSink`] that has been
215		/// sent out. The notifications to send out can be pulled from this receivers.
216		/// We use two different channels in order to have two different channel sizes, but from
217		/// the receiving point of view, the two channels are the same.
218		/// The receivers are fused in case the user drops the [`NotificationsSink`] entirely.
219		notifications_sink_rx: stream::Peekable<
220			stream::Select<
221				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
222				stream::Fuse<mpsc::Receiver<NotificationsSinkMessage>>,
223			>,
224		>,
225
226		/// Outbound substream that has been accepted by the remote.
227		///
228		/// Always `Some` on transition to [`State::Open`]. Switched to `None` only if the remote
229		/// closed the substream. If `None`, a [`NotifsHandlerOut::CloseDesired`] event has been
230		/// emitted.
231		out_substream: Option<NotificationsOutSubstream<Stream>>,
232
233		/// Substream opened by the remote.
234		///
235		/// Contrary to the `out_substream` field, operations continue as normal even if the
236		/// substream has been closed by the remote. A `None` is treated the same way as if there
237		/// was an idle substream.
238		in_substream: Option<NotificationsInSubstream<Stream>>,
239	},
240}
241
242/// The close reason of an [`NotifsHandlerOut::CloseDesired`] event.
243#[derive(Debug, Clone, Copy, PartialEq, Eq)]
244pub enum CloseReason {
245	/// The remote has requested the substreams to be closed.
246	///
247	/// This can happen when the remote drops the substream or an IO error is encountered.
248	RemoteRequest,
249
250	/// The remote has misbehaved and did not comply with the notification spec.
251	///
252	/// This means for now that the remote has sent data on an outbound substream.
253	ProtocolMisbehavior,
254}
255
256/// Event that can be received by a `NotifsHandler`.
257#[derive(Debug, Clone)]
258pub enum NotifsHandlerIn {
259	/// Instruct the handler to open the notification substreams.
260	///
261	/// Must always be answered by a [`NotifsHandlerOut::OpenResultOk`] or a
262	/// [`NotifsHandlerOut::OpenResultErr`] event.
263	///
264	/// Importantly, it is forbidden to send a [`NotifsHandlerIn::Open`] while a previous one is
265	/// already in the fly. It is however possible if a `Close` is still in the fly.
266	Open {
267		/// Index of the protocol in the list of protocols passed at initialization.
268		protocol_index: usize,
269
270		/// The peer id of the remote.
271		peer_id: PeerId,
272	},
273
274	/// Instruct the handler to close the notification substreams, or reject any pending incoming
275	/// substream request.
276	///
277	/// Must always be answered by a [`NotifsHandlerOut::CloseResult`] event.
278	Close {
279		/// Index of the protocol in the list of protocols passed at initialization.
280		protocol_index: usize,
281	},
282}
283
284/// Event that can be emitted by a `NotifsHandler`.
285#[derive(Debug)]
286pub enum NotifsHandlerOut {
287	/// Acknowledges a [`NotifsHandlerIn::Open`].
288	OpenResultOk {
289		/// Index of the protocol in the list of protocols passed at initialization.
290		protocol_index: usize,
291		/// Name of the protocol that was actually negotiated, if the default one wasn't available.
292		negotiated_fallback: Option<ProtocolName>,
293		/// Handshake that was sent to us.
294		/// This is normally a "Status" message, but this out of the concern of this code.
295		received_handshake: Vec<u8>,
296		/// How notifications can be sent to this node.
297		notifications_sink: NotificationsSink,
298		/// Is the connection inbound.
299		inbound: bool,
300	},
301
302	/// Acknowledges a [`NotifsHandlerIn::Open`]. The remote has refused the attempt to open
303	/// notification substreams.
304	OpenResultErr {
305		/// Index of the protocol in the list of protocols passed at initialization.
306		protocol_index: usize,
307	},
308
309	/// Acknowledges a [`NotifsHandlerIn::Close`].
310	CloseResult {
311		/// Index of the protocol in the list of protocols passed at initialization.
312		protocol_index: usize,
313	},
314
315	/// The remote would like the substreams to be open. Send a [`NotifsHandlerIn::Open`] or a
316	/// [`NotifsHandlerIn::Close`] in order to either accept or deny this request. If a
317	/// [`NotifsHandlerIn::Open`] or [`NotifsHandlerIn::Close`] has been sent before and has not
318	/// yet been acknowledged by a matching [`NotifsHandlerOut`], then you don't need to a send
319	/// another [`NotifsHandlerIn`].
320	OpenDesiredByRemote {
321		/// Index of the protocol in the list of protocols passed at initialization.
322		protocol_index: usize,
323		/// Received handshake.
324		handshake: Vec<u8>,
325	},
326
327	/// The remote would like the substreams to be closed, or the remote peer has misbehaved.
328	///
329	/// Send a [`NotifsHandlerIn::Close`] in order to close them. If a [`NotifsHandlerIn::Close`]
330	/// has been sent before and has not yet been acknowledged by a
331	/// [`NotifsHandlerOut::CloseResult`], then you don't need to a send another one.
332	CloseDesired {
333		/// Index of the protocol in the list of protocols passed at initialization.
334		protocol_index: usize,
335
336		/// The close reason.
337		reason: CloseReason,
338	},
339
340	/// Received a message on a custom protocol substream.
341	///
342	/// Can only happen when the handler is in the open state.
343	Notification {
344		/// Index of the protocol in the list of protocols passed at initialization.
345		protocol_index: usize,
346		/// Message that has been received.
347		message: BytesMut,
348	},
349
350	/// Close connection
351	Close {
352		/// Index of the protocol in the list of protocols passed at initialization.
353		protocol_index: usize,
354	},
355}
356
357/// Sink connected directly to the node background task. Allows sending notifications to the peer.
358///
359/// Can be cloned in order to obtain multiple references to the substream of the same peer.
360#[derive(Debug, Clone)]
361pub struct NotificationsSink {
362	inner: Arc<NotificationsSinkInner>,
363	metrics: Option<Arc<NotificationMetrics>>,
364}
365
366impl NotificationsSink {
367	/// Create new [`NotificationsSink`].
368	/// NOTE: only used for testing but must be `pub` as other crates in `client/network` use this.
369	pub fn new(
370		peer_id: PeerId,
371	) -> (Self, mpsc::Receiver<NotificationsSinkMessage>, mpsc::Receiver<NotificationsSinkMessage>)
372	{
373		let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
374		let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
375		(
376			NotificationsSink {
377				inner: Arc::new(NotificationsSinkInner {
378					peer_id,
379					async_channel: FuturesMutex::new(async_tx),
380					sync_channel: Mutex::new(Some(sync_tx)),
381				}),
382				metrics: None,
383			},
384			async_rx,
385			sync_rx,
386		)
387	}
388
389	/// Get reference to metrics.
390	pub fn metrics(&self) -> &Option<Arc<NotificationMetrics>> {
391		&self.metrics
392	}
393}
394
395#[derive(Debug)]
396struct NotificationsSinkInner {
397	/// Target of the sink.
398	peer_id: PeerId,
399	/// Sender to use in asynchronous contexts. Uses an asynchronous mutex.
400	async_channel: FuturesMutex<mpsc::Sender<NotificationsSinkMessage>>,
401	/// Sender to use in synchronous contexts. Uses a synchronous mutex.
402	/// Contains `None` if the channel was full at some point, in which case the channel will
403	/// be closed in the near future anyway.
404	/// This channel has a large capacity and is meant to be used in contexts where
405	/// back-pressure cannot be properly exerted.
406	/// It will be removed in a future version.
407	sync_channel: Mutex<Option<mpsc::Sender<NotificationsSinkMessage>>>,
408}
409
410/// Message emitted through the [`NotificationsSink`] and processed by the background task
411/// dedicated to the peer.
412#[derive(Debug, PartialEq, Eq)]
413pub enum NotificationsSinkMessage {
414	/// Message emitted by [`NotificationsSink::reserve_notification`] and
415	/// [`NotificationsSink::write_notification_now`].
416	Notification { message: Vec<u8> },
417
418	/// Must close the connection.
419	ForceClose,
420}
421
422impl NotificationsSink {
423	/// Returns the [`PeerId`] the sink is connected to.
424	pub fn peer_id(&self) -> &PeerId {
425		&self.inner.peer_id
426	}
427
428	/// Sends a notification to the peer.
429	///
430	/// If too many messages are already buffered, the notification is silently discarded and the
431	/// connection to the peer will be closed shortly after.
432	///
433	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
434	/// error to send a notification using an unknown protocol.
435	///
436	/// This method will be removed in a future version.
437	pub fn send_sync_notification(&self, message: impl Into<Vec<u8>>) {
438		let mut lock = self.inner.sync_channel.lock();
439
440		if let Some(tx) = lock.as_mut() {
441			let message = message.into();
442			let result = tx.try_send(NotificationsSinkMessage::Notification { message });
443
444			if result.is_err() {
445				// Cloning the `mpsc::Sender` guarantees the allocation of an extra spot in the
446				// buffer, and therefore `try_send` will succeed.
447				let _result2 = tx.clone().try_send(NotificationsSinkMessage::ForceClose);
448				debug_assert!(_result2.map(|()| true).unwrap_or_else(|err| err.is_disconnected()));
449
450				// Destroy the sender in order to not send more `ForceClose` messages.
451				*lock = None;
452			}
453		}
454	}
455
456	/// Wait until the remote is ready to accept a notification.
457	///
458	/// Returns an error in the case where the connection is closed.
459	///
460	/// The protocol name is expected to be checked ahead of calling this method. It is a logic
461	/// error to send a notification using an unknown protocol.
462	pub async fn reserve_notification(&self) -> Result<Ready<'_>, ()> {
463		let mut lock = self.inner.async_channel.lock().await;
464
465		let poll_ready = future::poll_fn(|cx| lock.poll_ready(cx)).await;
466		if poll_ready.is_ok() {
467			Ok(Ready { lock })
468		} else {
469			Err(())
470		}
471	}
472}
473
474/// Notification slot is reserved and the notification can actually be sent.
475#[must_use]
476#[derive(Debug)]
477pub struct Ready<'a> {
478	/// Guarded channel. The channel inside is guaranteed to not be full.
479	lock: FuturesMutexGuard<'a, mpsc::Sender<NotificationsSinkMessage>>,
480}
481
482impl<'a> Ready<'a> {
483	/// Consumes this slots reservation and actually queues the notification.
484	///
485	/// Returns an error if the substream has been closed.
486	pub fn send(mut self, notification: impl Into<Vec<u8>>) -> Result<(), ()> {
487		self.lock
488			.start_send(NotificationsSinkMessage::Notification { message: notification.into() })
489			.map_err(|_| ())
490	}
491}
492
493impl ConnectionHandler for NotifsHandler {
494	type FromBehaviour = NotifsHandlerIn;
495	type ToBehaviour = NotifsHandlerOut;
496	type InboundProtocol = UpgradeCollec<NotificationsIn>;
497	type OutboundProtocol = NotificationsOut;
498	// Index within the `out_protocols`.
499	type OutboundOpenInfo = usize;
500	type InboundOpenInfo = ();
501
502	fn listen_protocol(&self) -> SubstreamProtocol<Self::InboundProtocol, ()> {
503		let protocols = self
504			.protocols
505			.iter()
506			.map(|p| p.in_upgrade.clone())
507			.collect::<UpgradeCollec<_>>();
508
509		SubstreamProtocol::new(protocols, ())
510	}
511
512	fn on_connection_event(
513		&mut self,
514		event: ConnectionEvent<
515			'_,
516			Self::InboundProtocol,
517			Self::OutboundProtocol,
518			Self::InboundOpenInfo,
519			Self::OutboundOpenInfo,
520		>,
521	) {
522		match event {
523			ConnectionEvent::FullyNegotiatedInbound(inbound) => {
524				let (mut in_substream_open, protocol_index) = inbound.protocol;
525				let protocol_info = &mut self.protocols[protocol_index];
526
527				match protocol_info.state {
528					State::Closed { pending_opening } => {
529						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
530							NotifsHandlerOut::OpenDesiredByRemote {
531								protocol_index,
532								handshake: in_substream_open.handshake,
533							},
534						));
535
536						protocol_info.state = State::OpenDesiredByRemote {
537							in_substream: in_substream_open.substream,
538							pending_opening,
539						};
540					},
541					State::OpenDesiredByRemote { .. } => {
542						// If a substream already exists, silently drop the new one.
543						// Note that we drop the substream, which will send an equivalent to a
544						// TCP "RST" to the remote and force-close the substream. It might
545						// seem like an unclean way to get rid of a substream. However, keep
546						// in mind that it is invalid for the remote to open multiple such
547						// substreams, and therefore sending a "RST" is the most correct thing
548						// to do.
549						return;
550					},
551					State::Opening { ref mut in_substream, .. }
552					| State::Open { ref mut in_substream, .. } => {
553						if in_substream.is_some() {
554							// Same remark as above.
555							return;
556						}
557
558						// Create `handshake_message` on a separate line to be sure that the
559						// lock is released as soon as possible.
560						let handshake_message = protocol_info.config.handshake.read().clone();
561						in_substream_open.substream.send_handshake(handshake_message);
562						*in_substream = Some(in_substream_open.substream);
563					},
564				}
565			},
566			ConnectionEvent::FullyNegotiatedOutbound(outbound) => {
567				let (new_open, protocol_index) = (outbound.protocol, outbound.info);
568
569				match self.protocols[protocol_index].state {
570					State::Closed { ref mut pending_opening }
571					| State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
572						debug_assert!(*pending_opening);
573						*pending_opening = false;
574					},
575					State::Open { .. } => {
576						log::error!(target: LOG_TARGET, "☎️ State mismatch in notifications handler");
577						debug_assert!(false);
578					},
579					State::Opening { ref mut in_substream, inbound } => {
580						let (async_tx, async_rx) = mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
581						let (sync_tx, sync_rx) = mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
582						let notifications_sink = NotificationsSink {
583							inner: Arc::new(NotificationsSinkInner {
584								peer_id: self.peer_id,
585								async_channel: FuturesMutex::new(async_tx),
586								sync_channel: Mutex::new(Some(sync_tx)),
587							}),
588							metrics: self.metrics.clone(),
589						};
590
591						self.protocols[protocol_index].state = State::Open {
592							notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse())
593								.peekable(),
594							out_substream: Some(new_open.substream),
595							in_substream: in_substream.take(),
596						};
597
598						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
599							NotifsHandlerOut::OpenResultOk {
600								protocol_index,
601								negotiated_fallback: new_open.negotiated_fallback,
602								received_handshake: new_open.handshake,
603								notifications_sink,
604								inbound,
605							},
606						));
607					},
608				}
609			},
610			ConnectionEvent::AddressChange(_address_change) => {},
611			ConnectionEvent::LocalProtocolsChange(_) => {},
612			ConnectionEvent::RemoteProtocolsChange(_) => {},
613			ConnectionEvent::DialUpgradeError(dial_upgrade_error) => match self.protocols
614				[dial_upgrade_error.info]
615				.state
616			{
617				State::Closed { ref mut pending_opening }
618				| State::OpenDesiredByRemote { ref mut pending_opening, .. } => {
619					debug_assert!(*pending_opening);
620					*pending_opening = false;
621				},
622
623				State::Opening { .. } => {
624					self.protocols[dial_upgrade_error.info].state =
625						State::Closed { pending_opening: false };
626
627					self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
628						NotifsHandlerOut::OpenResultErr { protocol_index: dial_upgrade_error.info },
629					));
630				},
631
632				// No substream is being open when already `Open`.
633				State::Open { .. } => debug_assert!(false),
634			},
635			ConnectionEvent::ListenUpgradeError(_listen_upgrade_error) => {},
636			event => {
637				log::warn!(target: LOG_TARGET, "New unknown `ConnectionEvent` libp2p event: {event:?}");
638			},
639		}
640	}
641
642	fn on_behaviour_event(&mut self, message: NotifsHandlerIn) {
643		match message {
644			NotifsHandlerIn::Open { protocol_index, peer_id } => {
645				let protocol_info = &mut self.protocols[protocol_index];
646				match &mut protocol_info.state {
647					State::Closed { pending_opening } => {
648						if !*pending_opening {
649							let proto = NotificationsOut::new(
650								protocol_info.config.name.clone(),
651								protocol_info.config.fallback_names.clone(),
652								protocol_info.config.handshake.read().clone(),
653								protocol_info.config.max_notification_size,
654								peer_id,
655							);
656
657							self.events_queue.push_back(
658								ConnectionHandlerEvent::OutboundSubstreamRequest {
659									protocol: SubstreamProtocol::new(proto, protocol_index)
660										.with_timeout(OPEN_TIMEOUT),
661								},
662							);
663						}
664
665						protocol_info.state = State::Opening { in_substream: None, inbound: false };
666					},
667					State::OpenDesiredByRemote { pending_opening, in_substream } => {
668						let handshake_message = protocol_info.config.handshake.read().clone();
669
670						if !*pending_opening {
671							let proto = NotificationsOut::new(
672								protocol_info.config.name.clone(),
673								protocol_info.config.fallback_names.clone(),
674								handshake_message.clone(),
675								protocol_info.config.max_notification_size,
676								peer_id,
677							);
678
679							self.events_queue.push_back(
680								ConnectionHandlerEvent::OutboundSubstreamRequest {
681									protocol: SubstreamProtocol::new(proto, protocol_index)
682										.with_timeout(OPEN_TIMEOUT),
683								},
684							);
685						}
686
687						in_substream.send_handshake(handshake_message);
688
689						// The state change is done in two steps because of borrowing issues.
690						let in_substream = match mem::replace(
691							&mut protocol_info.state,
692							State::Opening { in_substream: None, inbound: false },
693						) {
694							State::OpenDesiredByRemote { in_substream, .. } => in_substream,
695							_ => unreachable!(),
696						};
697						protocol_info.state =
698							State::Opening { in_substream: Some(in_substream), inbound: true };
699					},
700					State::Opening { .. } | State::Open { .. } => {
701						// As documented, it is forbidden to send an `Open` while there is already
702						// one in the fly.
703						log::error!(target: LOG_TARGET, "opening already-opened handler");
704						debug_assert!(false);
705					},
706				}
707			},
708
709			NotifsHandlerIn::Close { protocol_index } => {
710				match self.protocols[protocol_index].state {
711					State::Open { .. } => {
712						self.protocols[protocol_index].state =
713							State::Closed { pending_opening: false };
714					},
715					State::Opening { .. } => {
716						self.protocols[protocol_index].state =
717							State::Closed { pending_opening: true };
718
719						self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
720							NotifsHandlerOut::OpenResultErr { protocol_index },
721						));
722					},
723					State::OpenDesiredByRemote { pending_opening, .. } => {
724						self.protocols[protocol_index].state = State::Closed { pending_opening };
725					},
726					State::Closed { .. } => {},
727				}
728
729				self.events_queue.push_back(ConnectionHandlerEvent::NotifyBehaviour(
730					NotifsHandlerOut::CloseResult { protocol_index },
731				));
732			},
733		}
734	}
735
736	fn connection_keep_alive(&self) -> bool {
737		// `Yes` if any protocol has some activity.
738		if self.protocols.iter().any(|p| !matches!(p.state, State::Closed { .. })) {
739			return true;
740		}
741
742		self.keep_alive
743	}
744
745	fn poll(
746		&mut self,
747		cx: &mut Context,
748	) -> Poll<
749		ConnectionHandlerEvent<Self::OutboundProtocol, Self::OutboundOpenInfo, Self::ToBehaviour>,
750	> {
751		{
752			let maybe_keep_alive_timeout_future = &mut self.keep_alive_timeout_future;
753			if let Some(keep_alive_timeout_future) = maybe_keep_alive_timeout_future {
754				if keep_alive_timeout_future.poll_unpin(cx).is_ready() {
755					maybe_keep_alive_timeout_future.take();
756					self.keep_alive = false;
757				}
758			}
759		}
760
761		if let Some(ev) = self.events_queue.pop_front() {
762			return Poll::Ready(ev);
763		}
764
765		// For each open substream, try to send messages from `notifications_sink_rx` to the
766		// substream.
767		for protocol_index in 0..self.protocols.len() {
768			if let State::Open {
769				notifications_sink_rx, out_substream: Some(out_substream), ..
770			} = &mut self.protocols[protocol_index].state
771			{
772				loop {
773					// Only proceed with `out_substream.poll_ready_unpin` if there is an element
774					// available in `notifications_sink_rx`. This avoids waking up the task when
775					// a substream is ready to send if there isn't actually something to send.
776					match Pin::new(&mut *notifications_sink_rx).as_mut().poll_peek(cx) {
777						Poll::Ready(Some(&NotificationsSinkMessage::ForceClose)) => {
778							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
779								NotifsHandlerOut::Close { protocol_index },
780							))
781						},
782						Poll::Ready(Some(&NotificationsSinkMessage::Notification { .. })) => {},
783						Poll::Ready(None) | Poll::Pending => break,
784					}
785
786					// Before we extract the element from `notifications_sink_rx`, check that the
787					// substream is ready to accept a message.
788					match out_substream.poll_ready_unpin(cx) {
789						Poll::Ready(_) => {},
790						Poll::Pending => break,
791					}
792
793					// Now that the substream is ready for a message, grab what to send.
794					let message = match notifications_sink_rx.poll_next_unpin(cx) {
795						Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
796							message
797						},
798						Poll::Ready(Some(NotificationsSinkMessage::ForceClose))
799						| Poll::Ready(None)
800						| Poll::Pending => {
801							// Should never be reached, as per `poll_peek` above.
802							debug_assert!(false);
803							break;
804						},
805					};
806
807					let _ = out_substream.start_send_unpin(message);
808					// Note that flushing is performed later down this function.
809				}
810			}
811		}
812
813		// Flush all outbound substreams.
814		// When `poll` returns `Poll::Ready`, the libp2p `Swarm` may decide to no longer call
815		// `poll` again before it is ready to accept more events.
816		// In order to make sure that substreams are flushed as soon as possible, the flush is
817		// performed before the code paths that can produce `Ready` (with some rare exceptions).
818		// Importantly, however, the flush is performed *after* notifications are queued with
819		// `Sink::start_send`.
820		// Note that we must call `poll_flush` on all substreams and not only on those we
821		// have called `Sink::start_send` on, because `NotificationsOutSubstream::poll_flush`
822		// also reports the substream termination (even if no data was written into it).
823		for protocol_index in 0..self.protocols.len() {
824			match &mut self.protocols[protocol_index].state {
825				State::Open { out_substream: out_substream @ Some(_), .. } => {
826					match Sink::poll_flush(Pin::new(out_substream.as_mut().unwrap()), cx) {
827						Poll::Pending | Poll::Ready(Ok(())) => {},
828						Poll::Ready(Err(error)) => {
829							*out_substream = None;
830
831							let reason = match error {
832								NotificationsOutError::Io(_) | NotificationsOutError::Closed => {
833									CloseReason::RemoteRequest
834								},
835								NotificationsOutError::UnexpectedData => {
836									CloseReason::ProtocolMisbehavior
837								},
838							};
839
840							let event = NotifsHandlerOut::CloseDesired { protocol_index, reason };
841							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
842						},
843					};
844				},
845
846				State::Closed { .. }
847				| State::Opening { .. }
848				| State::Open { out_substream: None, .. }
849				| State::OpenDesiredByRemote { .. } => {},
850			}
851		}
852
853		// Poll inbound substreams.
854		for protocol_index in 0..self.protocols.len() {
855			// Inbound substreams being closed is always tolerated, except for the
856			// `OpenDesiredByRemote` state which might need to be switched back to `Closed`.
857			match &mut self.protocols[protocol_index].state {
858				State::Closed { .. }
859				| State::Open { in_substream: None, .. }
860				| State::Opening { in_substream: None, .. } => {},
861
862				State::Open { in_substream: in_substream @ Some(_), .. } => {
863					match futures::prelude::stream::Stream::poll_next(
864						Pin::new(in_substream.as_mut().unwrap()),
865						cx,
866					) {
867						Poll::Pending => {},
868						Poll::Ready(Some(Ok(message))) => {
869							let event = NotifsHandlerOut::Notification { protocol_index, message };
870							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(event));
871						},
872						Poll::Ready(None) | Poll::Ready(Some(Err(_))) => *in_substream = None,
873					}
874				},
875
876				State::OpenDesiredByRemote { in_substream, pending_opening } => {
877					match NotificationsInSubstream::poll_process(Pin::new(in_substream), cx) {
878						Poll::Pending => {},
879						Poll::Ready(Ok(())) => {},
880						Poll::Ready(Err(_)) => {
881							self.protocols[protocol_index].state =
882								State::Closed { pending_opening: *pending_opening };
883							return Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
884								NotifsHandlerOut::CloseDesired {
885									protocol_index,
886									reason: CloseReason::RemoteRequest,
887								},
888							));
889						},
890					}
891				},
892
893				State::Opening { in_substream: in_substream @ Some(_), .. } => {
894					match NotificationsInSubstream::poll_process(
895						Pin::new(in_substream.as_mut().unwrap()),
896						cx,
897					) {
898						Poll::Pending => {},
899						Poll::Ready(Ok(())) => {},
900						Poll::Ready(Err(_)) => *in_substream = None,
901					}
902				},
903			}
904		}
905
906		// This is the only place in this method that can return `Pending`.
907		// By putting it at the very bottom, we are guaranteed that everything has been properly
908		// polled.
909		Poll::Pending
910	}
911}
912
913#[cfg(test)]
914pub mod tests {
915	use super::*;
916	use crate::protocol::notifications::upgrade::{
917		NotificationsInOpen, NotificationsInSubstreamHandshake, NotificationsOutOpen,
918	};
919	use asynchronous_codec::Framed;
920	use libp2p::{
921		core::muxing::SubstreamBox,
922		swarm::handler::{self, StreamUpgradeError},
923	};
924	use multistream_select::{dialer_select_proto, listener_select_proto, Negotiated, Version};
925	use std::{
926		collections::HashMap,
927		io::{Error, IoSlice, IoSliceMut},
928	};
929	use tokio::sync::mpsc;
930	use unsigned_varint::codec::UviBytes;
931
932	struct OpenSubstream {
933		notifications: stream::Peekable<
934			stream::Select<
935				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
936				stream::Fuse<futures::channel::mpsc::Receiver<NotificationsSinkMessage>>,
937			>,
938		>,
939		_in_substream: MockSubstream,
940		_out_substream: MockSubstream,
941	}
942
943	pub struct ConnectionYielder {
944		connections: HashMap<(PeerId, usize), OpenSubstream>,
945	}
946
947	impl ConnectionYielder {
948		/// Create new [`ConnectionYielder`].
949		pub fn new() -> Self {
950			Self { connections: HashMap::new() }
951		}
952
953		/// Open a new substream for peer.
954		pub fn open_substream(
955			&mut self,
956			peer: PeerId,
957			protocol_index: usize,
958			received_handshake: Vec<u8>,
959		) -> NotifsHandlerOut {
960			let (async_tx, async_rx) =
961				futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
962			let (sync_tx, sync_rx) =
963				futures::channel::mpsc::channel(SYNC_NOTIFICATIONS_BUFFER_SIZE);
964			let notifications_sink = NotificationsSink {
965				inner: Arc::new(NotificationsSinkInner {
966					peer_id: peer,
967					async_channel: FuturesMutex::new(async_tx),
968					sync_channel: Mutex::new(Some(sync_tx)),
969				}),
970				metrics: None,
971			};
972			let (in_substream, out_substream) = MockSubstream::new();
973
974			self.connections.insert(
975				(peer, protocol_index),
976				OpenSubstream {
977					notifications: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
978					_in_substream: in_substream,
979					_out_substream: out_substream,
980				},
981			);
982
983			NotifsHandlerOut::OpenResultOk {
984				protocol_index,
985				negotiated_fallback: None,
986				received_handshake,
987				notifications_sink,
988				inbound: false,
989			}
990		}
991
992		/// Attempt to get next pending event from one of the notification sinks.
993		pub async fn get_next_event(&mut self, peer: PeerId, set: usize) -> Option<Vec<u8>> {
994			let substream = if let Some(info) = self.connections.get_mut(&(peer, set)) {
995				info
996			} else {
997				return None;
998			};
999
1000			futures::future::poll_fn(|cx| match substream.notifications.poll_next_unpin(cx) {
1001				Poll::Ready(Some(NotificationsSinkMessage::Notification { message })) => {
1002					Poll::Ready(Some(message))
1003				},
1004				Poll::Pending => Poll::Ready(None),
1005				Poll::Ready(Some(NotificationsSinkMessage::ForceClose)) | Poll::Ready(None) => {
1006					panic!("sink closed")
1007				},
1008			})
1009			.await
1010		}
1011	}
1012
1013	struct MockSubstream {
1014		pub rx: mpsc::Receiver<Vec<u8>>,
1015		pub tx: mpsc::Sender<Vec<u8>>,
1016		rx_buffer: BytesMut,
1017	}
1018
1019	/// Mirror of `ActiveStreamCounter` in `libp2p`
1020	#[allow(dead_code)]
1021	struct MockActiveStreamCounter(Arc<()>);
1022
1023	// Mirror of `Stream` in `libp2p`
1024	#[allow(dead_code)]
1025	struct MockStream {
1026		stream: Negotiated<SubstreamBox>,
1027		counter: Option<MockActiveStreamCounter>,
1028	}
1029
1030	impl MockSubstream {
1031		/// Create new substream pair.
1032		pub fn new() -> (Self, Self) {
1033			let (tx1, rx1) = mpsc::channel(32);
1034			let (tx2, rx2) = mpsc::channel(32);
1035
1036			(
1037				Self { rx: rx1, tx: tx2, rx_buffer: BytesMut::with_capacity(512) },
1038				Self { rx: rx2, tx: tx1, rx_buffer: BytesMut::with_capacity(512) },
1039			)
1040		}
1041
1042		/// Create new negotiated substream pair.
1043		pub async fn negotiated() -> (Stream, Stream) {
1044			let (socket1, socket2) = Self::new();
1045			let socket1 = SubstreamBox::new(socket1);
1046			let socket2 = SubstreamBox::new(socket2);
1047
1048			let protos = vec!["/echo/1.0.0", "/echo/2.5.0"];
1049			let (res1, res2) = tokio::join!(
1050				dialer_select_proto(socket1, protos.clone(), Version::V1),
1051				listener_select_proto(socket2, protos),
1052			);
1053
1054			(Self::stream_new(res1.unwrap().1), Self::stream_new(res2.unwrap().1))
1055		}
1056
1057		/// Unsafe substitute for `Stream::new` private constructor.
1058		fn stream_new(stream: Negotiated<SubstreamBox>) -> Stream {
1059			let stream = MockStream { stream, counter: None };
1060			// Static asserts to make sure this doesn't break.
1061			const _: () = {
1062				assert!(core::mem::size_of::<Stream>() == core::mem::size_of::<MockStream>());
1063				assert!(core::mem::align_of::<Stream>() == core::mem::align_of::<MockStream>());
1064			};
1065
1066			unsafe { core::mem::transmute(stream) }
1067		}
1068	}
1069
1070	impl AsyncWrite for MockSubstream {
1071		fn poll_write<'a>(
1072			self: Pin<&mut Self>,
1073			_cx: &mut Context<'a>,
1074			buf: &[u8],
1075		) -> Poll<Result<usize, Error>> {
1076			match self.tx.try_send(buf.to_vec()) {
1077				Ok(_) => Poll::Ready(Ok(buf.len())),
1078				Err(_) => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
1079			}
1080		}
1081
1082		fn poll_flush<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1083			Poll::Ready(Ok(()))
1084		}
1085
1086		fn poll_close<'a>(self: Pin<&mut Self>, _cx: &mut Context<'a>) -> Poll<Result<(), Error>> {
1087			Poll::Ready(Ok(()))
1088		}
1089
1090		fn poll_write_vectored<'a, 'b>(
1091			self: Pin<&mut Self>,
1092			_cx: &mut Context<'a>,
1093			_bufs: &[IoSlice<'b>],
1094		) -> Poll<Result<usize, Error>> {
1095			unimplemented!();
1096		}
1097	}
1098
1099	impl AsyncRead for MockSubstream {
1100		fn poll_read<'a>(
1101			mut self: Pin<&mut Self>,
1102			cx: &mut Context<'a>,
1103			buf: &mut [u8],
1104		) -> Poll<Result<usize, Error>> {
1105			match self.rx.poll_recv(cx) {
1106				Poll::Ready(Some(data)) => self.rx_buffer.extend_from_slice(&data),
1107				Poll::Ready(None) => {
1108					return Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into()))
1109				},
1110				_ => {},
1111			}
1112
1113			let nsize = std::cmp::min(self.rx_buffer.len(), buf.len());
1114			let data = self.rx_buffer.split_to(nsize);
1115			buf[..nsize].copy_from_slice(&data[..]);
1116
1117			if nsize > 0 {
1118				return Poll::Ready(Ok(nsize));
1119			}
1120
1121			Poll::Pending
1122		}
1123
1124		fn poll_read_vectored<'a, 'b>(
1125			self: Pin<&mut Self>,
1126			_cx: &mut Context<'a>,
1127			_bufs: &mut [IoSliceMut<'b>],
1128		) -> Poll<Result<usize, Error>> {
1129			unimplemented!();
1130		}
1131	}
1132
1133	/// Create new [`NotifsHandler`].
1134	fn notifs_handler() -> NotifsHandler {
1135		NotifsHandler::new(
1136			PeerId::random(),
1137			vec![ProtocolConfig {
1138				name: "/foo".into(),
1139				fallback_names: vec![],
1140				handshake: Arc::new(RwLock::new(b"hello, world".to_vec())),
1141				max_notification_size: u64::MAX,
1142			}],
1143			None,
1144		)
1145	}
1146
1147	// verify that if another substream is attempted to be opened by remote while an inbound
1148	// substream already exists, the new inbound stream is rejected and closed by the local node.
1149	#[tokio::test]
1150	async fn second_open_desired_by_remote_rejected() {
1151		let mut handler = notifs_handler();
1152		let (io, mut io2) = MockSubstream::negotiated().await;
1153		let mut codec = UviBytes::default();
1154		codec.set_max_len(usize::MAX);
1155
1156		let notif_in = NotificationsInOpen {
1157			handshake: b"hello, world".to_vec(),
1158			substream: NotificationsInSubstream::new(
1159				Framed::new(io, codec),
1160				NotificationsInSubstreamHandshake::NotSent,
1161			),
1162		};
1163
1164		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1165			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1166		));
1167
1168		// verify that the substream is in (partly) opened state
1169		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1170		futures::future::poll_fn(|cx| {
1171			let mut buf = Vec::with_capacity(512);
1172			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1173			Poll::Ready(())
1174		})
1175		.await;
1176
1177		// attempt to open another inbound substream and verify that it is rejected
1178		let (io, mut io2) = MockSubstream::negotiated().await;
1179		let mut codec = UviBytes::default();
1180		codec.set_max_len(usize::MAX);
1181
1182		let notif_in = NotificationsInOpen {
1183			handshake: b"hello, world".to_vec(),
1184			substream: NotificationsInSubstream::new(
1185				Framed::new(io, codec),
1186				NotificationsInSubstreamHandshake::NotSent,
1187			),
1188		};
1189
1190		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1191			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1192		));
1193
1194		// verify that the new substream is rejected and closed
1195		futures::future::poll_fn(|cx| {
1196			let mut buf = Vec::with_capacity(512);
1197
1198			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1199				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1200			}
1201
1202			Poll::Ready(())
1203		})
1204		.await;
1205	}
1206
1207	#[tokio::test]
1208	async fn open_rejected_if_substream_is_opening() {
1209		let mut handler = notifs_handler();
1210		let (io, mut io2) = MockSubstream::negotiated().await;
1211		let mut codec = UviBytes::default();
1212		codec.set_max_len(usize::MAX);
1213
1214		let notif_in = NotificationsInOpen {
1215			handshake: b"hello, world".to_vec(),
1216			substream: NotificationsInSubstream::new(
1217				Framed::new(io, codec),
1218				NotificationsInSubstreamHandshake::NotSent,
1219			),
1220		};
1221
1222		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1223			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1224		));
1225
1226		// verify that the substream is in (partly) opened state
1227		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1228		futures::future::poll_fn(|cx| {
1229			let mut buf = Vec::with_capacity(512);
1230			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1231			Poll::Ready(())
1232		})
1233		.await;
1234
1235		// move the handler state to 'Opening'
1236		handler.on_behaviour_event(NotifsHandlerIn::Open {
1237			protocol_index: 0,
1238			peer_id: PeerId::random(),
1239		});
1240		assert!(std::matches!(
1241			handler.protocols[0].state,
1242			State::Opening { in_substream: Some(_), .. }
1243		));
1244
1245		// remote now tries to open another substream, verify that it is rejected and closed
1246		let (io, mut io2) = MockSubstream::negotiated().await;
1247		let mut codec = UviBytes::default();
1248		codec.set_max_len(usize::MAX);
1249
1250		let notif_in = NotificationsInOpen {
1251			handshake: b"hello, world".to_vec(),
1252			substream: NotificationsInSubstream::new(
1253				Framed::new(io, codec),
1254				NotificationsInSubstreamHandshake::NotSent,
1255			),
1256		};
1257
1258		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1259			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1260		));
1261
1262		// verify that the new substream is rejected and closed but that the first substream is
1263		// still in correct state
1264		futures::future::poll_fn(|cx| {
1265			let mut buf = Vec::with_capacity(512);
1266
1267			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1268				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof,);
1269			} else {
1270				panic!("unexpected result");
1271			}
1272
1273			Poll::Ready(())
1274		})
1275		.await;
1276		assert!(std::matches!(
1277			handler.protocols[0].state,
1278			State::Opening { in_substream: Some(_), .. }
1279		));
1280	}
1281
1282	#[tokio::test]
1283	async fn open_rejected_if_substream_already_open() {
1284		let mut handler = notifs_handler();
1285		let (io, mut io2) = MockSubstream::negotiated().await;
1286		let mut codec = UviBytes::default();
1287		codec.set_max_len(usize::MAX);
1288
1289		let notif_in = NotificationsInOpen {
1290			handshake: b"hello, world".to_vec(),
1291			substream: NotificationsInSubstream::new(
1292				Framed::new(io, codec),
1293				NotificationsInSubstreamHandshake::NotSent,
1294			),
1295		};
1296		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1297			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1298		));
1299
1300		// verify that the substream is in (partly) opened state
1301		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1302		futures::future::poll_fn(|cx| {
1303			let mut buf = Vec::with_capacity(512);
1304			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1305			Poll::Ready(())
1306		})
1307		.await;
1308
1309		// move the handler state to 'Opening'
1310		handler.on_behaviour_event(NotifsHandlerIn::Open {
1311			protocol_index: 0,
1312			peer_id: PeerId::random(),
1313		});
1314		assert!(std::matches!(
1315			handler.protocols[0].state,
1316			State::Opening { in_substream: Some(_), .. }
1317		));
1318
1319		// accept the substream and move its state to `Open`
1320		let (io, _io2) = MockSubstream::negotiated().await;
1321		let mut codec = UviBytes::default();
1322		codec.set_max_len(usize::MAX);
1323
1324		let notif_out = NotificationsOutOpen {
1325			handshake: b"hello, world".to_vec(),
1326			negotiated_fallback: None,
1327			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1328		};
1329		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1330			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1331		));
1332
1333		assert!(std::matches!(
1334			handler.protocols[0].state,
1335			State::Open { in_substream: Some(_), .. }
1336		));
1337
1338		// remote now tries to open another substream, verify that it is rejected and closed
1339		let (io, mut io2) = MockSubstream::negotiated().await;
1340		let mut codec = UviBytes::default();
1341		codec.set_max_len(usize::MAX);
1342		let notif_in = NotificationsInOpen {
1343			handshake: b"hello, world".to_vec(),
1344			substream: NotificationsInSubstream::new(
1345				Framed::new(io, codec),
1346				NotificationsInSubstreamHandshake::NotSent,
1347			),
1348		};
1349		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1350			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1351		));
1352
1353		// verify that the new substream is rejected and closed but that the first substream is
1354		// still in correct state
1355		futures::future::poll_fn(|cx| {
1356			let mut buf = Vec::with_capacity(512);
1357
1358			if let Poll::Ready(Err(err)) = Pin::new(&mut io2).poll_read(cx, &mut buf) {
1359				assert_eq!(err.kind(), std::io::ErrorKind::UnexpectedEof);
1360			} else {
1361				panic!("unexpected result");
1362			}
1363
1364			Poll::Ready(())
1365		})
1366		.await;
1367		assert!(std::matches!(
1368			handler.protocols[0].state,
1369			State::Open { in_substream: Some(_), .. }
1370		));
1371	}
1372
1373	#[tokio::test]
1374	async fn fully_negotiated_resets_state_for_closed_substream() {
1375		let mut handler = notifs_handler();
1376		let (io, mut io2) = MockSubstream::negotiated().await;
1377		let mut codec = UviBytes::default();
1378		codec.set_max_len(usize::MAX);
1379
1380		let notif_in = NotificationsInOpen {
1381			handshake: b"hello, world".to_vec(),
1382			substream: NotificationsInSubstream::new(
1383				Framed::new(io, codec),
1384				NotificationsInSubstreamHandshake::NotSent,
1385			),
1386		};
1387		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1388			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1389		));
1390
1391		// verify that the substream is in (partly) opened state
1392		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1393		futures::future::poll_fn(|cx| {
1394			let mut buf = Vec::with_capacity(512);
1395			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1396			Poll::Ready(())
1397		})
1398		.await;
1399
1400		// first instruct the handler to open a connection and then close it right after
1401		// so the handler is in state `Closed { pending_opening: true }`
1402		handler.on_behaviour_event(NotifsHandlerIn::Open {
1403			protocol_index: 0,
1404			peer_id: PeerId::random(),
1405		});
1406		assert!(std::matches!(
1407			handler.protocols[0].state,
1408			State::Opening { in_substream: Some(_), .. }
1409		));
1410
1411		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1412		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1413
1414		// verify that if the the outbound substream is successfully negotiated, the state is not
1415		// changed as the substream was commanded to be closed by the handler.
1416		let (io, _io2) = MockSubstream::negotiated().await;
1417		let mut codec = UviBytes::default();
1418		codec.set_max_len(usize::MAX);
1419
1420		let notif_out = NotificationsOutOpen {
1421			handshake: b"hello, world".to_vec(),
1422			negotiated_fallback: None,
1423			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1424		};
1425		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1426			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1427		));
1428
1429		assert!(std::matches!(
1430			handler.protocols[0].state,
1431			State::Closed { pending_opening: false }
1432		));
1433	}
1434
1435	#[tokio::test]
1436	async fn fully_negotiated_resets_state_for_open_desired_substream() {
1437		let mut handler = notifs_handler();
1438		let (io, mut io2) = MockSubstream::negotiated().await;
1439		let mut codec = UviBytes::default();
1440		codec.set_max_len(usize::MAX);
1441
1442		let notif_in = NotificationsInOpen {
1443			handshake: b"hello, world".to_vec(),
1444			substream: NotificationsInSubstream::new(
1445				Framed::new(io, codec),
1446				NotificationsInSubstreamHandshake::NotSent,
1447			),
1448		};
1449		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1450			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1451		));
1452
1453		// verify that the substream is in (partly) opened state
1454		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1455		futures::future::poll_fn(|cx| {
1456			let mut buf = Vec::with_capacity(512);
1457			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1458			Poll::Ready(())
1459		})
1460		.await;
1461
1462		// first instruct the handler to open a connection and then close it right after
1463		// so the handler is in state `Closed { pending_opening: true }`
1464		handler.on_behaviour_event(NotifsHandlerIn::Open {
1465			protocol_index: 0,
1466			peer_id: PeerId::random(),
1467		});
1468		assert!(std::matches!(
1469			handler.protocols[0].state,
1470			State::Opening { in_substream: Some(_), .. }
1471		));
1472
1473		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1474		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1475
1476		// attempt to open another inbound substream and verify that it is rejected
1477		let (io, _io2) = MockSubstream::negotiated().await;
1478		let mut codec = UviBytes::default();
1479		codec.set_max_len(usize::MAX);
1480
1481		let notif_in = NotificationsInOpen {
1482			handshake: b"hello, world".to_vec(),
1483			substream: NotificationsInSubstream::new(
1484				Framed::new(io, codec),
1485				NotificationsInSubstreamHandshake::NotSent,
1486			),
1487		};
1488		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1489			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1490		));
1491
1492		assert!(std::matches!(
1493			handler.protocols[0].state,
1494			State::OpenDesiredByRemote { pending_opening: true, .. }
1495		));
1496
1497		// verify that if the the outbound substream is successfully negotiated, the state is not
1498		// changed as the substream was commanded to be closed by the handler.
1499		let (io, _io2) = MockSubstream::negotiated().await;
1500		let mut codec = UviBytes::default();
1501		codec.set_max_len(usize::MAX);
1502
1503		let notif_out = NotificationsOutOpen {
1504			handshake: b"hello, world".to_vec(),
1505			negotiated_fallback: None,
1506			substream: NotificationsOutSubstream::new(Framed::new(io, codec)),
1507		};
1508
1509		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedOutbound(
1510			handler::FullyNegotiatedOutbound { protocol: notif_out, info: 0 },
1511		));
1512
1513		assert!(std::matches!(
1514			handler.protocols[0].state,
1515			State::OpenDesiredByRemote { pending_opening: false, .. }
1516		));
1517	}
1518
1519	#[tokio::test]
1520	async fn dial_upgrade_error_resets_closed_outbound_state() {
1521		let mut handler = notifs_handler();
1522		let (io, mut io2) = MockSubstream::negotiated().await;
1523		let mut codec = UviBytes::default();
1524		codec.set_max_len(usize::MAX);
1525
1526		let notif_in = NotificationsInOpen {
1527			handshake: b"hello, world".to_vec(),
1528			substream: NotificationsInSubstream::new(
1529				Framed::new(io, codec),
1530				NotificationsInSubstreamHandshake::NotSent,
1531			),
1532		};
1533		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1534			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1535		));
1536
1537		// verify that the substream is in (partly) opened state
1538		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1539		futures::future::poll_fn(|cx| {
1540			let mut buf = Vec::with_capacity(512);
1541			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1542			Poll::Ready(())
1543		})
1544		.await;
1545
1546		// first instruct the handler to open a connection and then close it right after
1547		// so the handler is in state `Closed { pending_opening: true }`
1548		handler.on_behaviour_event(NotifsHandlerIn::Open {
1549			protocol_index: 0,
1550			peer_id: PeerId::random(),
1551		});
1552		assert!(std::matches!(
1553			handler.protocols[0].state,
1554			State::Opening { in_substream: Some(_), .. }
1555		));
1556
1557		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1558		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1559
1560		// inject dial failure to an already closed substream and verify outbound state is reset
1561		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1562			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1563		));
1564		assert!(std::matches!(
1565			handler.protocols[0].state,
1566			State::Closed { pending_opening: false }
1567		));
1568	}
1569
1570	#[tokio::test]
1571	async fn dial_upgrade_error_resets_open_desired_state() {
1572		let mut handler = notifs_handler();
1573		let (io, mut io2) = MockSubstream::negotiated().await;
1574		let mut codec = UviBytes::default();
1575		codec.set_max_len(usize::MAX);
1576
1577		let notif_in = NotificationsInOpen {
1578			handshake: b"hello, world".to_vec(),
1579			substream: NotificationsInSubstream::new(
1580				Framed::new(io, codec),
1581				NotificationsInSubstreamHandshake::NotSent,
1582			),
1583		};
1584		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1585			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1586		));
1587
1588		// verify that the substream is in (partly) opened state
1589		assert!(std::matches!(handler.protocols[0].state, State::OpenDesiredByRemote { .. }));
1590		futures::future::poll_fn(|cx| {
1591			let mut buf = Vec::with_capacity(512);
1592			assert!(std::matches!(Pin::new(&mut io2).poll_read(cx, &mut buf), Poll::Pending));
1593			Poll::Ready(())
1594		})
1595		.await;
1596
1597		// first instruct the handler to open a connection and then close it right after
1598		// so the handler is in state `Closed { pending_opening: true }`
1599		handler.on_behaviour_event(NotifsHandlerIn::Open {
1600			protocol_index: 0,
1601			peer_id: PeerId::random(),
1602		});
1603		assert!(std::matches!(
1604			handler.protocols[0].state,
1605			State::Opening { in_substream: Some(_), .. }
1606		));
1607
1608		handler.on_behaviour_event(NotifsHandlerIn::Close { protocol_index: 0 });
1609		assert!(std::matches!(handler.protocols[0].state, State::Closed { pending_opening: true }));
1610
1611		let (io, _io2) = MockSubstream::negotiated().await;
1612		let mut codec = UviBytes::default();
1613		codec.set_max_len(usize::MAX);
1614
1615		let notif_in = NotificationsInOpen {
1616			handshake: b"hello, world".to_vec(),
1617			substream: NotificationsInSubstream::new(
1618				Framed::new(io, codec),
1619				NotificationsInSubstreamHandshake::NotSent,
1620			),
1621		};
1622		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1623			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1624		));
1625
1626		assert!(std::matches!(
1627			handler.protocols[0].state,
1628			State::OpenDesiredByRemote { pending_opening: true, .. }
1629		));
1630
1631		// inject dial failure to an already closed substream and verify outbound state is reset
1632		handler.on_connection_event(handler::ConnectionEvent::DialUpgradeError(
1633			handler::DialUpgradeError { info: 0, error: StreamUpgradeError::Timeout },
1634		));
1635		assert!(std::matches!(
1636			handler.protocols[0].state,
1637			State::OpenDesiredByRemote { pending_opening: false, .. }
1638		));
1639	}
1640
1641	#[tokio::test]
1642	async fn sync_notifications_clogged() {
1643		let mut handler = notifs_handler();
1644		let (io, _) = MockSubstream::negotiated().await;
1645		let codec = UviBytes::default();
1646
1647		let (async_tx, async_rx) = futures::channel::mpsc::channel(ASYNC_NOTIFICATIONS_BUFFER_SIZE);
1648		let (sync_tx, sync_rx) = futures::channel::mpsc::channel(1);
1649		let notifications_sink = NotificationsSink {
1650			inner: Arc::new(NotificationsSinkInner {
1651				peer_id: PeerId::random(),
1652				async_channel: FuturesMutex::new(async_tx),
1653				sync_channel: Mutex::new(Some(sync_tx)),
1654			}),
1655			metrics: None,
1656		};
1657
1658		handler.protocols[0].state = State::Open {
1659			notifications_sink_rx: stream::select(async_rx.fuse(), sync_rx.fuse()).peekable(),
1660			out_substream: Some(NotificationsOutSubstream::new(Framed::new(io, codec))),
1661			in_substream: None,
1662		};
1663
1664		notifications_sink.send_sync_notification(vec![1, 3, 3, 7]);
1665		notifications_sink.send_sync_notification(vec![1, 3, 3, 8]);
1666		notifications_sink.send_sync_notification(vec![1, 3, 3, 9]);
1667		notifications_sink.send_sync_notification(vec![1, 3, 4, 0]);
1668
1669		futures::future::poll_fn(|cx| {
1670			assert!(std::matches!(
1671				handler.poll(cx),
1672				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1673					NotifsHandlerOut::Close { .. }
1674				))
1675			));
1676			Poll::Ready(())
1677		})
1678		.await;
1679	}
1680
1681	#[tokio::test]
1682	async fn close_desired_by_remote() {
1683		let mut handler = notifs_handler();
1684		let (io, io2) = MockSubstream::negotiated().await;
1685		let mut codec = UviBytes::default();
1686		codec.set_max_len(usize::MAX);
1687
1688		let notif_in = NotificationsInOpen {
1689			handshake: b"hello, world".to_vec(),
1690			substream: NotificationsInSubstream::new(
1691				Framed::new(io, codec),
1692				NotificationsInSubstreamHandshake::PendingSend(vec![1, 2, 3, 4]),
1693			),
1694		};
1695
1696		// add new inbound substream but close it immediately and verify that correct events are
1697		// emitted
1698		handler.on_connection_event(handler::ConnectionEvent::FullyNegotiatedInbound(
1699			handler::FullyNegotiatedInbound { protocol: (notif_in, 0), info: () },
1700		));
1701		drop(io2);
1702
1703		futures::future::poll_fn(|cx| {
1704			assert!(std::matches!(
1705				handler.poll(cx),
1706				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1707					NotifsHandlerOut::OpenDesiredByRemote { protocol_index: 0, .. },
1708				))
1709			));
1710			assert!(std::matches!(
1711				handler.poll(cx),
1712				Poll::Ready(ConnectionHandlerEvent::NotifyBehaviour(
1713					NotifsHandlerOut::CloseDesired {
1714						protocol_index: 0,
1715						reason: CloseReason::RemoteRequest,
1716					},
1717				))
1718			));
1719			Poll::Ready(())
1720		})
1721		.await;
1722	}
1723}