soil_client/client_api/
notifications.rs1use std::{
10 collections::{HashMap, HashSet},
11 pin::Pin,
12 sync::Arc,
13 task::Poll,
14};
15
16use futures::Stream;
17
18use soil_prometheus::Registry as PrometheusRegistry;
19
20use crate::utils::pubsub::{Hub, Receiver};
21use subsoil::core::storage::{StorageData, StorageKey};
22use subsoil::runtime::traits::Block as BlockT;
23
24mod registry;
25
26use registry::Registry;
27
28#[cfg(test)]
29mod tests;
30
31#[derive(Debug)]
33pub struct StorageNotification<Hash> {
34 pub block: Hash,
36
37 pub changes: StorageChangeSet,
39}
40
41#[derive(Debug)]
43pub struct StorageChangeSet {
44 changes: Arc<[(StorageKey, Option<StorageData>)]>,
45 child_changes: Arc<[(StorageKey, Vec<(StorageKey, Option<StorageData>)>)]>,
46 filter: Keys,
47 child_filters: ChildKeys,
48}
49
50#[derive(Debug)]
52pub struct StorageNotifications<Block: BlockT>(Hub<StorageNotification<Block::Hash>, Registry>);
53
54pub struct StorageEventStream<H>(Receiver<StorageNotification<H>, Registry>);
56
57type Keys = Option<HashSet<StorageKey>>;
58type ChildKeys = Option<HashMap<StorageKey, Option<HashSet<StorageKey>>>>;
59
60impl StorageChangeSet {
61 pub fn iter(
63 &self,
64 ) -> impl Iterator<Item = (Option<&StorageKey>, &StorageKey, Option<&StorageData>)> + '_ {
65 let top = self
66 .changes
67 .iter()
68 .filter(move |&(key, _)| match self.filter {
69 Some(ref filter) => filter.contains(key),
70 None => true,
71 })
72 .map(move |(k, v)| (None, k, v.as_ref()));
73 let children = self
74 .child_changes
75 .iter()
76 .filter_map(move |(sk, changes)| {
77 self.child_filters.as_ref().and_then(|cf| {
78 cf.get(sk).map(|filter| {
79 changes
80 .iter()
81 .filter(move |&(key, _)| match filter {
82 Some(ref filter) => filter.contains(key),
83 None => true,
84 })
85 .map(move |(k, v)| (Some(sk), k, v.as_ref()))
86 })
87 })
88 })
89 .flatten();
90 top.chain(children)
91 }
92}
93
94impl<H> Stream for StorageEventStream<H> {
95 type Item = StorageNotification<H>;
96 fn poll_next(
97 self: Pin<&mut Self>,
98 cx: &mut std::task::Context<'_>,
99 ) -> Poll<Option<Self::Item>> {
100 Stream::poll_next(Pin::new(&mut self.get_mut().0), cx)
101 }
102}
103
104impl<Block: BlockT> StorageNotifications<Block> {
105 pub fn new(prometheus_registry: Option<PrometheusRegistry>) -> Self {
108 let registry = Registry::new(prometheus_registry);
109 let hub = Hub::new_with_registry("mpsc_storage_notification_items", registry);
110
111 StorageNotifications(hub)
112 }
113
114 pub fn trigger(
119 &self,
120 hash: &Block::Hash,
121 changeset: impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>,
122 child_changeset: impl Iterator<
123 Item = (Vec<u8>, impl Iterator<Item = (Vec<u8>, Option<Vec<u8>>)>),
124 >,
125 ) {
126 self.0.send((hash, changeset, child_changeset))
127 }
128
129 pub fn listen(
131 &self,
132 filter_keys: Option<&[StorageKey]>,
133 filter_child_keys: Option<&[(StorageKey, Option<Vec<StorageKey>>)]>,
134 ) -> StorageEventStream<Block::Hash> {
135 let receiver = self
136 .0
137 .subscribe(registry::SubscribeOp { filter_keys, filter_child_keys }, 100_000);
138
139 StorageEventStream(receiver)
140 }
141}