Skip to main content

cumulus_relay_chain_rpc_interface/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use async_trait::async_trait;
19use core::time::Duration;
20use cumulus_primitives_core::{
21	relay_chain::{
22		CandidateEvent, CommittedCandidateReceiptV2 as CommittedCandidateReceipt,
23		Hash as RelayHash, Header as RelayHeader, InboundHrmpMessage, OccupiedCoreAssumption,
24		SessionIndex, ValidationCodeHash, ValidatorId,
25	},
26	InboundDownwardMessage, ParaId, PersistedValidationData,
27};
28use cumulus_relay_chain_interface::{
29	BlockNumber, ChildInfo, CoreIndex, CoreState, PHeader, RelayChainError, RelayChainInterface,
30	RelayChainResult,
31};
32use futures::{FutureExt, Stream, StreamExt};
33use polkadot_overseer::Handle;
34
35use sc_client_api::StorageProof;
36use sp_state_machine::StorageValue;
37use sp_storage::StorageKey;
38use sp_version::RuntimeVersion;
39use std::{collections::btree_map::BTreeMap, pin::Pin};
40
41use cumulus_primitives_core::relay_chain::BlockId;
42pub use url::Url;
43
44mod metrics;
45mod reconnecting_ws_client;
46mod rpc_client;
47
48pub use rpc_client::{create_client_and_start_worker, RelayChainRpcClient};
49
50const TIMEOUT_IN_SECONDS: u64 = 6;
51
52/// RelayChainRpcInterface is used to interact with a full node that is running locally
53/// in the same process.
54#[derive(Clone)]
55pub struct RelayChainRpcInterface {
56	rpc_client: RelayChainRpcClient,
57	overseer_handle: Handle,
58}
59
60impl RelayChainRpcInterface {
61	pub fn new(rpc_client: RelayChainRpcClient, overseer_handle: Handle) -> Self {
62		Self { rpc_client, overseer_handle }
63	}
64}
65
66#[async_trait]
67impl RelayChainInterface for RelayChainRpcInterface {
68	async fn retrieve_dmq_contents(
69		&self,
70		para_id: ParaId,
71		relay_parent: RelayHash,
72	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
73		self.rpc_client.parachain_host_dmq_contents(para_id, relay_parent).await
74	}
75
76	async fn retrieve_all_inbound_hrmp_channel_contents(
77		&self,
78		para_id: ParaId,
79		relay_parent: RelayHash,
80	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
81		self.rpc_client
82			.parachain_host_inbound_hrmp_channels_contents(para_id, relay_parent)
83			.await
84	}
85
86	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
87		let hash = match block_id {
88			BlockId::Hash(hash) => hash,
89			BlockId::Number(num) => {
90				if let Some(hash) = self.rpc_client.chain_get_block_hash(Some(num)).await? {
91					hash
92				} else {
93					return Ok(None);
94				}
95			},
96		};
97		let header = self.rpc_client.chain_get_header(Some(hash)).await?;
98
99		Ok(header)
100	}
101
102	async fn persisted_validation_data(
103		&self,
104		hash: RelayHash,
105		para_id: ParaId,
106		occupied_core_assumption: OccupiedCoreAssumption,
107	) -> RelayChainResult<Option<PersistedValidationData>> {
108		self.rpc_client
109			.parachain_host_persisted_validation_data(hash, para_id, occupied_core_assumption)
110			.await
111	}
112
113	async fn validation_code_hash(
114		&self,
115		hash: RelayHash,
116		para_id: ParaId,
117		occupied_core_assumption: OccupiedCoreAssumption,
118	) -> RelayChainResult<Option<ValidationCodeHash>> {
119		self.rpc_client
120			.validation_code_hash(hash, para_id, occupied_core_assumption)
121			.await
122	}
123
124	async fn candidate_pending_availability(
125		&self,
126		hash: RelayHash,
127		para_id: ParaId,
128	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
129		self.rpc_client
130			.parachain_host_candidate_pending_availability(hash, para_id)
131			.await
132	}
133
134	async fn session_index_for_child(&self, hash: RelayHash) -> RelayChainResult<SessionIndex> {
135		self.rpc_client.parachain_host_session_index_for_child(hash).await
136	}
137
138	async fn validators(&self, block_id: RelayHash) -> RelayChainResult<Vec<ValidatorId>> {
139		self.rpc_client.parachain_host_validators(block_id).await
140	}
141
142	async fn import_notification_stream(
143		&self,
144	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
145		let imported_headers_stream = self.rpc_client.get_imported_heads_stream()?;
146
147		Ok(imported_headers_stream.boxed())
148	}
149
150	async fn finality_notification_stream(
151		&self,
152	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
153		let imported_headers_stream = self.rpc_client.get_finalized_heads_stream()?;
154
155		Ok(imported_headers_stream.boxed())
156	}
157
158	async fn best_block_hash(&self) -> RelayChainResult<RelayHash> {
159		self.rpc_client.chain_get_head(None).await
160	}
161
162	async fn finalized_block_hash(&self) -> RelayChainResult<RelayHash> {
163		self.rpc_client.chain_get_finalized_head().await
164	}
165
166	async fn call_runtime_api(
167		&self,
168		method_name: &'static str,
169		hash: RelayHash,
170		payload: &[u8],
171	) -> RelayChainResult<Vec<u8>> {
172		self.rpc_client
173			.call_remote_runtime_function_encoded(method_name, hash, payload)
174			.await
175			.map(|bytes| bytes.to_vec())
176	}
177
178	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
179		self.rpc_client.system_health().await.map(|h| h.is_syncing)
180	}
181
182	fn overseer_handle(&self) -> RelayChainResult<Handle> {
183		Ok(self.overseer_handle.clone())
184	}
185
186	async fn get_storage_by_key(
187		&self,
188		relay_parent: RelayHash,
189		key: &[u8],
190	) -> RelayChainResult<Option<StorageValue>> {
191		let storage_key = StorageKey(key.to_vec());
192		self.rpc_client
193			.state_get_storage(storage_key, Some(relay_parent))
194			.await
195			.map(|storage_data| storage_data.map(|sv| sv.0))
196	}
197
198	async fn prove_read(
199		&self,
200		relay_parent: RelayHash,
201		relevant_keys: &Vec<Vec<u8>>,
202	) -> RelayChainResult<StorageProof> {
203		let cloned = relevant_keys.clone();
204		let storage_keys: Vec<StorageKey> = cloned.into_iter().map(StorageKey).collect();
205
206		self.rpc_client
207			.state_get_read_proof(storage_keys, Some(relay_parent))
208			.await
209			.map(|read_proof| {
210				StorageProof::new(read_proof.proof.into_iter().map(|bytes| bytes.to_vec()))
211			})
212	}
213
214	async fn prove_child_read(
215		&self,
216		relay_parent: RelayHash,
217		child_info: &ChildInfo,
218		child_keys: &[Vec<u8>],
219	) -> RelayChainResult<StorageProof> {
220		let child_storage_key = child_info.prefixed_storage_key();
221		let storage_keys: Vec<StorageKey> =
222			child_keys.iter().map(|key| StorageKey(key.clone())).collect();
223
224		self.rpc_client
225			.state_get_child_read_proof(child_storage_key, storage_keys, Some(relay_parent))
226			.await
227			.map(|read_proof| {
228				StorageProof::new(read_proof.proof.into_iter().map(|bytes| bytes.to_vec()))
229			})
230	}
231
232	/// Wait for a given relay chain block
233	///
234	/// The hash of the block to wait for is passed. We wait for the block to arrive or return after
235	/// a timeout.
236	///
237	/// Implementation:
238	/// 1. Register a listener to all new blocks.
239	/// 2. Check if the block is already in chain. If yes, succeed early.
240	/// 3. Wait for the block to be imported via subscription.
241	/// 4. If timeout is reached, we return an error.
242	async fn wait_for_block(&self, wait_for_hash: RelayHash) -> RelayChainResult<()> {
243		let mut head_stream = self.rpc_client.get_imported_heads_stream()?;
244
245		if self.rpc_client.chain_get_header(Some(wait_for_hash)).await?.is_some() {
246			return Ok(());
247		}
248
249		let mut timeout = futures_timer::Delay::new(Duration::from_secs(TIMEOUT_IN_SECONDS)).fuse();
250
251		loop {
252			futures::select! {
253				_ = timeout => return Err(RelayChainError::WaitTimeout(wait_for_hash)),
254				evt = head_stream.next().fuse() => match evt {
255					Some(evt) if evt.hash() == wait_for_hash => return Ok(()),
256					// Not the event we waited on.
257					Some(_) => continue,
258					None => return Err(RelayChainError::ImportListenerClosed(wait_for_hash)),
259				}
260			}
261		}
262	}
263
264	async fn new_best_notification_stream(
265		&self,
266	) -> RelayChainResult<Pin<Box<dyn Stream<Item = RelayHeader> + Send>>> {
267		let imported_headers_stream = self.rpc_client.get_best_heads_stream()?;
268		Ok(imported_headers_stream.boxed())
269	}
270
271	async fn candidates_pending_availability(
272		&self,
273		hash: RelayHash,
274		para_id: ParaId,
275	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
276		self.rpc_client
277			.parachain_host_candidates_pending_availability(hash, para_id)
278			.await
279	}
280
281	async fn version(&self, relay_parent: RelayHash) -> RelayChainResult<RuntimeVersion> {
282		self.rpc_client.runtime_version(relay_parent).await
283	}
284
285	async fn availability_cores(
286		&self,
287		relay_parent: RelayHash,
288	) -> RelayChainResult<Vec<CoreState<RelayHash, BlockNumber>>> {
289		self.rpc_client.parachain_host_availability_cores(relay_parent).await
290	}
291
292	async fn claim_queue(
293		&self,
294		relay_parent: RelayHash,
295	) -> RelayChainResult<BTreeMap<CoreIndex, std::collections::VecDeque<ParaId>>> {
296		self.rpc_client.parachain_host_claim_queue(relay_parent).await
297	}
298
299	async fn scheduling_lookahead(&self, relay_parent: RelayHash) -> RelayChainResult<u32> {
300		self.rpc_client.parachain_host_scheduling_lookahead(relay_parent).await
301	}
302
303	async fn candidate_events(
304		&self,
305		relay_parent: RelayHash,
306	) -> RelayChainResult<Vec<CandidateEvent>> {
307		self.rpc_client.parachain_host_candidate_events(relay_parent).await
308	}
309}