1use crate::{
15 api::{Api, Error, Result},
16 rpc::{HandleSubscription, Request, Subscribe},
17 GetChainInfo, GetStorage,
18};
19use ac_compose_macros::rpc_params;
20use ac_node_api::{metadata::Metadata, EventDetails, EventRecord, Events, Phase};
21use ac_primitives::config::Config;
22#[cfg(all(not(feature = "sync-api"), not(feature = "std")))]
23use alloc::boxed::Box;
24use alloc::{vec, vec::Vec};
25use codec::{Decode, Encode};
26use core::marker::PhantomData;
27use log::*;
28use serde::de::DeserializeOwned;
29use sp_runtime::traits::{Block as BlockHash, Hash as HashTrait};
30use sp_storage::StorageChangeSet;
31
32pub type EventSubscriptionFor<Client, Hash> =
33 EventSubscription<<Client as Subscribe>::Subscription<StorageChangeSet<Hash>>, Hash>;
34
35#[maybe_async::maybe_async(?Send)]
36pub trait FetchEvents {
37 type Hash: Encode + Decode;
38
39 async fn fetch_events_from_block(&self, block_hash: Self::Hash) -> Result<Events<Self::Hash>>;
41
42 async fn fetch_events_for_extrinsic(
44 &self,
45 block_hash: Self::Hash,
46 extrinsic_hash: Self::Hash,
47 ) -> Result<Vec<EventDetails<Self::Hash>>>;
48}
49
50#[maybe_async::maybe_async(?Send)]
51impl<T, Client> FetchEvents for Api<T, Client>
52where
53 T: Config,
54 Client: Request,
55{
56 type Hash = T::Hash;
57
58 async fn fetch_events_from_block(&self, block_hash: Self::Hash) -> Result<Events<Self::Hash>> {
59 let key = crate::storage_key("System", "Events");
60 let event_bytes = self
61 .get_opaque_storage_by_key(key, Some(block_hash))
62 .await?
63 .ok_or(Error::BlockNotFound)?;
64 let events =
65 Events::<Self::Hash>::new(self.metadata().clone(), Default::default(), event_bytes);
66 Ok(events)
67 }
68
69 async fn fetch_events_for_extrinsic(
70 &self,
71 block_hash: Self::Hash,
72 extrinsic_hash: Self::Hash,
73 ) -> Result<Vec<EventDetails<Self::Hash>>> {
74 let extrinsic_index =
75 self.retrieve_extrinsic_index_from_block(block_hash, extrinsic_hash).await?;
76 let block_events = self.fetch_events_from_block(block_hash).await?;
77 self.filter_extrinsic_events(block_events, extrinsic_index)
78 }
79}
80
81pub struct EventSubscription<Subscription, Hash> {
84 pub subscription: Subscription,
85 pub metadata: Metadata,
86 _phantom: PhantomData<Hash>,
87}
88
89impl<Subscription, Hash> EventSubscription<Subscription, Hash> {
90 pub fn new(subscription: Subscription, metadata: Metadata) -> Self {
92 Self { subscription, metadata, _phantom: Default::default() }
93 }
94
95 pub fn update_metadata(&mut self, metadata: Metadata) {
97 self.metadata = metadata
98 }
99}
100
101impl<Subscription, Hash> EventSubscription<Subscription, Hash>
102where
103 Hash: DeserializeOwned + Copy + Encode + Decode,
104 Subscription: HandleSubscription<StorageChangeSet<Hash>>,
105{
106 #[maybe_async::maybe_async(?Send)]
109 pub async fn next_events<RuntimeEvent: Decode, Topic: Decode>(
110 &mut self,
111 ) -> Option<Result<Vec<EventRecord<RuntimeEvent, Topic>>>> {
112 let change_set = match self.subscription.next().await? {
113 Ok(set) => set,
114 Err(e) => return Some(Err(Error::RpcClient(e))),
115 };
116 let storage_data = change_set.changes[0].1.as_ref()?;
120 let event_records = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
121 Some(event_records)
122 }
123
124 #[maybe_async::maybe_async(?Send)]
130 pub async fn next_events_from_metadata(&mut self) -> Option<Result<Events<Hash>>> {
131 let change_set = match self.subscription.next().await? {
132 Ok(set) => set,
133 Err(e) => return Some(Err(Error::RpcClient(e))),
134 };
135 let block_hash = change_set.block;
136 let storage_data = change_set.changes[0].1.as_ref()?;
140 let event_bytes = storage_data.0.clone();
141
142 let events = Events::<Hash>::new(self.metadata.clone(), block_hash, event_bytes);
143 Some(Ok(events))
144 }
145
146 #[maybe_async::maybe_async(?Send)]
148 pub async fn unsubscribe(self) -> Result<()> {
149 self.subscription.unsubscribe().await.map_err(|e| e.into())
150 }
151}
152
153#[maybe_async::maybe_async(?Send)]
154pub trait SubscribeEvents {
155 type Client: Subscribe;
156 type Hash: DeserializeOwned;
157
158 async fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>>;
160}
161
162#[maybe_async::maybe_async(?Send)]
163impl<T, Client> SubscribeEvents for Api<T, Client>
164where
165 T: Config,
166 Client: Subscribe,
167{
168 type Client = Client;
169 type Hash = T::Hash;
170
171 async fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>> {
172 let key = crate::storage_key("System", "Events");
173 let subscription = self
174 .client()
175 .subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
176 .await
177 .map(|sub| EventSubscription::new(sub, self.metadata().clone()))?;
178 Ok(subscription)
179 }
180}
181
182impl<T, Client> Api<T, Client>
183where
184 T: Config,
185 Client: Request,
186{
187 #[maybe_async::maybe_async(?Send)]
189 async fn retrieve_extrinsic_index_from_block(
190 &self,
191 block_hash: T::Hash,
192 extrinsic_hash: T::Hash,
193 ) -> Result<u32> {
194 let block = self.get_block(Some(block_hash)).await?.ok_or(Error::BlockNotFound)?;
195 let xt_index = block
196 .extrinsics()
197 .iter()
198 .position(|xt| {
199 let xt_hash = T::Hasher::hash_of(&xt);
200 trace!("Looking for: {extrinsic_hash:?}, got xt_hash {xt_hash:?}");
201 extrinsic_hash == xt_hash
202 })
203 .ok_or(Error::ExtrinsicNotFound)?;
204 Ok(xt_index as u32)
205 }
206
207 fn filter_extrinsic_events(
209 &self,
210 events: Events<T::Hash>,
211 extrinsic_index: u32,
212 ) -> Result<Vec<EventDetails<T::Hash>>> {
213 let extrinsic_event_results = events.iter().filter(|ev| {
214 ev.as_ref()
215 .map_or(true, |ev| ev.phase() == Phase::ApplyExtrinsic(extrinsic_index))
216 });
217 let mut extrinsic_events = Vec::new();
218 for event_details in extrinsic_event_results {
219 let event_details = event_details?;
220 debug!(
221 "associated event_details {:?} {:?}",
222 event_details.pallet_name(),
223 event_details.variant_name()
224 );
225 extrinsic_events.push(event_details);
226 }
227 Ok(extrinsic_events)
228 }
229}
230
231#[cfg(test)]
232mod tests {
233 use super::*;
234 use crate::rpc::mocks::RpcClientMock;
235 use ac_node_api::{metadata::Metadata, test_utils::*};
236 use ac_primitives::RococoRuntimeConfig;
237 use codec::{Decode, Encode};
238 use frame_metadata::RuntimeMetadataPrefixed;
239 use rococo_runtime::{BalancesCall, RuntimeCall, UncheckedExtrinsic};
240 use scale_info::TypeInfo;
241 use sp_core::{crypto::Ss58Codec, sr25519, Bytes, H256};
242 use sp_runtime::{
243 generic::{Block, SignedBlock},
244 AccountId32, MultiAddress,
245 };
246 use sp_storage::StorageData;
247 use sp_version::RuntimeVersion;
248 use std::{collections::HashMap, fs};
249 use test_case::test_case;
250
251 #[derive(Clone, Copy, Debug, PartialEq, Decode, Encode, TypeInfo)]
252 enum Event {
253 A(u8),
254 B(bool),
255 }
256
257 fn create_mock_api(
258 metadata: Metadata,
259 data: HashMap<String, String>,
260 ) -> Api<RococoRuntimeConfig, RpcClientMock> {
261 let genesis_hash = H256::random();
263 let runtime_version = RuntimeVersion::default();
264 let client = RpcClientMock::new(data);
265 Api::new_offline(genesis_hash, metadata, runtime_version, client)
266 }
267
268 fn default_header() -> rococo_runtime::Header {
269 rococo_runtime::Header {
270 number: Default::default(),
271 parent_hash: Default::default(),
272 state_root: Default::default(),
273 extrinsics_root: Default::default(),
274 digest: Default::default(),
275 }
276 }
277
278 #[test_case(SupportedMetadataVersions::V14)]
279 #[test_case(SupportedMetadataVersions::V15)]
280 fn filter_extrinsic_events_works(metadata_version: SupportedMetadataVersions) {
281 let metadata = metadata_with_version::<Event>(metadata_version);
282
283 let extrinsic_index = 1;
284
285 let event1 = Event::A(1);
287 let event2 = Event::B(true);
288 let event3 = Event::A(234);
289 let event4 = Event::A(2);
290
291 let block_events = events::<Event>(
292 metadata.clone(),
293 vec![
294 event_record(Phase::Initialization, event1),
295 event_record(Phase::ApplyExtrinsic(extrinsic_index), event2),
296 event_record(Phase::ApplyExtrinsic(extrinsic_index), event3),
297 event_record(Phase::ApplyExtrinsic(extrinsic_index + 1), event4),
298 ],
299 );
300 let mut event_details = block_events.iter();
301 let _not_associated_event_details1 = event_details.next().unwrap().unwrap();
302 let associated_event_details2 = event_details.next().unwrap().unwrap();
303 let associated_event_details3 = event_details.next().unwrap().unwrap();
304 let _not_associated_event_details4 = event_details.next().unwrap().unwrap();
305 assert!(event_details.next().is_none());
306
307 let api = create_mock_api(metadata, Default::default());
308
309 let associated_events = api.filter_extrinsic_events(block_events, extrinsic_index).unwrap();
310 assert_eq!(associated_events.len(), 2);
311 assert_eq!(associated_events[0].index(), associated_event_details2.index());
312 assert_eq!(associated_events[1].index(), associated_event_details3.index());
313 }
314
315 #[test_case(SupportedMetadataVersions::V14)]
316 #[test_case(SupportedMetadataVersions::V15)]
317 fn fetch_events_from_block_works(metadata_version: SupportedMetadataVersions) {
318 let metadata = metadata_with_version::<Event>(metadata_version);
319
320 let extrinsic_index = 1;
321
322 let event1 = Event::A(1);
324 let event2 = Event::B(true);
325 let event3 = Event::A(234);
326 let event4 = Event::A(2);
327
328 let block_events = events::<Event>(
329 metadata.clone(),
330 vec![
331 event_record(Phase::Initialization, event1),
332 event_record(Phase::ApplyExtrinsic(extrinsic_index), event2),
333 event_record(Phase::ApplyExtrinsic(extrinsic_index), event3),
334 event_record(Phase::ApplyExtrinsic(extrinsic_index + 1), event4),
335 ],
336 );
337 let event_bytes = block_events.event_bytes().to_vec();
338
339 let data = HashMap::<String, String>::from([(
343 "state_getStorage".to_owned(),
344 serde_json::to_string(&Some(StorageData(event_bytes))).unwrap(),
345 )]);
346
347 let api = create_mock_api(metadata, data);
348
349 let fetched_events = api.fetch_events_from_block(H256::random()).unwrap();
350
351 assert_eq!(fetched_events.event_bytes(), block_events.event_bytes());
352 }
353
354 #[test]
355 fn retrieve_extrinsic_index_from_block_works() {
356 let encoded_metadata = fs::read("./../ksm_metadata_v14.bin").unwrap();
358 let metadata: RuntimeMetadataPrefixed =
359 Decode::decode(&mut encoded_metadata.as_slice()).unwrap();
360 let metadata = Metadata::try_from(metadata).unwrap();
361
362 let bob: AccountId32 =
363 sr25519::Public::from_ss58check("5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty")
364 .unwrap()
365 .into();
366 let bob = MultiAddress::Id(bob);
367
368 let call1 = RuntimeCall::Balances(BalancesCall::force_transfer {
369 source: bob.clone(),
370 dest: bob.clone(),
371 value: 10,
372 });
373 let call2 = RuntimeCall::Balances(BalancesCall::transfer_allow_death {
374 dest: bob.clone(),
375 value: 2000,
376 });
377 let call3 =
378 RuntimeCall::Balances(BalancesCall::transfer_allow_death { dest: bob, value: 1000 });
379
380 let xt1: Bytes = UncheckedExtrinsic::new_bare(call1).encode().into();
381 let xt2: Bytes = UncheckedExtrinsic::new_bare(call2).encode().into();
382 let xt3: Bytes = UncheckedExtrinsic::new_bare(call3).encode().into();
383
384 let xt_hash1 = <RococoRuntimeConfig as Config>::Hasher::hash(&xt1);
385 let xt_hash2 = <RococoRuntimeConfig as Config>::Hasher::hash(&xt2);
386 let xt_hash3 = <RococoRuntimeConfig as Config>::Hasher::hash(&xt3);
387
388 let block = Block { header: default_header(), extrinsics: vec![xt1, xt2, xt3] };
389 let signed_block = SignedBlock { block, justifications: None };
390 let data = HashMap::<String, String>::from([(
391 "chain_getBlock".to_owned(),
392 serde_json::to_string(&signed_block).unwrap(),
393 )]);
394
395 let api = create_mock_api(metadata, data);
397 let block_hash = H256::default();
398
399 let (index1, index2, index3) = (
400 api.retrieve_extrinsic_index_from_block(block_hash, xt_hash1).unwrap(),
401 api.retrieve_extrinsic_index_from_block(block_hash, xt_hash2).unwrap(),
402 api.retrieve_extrinsic_index_from_block(block_hash, xt_hash3).unwrap(),
403 );
404
405 assert_eq!(index1, 0);
406 assert_eq!(index2, 1);
407 assert_eq!(index3, 2);
408 }
409}