cumulus_relay_chain_rpc_interface/
rpc_client.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18use futures::channel::{
19	mpsc::{Receiver, Sender},
20	oneshot::Sender as OneshotSender,
21};
22use jsonrpsee::{
23	core::{params::ArrayParams, ClientError as JsonRpseeError},
24	rpc_params,
25};
26use prometheus::Registry;
27use serde::{de::DeserializeOwned, Serialize};
28use serde_json::Value as JsonValue;
29use std::collections::{btree_map::BTreeMap, VecDeque};
30use tokio::sync::mpsc::Sender as TokioSender;
31
32use codec::{Decode, Encode};
33
34use cumulus_primitives_core::{
35	relay_chain::{
36		async_backing::{AsyncBackingParams, BackingState, Constraints},
37		slashing, ApprovalVotingParams, BlockNumber, CandidateCommitments, CandidateEvent,
38		CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
39		CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash as RelayHash,
40		Header as RelayHeader, InboundHrmpMessage, NodeFeatures, OccupiedCoreAssumption,
41		PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
42		ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
43	},
44	InboundDownwardMessage, ParaId, PersistedValidationData,
45};
46use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
47
48use sc_client_api::StorageData;
49use sc_rpc_api::{state::ReadProof, system::Health};
50use sc_service::TaskManager;
51use sp_consensus_babe::Epoch;
52use sp_storage::StorageKey;
53use sp_version::RuntimeVersion;
54
55use crate::{metrics::RelaychainRpcMetrics, reconnecting_ws_client::ReconnectingWebsocketWorker};
56pub use url::Url;
57
58const LOG_TARGET: &str = "relay-chain-rpc-client";
59const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
60
61/// Messages for communication between [`RelayChainRpcClient`] and the RPC workers.
62#[derive(Debug)]
63pub enum RpcDispatcherMessage {
64	/// Register new listener for the best headers stream. Contains a sender which will be used
65	/// to send incoming headers.
66	RegisterBestHeadListener(Sender<RelayHeader>),
67
68	/// Register new listener for the import headers stream. Contains a sender which will be used
69	/// to send incoming headers.
70	RegisterImportListener(Sender<RelayHeader>),
71
72	/// Register new listener for the finalized headers stream. Contains a sender which will be
73	/// used to send incoming headers.
74	RegisterFinalizationListener(Sender<RelayHeader>),
75
76	/// Register new listener for the finalized headers stream.
77	/// Contains the following:
78	/// - [`String`] representing the RPC method to be called
79	/// - [`ArrayParams`] for the parameters to the RPC call
80	/// - [`OneshotSender`] for the return value of the request
81	Request(String, ArrayParams, OneshotSender<Result<JsonValue, JsonRpseeError>>),
82}
83
84/// Entry point to create [`RelayChainRpcClient`] and start a worker that communicates
85/// to JsonRPC servers over the network.
86pub async fn create_client_and_start_worker(
87	urls: Vec<Url>,
88	task_manager: &mut TaskManager,
89	prometheus_registry: Option<&Registry>,
90) -> RelayChainResult<RelayChainRpcClient> {
91	let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;
92
93	task_manager
94		.spawn_essential_handle()
95		.spawn("relay-chain-rpc-worker", None, worker.run());
96
97	let client = RelayChainRpcClient::new(sender, prometheus_registry);
98
99	Ok(client)
100}
101
102#[derive(Serialize)]
103struct PayloadToHex<'a>(#[serde(with = "sp_core::bytes")] &'a [u8]);
104
105/// Client that maps RPC methods and deserializes results
106#[derive(Clone)]
107pub struct RelayChainRpcClient {
108	/// Sender to send messages to the worker.
109	worker_channel: TokioSender<RpcDispatcherMessage>,
110	metrics: Option<RelaychainRpcMetrics>,
111}
112
113impl RelayChainRpcClient {
114	/// Initialize new RPC Client.
115	///
116	/// This client expects a channel connected to a worker that processes
117	/// requests sent via this channel.
118	pub(crate) fn new(
119		worker_channel: TokioSender<RpcDispatcherMessage>,
120		prometheus_registry: Option<&Registry>,
121	) -> Self {
122		RelayChainRpcClient {
123			worker_channel,
124			metrics: prometheus_registry
125				.and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| {
126					tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup.");
127				}).ok()),
128		}
129	}
130
131	/// Same as `call_remote_runtime_function` but work on encoded data
132	pub async fn call_remote_runtime_function_encoded(
133		&self,
134		method_name: &str,
135		hash: RelayHash,
136		payload: &[u8],
137	) -> RelayChainResult<sp_core::Bytes> {
138		let payload = PayloadToHex(payload);
139
140		let params = rpc_params! {
141			method_name,
142			payload,
143			hash
144		};
145
146		self.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
147			tracing::trace!(
148				target: LOG_TARGET,
149				%method_name,
150				%hash,
151				error = %err,
152				"Error during call to 'state_call'.",
153			);
154		})
155		.await
156	}
157
158	/// Call a call to `state_call` rpc method.
159	pub async fn call_remote_runtime_function<R: Decode>(
160		&self,
161		method_name: &str,
162		hash: RelayHash,
163		payload: Option<impl Encode>,
164	) -> RelayChainResult<R> {
165		let payload_bytes =
166			payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
167		let res = self
168			.call_remote_runtime_function_encoded(method_name, hash, &payload_bytes)
169			.await?;
170		Decode::decode(&mut &*res.0).map_err(Into::into)
171	}
172
173	/// Perform RPC request
174	async fn request<'a, R>(
175		&self,
176		method: &'a str,
177		params: ArrayParams,
178	) -> Result<R, RelayChainError>
179	where
180		R: DeserializeOwned + std::fmt::Debug,
181	{
182		self.request_tracing(
183			method,
184			params,
185			|e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
186		)
187		.await
188	}
189
190	/// Perform RPC request
191	async fn request_tracing<'a, R, OR>(
192		&self,
193		method: &'a str,
194		params: ArrayParams,
195		trace_error: OR,
196	) -> Result<R, RelayChainError>
197	where
198		R: DeserializeOwned + std::fmt::Debug,
199		OR: Fn(&RelayChainError),
200	{
201		let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method));
202
203		let (tx, rx) = futures::channel::oneshot::channel();
204
205		let message = RpcDispatcherMessage::Request(method.into(), params, tx);
206		self.worker_channel.send(message).await.map_err(|err| {
207			RelayChainError::WorkerCommunicationError(format!(
208				"Unable to send message to RPC worker: {}",
209				err
210			))
211		})?;
212
213		let value = rx.await.map_err(|err| {
214			RelayChainError::WorkerCommunicationError(format!(
215				"RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}",
216				err
217			))
218		})??;
219
220		serde_json::from_value(value).map_err(|_| {
221			trace_error(&RelayChainError::GenericError("Unable to deserialize value".to_string()));
222			RelayChainError::RpcCallError(method.to_string())
223		})
224	}
225
226	/// Returns information regarding the current epoch.
227	pub async fn babe_api_current_epoch(&self, at: RelayHash) -> Result<Epoch, RelayChainError> {
228		self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await
229	}
230
231	/// Scrape dispute relevant from on-chain, backing votes and resolved disputes.
232	pub async fn parachain_host_on_chain_votes(
233		&self,
234		at: RelayHash,
235	) -> Result<Option<ScrapedOnChainVotes<RelayHash>>, RelayChainError> {
236		self.call_remote_runtime_function("ParachainHost_on_chain_votes", at, None::<()>)
237			.await
238	}
239
240	/// Returns code hashes of PVFs that require pre-checking by validators in the active set.
241	pub async fn parachain_host_pvfs_require_precheck(
242		&self,
243		at: RelayHash,
244	) -> Result<Vec<ValidationCodeHash>, RelayChainError> {
245		self.call_remote_runtime_function("ParachainHost_pvfs_require_precheck", at, None::<()>)
246			.await
247	}
248
249	/// Submits a PVF pre-checking statement into the transaction pool.
250	pub async fn parachain_host_submit_pvf_check_statement(
251		&self,
252		at: RelayHash,
253		stmt: PvfCheckStatement,
254		signature: ValidatorSignature,
255	) -> Result<(), RelayChainError> {
256		self.call_remote_runtime_function(
257			"ParachainHost_submit_pvf_check_statement",
258			at,
259			Some((stmt, signature)),
260		)
261		.await
262	}
263
264	/// Get system health information
265	pub async fn system_health(&self) -> Result<Health, RelayChainError> {
266		self.request("system_health", rpc_params![]).await
267	}
268
269	/// Get read proof for `storage_keys`
270	pub async fn state_get_read_proof(
271		&self,
272		storage_keys: Vec<StorageKey>,
273		at: Option<RelayHash>,
274	) -> Result<ReadProof<RelayHash>, RelayChainError> {
275		let params = rpc_params![storage_keys, at];
276		self.request("state_getReadProof", params).await
277	}
278
279	/// Retrieve storage item at `storage_key`
280	pub async fn state_get_storage(
281		&self,
282		storage_key: StorageKey,
283		at: Option<RelayHash>,
284	) -> Result<Option<StorageData>, RelayChainError> {
285		let params = rpc_params![storage_key, at];
286		self.request("state_getStorage", params).await
287	}
288
289	/// Get hash of the n-th block in the canon chain.
290	///
291	/// By default returns latest block hash.
292	pub async fn chain_get_head(&self, at: Option<u64>) -> Result<RelayHash, RelayChainError> {
293		let params = rpc_params![at];
294		self.request("chain_getHead", params).await
295	}
296
297	/// Returns the validator groups and rotation info localized based on the hypothetical child
298	///  of a block whose state  this is invoked on. Note that `now` in the `GroupRotationInfo`
299	/// should be the successor of the number of the block.
300	pub async fn parachain_host_validator_groups(
301		&self,
302		at: RelayHash,
303	) -> Result<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo), RelayChainError> {
304		self.call_remote_runtime_function("ParachainHost_validator_groups", at, None::<()>)
305			.await
306	}
307
308	/// Get a vector of events concerning candidates that occurred within a block.
309	pub async fn parachain_host_candidate_events(
310		&self,
311		at: RelayHash,
312	) -> Result<Vec<CandidateEvent>, RelayChainError> {
313		self.call_remote_runtime_function("ParachainHost_candidate_events", at, None::<()>)
314			.await
315	}
316
317	/// Checks if the given validation outputs pass the acceptance criteria.
318	pub async fn parachain_host_check_validation_outputs(
319		&self,
320		at: RelayHash,
321		para_id: ParaId,
322		outputs: CandidateCommitments,
323	) -> Result<bool, RelayChainError> {
324		self.call_remote_runtime_function(
325			"ParachainHost_check_validation_outputs",
326			at,
327			Some((para_id, outputs)),
328		)
329		.await
330	}
331
332	/// Returns the persisted validation data for the given `ParaId` along with the corresponding
333	/// validation code hash. Instead of accepting assumption about the para, matches the validation
334	/// data hash against an expected one and yields `None` if they're not equal.
335	pub async fn parachain_host_assumed_validation_data(
336		&self,
337		at: RelayHash,
338		para_id: ParaId,
339		expected_hash: RelayHash,
340	) -> Result<Option<(PersistedValidationData, ValidationCodeHash)>, RelayChainError> {
341		self.call_remote_runtime_function(
342			"ParachainHost_persisted_assumed_validation_data",
343			at,
344			Some((para_id, expected_hash)),
345		)
346		.await
347	}
348
349	/// Get hash of last finalized block.
350	pub async fn chain_get_finalized_head(&self) -> Result<RelayHash, RelayChainError> {
351		self.request("chain_getFinalizedHead", rpc_params![]).await
352	}
353
354	/// Get hash of n-th block.
355	pub async fn chain_get_block_hash(
356		&self,
357		block_number: Option<BlockNumber>,
358	) -> Result<Option<RelayHash>, RelayChainError> {
359		let params = rpc_params![block_number];
360		self.request("chain_getBlockHash", params).await
361	}
362
363	/// Yields the persisted validation data for the given `ParaId` along with an assumption that
364	/// should be used if the para currently occupies a core.
365	///
366	/// Returns `None` if either the para is not registered or the assumption is `Freed`
367	/// and the para already occupies a core.
368	pub async fn parachain_host_persisted_validation_data(
369		&self,
370		at: RelayHash,
371		para_id: ParaId,
372		occupied_core_assumption: OccupiedCoreAssumption,
373	) -> Result<Option<PersistedValidationData>, RelayChainError> {
374		self.call_remote_runtime_function(
375			"ParachainHost_persisted_validation_data",
376			at,
377			Some((para_id, occupied_core_assumption)),
378		)
379		.await
380	}
381
382	/// Get the validation code from its hash.
383	pub async fn parachain_host_validation_code_by_hash(
384		&self,
385		at: RelayHash,
386		validation_code_hash: ValidationCodeHash,
387	) -> Result<Option<ValidationCode>, RelayChainError> {
388		self.call_remote_runtime_function(
389			"ParachainHost_validation_code_by_hash",
390			at,
391			Some(validation_code_hash),
392		)
393		.await
394	}
395
396	/// Yields information on all availability cores as relevant to the child block.
397	/// Cores are either free or occupied. Free cores can have paras assigned to them.
398	pub async fn parachain_host_availability_cores(
399		&self,
400		at: RelayHash,
401	) -> Result<Vec<CoreState<RelayHash, BlockNumber>>, RelayChainError> {
402		self.call_remote_runtime_function("ParachainHost_availability_cores", at, None::<()>)
403			.await
404	}
405
406	/// Get runtime version
407	pub async fn runtime_version(&self, at: RelayHash) -> Result<RuntimeVersion, RelayChainError> {
408		let params = rpc_params![at];
409		self.request("state_getRuntimeVersion", params).await
410	}
411
412	/// Returns all onchain disputes.
413	pub async fn parachain_host_disputes(
414		&self,
415		at: RelayHash,
416	) -> Result<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>, RelayChainError> {
417		self.call_remote_runtime_function("ParachainHost_disputes", at, None::<()>)
418			.await
419	}
420
421	/// Returns a list of validators that lost a past session dispute and need to be slashed.
422	///
423	/// This is a staging method! Do not use on production runtimes!
424	pub async fn parachain_host_unapplied_slashes(
425		&self,
426		at: RelayHash,
427	) -> Result<Vec<(SessionIndex, CandidateHash, slashing::LegacyPendingSlashes)>, RelayChainError>
428	{
429		self.call_remote_runtime_function("ParachainHost_unapplied_slashes", at, None::<()>)
430			.await
431	}
432
433	/// Returns a list of validators that lost a past session dispute and need to be slashed.
434	pub async fn parachain_host_unapplied_slashes_v2(
435		&self,
436		at: RelayHash,
437	) -> Result<Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>, RelayChainError> {
438		self.call_remote_runtime_function("ParachainHost_unapplied_slashes_v2", at, None::<()>)
439			.await
440	}
441
442	/// Returns a merkle proof of a validator session key in a past session.
443	///
444	/// This is a staging method! Do not use on production runtimes!
445	pub async fn parachain_host_key_ownership_proof(
446		&self,
447		at: RelayHash,
448		validator_id: ValidatorId,
449	) -> Result<Option<slashing::OpaqueKeyOwnershipProof>, RelayChainError> {
450		self.call_remote_runtime_function(
451			"ParachainHost_key_ownership_proof",
452			at,
453			Some(validator_id),
454		)
455		.await
456	}
457
458	/// Submits an unsigned extrinsic to slash validators who lost a dispute about
459	/// a candidate of a past session.
460	///
461	/// This is a staging method! Do not use on production runtimes!
462	pub async fn parachain_host_submit_report_dispute_lost(
463		&self,
464		at: RelayHash,
465		dispute_proof: slashing::DisputeProof,
466		key_ownership_proof: slashing::OpaqueKeyOwnershipProof,
467	) -> Result<Option<()>, RelayChainError> {
468		self.call_remote_runtime_function(
469			"ParachainHost_submit_report_dispute_lost",
470			at,
471			Some((dispute_proof, key_ownership_proof)),
472		)
473		.await
474	}
475
476	pub async fn authority_discovery_authorities(
477		&self,
478		at: RelayHash,
479	) -> Result<Vec<sp_authority_discovery::AuthorityId>, RelayChainError> {
480		self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>)
481			.await
482	}
483
484	/// Fetch the validation code used by a para, making the given `OccupiedCoreAssumption`.
485	///
486	/// Returns `None` if either the para is not registered or the assumption is `Freed`
487	/// and the para already occupies a core.
488	pub async fn parachain_host_validation_code(
489		&self,
490		at: RelayHash,
491		para_id: ParaId,
492		occupied_core_assumption: OccupiedCoreAssumption,
493	) -> Result<Option<ValidationCode>, RelayChainError> {
494		self.call_remote_runtime_function(
495			"ParachainHost_validation_code",
496			at,
497			Some((para_id, occupied_core_assumption)),
498		)
499		.await
500	}
501
502	/// Fetch the hash of the validation code used by a para, making the given
503	/// `OccupiedCoreAssumption`.
504	pub async fn parachain_host_validation_code_hash(
505		&self,
506		at: RelayHash,
507		para_id: ParaId,
508		occupied_core_assumption: OccupiedCoreAssumption,
509	) -> Result<Option<ValidationCodeHash>, RelayChainError> {
510		self.call_remote_runtime_function(
511			"ParachainHost_validation_code_hash",
512			at,
513			Some((para_id, occupied_core_assumption)),
514		)
515		.await
516	}
517
518	/// Get the session info for the given session, if stored.
519	pub async fn parachain_host_session_info(
520		&self,
521		at: RelayHash,
522		index: SessionIndex,
523	) -> Result<Option<SessionInfo>, RelayChainError> {
524		self.call_remote_runtime_function("ParachainHost_session_info", at, Some(index))
525			.await
526	}
527
528	/// Get the executor parameters for the given session, if stored
529	pub async fn parachain_host_session_executor_params(
530		&self,
531		at: RelayHash,
532		session_index: SessionIndex,
533	) -> Result<Option<ExecutorParams>, RelayChainError> {
534		self.call_remote_runtime_function(
535			"ParachainHost_session_executor_params",
536			at,
537			Some(session_index),
538		)
539		.await
540	}
541
542	/// Get header at specified hash
543	pub async fn chain_get_header(
544		&self,
545		hash: Option<RelayHash>,
546	) -> Result<Option<RelayHeader>, RelayChainError> {
547		let params = rpc_params![hash];
548		self.request("chain_getHeader", params).await
549	}
550
551	/// Get the receipt of a candidate pending availability. This returns `Some` for any paras
552	/// assigned to occupied cores in `availability_cores` and `None` otherwise.
553	pub async fn parachain_host_candidate_pending_availability(
554		&self,
555		at: RelayHash,
556		para_id: ParaId,
557	) -> Result<Option<CommittedCandidateReceipt>, RelayChainError> {
558		self.call_remote_runtime_function(
559			"ParachainHost_candidate_pending_availability",
560			at,
561			Some(para_id),
562		)
563		.await
564	}
565
566	/// Returns the session index expected at a child of the block.
567	///
568	/// This can be used to instantiate a `SigningContext`.
569	pub async fn parachain_host_session_index_for_child(
570		&self,
571		at: RelayHash,
572	) -> Result<SessionIndex, RelayChainError> {
573		self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>)
574			.await
575	}
576
577	/// Get the current validators.
578	pub async fn parachain_host_validators(
579		&self,
580		at: RelayHash,
581	) -> Result<Vec<ValidatorId>, RelayChainError> {
582		self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>)
583			.await
584	}
585
586	/// Get the contents of all channels addressed to the given recipient. Channels that have no
587	/// messages in them are also included.
588	pub async fn parachain_host_inbound_hrmp_channels_contents(
589		&self,
590		para_id: ParaId,
591		at: RelayHash,
592	) -> Result<BTreeMap<ParaId, Vec<InboundHrmpMessage>>, RelayChainError> {
593		self.call_remote_runtime_function(
594			"ParachainHost_inbound_hrmp_channels_contents",
595			at,
596			Some(para_id),
597		)
598		.await
599	}
600
601	/// Get all the pending inbound messages in the downward message queue for a para.
602	pub async fn parachain_host_dmq_contents(
603		&self,
604		para_id: ParaId,
605		at: RelayHash,
606	) -> Result<Vec<InboundDownwardMessage>, RelayChainError> {
607		self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id))
608			.await
609	}
610
611	/// Get the minimum number of backing votes for a candidate.
612	pub async fn parachain_host_minimum_backing_votes(
613		&self,
614		at: RelayHash,
615		_session_index: SessionIndex,
616	) -> Result<u32, RelayChainError> {
617		self.call_remote_runtime_function("ParachainHost_minimum_backing_votes", at, None::<()>)
618			.await
619	}
620
621	pub async fn parachain_host_node_features(
622		&self,
623		at: RelayHash,
624	) -> Result<NodeFeatures, RelayChainError> {
625		self.call_remote_runtime_function("ParachainHost_node_features", at, None::<()>)
626			.await
627	}
628
629	pub async fn parachain_host_disabled_validators(
630		&self,
631		at: RelayHash,
632	) -> Result<Vec<ValidatorIndex>, RelayChainError> {
633		self.call_remote_runtime_function("ParachainHost_disabled_validators", at, None::<()>)
634			.await
635	}
636
637	#[allow(missing_docs)]
638	pub async fn parachain_host_async_backing_params(
639		&self,
640		at: RelayHash,
641	) -> Result<AsyncBackingParams, RelayChainError> {
642		self.call_remote_runtime_function("ParachainHost_async_backing_params", at, None::<()>)
643			.await
644	}
645
646	#[allow(missing_docs)]
647	pub async fn parachain_host_staging_approval_voting_params(
648		&self,
649		at: RelayHash,
650		_session_index: SessionIndex,
651	) -> Result<ApprovalVotingParams, RelayChainError> {
652		self.call_remote_runtime_function(
653			"ParachainHost_staging_approval_voting_params",
654			at,
655			None::<()>,
656		)
657		.await
658	}
659
660	pub async fn parachain_host_para_backing_state(
661		&self,
662		at: RelayHash,
663		para_id: ParaId,
664	) -> Result<Option<BackingState>, RelayChainError> {
665		self.call_remote_runtime_function("ParachainHost_para_backing_state", at, Some(para_id))
666			.await
667	}
668
669	pub async fn parachain_host_claim_queue(
670		&self,
671		at: RelayHash,
672	) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, RelayChainError> {
673		self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>)
674			.await
675	}
676
677	/// Get the receipt of all candidates pending availability.
678	pub async fn parachain_host_candidates_pending_availability(
679		&self,
680		at: RelayHash,
681		para_id: ParaId,
682	) -> Result<Vec<CommittedCandidateReceipt>, RelayChainError> {
683		self.call_remote_runtime_function(
684			"ParachainHost_candidates_pending_availability",
685			at,
686			Some(para_id),
687		)
688		.await
689	}
690
691	pub async fn parachain_host_scheduling_lookahead(
692		&self,
693		at: RelayHash,
694	) -> Result<u32, RelayChainError> {
695		self.call_remote_runtime_function("ParachainHost_scheduling_lookahead", at, None::<()>)
696			.await
697	}
698
699	pub async fn parachain_host_validation_code_bomb_limit(
700		&self,
701		at: RelayHash,
702	) -> Result<u32, RelayChainError> {
703		self.call_remote_runtime_function(
704			"ParachainHost_validation_code_bomb_limit",
705			at,
706			None::<()>,
707		)
708		.await
709	}
710
711	pub async fn validation_code_hash(
712		&self,
713		at: RelayHash,
714		para_id: ParaId,
715		occupied_core_assumption: OccupiedCoreAssumption,
716	) -> Result<Option<ValidationCodeHash>, RelayChainError> {
717		self.call_remote_runtime_function(
718			"ParachainHost_validation_code_hash",
719			at,
720			Some((para_id, occupied_core_assumption)),
721		)
722		.await
723	}
724
725	pub async fn parachain_host_backing_constraints(
726		&self,
727		at: RelayHash,
728		para_id: ParaId,
729	) -> Result<Option<Constraints>, RelayChainError> {
730		self.call_remote_runtime_function("ParachainHost_backing_constraints", at, Some(para_id))
731			.await
732	}
733
734	fn send_register_message_to_worker(
735		&self,
736		message: RpcDispatcherMessage,
737	) -> Result<(), RelayChainError> {
738		self.worker_channel
739			.try_send(message)
740			.map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))
741	}
742
743	/// Get a stream of all imported relay chain headers
744	pub fn get_imported_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
745		let (tx, rx) =
746			futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
747		self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?;
748		Ok(rx)
749	}
750
751	/// Get a stream of new best relay chain headers
752	pub fn get_best_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
753		let (tx, rx) =
754			futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
755		self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?;
756		Ok(rx)
757	}
758
759	/// Get a stream of finalized relay chain headers
760	pub fn get_finalized_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
761		let (tx, rx) =
762			futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
763		self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener(
764			tx,
765		))?;
766		Ok(rx)
767	}
768
769	pub async fn parachain_host_para_ids(
770		&self,
771		at: RelayHash,
772	) -> Result<Vec<ParaId>, RelayChainError> {
773		self.call_remote_runtime_function("ParachainHost_para_ids", at, None::<()>)
774			.await
775	}
776}
777
778/// Send `header` through all channels contained in `senders`.
779/// If no one is listening to the sender, it is removed from the vector.
780pub fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>) {
781	senders.retain_mut(|e| {
782				match e.try_send(header.clone()) {
783					// Receiver has been dropped, remove Sender from list.
784					Err(error) if error.is_disconnected() => false,
785					// Channel is full. This should not happen.
786					// TODO: Improve error handling here
787					// https://github.com/paritytech/cumulus/issues/1482
788					Err(error) => {
789						tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications.");
790						true
791					},
792					_ => true,
793				}
794			});
795}