cumulus_client_service/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Cumulus is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Cumulus is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Cumulus. If not, see <https://www.gnu.org/licenses/>.
17
18//! Cumulus service
19//!
20//! Provides functions for starting a collator node or a normal full node.
21
22use cumulus_client_cli::CollatorOptions;
23use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce};
24use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle};
25use cumulus_primitives_core::{CollectCollationInfo, ParaId};
26pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
27use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
28use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
29use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
30use futures::{channel::mpsc, StreamExt};
31use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption};
32use prometheus::{Histogram, HistogramOpts, Registry};
33use sc_client_api::{
34	Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
35};
36use sc_consensus::{
37	import_queue::{ImportQueue, ImportQueueService},
38	BlockImport,
39};
40use sc_network::{
41	config::SyncMode, request_responses::IncomingRequest, service::traits::NetworkService,
42	NetworkBackend,
43};
44use sc_network_sync::SyncingService;
45use sc_network_transactions::TransactionsHandlerController;
46use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
47use sc_telemetry::{log, TelemetryWorkerHandle};
48use sc_tracing::block::TracingExecuteBlock;
49use sc_utils::mpsc::TracingUnboundedSender;
50use sp_api::{ApiExt, Core, ProofRecorder, ProvideRuntimeApi};
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_core::Decode;
53use sp_runtime::{
54	traits::{Block as BlockT, BlockIdTo, Header},
55	SaturatedConversion, Saturating,
56};
57use sp_trie::proof_size_extension::ProofSizeExt;
58use std::{
59	sync::Arc,
60	time::{Duration, Instant},
61};
62
63/// Host functions that should be used in parachain nodes.
64///
65/// Contains the standard substrate host functions, as well as a
66/// host function to enable PoV-reclaim on parachain nodes.
67pub type ParachainHostFunctions = (
68	cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions,
69	sp_io::SubstrateHostFunctions,
70);
71
72// Given the sporadic nature of the explicit recovery operation and the
73// possibility to retry infinite times this value is more than enough.
74// In practice here we expect no more than one queued messages.
75const RECOVERY_CHAN_SIZE: usize = 8;
76const LOG_TARGET_SYNC: &str = "sync::cumulus";
77
78/// A hint about how long the node should wait before attempting to recover missing block data
79/// from the data availability layer.
80pub enum DARecoveryProfile {
81	/// Collators use an aggressive recovery profile by default.
82	Collator,
83	/// Full nodes use a passive recovery profile by default, as they are not direct
84	/// victims of withholding attacks.
85	FullNode,
86	/// Provide an explicit recovery profile.
87	Other(RecoveryDelayRange),
88}
89
90/// Parameters given to [`start_relay_chain_tasks`].
91pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
92	pub client: Arc<Client>,
93	pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
94	pub para_id: ParaId,
95	pub relay_chain_interface: RCInterface,
96	pub task_manager: &'a mut TaskManager,
97	pub da_recovery_profile: DARecoveryProfile,
98	pub import_queue: Box<dyn ImportQueueService<Block>>,
99	pub relay_chain_slot_duration: Duration,
100	pub recovery_handle: Box<dyn RecoveryHandle>,
101	pub sync_service: Arc<SyncingService<Block>>,
102	pub prometheus_registry: Option<&'a Registry>,
103}
104
105/// Start necessary consensus tasks related to the relay chain.
106///
107/// Parachain nodes need to track the state of the relay chain and use the
108/// relay chain's data availability service to fetch blocks if they don't
109/// arrive via the normal p2p layer (i.e. when authors withhold their blocks deliberately).
110///
111/// This function spawns work for those side tasks.
112///
113/// It also spawns a parachain informant task that will log the relay chain state and some metrics.
114pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
115	StartRelayChainTasksParams {
116		client,
117		announce_block,
118		para_id,
119		task_manager,
120		da_recovery_profile,
121		relay_chain_interface,
122		import_queue,
123		relay_chain_slot_duration,
124		recovery_handle,
125		sync_service,
126		prometheus_registry,
127	}: StartRelayChainTasksParams<Block, Client, RCInterface>,
128) -> sc_service::error::Result<()>
129where
130	Block: BlockT,
131	Client: Finalizer<Block, Backend>
132		+ UsageProvider<Block>
133		+ HeaderBackend<Block>
134		+ Send
135		+ Sync
136		+ BlockBackend<Block>
137		+ BlockchainEvents<Block>
138		+ 'static,
139	for<'a> &'a Client: BlockImport<Block>,
140	Backend: BackendT<Block> + 'static,
141	RCInterface: RelayChainInterface + Clone + 'static,
142{
143	let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
144
145	cumulus_client_consensus_common::spawn_parachain_consensus_tasks(
146		para_id,
147		client.clone(),
148		relay_chain_interface.clone(),
149		announce_block.clone(),
150		Some(recovery_chan_tx),
151		task_manager.spawn_essential_handle(),
152	);
153
154	let da_recovery_profile = match da_recovery_profile {
155		DARecoveryProfile::Collator => {
156			// We want that collators wait at maximum the relay chain slot duration before starting
157			// to recover blocks. Additionally, we wait at least half the slot time to give the
158			// relay chain the chance to increase availability.
159			RecoveryDelayRange {
160				min: relay_chain_slot_duration / 2,
161				max: relay_chain_slot_duration,
162			}
163		},
164		DARecoveryProfile::FullNode => {
165			// Full nodes should at least wait 2.5 minutes (assuming 6 seconds slot duration) and
166			// in maximum 5 minutes before starting to recover blocks. Collators should already
167			// start the recovery way before full nodes try to recover a certain block and then
168			// share the block with the network using "the normal way". Full nodes are just the
169			// "last resort" for block recovery.
170			RecoveryDelayRange {
171				min: relay_chain_slot_duration * 25,
172				max: relay_chain_slot_duration * 50,
173			}
174		},
175		DARecoveryProfile::Other(profile) => profile,
176	};
177
178	let pov_recovery = PoVRecovery::new(
179		recovery_handle,
180		da_recovery_profile,
181		client.clone(),
182		import_queue,
183		relay_chain_interface.clone(),
184		para_id,
185		recovery_chan_rx,
186		sync_service.clone(),
187	);
188
189	task_manager
190		.spawn_essential_handle()
191		.spawn("cumulus-pov-recovery", None, pov_recovery.run());
192
193	let parachain_informant = parachain_informant::<Block, _>(
194		para_id,
195		relay_chain_interface.clone(),
196		client.clone(),
197		prometheus_registry.map(ParachainInformantMetrics::new).transpose()?,
198	);
199	task_manager
200		.spawn_handle()
201		.spawn("parachain-informant", None, parachain_informant);
202
203	Ok(())
204}
205
206/// Prepare the parachain's node configuration
207///
208/// This function will:
209/// * Disable the default announcement of Substrate for the parachain in favor of the one of
210///   Cumulus.
211/// * Set peers needed to start warp sync to 1.
212pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
213	parachain_config.announce_block = false;
214	// Parachains only need 1 peer to start warp sync, because the target block is fetched from the
215	// relay chain.
216	parachain_config.network.min_peers_to_start_warp_sync = Some(1);
217
218	parachain_config
219}
220
221/// Build a relay chain interface.
222/// Will return a minimal relay chain node with RPC
223/// client or an inprocess node, based on the [`CollatorOptions`] passed in.
224pub async fn build_relay_chain_interface(
225	relay_chain_config: Configuration,
226	parachain_config: &Configuration,
227	telemetry_worker_handle: Option<TelemetryWorkerHandle>,
228	task_manager: &mut TaskManager,
229	collator_options: CollatorOptions,
230	hwbench: Option<sc_sysinfo::HwBench>,
231) -> RelayChainResult<(
232	Arc<dyn RelayChainInterface + 'static>,
233	Option<CollatorPair>,
234	Arc<dyn NetworkService>,
235	async_channel::Receiver<IncomingRequest>,
236)> {
237	match collator_options.relay_chain_mode {
238		cumulus_client_cli::RelayChainMode::Embedded => build_inprocess_relay_chain(
239			relay_chain_config,
240			parachain_config,
241			telemetry_worker_handle,
242			task_manager,
243			hwbench,
244		),
245		cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
246			build_minimal_relay_chain_node_with_rpc(
247				relay_chain_config,
248				parachain_config.prometheus_registry(),
249				task_manager,
250				rpc_target_urls,
251			)
252			.await,
253	}
254}
255
256/// The expected level of collator sybil-resistance on the network. This is used to
257/// configure the type of metadata passed alongside block announcements on the network.
258pub enum CollatorSybilResistance {
259	/// There is a collator-selection protocol which provides sybil-resistance,
260	/// such as Aura. Sybil-resistant collator-selection protocols are able to
261	/// operate more efficiently.
262	Resistant,
263	/// There is no collator-selection protocol providing sybil-resistance.
264	/// In situations such as "free-for-all" collators, the network is unresistant
265	/// and needs to attach more metadata to block announcements, relying on relay-chain
266	/// validators to avoid handling unbounded numbers of blocks.
267	Unresistant,
268}
269
270/// Parameters given to [`build_network`].
271pub struct BuildNetworkParams<
272	'a,
273	Block: BlockT,
274	Client: ProvideRuntimeApi<Block>
275		+ BlockBackend<Block>
276		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
277		+ HeaderBackend<Block>
278		+ BlockIdTo<Block>
279		+ 'static,
280	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
281	RCInterface,
282	IQ,
283> where
284	Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
285{
286	pub parachain_config: &'a Configuration,
287	pub net_config:
288		sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
289	pub client: Arc<Client>,
290	pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
291	pub para_id: ParaId,
292	pub relay_chain_interface: RCInterface,
293	pub spawn_handle: SpawnTaskHandle,
294	pub import_queue: IQ,
295	pub sybil_resistance_level: CollatorSybilResistance,
296	pub metrics: sc_network::NotificationMetrics,
297}
298
299/// Build the network service, the network status sinks and an RPC sender.
300pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
301	BuildNetworkParams {
302		parachain_config,
303		net_config,
304		client,
305		transaction_pool,
306		para_id,
307		spawn_handle,
308		relay_chain_interface,
309		import_queue,
310		sybil_resistance_level,
311		metrics,
312	}: BuildNetworkParams<'a, Block, Client, Network, RCInterface, IQ>,
313) -> sc_service::error::Result<(
314	Arc<dyn NetworkService>,
315	TracingUnboundedSender<sc_rpc::system::Request<Block>>,
316	TransactionsHandlerController<Block::Hash>,
317	Arc<SyncingService<Block>>,
318)>
319where
320	Block: BlockT,
321	Client: UsageProvider<Block>
322		+ HeaderBackend<Block>
323		+ sp_consensus::block_validation::Chain<Block>
324		+ Send
325		+ Sync
326		+ BlockBackend<Block>
327		+ BlockchainEvents<Block>
328		+ ProvideRuntimeApi<Block>
329		+ HeaderMetadata<Block, Error = sp_blockchain::Error>
330		+ BlockIdTo<Block, Error = sp_blockchain::Error>
331		+ ProofProvider<Block>
332		+ 'static,
333	Client::Api: CollectCollationInfo<Block>
334		+ sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
335	for<'b> &'b Client: BlockImport<Block>,
336	RCInterface: RelayChainInterface + Clone + 'static,
337	IQ: ImportQueue<Block> + 'static,
338	Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
339{
340	let warp_sync_config = match parachain_config.network.sync_mode {
341		SyncMode::Warp => {
342			log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");
343
344			let target_block =
345				wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
346					.await
347					.inspect_err(|e| {
348						log::error!(
349							target: LOG_TARGET_SYNC,
350							"Unable to determine parachain target block {:?}",
351							e
352						);
353					})?;
354			Some(WarpSyncConfig::WithTarget(target_block))
355		},
356		_ => None,
357	};
358
359	let block_announce_validator = match sybil_resistance_level {
360		CollatorSybilResistance::Resistant => {
361			let block_announce_validator = AssumeSybilResistance::allow_seconded_messages();
362			Box::new(block_announce_validator) as Box<_>
363		},
364		CollatorSybilResistance::Unresistant => {
365			let block_announce_validator =
366				RequireSecondedInBlockAnnounce::new(relay_chain_interface, para_id);
367			Box::new(block_announce_validator) as Box<_>
368		},
369	};
370
371	sc_service::build_network(sc_service::BuildNetworkParams {
372		config: parachain_config,
373		net_config,
374		client,
375		transaction_pool,
376		spawn_handle,
377		import_queue,
378		block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
379		warp_sync_config,
380		block_relay: None,
381		metrics,
382	})
383}
384
385/// Waits for the relay chain to have finished syncing and then gets the parachain header that
386/// corresponds to the last finalized relay chain block.
387async fn wait_for_finalized_para_head<B, RCInterface>(
388	para_id: ParaId,
389	relay_chain_interface: RCInterface,
390) -> sc_service::error::Result<<B as BlockT>::Header>
391where
392	B: BlockT + 'static,
393	RCInterface: RelayChainInterface + Send + 'static,
394{
395	let mut imported_blocks = relay_chain_interface
396		.import_notification_stream()
397		.await
398		.map_err(|error| {
399			sc_service::Error::Other(format!(
400				"Relay chain import notification stream error when waiting for parachain head: \
401				{error}"
402			))
403		})?
404		.fuse();
405	while imported_blocks.next().await.is_some() {
406		let is_syncing = relay_chain_interface
407			.is_major_syncing()
408			.await
409			.map_err(|e| format!("Unable to determine sync status: {e}"))?;
410
411		if !is_syncing {
412			let relay_chain_best_hash = relay_chain_interface
413				.finalized_block_hash()
414				.await
415				.map_err(|e| Box::new(e) as Box<_>)?;
416
417			let validation_data = relay_chain_interface
418				.persisted_validation_data(
419					relay_chain_best_hash,
420					para_id,
421					OccupiedCoreAssumption::TimedOut,
422				)
423				.await
424				.map_err(|e| format!("{e:?}"))?
425				.ok_or("Could not find parachain head in relay chain")?;
426
427			let finalized_header = B::Header::decode(&mut &validation_data.parent_head.0[..])
428				.map_err(|e| format!("Failed to decode parachain head: {e}"))?;
429
430			log::info!(
431				"🎉 Received target parachain header #{} ({}) from the relay chain.",
432				finalized_header.number(),
433				finalized_header.hash()
434			);
435			return Ok(finalized_header)
436		}
437	}
438
439	Err("Stopping following imported blocks. Could not determine parachain target block".into())
440}
441
442/// Task for logging candidate events and some related metrics.
443async fn parachain_informant<Block: BlockT, Client>(
444	para_id: ParaId,
445	relay_chain_interface: impl RelayChainInterface + Clone,
446	client: Arc<Client>,
447	metrics: Option<ParachainInformantMetrics>,
448) where
449	Client: HeaderBackend<Block> + Send + Sync + 'static,
450{
451	let mut import_notifications = match relay_chain_interface.import_notification_stream().await {
452		Ok(import_notifications) => import_notifications,
453		Err(e) => {
454			log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
455			return
456		},
457	};
458	let mut last_backed_block_time: Option<Instant> = None;
459	while let Some(n) = import_notifications.next().await {
460		let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await {
461			Ok(candidate_events) => candidate_events,
462			Err(e) => {
463				log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
464				continue
465			},
466		};
467		let mut backed_candidates = Vec::new();
468		let mut included_candidates = Vec::new();
469		let mut timed_out_candidates = Vec::new();
470		for event in candidate_events {
471			match event {
472				CandidateEvent::CandidateBacked(receipt, head, _, _) => {
473					if receipt.descriptor.para_id() != para_id {
474						continue;
475					}
476					let backed_block = match Block::Header::decode(&mut &head.0[..]) {
477						Ok(header) => header,
478						Err(e) => {
479							log::warn!(
480								"Failed to decode parachain header from backed block: {e:?}"
481							);
482							continue
483						},
484					};
485					let backed_block_time = Instant::now();
486					if let Some(last_backed_block_time) = &last_backed_block_time {
487						let duration = backed_block_time.duration_since(*last_backed_block_time);
488						if let Some(metrics) = &metrics {
489							metrics.parachain_block_backed_duration.observe(duration.as_secs_f64());
490						}
491					}
492					last_backed_block_time = Some(backed_block_time);
493					backed_candidates.push(backed_block);
494				},
495				CandidateEvent::CandidateIncluded(receipt, head, _, _) => {
496					if receipt.descriptor.para_id() != para_id {
497						continue;
498					}
499					let included_block = match Block::Header::decode(&mut &head.0[..]) {
500						Ok(header) => header,
501						Err(e) => {
502							log::warn!(
503								"Failed to decode parachain header from included block: {e:?}"
504							);
505							continue
506						},
507					};
508					let unincluded_segment_size =
509						client.info().best_number.saturating_sub(*included_block.number());
510					let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into();
511					if let Some(metrics) = &metrics {
512						metrics.unincluded_segment_size.observe(unincluded_segment_size.into());
513					}
514					included_candidates.push(included_block);
515				},
516				CandidateEvent::CandidateTimedOut(receipt, head, _) => {
517					if receipt.descriptor.para_id() != para_id {
518						continue;
519					}
520					let timed_out_block = match Block::Header::decode(&mut &head.0[..]) {
521						Ok(header) => header,
522						Err(e) => {
523							log::warn!(
524								"Failed to decode parachain header from timed out block: {e:?}"
525							);
526							continue
527						},
528					};
529					timed_out_candidates.push(timed_out_block);
530				},
531			}
532		}
533		let mut log_parts = Vec::new();
534		if !backed_candidates.is_empty() {
535			let backed_candidates = backed_candidates
536				.into_iter()
537				.map(|c| format!("#{} ({})", c.number(), c.hash()))
538				.collect::<Vec<_>>()
539				.join(", ");
540			log_parts.push(format!("backed: {}", backed_candidates));
541		};
542		if !included_candidates.is_empty() {
543			let included_candidates = included_candidates
544				.into_iter()
545				.map(|c| format!("#{} ({})", c.number(), c.hash()))
546				.collect::<Vec<_>>()
547				.join(", ");
548			log_parts.push(format!("included: {}", included_candidates));
549		};
550		if !timed_out_candidates.is_empty() {
551			let timed_out_candidates = timed_out_candidates
552				.into_iter()
553				.map(|c| format!("#{} ({})", c.number(), c.hash()))
554				.collect::<Vec<_>>()
555				.join(", ");
556			log_parts.push(format!("timed out: {}", timed_out_candidates));
557		};
558		if !log_parts.is_empty() {
559			log::info!(
560				"Update at relay chain block #{} ({}) - {}",
561				n.number(),
562				n.hash(),
563				log_parts.join(", ")
564			);
565		}
566	}
567}
568
569struct ParachainInformantMetrics {
570	/// Time between parachain blocks getting backed by the relaychain.
571	parachain_block_backed_duration: Histogram,
572	/// Number of blocks between best block and last included block.
573	unincluded_segment_size: Histogram,
574}
575
576impl ParachainInformantMetrics {
577	fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
578		let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new(
579			"parachain_block_backed_duration",
580			"Time between parachain blocks getting backed by the relaychain",
581		))?;
582		prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?;
583
584		let unincluded_segment_size = Histogram::with_opts(
585			HistogramOpts::new(
586				"parachain_unincluded_segment_size",
587				"Number of blocks between best block and last included block",
588			)
589			.buckets((0..=24).into_iter().map(|i| i as f64).collect()),
590		)?;
591		prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?;
592
593		Ok(Self {
594			parachain_block_backed_duration: parachain_block_authorship_duration,
595			unincluded_segment_size,
596		})
597	}
598}
599
600/// Implementation of [`TracingExecuteBlock`] for parachains.
601///
602/// Ensures that all the required extensions required by parachain runtimes are registered and
603/// available.
604pub struct ParachainTracingExecuteBlock<Client> {
605	client: Arc<Client>,
606}
607
608impl<Client> ParachainTracingExecuteBlock<Client> {
609	/// Creates a new instance of `self`.
610	pub fn new(client: Arc<Client>) -> Self {
611		Self { client }
612	}
613}
614
615impl<Block, Client> TracingExecuteBlock<Block> for ParachainTracingExecuteBlock<Client>
616where
617	Block: BlockT,
618	Client: ProvideRuntimeApi<Block> + Send + Sync,
619	Client::Api: Core<Block>,
620{
621	fn execute_block(&self, _: Block::Hash, block: Block) -> sp_blockchain::Result<()> {
622		let mut runtime_api = self.client.runtime_api();
623		let storage_proof_recorder = ProofRecorder::<Block>::default();
624		runtime_api.register_extension(ProofSizeExt::new(storage_proof_recorder.clone()));
625		runtime_api.record_proof_with_recorder(storage_proof_recorder);
626
627		runtime_api
628			.execute_block(*block.header().parent_hash(), block.into())
629			.map_err(Into::into)
630	}
631}