1use futures::channel::{
19 mpsc::{Receiver, Sender},
20 oneshot::Sender as OneshotSender,
21};
22use jsonrpsee::{
23 core::{params::ArrayParams, ClientError as JsonRpseeError},
24 rpc_params,
25};
26use prometheus::Registry;
27use serde::{de::DeserializeOwned, Serialize};
28use serde_json::Value as JsonValue;
29use std::collections::{btree_map::BTreeMap, VecDeque};
30use tokio::sync::mpsc::Sender as TokioSender;
31
32use codec::{Decode, Encode};
33
34use cumulus_primitives_core::{
35 relay_chain::{
36 async_backing::{AsyncBackingParams, BackingState, Constraints},
37 slashing, ApprovalVotingParams, BlockNumber, CandidateCommitments, CandidateEvent,
38 CandidateHash, CommittedCandidateReceiptV2 as CommittedCandidateReceipt, CoreIndex,
39 CoreState, DisputeState, ExecutorParams, GroupRotationInfo, Hash as RelayHash,
40 Header as RelayHeader, InboundHrmpMessage, NodeFeatures, OccupiedCoreAssumption,
41 PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, ValidationCode,
42 ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature,
43 },
44 InboundDownwardMessage, ParaId, PersistedValidationData,
45};
46use cumulus_relay_chain_interface::{RelayChainError, RelayChainResult};
47
48use sc_client_api::StorageData;
49use sc_rpc_api::{state::ReadProof, system::Health};
50use sc_service::TaskManager;
51use sp_consensus_babe::Epoch;
52use sp_storage::StorageKey;
53use sp_version::RuntimeVersion;
54
55use crate::{metrics::RelaychainRpcMetrics, reconnecting_ws_client::ReconnectingWebsocketWorker};
56pub use url::Url;
57
58const LOG_TARGET: &str = "relay-chain-rpc-client";
59const NOTIFICATION_CHANNEL_SIZE_LIMIT: usize = 20;
60
61#[derive(Debug)]
63pub enum RpcDispatcherMessage {
64 RegisterBestHeadListener(Sender<RelayHeader>),
67
68 RegisterImportListener(Sender<RelayHeader>),
71
72 RegisterFinalizationListener(Sender<RelayHeader>),
75
76 Request(String, ArrayParams, OneshotSender<Result<JsonValue, JsonRpseeError>>),
82}
83
84pub async fn create_client_and_start_worker(
87 urls: Vec<Url>,
88 task_manager: &mut TaskManager,
89 prometheus_registry: Option<&Registry>,
90) -> RelayChainResult<RelayChainRpcClient> {
91 let (worker, sender) = ReconnectingWebsocketWorker::new(urls).await;
92
93 task_manager
94 .spawn_essential_handle()
95 .spawn("relay-chain-rpc-worker", None, worker.run());
96
97 let client = RelayChainRpcClient::new(sender, prometheus_registry);
98
99 Ok(client)
100}
101
102#[derive(Serialize)]
103struct PayloadToHex<'a>(#[serde(with = "sp_core::bytes")] &'a [u8]);
104
105#[derive(Clone)]
107pub struct RelayChainRpcClient {
108 worker_channel: TokioSender<RpcDispatcherMessage>,
110 metrics: Option<RelaychainRpcMetrics>,
111}
112
113impl RelayChainRpcClient {
114 pub(crate) fn new(
119 worker_channel: TokioSender<RpcDispatcherMessage>,
120 prometheus_registry: Option<&Registry>,
121 ) -> Self {
122 RelayChainRpcClient {
123 worker_channel,
124 metrics: prometheus_registry
125 .and_then(|inner| RelaychainRpcMetrics::register(inner).map_err(|err| {
126 tracing::warn!(target: LOG_TARGET, error = %err, "Unable to instantiate the RPC client metrics, continuing w/o metrics setup.");
127 }).ok()),
128 }
129 }
130
131 pub async fn call_remote_runtime_function_encoded(
133 &self,
134 method_name: &str,
135 hash: RelayHash,
136 payload: &[u8],
137 ) -> RelayChainResult<sp_core::Bytes> {
138 let payload = PayloadToHex(payload);
139
140 let params = rpc_params! {
141 method_name,
142 payload,
143 hash
144 };
145
146 self.request_tracing::<sp_core::Bytes, _>("state_call", params, |err| {
147 tracing::trace!(
148 target: LOG_TARGET,
149 %method_name,
150 %hash,
151 error = %err,
152 "Error during call to 'state_call'.",
153 );
154 })
155 .await
156 }
157
158 pub async fn call_remote_runtime_function<R: Decode>(
160 &self,
161 method_name: &str,
162 hash: RelayHash,
163 payload: Option<impl Encode>,
164 ) -> RelayChainResult<R> {
165 let payload_bytes =
166 payload.map_or(sp_core::Bytes(Vec::new()), |v| sp_core::Bytes(v.encode()));
167 let res = self
168 .call_remote_runtime_function_encoded(method_name, hash, &payload_bytes)
169 .await?;
170 Decode::decode(&mut &*res.0).map_err(Into::into)
171 }
172
173 async fn request<'a, R>(
175 &self,
176 method: &'a str,
177 params: ArrayParams,
178 ) -> Result<R, RelayChainError>
179 where
180 R: DeserializeOwned + std::fmt::Debug,
181 {
182 self.request_tracing(
183 method,
184 params,
185 |e| tracing::trace!(target:LOG_TARGET, error = %e, %method, "Unable to complete RPC request"),
186 )
187 .await
188 }
189
190 async fn request_tracing<'a, R, OR>(
192 &self,
193 method: &'a str,
194 params: ArrayParams,
195 trace_error: OR,
196 ) -> Result<R, RelayChainError>
197 where
198 R: DeserializeOwned + std::fmt::Debug,
199 OR: Fn(&RelayChainError),
200 {
201 let _timer = self.metrics.as_ref().map(|inner| inner.start_request_timer(method));
202
203 let (tx, rx) = futures::channel::oneshot::channel();
204
205 let message = RpcDispatcherMessage::Request(method.into(), params, tx);
206 self.worker_channel.send(message).await.map_err(|err| {
207 RelayChainError::WorkerCommunicationError(format!(
208 "Unable to send message to RPC worker: {}",
209 err
210 ))
211 })?;
212
213 let value = rx.await.map_err(|err| {
214 RelayChainError::WorkerCommunicationError(format!(
215 "RPC worker channel closed. This can hint and connectivity issues with the supplied RPC endpoints. Message: {}",
216 err
217 ))
218 })??;
219
220 serde_json::from_value(value).map_err(|_| {
221 trace_error(&RelayChainError::GenericError("Unable to deserialize value".to_string()));
222 RelayChainError::RpcCallError(method.to_string())
223 })
224 }
225
226 pub async fn babe_api_current_epoch(&self, at: RelayHash) -> Result<Epoch, RelayChainError> {
228 self.call_remote_runtime_function("BabeApi_current_epoch", at, None::<()>).await
229 }
230
231 pub async fn parachain_host_on_chain_votes(
233 &self,
234 at: RelayHash,
235 ) -> Result<Option<ScrapedOnChainVotes<RelayHash>>, RelayChainError> {
236 self.call_remote_runtime_function("ParachainHost_on_chain_votes", at, None::<()>)
237 .await
238 }
239
240 pub async fn parachain_host_pvfs_require_precheck(
242 &self,
243 at: RelayHash,
244 ) -> Result<Vec<ValidationCodeHash>, RelayChainError> {
245 self.call_remote_runtime_function("ParachainHost_pvfs_require_precheck", at, None::<()>)
246 .await
247 }
248
249 pub async fn parachain_host_submit_pvf_check_statement(
251 &self,
252 at: RelayHash,
253 stmt: PvfCheckStatement,
254 signature: ValidatorSignature,
255 ) -> Result<(), RelayChainError> {
256 self.call_remote_runtime_function(
257 "ParachainHost_submit_pvf_check_statement",
258 at,
259 Some((stmt, signature)),
260 )
261 .await
262 }
263
264 pub async fn system_health(&self) -> Result<Health, RelayChainError> {
266 self.request("system_health", rpc_params![]).await
267 }
268
269 pub async fn state_get_read_proof(
271 &self,
272 storage_keys: Vec<StorageKey>,
273 at: Option<RelayHash>,
274 ) -> Result<ReadProof<RelayHash>, RelayChainError> {
275 let params = rpc_params![storage_keys, at];
276 self.request("state_getReadProof", params).await
277 }
278
279 pub async fn state_get_storage(
281 &self,
282 storage_key: StorageKey,
283 at: Option<RelayHash>,
284 ) -> Result<Option<StorageData>, RelayChainError> {
285 let params = rpc_params![storage_key, at];
286 self.request("state_getStorage", params).await
287 }
288
289 pub async fn chain_get_head(&self, at: Option<u64>) -> Result<RelayHash, RelayChainError> {
293 let params = rpc_params![at];
294 self.request("chain_getHead", params).await
295 }
296
297 pub async fn parachain_host_validator_groups(
301 &self,
302 at: RelayHash,
303 ) -> Result<(Vec<Vec<ValidatorIndex>>, GroupRotationInfo), RelayChainError> {
304 self.call_remote_runtime_function("ParachainHost_validator_groups", at, None::<()>)
305 .await
306 }
307
308 pub async fn parachain_host_candidate_events(
310 &self,
311 at: RelayHash,
312 ) -> Result<Vec<CandidateEvent>, RelayChainError> {
313 self.call_remote_runtime_function("ParachainHost_candidate_events", at, None::<()>)
314 .await
315 }
316
317 pub async fn parachain_host_check_validation_outputs(
319 &self,
320 at: RelayHash,
321 para_id: ParaId,
322 outputs: CandidateCommitments,
323 ) -> Result<bool, RelayChainError> {
324 self.call_remote_runtime_function(
325 "ParachainHost_check_validation_outputs",
326 at,
327 Some((para_id, outputs)),
328 )
329 .await
330 }
331
332 pub async fn parachain_host_assumed_validation_data(
336 &self,
337 at: RelayHash,
338 para_id: ParaId,
339 expected_hash: RelayHash,
340 ) -> Result<Option<(PersistedValidationData, ValidationCodeHash)>, RelayChainError> {
341 self.call_remote_runtime_function(
342 "ParachainHost_persisted_assumed_validation_data",
343 at,
344 Some((para_id, expected_hash)),
345 )
346 .await
347 }
348
349 pub async fn chain_get_finalized_head(&self) -> Result<RelayHash, RelayChainError> {
351 self.request("chain_getFinalizedHead", rpc_params![]).await
352 }
353
354 pub async fn chain_get_block_hash(
356 &self,
357 block_number: Option<BlockNumber>,
358 ) -> Result<Option<RelayHash>, RelayChainError> {
359 let params = rpc_params![block_number];
360 self.request("chain_getBlockHash", params).await
361 }
362
363 pub async fn parachain_host_persisted_validation_data(
369 &self,
370 at: RelayHash,
371 para_id: ParaId,
372 occupied_core_assumption: OccupiedCoreAssumption,
373 ) -> Result<Option<PersistedValidationData>, RelayChainError> {
374 self.call_remote_runtime_function(
375 "ParachainHost_persisted_validation_data",
376 at,
377 Some((para_id, occupied_core_assumption)),
378 )
379 .await
380 }
381
382 pub async fn parachain_host_validation_code_by_hash(
384 &self,
385 at: RelayHash,
386 validation_code_hash: ValidationCodeHash,
387 ) -> Result<Option<ValidationCode>, RelayChainError> {
388 self.call_remote_runtime_function(
389 "ParachainHost_validation_code_by_hash",
390 at,
391 Some(validation_code_hash),
392 )
393 .await
394 }
395
396 pub async fn parachain_host_availability_cores(
399 &self,
400 at: RelayHash,
401 ) -> Result<Vec<CoreState<RelayHash, BlockNumber>>, RelayChainError> {
402 self.call_remote_runtime_function("ParachainHost_availability_cores", at, None::<()>)
403 .await
404 }
405
406 pub async fn runtime_version(&self, at: RelayHash) -> Result<RuntimeVersion, RelayChainError> {
408 let params = rpc_params![at];
409 self.request("state_getRuntimeVersion", params).await
410 }
411
412 pub async fn parachain_host_disputes(
414 &self,
415 at: RelayHash,
416 ) -> Result<Vec<(SessionIndex, CandidateHash, DisputeState<BlockNumber>)>, RelayChainError> {
417 self.call_remote_runtime_function("ParachainHost_disputes", at, None::<()>)
418 .await
419 }
420
421 pub async fn parachain_host_unapplied_slashes(
425 &self,
426 at: RelayHash,
427 ) -> Result<Vec<(SessionIndex, CandidateHash, slashing::LegacyPendingSlashes)>, RelayChainError>
428 {
429 self.call_remote_runtime_function("ParachainHost_unapplied_slashes", at, None::<()>)
430 .await
431 }
432
433 pub async fn parachain_host_unapplied_slashes_v2(
435 &self,
436 at: RelayHash,
437 ) -> Result<Vec<(SessionIndex, CandidateHash, slashing::PendingSlashes)>, RelayChainError> {
438 self.call_remote_runtime_function("ParachainHost_unapplied_slashes_v2", at, None::<()>)
439 .await
440 }
441
442 pub async fn parachain_host_key_ownership_proof(
446 &self,
447 at: RelayHash,
448 validator_id: ValidatorId,
449 ) -> Result<Option<slashing::OpaqueKeyOwnershipProof>, RelayChainError> {
450 self.call_remote_runtime_function(
451 "ParachainHost_key_ownership_proof",
452 at,
453 Some(validator_id),
454 )
455 .await
456 }
457
458 pub async fn parachain_host_submit_report_dispute_lost(
463 &self,
464 at: RelayHash,
465 dispute_proof: slashing::DisputeProof,
466 key_ownership_proof: slashing::OpaqueKeyOwnershipProof,
467 ) -> Result<Option<()>, RelayChainError> {
468 self.call_remote_runtime_function(
469 "ParachainHost_submit_report_dispute_lost",
470 at,
471 Some((dispute_proof, key_ownership_proof)),
472 )
473 .await
474 }
475
476 pub async fn authority_discovery_authorities(
477 &self,
478 at: RelayHash,
479 ) -> Result<Vec<sp_authority_discovery::AuthorityId>, RelayChainError> {
480 self.call_remote_runtime_function("AuthorityDiscoveryApi_authorities", at, None::<()>)
481 .await
482 }
483
484 pub async fn parachain_host_validation_code(
489 &self,
490 at: RelayHash,
491 para_id: ParaId,
492 occupied_core_assumption: OccupiedCoreAssumption,
493 ) -> Result<Option<ValidationCode>, RelayChainError> {
494 self.call_remote_runtime_function(
495 "ParachainHost_validation_code",
496 at,
497 Some((para_id, occupied_core_assumption)),
498 )
499 .await
500 }
501
502 pub async fn parachain_host_validation_code_hash(
505 &self,
506 at: RelayHash,
507 para_id: ParaId,
508 occupied_core_assumption: OccupiedCoreAssumption,
509 ) -> Result<Option<ValidationCodeHash>, RelayChainError> {
510 self.call_remote_runtime_function(
511 "ParachainHost_validation_code_hash",
512 at,
513 Some((para_id, occupied_core_assumption)),
514 )
515 .await
516 }
517
518 pub async fn parachain_host_session_info(
520 &self,
521 at: RelayHash,
522 index: SessionIndex,
523 ) -> Result<Option<SessionInfo>, RelayChainError> {
524 self.call_remote_runtime_function("ParachainHost_session_info", at, Some(index))
525 .await
526 }
527
528 pub async fn parachain_host_session_executor_params(
530 &self,
531 at: RelayHash,
532 session_index: SessionIndex,
533 ) -> Result<Option<ExecutorParams>, RelayChainError> {
534 self.call_remote_runtime_function(
535 "ParachainHost_session_executor_params",
536 at,
537 Some(session_index),
538 )
539 .await
540 }
541
542 pub async fn chain_get_header(
544 &self,
545 hash: Option<RelayHash>,
546 ) -> Result<Option<RelayHeader>, RelayChainError> {
547 let params = rpc_params![hash];
548 self.request("chain_getHeader", params).await
549 }
550
551 pub async fn parachain_host_candidate_pending_availability(
554 &self,
555 at: RelayHash,
556 para_id: ParaId,
557 ) -> Result<Option<CommittedCandidateReceipt>, RelayChainError> {
558 self.call_remote_runtime_function(
559 "ParachainHost_candidate_pending_availability",
560 at,
561 Some(para_id),
562 )
563 .await
564 }
565
566 pub async fn parachain_host_session_index_for_child(
570 &self,
571 at: RelayHash,
572 ) -> Result<SessionIndex, RelayChainError> {
573 self.call_remote_runtime_function("ParachainHost_session_index_for_child", at, None::<()>)
574 .await
575 }
576
577 pub async fn parachain_host_validators(
579 &self,
580 at: RelayHash,
581 ) -> Result<Vec<ValidatorId>, RelayChainError> {
582 self.call_remote_runtime_function("ParachainHost_validators", at, None::<()>)
583 .await
584 }
585
586 pub async fn parachain_host_inbound_hrmp_channels_contents(
589 &self,
590 para_id: ParaId,
591 at: RelayHash,
592 ) -> Result<BTreeMap<ParaId, Vec<InboundHrmpMessage>>, RelayChainError> {
593 self.call_remote_runtime_function(
594 "ParachainHost_inbound_hrmp_channels_contents",
595 at,
596 Some(para_id),
597 )
598 .await
599 }
600
601 pub async fn parachain_host_dmq_contents(
603 &self,
604 para_id: ParaId,
605 at: RelayHash,
606 ) -> Result<Vec<InboundDownwardMessage>, RelayChainError> {
607 self.call_remote_runtime_function("ParachainHost_dmq_contents", at, Some(para_id))
608 .await
609 }
610
611 pub async fn parachain_host_minimum_backing_votes(
613 &self,
614 at: RelayHash,
615 _session_index: SessionIndex,
616 ) -> Result<u32, RelayChainError> {
617 self.call_remote_runtime_function("ParachainHost_minimum_backing_votes", at, None::<()>)
618 .await
619 }
620
621 pub async fn parachain_host_node_features(
622 &self,
623 at: RelayHash,
624 ) -> Result<NodeFeatures, RelayChainError> {
625 self.call_remote_runtime_function("ParachainHost_node_features", at, None::<()>)
626 .await
627 }
628
629 pub async fn parachain_host_disabled_validators(
630 &self,
631 at: RelayHash,
632 ) -> Result<Vec<ValidatorIndex>, RelayChainError> {
633 self.call_remote_runtime_function("ParachainHost_disabled_validators", at, None::<()>)
634 .await
635 }
636
637 #[allow(missing_docs)]
638 pub async fn parachain_host_async_backing_params(
639 &self,
640 at: RelayHash,
641 ) -> Result<AsyncBackingParams, RelayChainError> {
642 self.call_remote_runtime_function("ParachainHost_async_backing_params", at, None::<()>)
643 .await
644 }
645
646 #[allow(missing_docs)]
647 pub async fn parachain_host_staging_approval_voting_params(
648 &self,
649 at: RelayHash,
650 _session_index: SessionIndex,
651 ) -> Result<ApprovalVotingParams, RelayChainError> {
652 self.call_remote_runtime_function(
653 "ParachainHost_staging_approval_voting_params",
654 at,
655 None::<()>,
656 )
657 .await
658 }
659
660 pub async fn parachain_host_para_backing_state(
661 &self,
662 at: RelayHash,
663 para_id: ParaId,
664 ) -> Result<Option<BackingState>, RelayChainError> {
665 self.call_remote_runtime_function("ParachainHost_para_backing_state", at, Some(para_id))
666 .await
667 }
668
669 pub async fn parachain_host_claim_queue(
670 &self,
671 at: RelayHash,
672 ) -> Result<BTreeMap<CoreIndex, VecDeque<ParaId>>, RelayChainError> {
673 self.call_remote_runtime_function("ParachainHost_claim_queue", at, None::<()>)
674 .await
675 }
676
677 pub async fn parachain_host_candidates_pending_availability(
679 &self,
680 at: RelayHash,
681 para_id: ParaId,
682 ) -> Result<Vec<CommittedCandidateReceipt>, RelayChainError> {
683 self.call_remote_runtime_function(
684 "ParachainHost_candidates_pending_availability",
685 at,
686 Some(para_id),
687 )
688 .await
689 }
690
691 pub async fn parachain_host_scheduling_lookahead(
692 &self,
693 at: RelayHash,
694 ) -> Result<u32, RelayChainError> {
695 self.call_remote_runtime_function("ParachainHost_scheduling_lookahead", at, None::<()>)
696 .await
697 }
698
699 pub async fn parachain_host_validation_code_bomb_limit(
700 &self,
701 at: RelayHash,
702 ) -> Result<u32, RelayChainError> {
703 self.call_remote_runtime_function(
704 "ParachainHost_validation_code_bomb_limit",
705 at,
706 None::<()>,
707 )
708 .await
709 }
710
711 pub async fn validation_code_hash(
712 &self,
713 at: RelayHash,
714 para_id: ParaId,
715 occupied_core_assumption: OccupiedCoreAssumption,
716 ) -> Result<Option<ValidationCodeHash>, RelayChainError> {
717 self.call_remote_runtime_function(
718 "ParachainHost_validation_code_hash",
719 at,
720 Some((para_id, occupied_core_assumption)),
721 )
722 .await
723 }
724
725 pub async fn parachain_host_backing_constraints(
726 &self,
727 at: RelayHash,
728 para_id: ParaId,
729 ) -> Result<Option<Constraints>, RelayChainError> {
730 self.call_remote_runtime_function("ParachainHost_backing_constraints", at, Some(para_id))
731 .await
732 }
733
734 fn send_register_message_to_worker(
735 &self,
736 message: RpcDispatcherMessage,
737 ) -> Result<(), RelayChainError> {
738 self.worker_channel
739 .try_send(message)
740 .map_err(|e| RelayChainError::WorkerCommunicationError(e.to_string()))
741 }
742
743 pub fn get_imported_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
745 let (tx, rx) =
746 futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
747 self.send_register_message_to_worker(RpcDispatcherMessage::RegisterImportListener(tx))?;
748 Ok(rx)
749 }
750
751 pub fn get_best_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
753 let (tx, rx) =
754 futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
755 self.send_register_message_to_worker(RpcDispatcherMessage::RegisterBestHeadListener(tx))?;
756 Ok(rx)
757 }
758
759 pub fn get_finalized_heads_stream(&self) -> Result<Receiver<RelayHeader>, RelayChainError> {
761 let (tx, rx) =
762 futures::channel::mpsc::channel::<RelayHeader>(NOTIFICATION_CHANNEL_SIZE_LIMIT);
763 self.send_register_message_to_worker(RpcDispatcherMessage::RegisterFinalizationListener(
764 tx,
765 ))?;
766 Ok(rx)
767 }
768
769 pub async fn parachain_host_para_ids(
770 &self,
771 at: RelayHash,
772 ) -> Result<Vec<ParaId>, RelayChainError> {
773 self.call_remote_runtime_function("ParachainHost_para_ids", at, None::<()>)
774 .await
775 }
776}
777
778pub fn distribute_header(header: RelayHeader, senders: &mut Vec<Sender<RelayHeader>>) {
781 senders.retain_mut(|e| {
782 match e.try_send(header.clone()) {
783 Err(error) if error.is_disconnected() => false,
785 Err(error) => {
789 tracing::error!(target: LOG_TARGET, ?error, "Event distribution channel has reached its limit. This can lead to missed notifications.");
790 true
791 },
792 _ => true,
793 }
794 });
795}