Skip to main content

soil_client/utils/
notification.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: Apache-2.0 OR GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Provides mpsc notification channel that can be instantiated
8//! _after_ it's been shared to the consumer and producers entities.
9//!
10//! Useful when building RPC extensions where, at service definition time, we
11//! don't know whether the specific interface where the RPC extension will be
12//! exposed is safe or not and we want to lazily build the RPC extension
13//! whenever we bind the service to an interface.
14//!
15//! See `soil_service::builder::RpcExtensionBuilder` for more details.
16
17use futures::stream::{FusedStream, Stream};
18use std::{
19	pin::Pin,
20	task::{Context, Poll},
21};
22
23use crate::utils::pubsub::{Hub, Receiver};
24
25mod registry;
26use registry::Registry;
27
28#[cfg(test)]
29mod tests;
30
31/// Trait used to define the "tracing key" string used to tag
32/// and identify the mpsc channels.
33pub trait TracingKeyStr {
34	/// Const `str` representing the "tracing key" used to tag and identify
35	/// the mpsc channels owned by the object implementing this trait.
36	const TRACING_KEY: &'static str;
37}
38
39/// The receiving half of the notifications channel.
40///
41/// The [`NotificationStream`] entity stores the [`Hub`] so it can be
42/// used to add more subscriptions.
43#[derive(Clone)]
44pub struct NotificationStream<Payload, TK: TracingKeyStr> {
45	hub: Hub<Payload, Registry>,
46	_pd: std::marker::PhantomData<TK>,
47}
48
49/// The receiving half of the notifications channel(s).
50#[derive(Debug)]
51pub struct NotificationReceiver<Payload> {
52	receiver: Receiver<Payload, Registry>,
53}
54
55/// The sending half of the notifications channel(s).
56pub struct NotificationSender<Payload> {
57	hub: Hub<Payload, Registry>,
58}
59
60impl<Payload, TK: TracingKeyStr> NotificationStream<Payload, TK> {
61	/// Creates a new pair of receiver and sender of `Payload` notifications.
62	pub fn channel() -> (NotificationSender<Payload>, Self) {
63		let hub = Hub::new(TK::TRACING_KEY);
64		let sender = NotificationSender { hub: hub.clone() };
65		let receiver = NotificationStream { hub, _pd: Default::default() };
66		(sender, receiver)
67	}
68
69	/// Subscribe to a channel through which the generic payload can be received.
70	pub fn subscribe(&self, queue_size_warning: usize) -> NotificationReceiver<Payload> {
71		let receiver = self.hub.subscribe((), queue_size_warning);
72		NotificationReceiver { receiver }
73	}
74}
75
76impl<Payload> NotificationSender<Payload> {
77	/// Send out a notification to all subscribers that a new payload is available for a
78	/// block.
79	pub fn notify<Error>(
80		&self,
81		payload: impl FnOnce() -> Result<Payload, Error>,
82	) -> Result<(), Error>
83	where
84		Payload: Clone,
85	{
86		self.hub.send(payload)
87	}
88}
89
90impl<Payload> Clone for NotificationSender<Payload> {
91	fn clone(&self) -> Self {
92		Self { hub: self.hub.clone() }
93	}
94}
95
96impl<Payload> Unpin for NotificationReceiver<Payload> {}
97
98impl<Payload> Stream for NotificationReceiver<Payload> {
99	type Item = Payload;
100
101	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Payload>> {
102		Pin::new(&mut self.get_mut().receiver).poll_next(cx)
103	}
104}
105
106impl<Payload> FusedStream for NotificationReceiver<Payload> {
107	fn is_terminated(&self) -> bool {
108		self.receiver.is_terminated()
109	}
110}