Skip to main content

soil_client/utils/
pubsub.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: Apache-2.0 OR GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Provides means to implement a typical Pub/Sub mechanism.
8//!
9//! This module provides a type [`Hub`] which can be used both to subscribe,
10//! and to send the broadcast messages.
11//!
12//! The [`Hub`] type is parametrized by two other types:
13//! - `Message` — the type of a message that shall be delivered to the subscribers;
14//! - `Registry` — implementation of the subscription/dispatch logic.
15//!
16//! A Registry is implemented by defining the following traits:
17//! - [`Subscribe<K>`];
18//! - [`Dispatch<M>`];
19//! - [`Unsubscribe`].
20//!
21//! As a result of subscription `Hub::subscribe` method returns an instance of
22//! [`Receiver<Message,Registry>`]. That can be used as a [`Stream`] to receive the messages.
23//! Upon drop the [`Receiver<Message, Registry>`] shall unregister itself from the `Hub`.
24
25use std::{
26	collections::HashMap,
27	pin::Pin,
28	sync::{Arc, Weak},
29	task::{Context, Poll},
30};
31
32use futures::stream::{FusedStream, Stream};
33// use parking_lot::Mutex;
34use parking_lot::ReentrantMutex;
35use std::cell::RefCell;
36
37use crate::utils::{
38	id_sequence::SeqID,
39	mpsc::{TracingUnboundedReceiver, TracingUnboundedSender},
40};
41
42#[cfg(test)]
43mod tests;
44
45/// Unsubscribe: unregisters a previously created subscription.
46pub trait Unsubscribe {
47	/// Remove all registrations of the subscriber with ID `subs_id`.
48	fn unsubscribe(&mut self, subs_id: SeqID);
49}
50
51/// Subscribe using a key of type `K`
52pub trait Subscribe<K> {
53	/// Register subscriber with the ID `subs_id` as having interest to the key `K`.
54	fn subscribe(&mut self, subs_key: K, subs_id: SeqID);
55}
56
57/// Dispatch a message of type `M`.
58pub trait Dispatch<M> {
59	/// The type of the that shall be sent through the channel as a result of such dispatch.
60	type Item;
61	/// The type returned by the `dispatch`-method.
62	type Ret;
63
64	/// Dispatch the message of type `M`.
65	///
66	/// The implementation is given an instance of `M` and is supposed to invoke `dispatch` for
67	/// each matching subscriber, with an argument of type `Self::Item` matching that subscriber.
68	///
69	/// Note that this does not have to be of the same type with the item that will be sent through
70	/// to the subscribers. The subscribers will receive a message of type `Self::Item`.
71	fn dispatch<F>(&mut self, message: M, dispatch: F) -> Self::Ret
72	where
73		F: FnMut(&SeqID, Self::Item);
74}
75
76/// A subscription hub.
77///
78/// Does the subscription and dispatch.
79/// The exact subscription and routing behaviour is to be implemented by the Registry (of type `R`).
80/// The Hub under the hood uses the channel defined in `crate::utils::mpsc` module.
81#[derive(Debug)]
82pub struct Hub<M, R> {
83	tracing_key: &'static str,
84	shared: Arc<ReentrantMutex<RefCell<Shared<M, R>>>>,
85}
86
87/// The receiving side of the subscription.
88///
89/// The messages are delivered as items of a [`Stream`].
90/// Upon drop this receiver unsubscribes itself from the [`Hub<M, R>`].
91#[derive(Debug)]
92pub struct Receiver<M, R>
93where
94	R: Unsubscribe,
95{
96	rx: TracingUnboundedReceiver<M>,
97
98	shared: Weak<ReentrantMutex<RefCell<Shared<M, R>>>>,
99	subs_id: SeqID,
100}
101
102#[derive(Debug)]
103struct Shared<M, R> {
104	id_sequence: crate::utils::id_sequence::IDSequence,
105	registry: R,
106	sinks: HashMap<SeqID, TracingUnboundedSender<M>>,
107}
108
109impl<M, R> Hub<M, R>
110where
111	R: Unsubscribe,
112{
113	/// Provide access to the registry (for test purposes).
114	pub fn map_registry_for_tests<MapF, Ret>(&self, map: MapF) -> Ret
115	where
116		MapF: FnOnce(&R) -> Ret,
117	{
118		let shared_locked = self.shared.lock();
119		let shared_borrowed = shared_locked.borrow();
120		map(&shared_borrowed.registry)
121	}
122}
123
124impl<M, R> Drop for Receiver<M, R>
125where
126	R: Unsubscribe,
127{
128	fn drop(&mut self) {
129		if let Some(shared) = self.shared.upgrade() {
130			shared.lock().borrow_mut().unsubscribe(self.subs_id);
131		}
132	}
133}
134
135impl<M, R> Hub<M, R> {
136	/// Create a new instance of Hub (with default value for the Registry).
137	pub fn new(tracing_key: &'static str) -> Self
138	where
139		R: Default,
140	{
141		Self::new_with_registry(tracing_key, Default::default())
142	}
143
144	/// Create a new instance of Hub over the initialized Registry.
145	pub fn new_with_registry(tracing_key: &'static str, registry: R) -> Self {
146		let shared =
147			Shared { registry, sinks: Default::default(), id_sequence: Default::default() };
148		let shared = Arc::new(ReentrantMutex::new(RefCell::new(shared)));
149		Self { tracing_key, shared }
150	}
151
152	/// Subscribe to this Hub using the `subs_key: K`.
153	///
154	/// A subscription with a key `K` is possible if the Registry implements `Subscribe<K>`.
155	pub fn subscribe<K>(&self, subs_key: K, queue_size_warning: usize) -> Receiver<M, R>
156	where
157		R: Subscribe<K> + Unsubscribe,
158	{
159		let shared_locked = self.shared.lock();
160		let mut shared_borrowed = shared_locked.borrow_mut();
161
162		let subs_id = shared_borrowed.id_sequence.next_id();
163
164		// The order (registry.subscribe then sinks.insert) is important here:
165		// assuming that `Subscribe<K>::subscribe` can panic, it is better to at least
166		// have the sink disposed.
167		shared_borrowed.registry.subscribe(subs_key, subs_id);
168
169		let (tx, rx) = crate::utils::mpsc::tracing_unbounded(self.tracing_key, queue_size_warning);
170		assert!(shared_borrowed.sinks.insert(subs_id, tx).is_none(), "Used IDSequence to create another ID. Should be unique until u64 is overflowed. Should be unique.");
171
172		Receiver { shared: Arc::downgrade(&self.shared), subs_id, rx }
173	}
174
175	/// Send the message produced with `Trigger`.
176	///
177	/// This is possible if the registry implements `Dispatch<Trigger, Item = M>`.
178	pub fn send<Trigger>(&self, trigger: Trigger) -> <R as Dispatch<Trigger>>::Ret
179	where
180		R: Dispatch<Trigger, Item = M>,
181	{
182		let shared_locked = self.shared.lock();
183		let mut shared_borrowed = shared_locked.borrow_mut();
184		let (registry, sinks) = shared_borrowed.get_mut();
185
186		registry.dispatch(trigger, |subs_id, item| {
187			if let Some(tx) = sinks.get_mut(subs_id) {
188				if let Err(send_err) = tx.unbounded_send(item) {
189					log::warn!("Sink with SubsID = {} failed to perform unbounded_send: {} ({} as Dispatch<{}, Item = {}>::dispatch(...))", subs_id, send_err, std::any::type_name::<R>(),
190					std::any::type_name::<Trigger>(),
191					std::any::type_name::<M>());
192				}
193			} else {
194				log::warn!(
195					"No Sink for SubsID = {} ({} as Dispatch<{}, Item = {}>::dispatch(...))",
196					subs_id,
197					std::any::type_name::<R>(),
198					std::any::type_name::<Trigger>(),
199					std::any::type_name::<M>(),
200				);
201			}
202		})
203	}
204}
205
206impl<M, R> Shared<M, R> {
207	fn get_mut(&mut self) -> (&mut R, &mut HashMap<SeqID, TracingUnboundedSender<M>>) {
208		(&mut self.registry, &mut self.sinks)
209	}
210
211	fn unsubscribe(&mut self, subs_id: SeqID)
212	where
213		R: Unsubscribe,
214	{
215		// The order (sinks.remove then registry.unsubscribe) is important here:
216		// assuming that `Unsubscribe::unsubscribe` can panic, it is better to at least
217		// have the sink disposed.
218		self.sinks.remove(&subs_id);
219		self.registry.unsubscribe(subs_id);
220	}
221}
222
223impl<M, R> Clone for Hub<M, R> {
224	fn clone(&self) -> Self {
225		Self { tracing_key: self.tracing_key, shared: self.shared.clone() }
226	}
227}
228
229impl<M, R> Unpin for Receiver<M, R> where R: Unsubscribe {}
230
231impl<M, R> Stream for Receiver<M, R>
232where
233	R: Unsubscribe,
234{
235	type Item = M;
236
237	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
238		Pin::new(&mut self.get_mut().rx).poll_next(cx)
239	}
240}
241
242impl<Ch, R> FusedStream for Receiver<Ch, R>
243where
244	R: Unsubscribe,
245{
246	fn is_terminated(&self) -> bool {
247		self.rx.is_terminated()
248	}
249}