substrate_api_client/api/rpc_api/
events.rs

1/*
2   Copyright 2019 Supercomputing Systems AG
3   Licensed under the Apache License, Version 2.0 (the "License");
4   you may not use this file except in compliance with the License.
5   You may obtain a copy of the License at
6	   http://www.apache.org/licenses/LICENSE-2.0
7   Unless required by applicable law or agreed to in writing, software
8   distributed under the License is distributed on an "AS IS" BASIS,
9   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10   See the License for the specific language governing permissions and
11   limitations under the License.
12*/
13
14use crate::{
15	api::{Api, Error, Result},
16	rpc::{HandleSubscription, Request, Subscribe},
17	GetChainInfo, GetStorage,
18};
19use ac_compose_macros::rpc_params;
20use ac_node_api::{metadata::Metadata, EventDetails, EventRecord, Events, Phase};
21use ac_primitives::config::Config;
22#[cfg(all(not(feature = "sync-api"), not(feature = "std")))]
23use alloc::boxed::Box;
24use alloc::{vec, vec::Vec};
25use codec::{Decode, Encode};
26use core::marker::PhantomData;
27use log::*;
28use serde::de::DeserializeOwned;
29use sp_runtime::traits::{Block as BlockHash, Hash as HashTrait};
30use sp_storage::StorageChangeSet;
31
32pub type EventSubscriptionFor<Client, Hash> =
33	EventSubscription<<Client as Subscribe>::Subscription<StorageChangeSet<Hash>>, Hash>;
34
35#[maybe_async::maybe_async(?Send)]
36pub trait FetchEvents {
37	type Hash: Encode + Decode;
38
39	/// Fetch all block events from node for the given block hash.
40	async fn fetch_events_from_block(&self, block_hash: Self::Hash) -> Result<Events<Self::Hash>>;
41
42	/// Fetch all associated events for a given extrinsic hash and block hash.
43	async fn fetch_events_for_extrinsic(
44		&self,
45		block_hash: Self::Hash,
46		extrinsic_hash: Self::Hash,
47	) -> Result<Vec<EventDetails<Self::Hash>>>;
48}
49
50#[maybe_async::maybe_async(?Send)]
51impl<T, Client> FetchEvents for Api<T, Client>
52where
53	T: Config,
54	Client: Request,
55{
56	type Hash = T::Hash;
57
58	async fn fetch_events_from_block(&self, block_hash: Self::Hash) -> Result<Events<Self::Hash>> {
59		let key = crate::storage_key("System", "Events");
60		let event_bytes = self
61			.get_opaque_storage_by_key(key, Some(block_hash))
62			.await?
63			.ok_or(Error::BlockNotFound)?;
64		let events =
65			Events::<Self::Hash>::new(self.metadata().clone(), Default::default(), event_bytes);
66		Ok(events)
67	}
68
69	async fn fetch_events_for_extrinsic(
70		&self,
71		block_hash: Self::Hash,
72		extrinsic_hash: Self::Hash,
73	) -> Result<Vec<EventDetails<Self::Hash>>> {
74		let extrinsic_index =
75			self.retrieve_extrinsic_index_from_block(block_hash, extrinsic_hash).await?;
76		let block_events = self.fetch_events_from_block(block_hash).await?;
77		self.filter_extrinsic_events(block_events, extrinsic_index)
78	}
79}
80
81/// Wrapper around a Event `StorageChangeSet` subscription.
82/// Simplifies the event retrieval from the subscription.
83pub struct EventSubscription<Subscription, Hash> {
84	pub subscription: Subscription,
85	pub metadata: Metadata,
86	_phantom: PhantomData<Hash>,
87}
88
89impl<Subscription, Hash> EventSubscription<Subscription, Hash> {
90	/// Create a new wrapper around the subscription.
91	pub fn new(subscription: Subscription, metadata: Metadata) -> Self {
92		Self { subscription, metadata, _phantom: Default::default() }
93	}
94
95	/// Update the metadata.
96	pub fn update_metadata(&mut self, metadata: Metadata) {
97		self.metadata = metadata
98	}
99}
100
101impl<Subscription, Hash> EventSubscription<Subscription, Hash>
102where
103	Hash: DeserializeOwned + Copy + Encode + Decode,
104	Subscription: HandleSubscription<StorageChangeSet<Hash>>,
105{
106	/// Wait for the next value from the internal subscription.
107	/// Upon encounter, it retrieves and decodes the expected `EventRecord`.
108	#[maybe_async::maybe_async(?Send)]
109	pub async fn next_events<RuntimeEvent: Decode, Topic: Decode>(
110		&mut self,
111	) -> Option<Result<Vec<EventRecord<RuntimeEvent, Topic>>>> {
112		let change_set = match self.subscription.next().await? {
113			Ok(set) => set,
114			Err(e) => return Some(Err(Error::RpcClient(e))),
115		};
116		// Since we subscribed to only the events key, we can simply take the first value of the
117		// changes in the set. Also, we don't care about the key but only the data, so take
118		// the second value in the tuple of two.
119		let storage_data = change_set.changes[0].1.as_ref()?;
120		let event_records = Decode::decode(&mut storage_data.0.as_slice()).map_err(Error::Codec);
121		Some(event_records)
122	}
123
124	/// Wait for the next value from the internal subscription.
125	/// Upon encounter, it retrieves and decodes the expected `EventDetails`.
126	//
127	// On the contrary to `next_events` this function only needs up-to-date metadata
128	// and is therefore updateable during runtime.
129	#[maybe_async::maybe_async(?Send)]
130	pub async fn next_events_from_metadata(&mut self) -> Option<Result<Events<Hash>>> {
131		let change_set = match self.subscription.next().await? {
132			Ok(set) => set,
133			Err(e) => return Some(Err(Error::RpcClient(e))),
134		};
135		let block_hash = change_set.block;
136		// Since we subscribed to only the events key, we can simply take the first value of the
137		// changes in the set. Also, we don't care about the key but only the data, so take
138		// the second value in the tuple of two.
139		let storage_data = change_set.changes[0].1.as_ref()?;
140		let event_bytes = storage_data.0.clone();
141
142		let events = Events::<Hash>::new(self.metadata.clone(), block_hash, event_bytes);
143		Some(Ok(events))
144	}
145
146	/// Unsubscribe from the internal subscription.
147	#[maybe_async::maybe_async(?Send)]
148	pub async fn unsubscribe(self) -> Result<()> {
149		self.subscription.unsubscribe().await.map_err(|e| e.into())
150	}
151}
152
153#[maybe_async::maybe_async(?Send)]
154pub trait SubscribeEvents {
155	type Client: Subscribe;
156	type Hash: DeserializeOwned;
157
158	/// Subscribe to events.
159	async fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>>;
160}
161
162#[maybe_async::maybe_async(?Send)]
163impl<T, Client> SubscribeEvents for Api<T, Client>
164where
165	T: Config,
166	Client: Subscribe,
167{
168	type Client = Client;
169	type Hash = T::Hash;
170
171	async fn subscribe_events(&self) -> Result<EventSubscriptionFor<Self::Client, Self::Hash>> {
172		let key = crate::storage_key("System", "Events");
173		let subscription = self
174			.client()
175			.subscribe("state_subscribeStorage", rpc_params![vec![key]], "state_unsubscribeStorage")
176			.await
177			.map(|sub| EventSubscription::new(sub, self.metadata().clone()))?;
178		Ok(subscription)
179	}
180}
181
182impl<T, Client> Api<T, Client>
183where
184	T: Config,
185	Client: Request,
186{
187	/// Retrieve block details from node and search for the position of the given extrinsic.
188	#[maybe_async::maybe_async(?Send)]
189	async fn retrieve_extrinsic_index_from_block(
190		&self,
191		block_hash: T::Hash,
192		extrinsic_hash: T::Hash,
193	) -> Result<u32> {
194		let block = self.get_block(Some(block_hash)).await?.ok_or(Error::BlockNotFound)?;
195		let xt_index = block
196			.extrinsics()
197			.iter()
198			.position(|xt| {
199				let xt_hash = T::Hasher::hash_of(&xt);
200				trace!("Looking for: {extrinsic_hash:?}, got xt_hash {xt_hash:?}");
201				extrinsic_hash == xt_hash
202			})
203			.ok_or(Error::ExtrinsicNotFound)?;
204		Ok(xt_index as u32)
205	}
206
207	/// Filter events and return the ones associated to the given extrinsic index.
208	fn filter_extrinsic_events(
209		&self,
210		events: Events<T::Hash>,
211		extrinsic_index: u32,
212	) -> Result<Vec<EventDetails<T::Hash>>> {
213		let extrinsic_event_results = events.iter().filter(|ev| {
214			ev.as_ref()
215				.map_or(true, |ev| ev.phase() == Phase::ApplyExtrinsic(extrinsic_index))
216		});
217		let mut extrinsic_events = Vec::new();
218		for event_details in extrinsic_event_results {
219			let event_details = event_details?;
220			debug!(
221				"associated event_details {:?} {:?}",
222				event_details.pallet_name(),
223				event_details.variant_name()
224			);
225			extrinsic_events.push(event_details);
226		}
227		Ok(extrinsic_events)
228	}
229}
230
231#[cfg(test)]
232mod tests {
233	use super::*;
234	use crate::rpc::mocks::RpcClientMock;
235	use ac_node_api::{metadata::Metadata, test_utils::*};
236	use ac_primitives::RococoRuntimeConfig;
237	use codec::{Decode, Encode};
238	use frame_metadata::RuntimeMetadataPrefixed;
239	use rococo_runtime::{BalancesCall, RuntimeCall, UncheckedExtrinsic};
240	use scale_info::TypeInfo;
241	use sp_core::{crypto::Ss58Codec, sr25519, Bytes, H256};
242	use sp_runtime::{
243		generic::{Block, SignedBlock},
244		AccountId32, MultiAddress,
245	};
246	use sp_storage::StorageData;
247	use sp_version::RuntimeVersion;
248	use std::{collections::HashMap, fs};
249	use test_case::test_case;
250
251	#[derive(Clone, Copy, Debug, PartialEq, Decode, Encode, TypeInfo)]
252	enum Event {
253		A(u8),
254		B(bool),
255	}
256
257	fn create_mock_api(
258		metadata: Metadata,
259		data: HashMap<String, String>,
260	) -> Api<RococoRuntimeConfig, RpcClientMock> {
261		// Create new api.
262		let genesis_hash = H256::random();
263		let runtime_version = RuntimeVersion::default();
264		let client = RpcClientMock::new(data);
265		Api::new_offline(genesis_hash, metadata, runtime_version, client)
266	}
267
268	fn default_header() -> rococo_runtime::Header {
269		rococo_runtime::Header {
270			number: Default::default(),
271			parent_hash: Default::default(),
272			state_root: Default::default(),
273			extrinsics_root: Default::default(),
274			digest: Default::default(),
275		}
276	}
277
278	#[test_case(SupportedMetadataVersions::V14)]
279	#[test_case(SupportedMetadataVersions::V15)]
280	fn filter_extrinsic_events_works(metadata_version: SupportedMetadataVersions) {
281		let metadata = metadata_with_version::<Event>(metadata_version);
282
283		let extrinsic_index = 1;
284
285		// Random events
286		let event1 = Event::A(1);
287		let event2 = Event::B(true);
288		let event3 = Event::A(234);
289		let event4 = Event::A(2);
290
291		let block_events = events::<Event>(
292			metadata.clone(),
293			vec![
294				event_record(Phase::Initialization, event1),
295				event_record(Phase::ApplyExtrinsic(extrinsic_index), event2),
296				event_record(Phase::ApplyExtrinsic(extrinsic_index), event3),
297				event_record(Phase::ApplyExtrinsic(extrinsic_index + 1), event4),
298			],
299		);
300		let mut event_details = block_events.iter();
301		let _not_associated_event_details1 = event_details.next().unwrap().unwrap();
302		let associated_event_details2 = event_details.next().unwrap().unwrap();
303		let associated_event_details3 = event_details.next().unwrap().unwrap();
304		let _not_associated_event_details4 = event_details.next().unwrap().unwrap();
305		assert!(event_details.next().is_none());
306
307		let api = create_mock_api(metadata, Default::default());
308
309		let associated_events = api.filter_extrinsic_events(block_events, extrinsic_index).unwrap();
310		assert_eq!(associated_events.len(), 2);
311		assert_eq!(associated_events[0].index(), associated_event_details2.index());
312		assert_eq!(associated_events[1].index(), associated_event_details3.index());
313	}
314
315	#[test_case(SupportedMetadataVersions::V14)]
316	#[test_case(SupportedMetadataVersions::V15)]
317	fn fetch_events_from_block_works(metadata_version: SupportedMetadataVersions) {
318		let metadata = metadata_with_version::<Event>(metadata_version);
319
320		let extrinsic_index = 1;
321
322		// Random events
323		let event1 = Event::A(1);
324		let event2 = Event::B(true);
325		let event3 = Event::A(234);
326		let event4 = Event::A(2);
327
328		let block_events = events::<Event>(
329			metadata.clone(),
330			vec![
331				event_record(Phase::Initialization, event1),
332				event_record(Phase::ApplyExtrinsic(extrinsic_index), event2),
333				event_record(Phase::ApplyExtrinsic(extrinsic_index), event3),
334				event_record(Phase::ApplyExtrinsic(extrinsic_index + 1), event4),
335			],
336		);
337		let event_bytes = block_events.event_bytes().to_vec();
338
339		// With this test, the storage key generation is not tested. This is part
340		// of the system test. Therefore, the storage key is simply set to "state_getStorage",
341		// without extra params.
342		let data = HashMap::<String, String>::from([(
343			"state_getStorage".to_owned(),
344			serde_json::to_string(&Some(StorageData(event_bytes))).unwrap(),
345		)]);
346
347		let api = create_mock_api(metadata, data);
348
349		let fetched_events = api.fetch_events_from_block(H256::random()).unwrap();
350
351		assert_eq!(fetched_events.event_bytes(), block_events.event_bytes());
352	}
353
354	#[test]
355	fn retrieve_extrinsic_index_from_block_works() {
356		// We need a pallet balance in the metadata, so ` api.balance_transfer` can create the extrinsic.
357		let encoded_metadata = fs::read("./../ksm_metadata_v14.bin").unwrap();
358		let metadata: RuntimeMetadataPrefixed =
359			Decode::decode(&mut encoded_metadata.as_slice()).unwrap();
360		let metadata = Metadata::try_from(metadata).unwrap();
361
362		let bob: AccountId32 =
363			sr25519::Public::from_ss58check("5FHneW46xGXgs5mUiveU4sbTyGBzmstUspZC92UhjJM694ty")
364				.unwrap()
365				.into();
366		let bob = MultiAddress::Id(bob);
367
368		let call1 = RuntimeCall::Balances(BalancesCall::force_transfer {
369			source: bob.clone(),
370			dest: bob.clone(),
371			value: 10,
372		});
373		let call2 = RuntimeCall::Balances(BalancesCall::transfer_allow_death {
374			dest: bob.clone(),
375			value: 2000,
376		});
377		let call3 =
378			RuntimeCall::Balances(BalancesCall::transfer_allow_death { dest: bob, value: 1000 });
379
380		let xt1: Bytes = UncheckedExtrinsic::new_bare(call1).encode().into();
381		let xt2: Bytes = UncheckedExtrinsic::new_bare(call2).encode().into();
382		let xt3: Bytes = UncheckedExtrinsic::new_bare(call3).encode().into();
383
384		let xt_hash1 = <RococoRuntimeConfig as Config>::Hasher::hash(&xt1);
385		let xt_hash2 = <RococoRuntimeConfig as Config>::Hasher::hash(&xt2);
386		let xt_hash3 = <RococoRuntimeConfig as Config>::Hasher::hash(&xt3);
387
388		let block = Block { header: default_header(), extrinsics: vec![xt1, xt2, xt3] };
389		let signed_block = SignedBlock { block, justifications: None };
390		let data = HashMap::<String, String>::from([(
391			"chain_getBlock".to_owned(),
392			serde_json::to_string(&signed_block).unwrap(),
393		)]);
394
395		// Create api with block as storage data:
396		let api = create_mock_api(metadata, data);
397		let block_hash = H256::default();
398
399		let (index1, index2, index3) = (
400			api.retrieve_extrinsic_index_from_block(block_hash, xt_hash1).unwrap(),
401			api.retrieve_extrinsic_index_from_block(block_hash, xt_hash2).unwrap(),
402			api.retrieve_extrinsic_index_from_block(block_hash, xt_hash3).unwrap(),
403		);
404
405		assert_eq!(index1, 0);
406		assert_eq!(index2, 1);
407		assert_eq!(index3, 2);
408	}
409}