Skip to main content

soil_client/client_api/
notifications.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Storage notifications
8
9use std::{
10	collections::{HashMap, HashSet},
11	pin::Pin,
12	sync::Arc,
13	task::Poll,
14};
15
16use futures::Stream;
17
18use soil_prometheus::Registry as PrometheusRegistry;
19
20use crate::utils::pubsub::{Hub, Receiver};
21use subsoil::core::storage::{StorageData, StorageKey};
22use subsoil::runtime::traits::Block as BlockT;
23
24mod registry;
25
26use registry::Registry;
27
28#[cfg(test)]
29mod tests;
30
31/// A type of a message delivered to the subscribers
32#[derive(Debug)]
33pub struct StorageNotification<Hash> {
34	/// The hash of the block
35	pub block: Hash,
36
37	/// The set of changes
38	pub changes: StorageChangeSet,
39}
40
41/// Storage change set
42#[derive(Debug)]
43pub struct StorageChangeSet {
44	changes: Arc<[(StorageKey, Option<StorageData>)]>,
45	child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option<StorageData>)>)]>,
46	filter: Keys,
47	child_filters: ChildKeys,
48}
49
50/// Manages storage listeners.
51#[derive(Debug)]
52pub struct StorageNotifications<Block: BlockT>(Hub<StorageNotification<Block::Hash>, Registry>);
53
54/// Type that implements `futures::Stream` of storage change events.
55pub struct StorageEventStream<H>(Receiver<StorageNotification<H>, Registry>);
56
57type Keys = Option<HashSet<StorageKey>>;
58type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
59
60impl StorageChangeSet {
61	/// Convert the change set into iterator over storage items.
62	pub fn iter(
63		&self,
64	) -> impl Iterator<Item = (Option<&StorageKey>, &StorageKey, Option<&StorageData>)> + '_ {
65		let top = self
66			.changes
67			.iter()
68			.filter(move |&(key, _)| match self.filter {
69				Some(ref filter) => filter.contains(key),
70				None => true,
71			})
72			.map(move |(k, v)| (None, k, v.as_ref()));
73		let children = self
74			.child_changes
75			.iter()
76			.filter_map(move |(sk, changes)| {
77				self.child_filters.as_ref().and_then(|cf| {
78					cf.get(sk).map(|filter| {
79						changes
80							.iter()
81							.filter(move |&(key, _)| match filter {
82								Some(ref filter) => filter.contains(key),
83								None => true,
84							})
85							.map(move |(k, v)| (Some(sk), k, v.as_ref()))
86					})
87				})
88			})
89			.flatten();
90		top.chain(children)
91	}
92}
93
94impl<H> Stream for StorageEventStream<H> {
95	type Item = StorageNotification<H>;
96	fn poll_next(
97		self: Pin<&mut Self>,
98		cx: &mut std::task::Context<'_>,
99	) -> Poll<Option<Self::Item>> {
100		Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
101	}
102}
103
104impl<Block: BlockT> StorageNotifications<Block> {
105	/// Initialize a new StorageNotifications
106	/// optionally pass a prometheus registry to send subscriber metrics to
107	pub fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
108		let registry = Registry::new(prometheus_registry);
109		let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry);
110
111		StorageNotifications(hub)
112	}
113
114	/// Trigger notification to all listeners.
115	///
116	/// Note the changes are going to be filtered by listener's filter key.
117	/// In fact no event might be sent if clients are not interested in the changes.
118	pub fn trigger(
119		&self,
120		hash: &Block::Hash,
121		changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
122		child_changeset: impl Iterator<
123			Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
124		>,
125	) {
126		self.0.send((hash, changeset, child_changeset))
127	}
128
129	/// Start listening for particular storage keys.
130	pub fn listen(
131		&self,
132		filter_keys: Option<&[StorageKey]>,
133		filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
134	) -> StorageEventStream<Block::Hash> {
135		let receiver = self
136			.0
137			.subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);
138
139		StorageEventStream(receiver)
140	}
141}