Skip to main content

cumulus_relay_chain_interface/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use std::{
19	collections::{BTreeMap, VecDeque},
20	pin::Pin,
21	sync::Arc,
22};
23
24use futures::Stream;
25use polkadot_overseer::prometheus::PrometheusError;
26use sc_client_api::StorageProof;
27use sp_version::RuntimeVersion;
28
29use async_trait::async_trait;
30use codec::{Decode, Encode, Error as CodecError};
31use jsonrpsee_core::ClientError as JsonRpcError;
32use sp_api::ApiError;
33
34use cumulus_primitives_core::relay_chain::{BlockId, CandidateEvent, Hash as RelayHash};
35pub use cumulus_primitives_core::{
36	relay_chain::{
37		BlockNumber, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
38		CoreState, Hash as PHash, Header as PHeader, InboundHrmpMessage, OccupiedCoreAssumption,
39		SessionIndex, ValidationCodeHash, ValidatorId,
40	},
41	InboundDownwardMessage, ParaId, PersistedValidationData,
42};
43pub use polkadot_overseer::Handle as OverseerHandle;
44pub use sp_state_machine::StorageValue;
45pub use sp_storage::ChildInfo;
46
47pub type RelayChainResult<T> = Result<T, RelayChainError>;
48
49#[derive(thiserror::Error, Debug)]
50pub enum RelayChainError {
51	#[error("Error occurred while calling relay chain runtime: {0}")]
52	ApiError(#[from] ApiError),
53	#[error("Timeout while waiting for relay-chain block `{0}` to be imported.")]
54	WaitTimeout(PHash),
55	#[error("Import listener closed while waiting for relay-chain block `{0}` to be imported.")]
56	ImportListenerClosed(PHash),
57	#[error(
58		"Blockchain returned an error while waiting for relay-chain block `{0}` to be imported: {1}"
59	)]
60	WaitBlockchainError(PHash, sp_blockchain::Error),
61	#[error("Blockchain returned an error: {0}")]
62	BlockchainError(#[from] sp_blockchain::Error),
63	#[error("State machine error occurred: {0}")]
64	StateMachineError(Box<dyn sp_state_machine::Error>),
65	#[error("Unable to call RPC method '{0}'")]
66	RpcCallError(String),
67	#[error("RPC Error: '{0}'")]
68	JsonRpcError(#[from] JsonRpcError),
69	#[error("Unable to communicate with RPC worker: {0}")]
70	WorkerCommunicationError(String),
71	#[error("Scale codec deserialization error: {0}")]
72	DeserializationError(CodecError),
73	#[error(transparent)]
74	Application(#[from] Box<dyn std::error::Error + Send + Sync + 'static>),
75	#[error("Prometheus error: {0}")]
76	PrometheusError(#[from] PrometheusError),
77	#[error("Unspecified error occurred: {0}")]
78	GenericError(String),
79}
80
81impl From<RelayChainError> for ApiError {
82	fn from(r: RelayChainError) -> Self {
83		sp_api::ApiError::Application(Box::new(r))
84	}
85}
86
87impl From<CodecError> for RelayChainError {
88	fn from(e: CodecError) -> Self {
89		RelayChainError::DeserializationError(e)
90	}
91}
92
93impl From<RelayChainError> for sp_blockchain::Error {
94	fn from(r: RelayChainError) -> Self {
95		sp_blockchain::Error::Application(Box::new(r))
96	}
97}
98
99impl<T: std::error::Error + Send + Sync + 'static> From<Box<T>> for RelayChainError {
100	fn from(r: Box<T>) -> Self {
101		RelayChainError::Application(r)
102	}
103}
104
105/// Trait that provides all necessary methods for interaction between collator and relay chain.
106#[async_trait]
107pub trait RelayChainInterface: Send + Sync {
108	/// Fetch a storage item by key.
109	async fn get_storage_by_key(
110		&self,
111		relay_parent: PHash,
112		key: &[u8],
113	) -> RelayChainResult<Option<StorageValue>>;
114
115	/// Fetch a vector of current validators.
116	async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>>;
117
118	/// Get the hash of the current best block.
119	async fn best_block_hash(&self) -> RelayChainResult<PHash>;
120
121	/// Fetch the block header of a given hash or height, if it exists.
122	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>>;
123
124	/// Get the hash of the finalized block.
125	async fn finalized_block_hash(&self) -> RelayChainResult<PHash>;
126
127	/// Call an arbitrary runtime api. The input and output are SCALE-encoded.
128	async fn call_runtime_api(
129		&self,
130		method_name: &'static str,
131		hash: RelayHash,
132		payload: &[u8],
133	) -> RelayChainResult<Vec<u8>>;
134
135	/// Returns the whole contents of the downward message queue for the parachain we are collating
136	/// for.
137	///
138	/// Returns `None` in case of an error.
139	async fn retrieve_dmq_contents(
140		&self,
141		para_id: ParaId,
142		relay_parent: PHash,
143	) -> RelayChainResult<Vec<InboundDownwardMessage>>;
144
145	/// Returns channels contents for each inbound HRMP channel addressed to the parachain we are
146	/// collating for.
147	///
148	/// Empty channels are also included.
149	async fn retrieve_all_inbound_hrmp_channel_contents(
150		&self,
151		para_id: ParaId,
152		relay_parent: PHash,
153	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>>;
154
155	/// Yields the persisted validation data for the given `ParaId` along with an assumption that
156	/// should be used if the para currently occupies a core.
157	///
158	/// Returns `None` if either the para is not registered or the assumption is `Freed`
159	/// and the para already occupies a core.
160	async fn persisted_validation_data(
161		&self,
162		block_id: PHash,
163		para_id: ParaId,
164		_: OccupiedCoreAssumption,
165	) -> RelayChainResult<Option<PersistedValidationData>>;
166
167	/// Get the receipt of the first candidate pending availability of this para_id. This returns
168	/// `Some` for any paras assigned to occupied cores in `availability_cores` and `None`
169	/// otherwise.
170	#[deprecated(
171		note = "`candidate_pending_availability` only returns one candidate and is deprecated. Use `candidates_pending_availability` instead."
172	)]
173	async fn candidate_pending_availability(
174		&self,
175		block_id: PHash,
176		para_id: ParaId,
177	) -> RelayChainResult<Option<CommittedCandidateReceipt>>;
178
179	/// Returns the session index expected at a child of the block.
180	async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex>;
181
182	/// Get a stream of import block notifications.
183	async fn import_notification_stream(
184		&self,
185	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
186
187	/// Get a stream of new best block notifications.
188	async fn new_best_notification_stream(
189		&self,
190	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
191
192	/// Wait for a block with a given hash in the relay chain.
193	///
194	/// This method returns immediately on error or if the block is already
195	/// reported to be in chain. Otherwise, it waits for the block to arrive.
196	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()>;
197
198	/// Get a stream of finality notifications.
199	async fn finality_notification_stream(
200		&self,
201	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>>;
202
203	/// Whether the synchronization service is undergoing major sync.
204	/// Returns true if so.
205	async fn is_major_syncing(&self) -> RelayChainResult<bool>;
206
207	/// Get a handle to the overseer.
208	fn overseer_handle(&self) -> RelayChainResult<OverseerHandle>;
209
210	/// Generate a storage read proof.
211	async fn prove_read(
212		&self,
213		relay_parent: PHash,
214		relevant_keys: &Vec<Vec<u8>>,
215	) -> RelayChainResult<StorageProof>;
216
217	/// Generate a child trie storage read proof.
218	async fn prove_child_read(
219		&self,
220		relay_parent: PHash,
221		child_info: &ChildInfo,
222		child_keys: &[Vec<u8>],
223	) -> RelayChainResult<StorageProof>;
224
225	/// Returns the validation code hash for the given `para_id` using the given
226	/// `occupied_core_assumption`.
227	async fn validation_code_hash(
228		&self,
229		relay_parent: PHash,
230		para_id: ParaId,
231		occupied_core_assumption: OccupiedCoreAssumption,
232	) -> RelayChainResult<Option<ValidationCodeHash>>;
233
234	/// Get the receipts of all candidates pending availability for this para_id.
235	async fn candidates_pending_availability(
236		&self,
237		block_id: PHash,
238		para_id: ParaId,
239	) -> RelayChainResult<Vec<CommittedCandidateReceipt>>;
240
241	/// Get the runtime version of the relay chain.
242	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion>;
243
244	/// Yields information on all availability cores as relevant to the child block.
245	///
246	/// Cores are either free, scheduled or occupied. Free cores can have paras assigned to them.
247	async fn availability_cores(
248		&self,
249		relay_parent: PHash,
250	) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>>;
251
252	/// Fetch the claim queue.
253	async fn claim_queue(
254		&self,
255		relay_parent: PHash,
256	) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>>;
257
258	/// Fetch the scheduling lookahead value.
259	async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32>;
260
261	async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>>;
262}
263
264#[async_trait]
265impl<T> RelayChainInterface for Arc<T>
266where
267	T: RelayChainInterface + ?Sized,
268{
269	async fn retrieve_dmq_contents(
270		&self,
271		para_id: ParaId,
272		relay_parent: PHash,
273	) -> RelayChainResult<Vec<InboundDownwardMessage>> {
274		(**self).retrieve_dmq_contents(para_id, relay_parent).await
275	}
276
277	async fn retrieve_all_inbound_hrmp_channel_contents(
278		&self,
279		para_id: ParaId,
280		relay_parent: PHash,
281	) -> RelayChainResult<BTreeMap<ParaId, Vec<InboundHrmpMessage>>> {
282		(**self).retrieve_all_inbound_hrmp_channel_contents(para_id, relay_parent).await
283	}
284
285	async fn persisted_validation_data(
286		&self,
287		block_id: PHash,
288		para_id: ParaId,
289		occupied_core_assumption: OccupiedCoreAssumption,
290	) -> RelayChainResult<Option<PersistedValidationData>> {
291		(**self)
292			.persisted_validation_data(block_id, para_id, occupied_core_assumption)
293			.await
294	}
295
296	#[allow(deprecated)]
297	async fn candidate_pending_availability(
298		&self,
299		block_id: PHash,
300		para_id: ParaId,
301	) -> RelayChainResult<Option<CommittedCandidateReceipt>> {
302		(**self).candidate_pending_availability(block_id, para_id).await
303	}
304
305	async fn session_index_for_child(&self, block_id: PHash) -> RelayChainResult<SessionIndex> {
306		(**self).session_index_for_child(block_id).await
307	}
308
309	async fn validators(&self, block_id: PHash) -> RelayChainResult<Vec<ValidatorId>> {
310		(**self).validators(block_id).await
311	}
312
313	async fn import_notification_stream(
314		&self,
315	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
316		(**self).import_notification_stream().await
317	}
318
319	async fn finality_notification_stream(
320		&self,
321	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
322		(**self).finality_notification_stream().await
323	}
324
325	async fn best_block_hash(&self) -> RelayChainResult<PHash> {
326		(**self).best_block_hash().await
327	}
328
329	async fn finalized_block_hash(&self) -> RelayChainResult<PHash> {
330		(**self).finalized_block_hash().await
331	}
332
333	async fn call_runtime_api(
334		&self,
335		method_name: &'static str,
336		hash: RelayHash,
337		payload: &[u8],
338	) -> RelayChainResult<Vec<u8>> {
339		(**self).call_runtime_api(method_name, hash, payload).await
340	}
341
342	async fn is_major_syncing(&self) -> RelayChainResult<bool> {
343		(**self).is_major_syncing().await
344	}
345
346	fn overseer_handle(&self) -> RelayChainResult<OverseerHandle> {
347		(**self).overseer_handle()
348	}
349
350	async fn get_storage_by_key(
351		&self,
352		relay_parent: PHash,
353		key: &[u8],
354	) -> RelayChainResult<Option<StorageValue>> {
355		(**self).get_storage_by_key(relay_parent, key).await
356	}
357
358	async fn prove_read(
359		&self,
360		relay_parent: PHash,
361		relevant_keys: &Vec<Vec<u8>>,
362	) -> RelayChainResult<StorageProof> {
363		(**self).prove_read(relay_parent, relevant_keys).await
364	}
365
366	async fn prove_child_read(
367		&self,
368		relay_parent: PHash,
369		child_info: &ChildInfo,
370		child_keys: &[Vec<u8>],
371	) -> RelayChainResult<StorageProof> {
372		(**self).prove_child_read(relay_parent, child_info, child_keys).await
373	}
374
375	async fn wait_for_block(&self, hash: PHash) -> RelayChainResult<()> {
376		(**self).wait_for_block(hash).await
377	}
378
379	async fn new_best_notification_stream(
380		&self,
381	) -> RelayChainResult<Pin<Box<dyn Stream<Item = PHeader> + Send>>> {
382		(**self).new_best_notification_stream().await
383	}
384
385	async fn header(&self, block_id: BlockId) -> RelayChainResult<Option<PHeader>> {
386		(**self).header(block_id).await
387	}
388
389	async fn validation_code_hash(
390		&self,
391		relay_parent: PHash,
392		para_id: ParaId,
393		occupied_core_assumption: OccupiedCoreAssumption,
394	) -> RelayChainResult<Option<ValidationCodeHash>> {
395		(**self)
396			.validation_code_hash(relay_parent, para_id, occupied_core_assumption)
397			.await
398	}
399
400	async fn availability_cores(
401		&self,
402		relay_parent: PHash,
403	) -> RelayChainResult<Vec<CoreState<PHash, BlockNumber>>> {
404		(**self).availability_cores(relay_parent).await
405	}
406
407	async fn candidates_pending_availability(
408		&self,
409		block_id: PHash,
410		para_id: ParaId,
411	) -> RelayChainResult<Vec<CommittedCandidateReceipt>> {
412		(**self).candidates_pending_availability(block_id, para_id).await
413	}
414
415	async fn version(&self, relay_parent: PHash) -> RelayChainResult<RuntimeVersion> {
416		(**self).version(relay_parent).await
417	}
418
419	async fn claim_queue(
420		&self,
421		relay_parent: PHash,
422	) -> RelayChainResult<BTreeMap<CoreIndex, VecDeque<ParaId>>> {
423		(**self).claim_queue(relay_parent).await
424	}
425
426	async fn scheduling_lookahead(&self, relay_parent: PHash) -> RelayChainResult<u32> {
427		(**self).scheduling_lookahead(relay_parent).await
428	}
429
430	async fn candidate_events(&self, at: RelayHash) -> RelayChainResult<Vec<CandidateEvent>> {
431		(**self).candidate_events(at).await
432	}
433}
434
435/// Helper function to call an arbitrary runtime API using a `RelayChainInterface` client.
436/// Unlike the trait method, this function can be generic, so it handles the encoding of input and
437/// output params.
438pub async fn call_runtime_api<R>(
439	client: &(impl RelayChainInterface + ?Sized),
440	method_name: &'static str,
441	hash: RelayHash,
442	payload: impl Encode,
443) -> RelayChainResult<R>
444where
445	R: Decode,
446{
447	let res = client.call_runtime_api(method_name, hash, &payload.encode()).await?;
448	Decode::decode(&mut &*res).map_err(Into::into)
449}