Skip to main content

soil_network/protocol/notifications/service/
mod.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Notification service implementation.
8
9use crate::{
10	error,
11	protocol::notifications::handler::NotificationsSink,
12	service::{
13		metrics::NotificationMetrics,
14		traits::{
15			Direction, MessageSink, NotificationEvent, NotificationService, ValidationResult,
16		},
17	},
18	types::ProtocolName,
19};
20
21use futures::{
22	stream::{FuturesUnordered, Stream},
23	StreamExt,
24};
25use libp2p::PeerId;
26use parking_lot::Mutex;
27use tokio::sync::{mpsc, oneshot};
28use tokio_stream::wrappers::ReceiverStream;
29
30use soil_client::utils::mpsc::{
31	tracing_unbounded, TracingUnboundedReceiver, TracingUnboundedSender,
32};
33
34use std::{collections::HashMap, fmt::Debug, sync::Arc};
35
36pub(crate) mod metrics;
37
38#[cfg(test)]
39mod tests;
40
41/// Logging target for the file.
42const LOG_TARGET: &str = "sub-libp2p::notification::service";
43
44/// Default command queue size.
45const COMMAND_QUEUE_SIZE: usize = 64;
46
47/// Type representing subscribers of a notification protocol.
48type Subscribers = Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>;
49
50/// Type representing a distributable message sink.
51/// Detached message sink must carry the protocol name for registering metrics.
52///
53/// See documentation for [`PeerContext`] for more details.
54type NotificationSink = Arc<Mutex<(NotificationsSink, ProtocolName)>>;
55
56#[async_trait::async_trait]
57impl MessageSink for NotificationSink {
58	/// Send synchronous `notification` to the peer associated with this [`MessageSink`].
59	fn send_sync_notification(&self, notification: Vec<u8>) {
60		let sink = self.lock();
61
62		metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification.len());
63		sink.0.send_sync_notification(notification);
64	}
65
66	/// Send an asynchronous `notification` to the peer associated with this [`MessageSink`],
67	/// allowing sender to exercise backpressure.
68	///
69	/// Returns an error if the peer does not exist.
70	async fn send_async_notification(&self, notification: Vec<u8>) -> Result<(), error::Error> {
71		// notification sink must be cloned because the lock cannot be held across `.await`
72		// this makes the implementation less efficient but not prohibitively so as the same
73		// method is also used by `NetworkService` when sending notifications.
74		let notification_len = notification.len();
75		let sink = self.lock().clone();
76		let permit = sink
77			.0
78			.reserve_notification()
79			.await
80			.map_err(|_| error::Error::ConnectionClosed)?;
81
82		permit.send(notification).map_err(|_| error::Error::ChannelClosed).inspect(|_| {
83			metrics::register_notification_sent(sink.0.metrics(), &sink.1, notification_len);
84		})
85	}
86}
87
88/// Inner notification event to deal with `NotificationsSinks` without exposing that
89/// implementation detail to [`NotificationService`] consumers.
90#[derive(Debug)]
91enum InnerNotificationEvent {
92	/// Validate inbound substream.
93	ValidateInboundSubstream {
94		/// Peer ID.
95		peer: PeerId,
96
97		/// Received handshake.
98		handshake: Vec<u8>,
99
100		/// `oneshot::Sender` for sending validation result back to `Notifications`
101		result_tx: oneshot::Sender<ValidationResult>,
102	},
103
104	/// Notification substream open to `peer`.
105	NotificationStreamOpened {
106		/// Peer ID.
107		peer: PeerId,
108
109		/// Direction of the substream.
110		direction: Direction,
111
112		/// Received handshake.
113		handshake: Vec<u8>,
114
115		/// Negotiated fallback.
116		negotiated_fallback: Option<ProtocolName>,
117
118		/// Notification sink.
119		sink: NotificationsSink,
120	},
121
122	/// Substream was closed.
123	NotificationStreamClosed {
124		/// Peer ID.
125		peer: PeerId,
126	},
127
128	/// Notification was received from the substream.
129	NotificationReceived {
130		/// Peer ID.
131		peer: PeerId,
132
133		/// Received notification.
134		notification: Vec<u8>,
135	},
136
137	/// Notification sink has been replaced.
138	NotificationSinkReplaced {
139		/// Peer ID.
140		peer: PeerId,
141
142		/// Notification sink.
143		sink: NotificationsSink,
144	},
145}
146
147/// Notification commands.
148///
149/// Sent by the installed protocols to `Notifications` to open/close/modify substreams.
150#[derive(Debug)]
151pub enum NotificationCommand {
152	/// Instruct `Notifications` to open a substream to peer.
153	#[allow(unused)]
154	OpenSubstream(PeerId),
155
156	/// Instruct `Notifications` to close the substream to peer.
157	#[allow(unused)]
158	CloseSubstream(PeerId),
159
160	/// Set handshake for the notifications protocol.
161	SetHandshake(Vec<u8>),
162}
163
164/// Context assigned to each peer.
165///
166/// Contains `NotificationsSink` used by [`NotificationService`] to send notifications
167/// and an additional, distributable `NotificationsSink` which the protocol may acquire
168/// if it wishes to send notifications through `NotificationsSink` directly.
169///
170/// The distributable `NotificationsSink` is wrapped in an `Arc<Mutex<>>` to allow
171/// `NotificationsService` to swap the underlying sink in case it's replaced.
172#[derive(Debug, Clone)]
173struct PeerContext {
174	/// Sink for sending notifications.
175	sink: NotificationsSink,
176
177	/// Distributable notification sink.
178	shared_sink: NotificationSink,
179}
180
181/// Handle that is passed on to the notifications protocol.
182#[derive(Debug)]
183pub struct NotificationHandle {
184	/// Protocol name.
185	protocol: ProtocolName,
186
187	/// TX channel for sending commands to `Notifications`.
188	tx: mpsc::Sender<NotificationCommand>,
189
190	/// RX channel for receiving events from `Notifications`.
191	rx: TracingUnboundedReceiver<InnerNotificationEvent>,
192
193	/// All subscribers of `NotificationEvent`s.
194	subscribers: Subscribers,
195
196	/// Connected peers.
197	peers: HashMap<PeerId, PeerContext>,
198}
199
200impl NotificationHandle {
201	/// Create new [`NotificationHandle`].
202	fn new(
203		protocol: ProtocolName,
204		tx: mpsc::Sender<NotificationCommand>,
205		rx: TracingUnboundedReceiver<InnerNotificationEvent>,
206		subscribers: Arc<Mutex<Vec<TracingUnboundedSender<InnerNotificationEvent>>>>,
207	) -> Self {
208		Self { protocol, tx, rx, subscribers, peers: HashMap::new() }
209	}
210}
211
212#[async_trait::async_trait]
213impl NotificationService for NotificationHandle {
214	/// Instruct `Notifications` to open a new substream for `peer`.
215	async fn open_substream(&mut self, _peer: crate::types::PeerId) -> Result<(), ()> {
216		todo!("support for opening substreams not implemented yet");
217	}
218
219	/// Instruct `Notifications` to close substream for `peer`.
220	async fn close_substream(&mut self, _peer: crate::types::PeerId) -> Result<(), ()> {
221		todo!("support for closing substreams not implemented yet, call `NetworkService::disconnect_peer()` instead");
222	}
223
224	/// Send synchronous `notification` to `peer`.
225	fn send_sync_notification(&mut self, peer: &crate::types::PeerId, notification: Vec<u8>) {
226		if let Some(info) = self.peers.get(&((*peer).into())) {
227			metrics::register_notification_sent(
228				info.sink.metrics(),
229				&self.protocol,
230				notification.len(),
231			);
232
233			let _ = info.sink.send_sync_notification(notification);
234		}
235	}
236
237	/// Send asynchronous `notification` to `peer`, allowing sender to exercise backpressure.
238	async fn send_async_notification(
239		&mut self,
240		peer: &crate::types::PeerId,
241		notification: Vec<u8>,
242	) -> Result<(), error::Error> {
243		let notification_len = notification.len();
244		let sink = &self
245			.peers
246			.get(&peer.into())
247			.ok_or_else(|| error::Error::PeerDoesntExist((*peer).into()))?
248			.sink;
249
250		sink.reserve_notification()
251			.await
252			.map_err(|_| error::Error::ConnectionClosed)?
253			.send(notification)
254			.map_err(|_| error::Error::ChannelClosed)
255			.inspect(|_| {
256				metrics::register_notification_sent(
257					sink.metrics(),
258					&self.protocol,
259					notification_len,
260				);
261			})
262	}
263
264	/// Set handshake for the notification protocol replacing the old handshake.
265	async fn set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
266		log::trace!(target: LOG_TARGET, "{}: set handshake to {handshake:?}", self.protocol);
267
268		self.tx.send(NotificationCommand::SetHandshake(handshake)).await.map_err(|_| ())
269	}
270
271	/// Non-blocking variant of `set_handshake()` that attempts to update the handshake
272	/// and returns an error if the channel is blocked.
273	///
274	/// Technically the function can return an error if the channel to `Notifications` is closed
275	/// but that doesn't happen under normal operation.
276	fn try_set_handshake(&mut self, handshake: Vec<u8>) -> Result<(), ()> {
277		self.tx.try_send(NotificationCommand::SetHandshake(handshake)).map_err(|_| ())
278	}
279
280	/// Get next event from the `Notifications` event stream.
281	async fn next_event(&mut self) -> Option<NotificationEvent> {
282		loop {
283			match self.rx.next().await? {
284				InnerNotificationEvent::ValidateInboundSubstream { peer, handshake, result_tx } => {
285					return Some(NotificationEvent::ValidateInboundSubstream {
286						peer: peer.into(),
287						handshake,
288						result_tx,
289					})
290				},
291				InnerNotificationEvent::NotificationStreamOpened {
292					peer,
293					handshake,
294					negotiated_fallback,
295					direction,
296					sink,
297				} => {
298					self.peers.insert(
299						peer,
300						PeerContext {
301							sink: sink.clone(),
302							shared_sink: Arc::new(Mutex::new((sink, self.protocol.clone()))),
303						},
304					);
305					return Some(NotificationEvent::NotificationStreamOpened {
306						peer: peer.into(),
307						handshake,
308						direction,
309						negotiated_fallback,
310					});
311				},
312				InnerNotificationEvent::NotificationStreamClosed { peer } => {
313					self.peers.remove(&peer);
314					return Some(NotificationEvent::NotificationStreamClosed { peer: peer.into() });
315				},
316				InnerNotificationEvent::NotificationReceived { peer, notification } => {
317					return Some(NotificationEvent::NotificationReceived {
318						peer: peer.into(),
319						notification,
320					})
321				},
322				InnerNotificationEvent::NotificationSinkReplaced { peer, sink } => {
323					match self.peers.get_mut(&peer) {
324						None => log::error!(
325							"{}: notification sink replaced for {peer} but peer does not exist",
326							self.protocol
327						),
328						Some(context) => {
329							context.sink = sink.clone();
330							*context.shared_sink.lock() = (sink.clone(), self.protocol.clone());
331						},
332					}
333				},
334			}
335		}
336	}
337
338	// Clone [`NotificationService`]
339	fn clone(&mut self) -> Result<Box<dyn NotificationService>, ()> {
340		let mut subscribers = self.subscribers.lock();
341
342		let (event_tx, event_rx) = tracing_unbounded(self.rx.name(), 100_000);
343		subscribers.push(event_tx);
344
345		Ok(Box::new(NotificationHandle {
346			protocol: self.protocol.clone(),
347			tx: self.tx.clone(),
348			rx: event_rx,
349			peers: self.peers.clone(),
350			subscribers: self.subscribers.clone(),
351		}))
352	}
353
354	/// Get protocol name.
355	fn protocol(&self) -> &ProtocolName {
356		&self.protocol
357	}
358
359	/// Get message sink of the peer.
360	fn message_sink(&self, peer: &crate::types::PeerId) -> Option<Box<dyn MessageSink>> {
361		match self.peers.get(&peer.into()) {
362			Some(context) => Some(Box::new(context.shared_sink.clone())),
363			None => None,
364		}
365	}
366}
367
368/// Channel pair which allows `Notifications` to interact with a protocol.
369#[derive(Debug)]
370pub struct ProtocolHandlePair {
371	/// Protocol name.
372	protocol: ProtocolName,
373
374	/// Subscribers of the notification protocol events.
375	subscribers: Subscribers,
376
377	// Receiver for notification commands received from the protocol implementation.
378	rx: mpsc::Receiver<NotificationCommand>,
379}
380
381impl ProtocolHandlePair {
382	/// Create new [`ProtocolHandlePair`].
383	fn new(
384		protocol: ProtocolName,
385		subscribers: Subscribers,
386		rx: mpsc::Receiver<NotificationCommand>,
387	) -> Self {
388		Self { protocol, subscribers, rx }
389	}
390
391	/// Consume `self` and split [`ProtocolHandlePair`] into a handle which allows it to send events
392	/// to the protocol and a stream of commands received from the protocol.
393	pub(crate) fn split(
394		self,
395	) -> (ProtocolHandle, Box<dyn Stream<Item = NotificationCommand> + Send + Unpin>) {
396		(
397			ProtocolHandle::new(self.protocol, self.subscribers),
398			Box::new(ReceiverStream::new(self.rx)),
399		)
400	}
401}
402
403/// Handle that is passed on to `Notifications` and allows it to directly communicate
404/// with the protocol.
405#[derive(Debug, Clone)]
406pub(crate) struct ProtocolHandle {
407	/// Protocol name.
408	protocol: ProtocolName,
409
410	/// Subscribers of the notification protocol.
411	subscribers: Subscribers,
412
413	/// Number of connected peers.
414	num_peers: usize,
415
416	/// Delegate validation to `Peerset`.
417	delegate_to_peerset: bool,
418
419	/// Prometheus metrics.
420	metrics: Option<NotificationMetrics>,
421}
422
423pub(crate) enum ValidationCallResult {
424	WaitForValidation(oneshot::Receiver<ValidationResult>),
425	Delegated,
426}
427
428impl ProtocolHandle {
429	/// Create new [`ProtocolHandle`].
430	fn new(protocol: ProtocolName, subscribers: Subscribers) -> Self {
431		Self { protocol, subscribers, num_peers: 0usize, metrics: None, delegate_to_peerset: false }
432	}
433
434	/// Set metrics.
435	pub fn set_metrics(&mut self, metrics: NotificationMetrics) {
436		self.metrics = Some(metrics);
437	}
438
439	/// Delegate validation to `Peerset`.
440	///
441	/// Protocols that do not do any validation themselves and only rely on `Peerset` handling
442	/// validation can disable protocol-side validation entirely by delegating all validation to
443	/// `Peerset`.
444	pub fn delegate_to_peerset(&mut self, delegate: bool) {
445		self.delegate_to_peerset = delegate;
446	}
447
448	/// Report to the protocol that a substream has been opened and it must be validated by the
449	/// protocol.
450	///
451	/// Return `oneshot::Receiver` which allows `Notifications` to poll for the validation result
452	/// from protocol.
453	pub fn report_incoming_substream(
454		&self,
455		peer: PeerId,
456		handshake: Vec<u8>,
457	) -> Result<ValidationCallResult, ()> {
458		let subscribers = self.subscribers.lock();
459
460		log::trace!(
461			target: LOG_TARGET,
462			"{}: report incoming substream for {peer}, handshake {handshake:?}",
463			self.protocol
464		);
465
466		if self.delegate_to_peerset {
467			return Ok(ValidationCallResult::Delegated);
468		}
469
470		// if there is only one subscriber, `Notifications` can wait directly on the
471		// `oneshot::channel()`'s RX half without indirection
472		if subscribers.len() == 1 {
473			let (result_tx, rx) = oneshot::channel();
474			return subscribers[0]
475				.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
476					peer,
477					handshake,
478					result_tx,
479				})
480				.map(|_| ValidationCallResult::WaitForValidation(rx))
481				.map_err(|_| ());
482		}
483
484		// if there are multiple subscribers, create a task which waits for all of the
485		// validations to finish and returns the combined result to `Notifications`
486		let mut results: FuturesUnordered<_> = subscribers
487			.iter()
488			.filter_map(|subscriber| {
489				let (result_tx, rx) = oneshot::channel();
490
491				subscriber
492					.unbounded_send(InnerNotificationEvent::ValidateInboundSubstream {
493						peer,
494						handshake: handshake.clone(),
495						result_tx,
496					})
497					.is_ok()
498					.then_some(rx)
499			})
500			.collect();
501
502		let (tx, rx) = oneshot::channel();
503		tokio::spawn(async move {
504			while let Some(event) = results.next().await {
505				match event {
506					Err(_) | Ok(ValidationResult::Reject) => {
507						return tx.send(ValidationResult::Reject)
508					},
509					Ok(ValidationResult::Accept) => {},
510				}
511			}
512
513			return tx.send(ValidationResult::Accept);
514		});
515
516		Ok(ValidationCallResult::WaitForValidation(rx))
517	}
518
519	/// Report to the protocol that a substream has been opened and that it can now use the handle
520	/// to send notifications to the remote peer.
521	pub fn report_substream_opened(
522		&mut self,
523		peer: PeerId,
524		direction: Direction,
525		handshake: Vec<u8>,
526		negotiated_fallback: Option<ProtocolName>,
527		sink: NotificationsSink,
528	) -> Result<(), ()> {
529		metrics::register_substream_opened(&self.metrics, &self.protocol);
530
531		let mut subscribers = self.subscribers.lock();
532		log::trace!(target: LOG_TARGET, "{}: substream opened for {peer:?}", self.protocol);
533
534		subscribers.retain(|subscriber| {
535			subscriber
536				.unbounded_send(InnerNotificationEvent::NotificationStreamOpened {
537					peer,
538					direction,
539					handshake: handshake.clone(),
540					negotiated_fallback: negotiated_fallback.clone(),
541					sink: sink.clone(),
542				})
543				.is_ok()
544		});
545		self.num_peers += 1;
546
547		Ok(())
548	}
549
550	/// Substream was closed.
551	pub fn report_substream_closed(&mut self, peer: PeerId) -> Result<(), ()> {
552		metrics::register_substream_closed(&self.metrics, &self.protocol);
553
554		let mut subscribers = self.subscribers.lock();
555		log::trace!(target: LOG_TARGET, "{}: substream closed for {peer:?}", self.protocol);
556
557		subscribers.retain(|subscriber| {
558			subscriber
559				.unbounded_send(InnerNotificationEvent::NotificationStreamClosed { peer })
560				.is_ok()
561		});
562		self.num_peers -= 1;
563
564		Ok(())
565	}
566
567	/// Notification was received from the substream.
568	pub fn report_notification_received(
569		&mut self,
570		peer: PeerId,
571		notification: Vec<u8>,
572	) -> Result<(), ()> {
573		metrics::register_notification_received(&self.metrics, &self.protocol, notification.len());
574
575		let mut subscribers = self.subscribers.lock();
576		log::trace!(target: LOG_TARGET, "{}: notification received from {peer:?}", self.protocol);
577
578		subscribers.retain(|subscriber| {
579			subscriber
580				.unbounded_send(InnerNotificationEvent::NotificationReceived {
581					peer,
582					notification: notification.clone(),
583				})
584				.is_ok()
585		});
586
587		Ok(())
588	}
589
590	/// Notification sink was replaced.
591	pub fn report_notification_sink_replaced(
592		&mut self,
593		peer: PeerId,
594		sink: NotificationsSink,
595	) -> Result<(), ()> {
596		let mut subscribers = self.subscribers.lock();
597
598		log::trace!(
599			target: LOG_TARGET,
600			"{}: notification sink replaced for {peer:?}",
601			self.protocol
602		);
603
604		subscribers.retain(|subscriber| {
605			subscriber
606				.unbounded_send(InnerNotificationEvent::NotificationSinkReplaced {
607					peer,
608					sink: sink.clone(),
609				})
610				.is_ok()
611		});
612
613		Ok(())
614	}
615
616	/// Get the number of connected peers.
617	pub fn num_peers(&self) -> usize {
618		self.num_peers
619	}
620}
621
622/// Create new (protocol, notification) handle pair.
623///
624/// Handle pair allows `Notifications` and the protocol to communicate with each other directly.
625pub fn notification_service(
626	protocol: ProtocolName,
627) -> (ProtocolHandlePair, Box<dyn NotificationService>) {
628	let (cmd_tx, cmd_rx) = mpsc::channel(COMMAND_QUEUE_SIZE);
629
630	let (event_tx, event_rx) =
631		tracing_unbounded(metric_label_for_protocol(&protocol).leak(), 100_000);
632	let subscribers = Arc::new(Mutex::new(vec![event_tx]));
633
634	(
635		ProtocolHandlePair::new(protocol.clone(), subscribers.clone(), cmd_rx),
636		Box::new(NotificationHandle::new(protocol.clone(), cmd_tx, event_rx, subscribers)),
637	)
638}
639
640// Decorates the mpsc-notification-to-protocol metric with the name of the protocol,
641// to be able to distiguish between different protocols in dashboards.
642fn metric_label_for_protocol(protocol: &ProtocolName) -> String {
643	let protocol_name = protocol.to_string();
644	let keys = protocol_name.split("/").collect::<Vec<_>>();
645	keys.iter()
646		.rev()
647		.take(2) // Last two tokens give the protocol name and version
648		.fold("mpsc-notification-to-protocol".into(), |acc, val| format!("{}-{}", acc, val))
649}