1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use jsonrpsee::client::Subscription;
use sp_core::storage::StorageChangeSet;
use std::collections::VecDeque;
use crate::{
error::Error,
events::{
EventsDecoder,
RawEvent,
},
frame::{
system::Phase,
Event,
},
runtimes::Runtime,
};
pub struct EventSubscription<T: Runtime> {
subscription: Subscription<StorageChangeSet<T::Hash>>,
decoder: EventsDecoder<T>,
block: Option<T::Hash>,
extrinsic: Option<usize>,
event: Option<(&'static str, &'static str)>,
events: VecDeque<RawEvent>,
finished: bool,
}
impl<T: Runtime> EventSubscription<T> {
pub fn new(
subscription: Subscription<StorageChangeSet<T::Hash>>,
decoder: EventsDecoder<T>,
) -> Self {
Self {
subscription,
decoder,
block: None,
extrinsic: None,
event: None,
events: Default::default(),
finished: false,
}
}
pub fn filter_block(&mut self, block: T::Hash) {
self.block = Some(block);
}
pub fn filter_extrinsic(&mut self, block: T::Hash, ext_index: usize) {
self.block = Some(block);
self.extrinsic = Some(ext_index);
}
pub fn filter_event<E: Event<T>>(&mut self) {
self.event = Some((E::MODULE, E::EVENT));
}
pub async fn next(&mut self) -> Option<Result<RawEvent, Error>> {
loop {
if let Some(event) = self.events.pop_front() {
return Some(Ok(event))
}
if self.finished {
return None
}
let change_set = self.subscription.next().await;
if let Some(hash) = self.block.as_ref() {
if &change_set.block == hash {
self.finished = true;
} else {
continue
}
}
for (_key, data) in change_set.changes {
if let Some(data) = data {
let raw_events = match self.decoder.decode_events(&mut &data.0[..]) {
Ok(events) => events,
Err(error) => return Some(Err(error)),
};
for (phase, event) in raw_events {
if let Phase::ApplyExtrinsic(i) = phase {
if let Some(ext_index) = self.extrinsic {
if i as usize != ext_index {
continue
}
}
if let Some((module, variant)) = self.event {
if event.module != module || event.variant != variant {
continue
}
}
self.events.push_back(event);
}
}
}
}
}
}
}