sc_client_api/
notifications.rs1use std::{
22 collections::{HashMap, HashSet},
23 pin::Pin,
24 sync::Arc,
25 task::Poll,
26};
27
28use futures::Stream;
29
30use prometheus_endpoint::Registry as PrometheusRegistry;
31
32use sc_utils::pubsub::{Hub, Receiver};
33use sp_core::storage::{StorageData, StorageKey};
34use sp_runtime::traits::Block as BlockT;
35
36mod registry;
37
38use registry::Registry;
39
40#[cfg(test)]
41mod tests;
42
43#[derive(Debug)]
45pub struct StorageNotification<Hash> {
46 pub block: Hash,
48
49 pub changes: StorageChangeSet,
51}
52
53#[derive(Debug)]
55pub struct StorageChangeSet {
56 changes: Arc<[(StorageKey, Option<StorageData>)]>,
57 child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option<StorageData>)>)]>,
58 filter: Keys,
59 child_filters: ChildKeys,
60}
61
62#[derive(Debug)]
64pub struct StorageNotifications<Block: BlockT>(Hub<StorageNotification<Block::Hash>, Registry>);
65
66pub struct StorageEventStream<H>(Receiver<StorageNotification<H>, Registry>);
68
69type Keys = Option<HashSet<StorageKey>>;
70type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
71
72impl StorageChangeSet {
73 pub fn iter(
75 &self,
76 ) -> impl Iterator<Item = (Option<&StorageKey>, &StorageKey, Option<&StorageData>)> + '_ {
77 let top = self
78 .changes
79 .iter()
80 .filter(move |&(key, _)| match self.filter {
81 Some(ref filter) => filter.contains(key),
82 None => true,
83 })
84 .map(move |(k, v)| (None, k, v.as_ref()));
85 let children = self
86 .child_changes
87 .iter()
88 .filter_map(move |(sk, changes)| {
89 self.child_filters.as_ref().and_then(|cf| {
90 cf.get(sk).map(|filter| {
91 changes
92 .iter()
93 .filter(move |&(key, _)| match filter {
94 Some(ref filter) => filter.contains(key),
95 None => true,
96 })
97 .map(move |(k, v)| (Some(sk), k, v.as_ref()))
98 })
99 })
100 })
101 .flatten();
102 top.chain(children)
103 }
104}
105
106impl<H> Stream for StorageEventStream<H> {
107 type Item = StorageNotification<H>;
108 fn poll_next(
109 self: Pin<&mut Self>,
110 cx: &mut std::task::Context<'_>,
111 ) -> Poll<Option<Self::Item>> {
112 Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
113 }
114}
115
116impl<Block: BlockT> StorageNotifications<Block> {
117 pub fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
120 let registry = Registry::new(prometheus_registry);
121 let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry);
122
123 StorageNotifications(hub)
124 }
125
126 pub fn trigger(
131 &self,
132 hash: &Block::Hash,
133 changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
134 child_changeset: impl Iterator<
135 Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
136 >,
137 ) {
138 self.0.send((hash, changeset, child_changeset))
139 }
140
141 pub fn listen(
143 &self,
144 filter_keys: Option<&[StorageKey]>,
145 filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
146 ) -> StorageEventStream<Block::Hash> {
147 let receiver = self
148 .0
149 .subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);
150
151 StorageEventStream(receiver)
152 }
153}