1use cumulus_client_cli::CollatorOptions;
23use cumulus_client_network::{AssumeSybilResistance, RequireSecondedInBlockAnnounce};
24use cumulus_client_pov_recovery::{PoVRecovery, RecoveryDelayRange, RecoveryHandle};
25use cumulus_primitives_core::{CollectCollationInfo, ParaId};
26pub use cumulus_primitives_proof_size_hostfunction::storage_proof_size;
27use cumulus_relay_chain_inprocess_interface::build_inprocess_relay_chain;
28use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
29use cumulus_relay_chain_minimal_node::build_minimal_relay_chain_node_with_rpc;
30use futures::{channel::mpsc, StreamExt};
31use polkadot_primitives::{CandidateEvent, CollatorPair, OccupiedCoreAssumption};
32use prometheus::{Histogram, HistogramOpts, Registry};
33use sc_client_api::{
34 Backend as BackendT, BlockBackend, BlockchainEvents, Finalizer, ProofProvider, UsageProvider,
35};
36use sc_consensus::{
37 import_queue::{ImportQueue, ImportQueueService},
38 BlockImport,
39};
40use sc_network::{
41 config::SyncMode, request_responses::IncomingRequest, service::traits::NetworkService,
42 NetworkBackend,
43};
44use sc_network_sync::SyncingService;
45use sc_network_transactions::TransactionsHandlerController;
46use sc_service::{Configuration, SpawnTaskHandle, TaskManager, WarpSyncConfig};
47use sc_telemetry::{log, TelemetryWorkerHandle};
48use sc_tracing::block::TracingExecuteBlock;
49use sc_utils::mpsc::TracingUnboundedSender;
50use sp_api::{ApiExt, Core, ProofRecorder, ProvideRuntimeApi};
51use sp_blockchain::{HeaderBackend, HeaderMetadata};
52use sp_core::Decode;
53use sp_runtime::{
54 traits::{Block as BlockT, BlockIdTo, Header},
55 SaturatedConversion, Saturating,
56};
57use sp_trie::proof_size_extension::ProofSizeExt;
58use std::{
59 sync::Arc,
60 time::{Duration, Instant},
61};
62
63pub type ParachainHostFunctions = (
68 cumulus_primitives_proof_size_hostfunction::storage_proof_size::HostFunctions,
69 sp_io::SubstrateHostFunctions,
70);
71
72const RECOVERY_CHAN_SIZE: usize = 8;
76const LOG_TARGET_SYNC: &str = "sync::cumulus";
77
78pub enum DARecoveryProfile {
81 Collator,
83 FullNode,
86 Other(RecoveryDelayRange),
88}
89
90pub struct StartRelayChainTasksParams<'a, Block: BlockT, Client, RCInterface> {
92 pub client: Arc<Client>,
93 pub announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
94 pub para_id: ParaId,
95 pub relay_chain_interface: RCInterface,
96 pub task_manager: &'a mut TaskManager,
97 pub da_recovery_profile: DARecoveryProfile,
98 pub import_queue: Box<dyn ImportQueueService<Block>>,
99 pub relay_chain_slot_duration: Duration,
100 pub recovery_handle: Box<dyn RecoveryHandle>,
101 pub sync_service: Arc<SyncingService<Block>>,
102 pub prometheus_registry: Option<&'a Registry>,
103}
104
105pub fn start_relay_chain_tasks<Block, Client, Backend, RCInterface>(
115 StartRelayChainTasksParams {
116 client,
117 announce_block,
118 para_id,
119 task_manager,
120 da_recovery_profile,
121 relay_chain_interface,
122 import_queue,
123 relay_chain_slot_duration,
124 recovery_handle,
125 sync_service,
126 prometheus_registry,
127 }: StartRelayChainTasksParams<Block, Client, RCInterface>,
128) -> sc_service::error::Result<()>
129where
130 Block: BlockT,
131 Client: Finalizer<Block, Backend>
132 + UsageProvider<Block>
133 + HeaderBackend<Block>
134 + Send
135 + Sync
136 + BlockBackend<Block>
137 + BlockchainEvents<Block>
138 + 'static,
139 for<'a> &'a Client: BlockImport<Block>,
140 Backend: BackendT<Block> + 'static,
141 RCInterface: RelayChainInterface + Clone + 'static,
142{
143 let (recovery_chan_tx, recovery_chan_rx) = mpsc::channel(RECOVERY_CHAN_SIZE);
144
145 cumulus_client_consensus_common::spawn_parachain_consensus_tasks(
146 para_id,
147 client.clone(),
148 relay_chain_interface.clone(),
149 announce_block.clone(),
150 Some(recovery_chan_tx),
151 task_manager.spawn_essential_handle(),
152 );
153
154 let da_recovery_profile = match da_recovery_profile {
155 DARecoveryProfile::Collator => {
156 RecoveryDelayRange {
160 min: relay_chain_slot_duration / 2,
161 max: relay_chain_slot_duration,
162 }
163 },
164 DARecoveryProfile::FullNode => {
165 RecoveryDelayRange {
171 min: relay_chain_slot_duration * 25,
172 max: relay_chain_slot_duration * 50,
173 }
174 },
175 DARecoveryProfile::Other(profile) => profile,
176 };
177
178 let pov_recovery = PoVRecovery::new(
179 recovery_handle,
180 da_recovery_profile,
181 client.clone(),
182 import_queue,
183 relay_chain_interface.clone(),
184 para_id,
185 recovery_chan_rx,
186 sync_service.clone(),
187 );
188
189 task_manager
190 .spawn_essential_handle()
191 .spawn("cumulus-pov-recovery", None, pov_recovery.run());
192
193 let parachain_informant = parachain_informant::<Block, _>(
194 para_id,
195 relay_chain_interface.clone(),
196 client.clone(),
197 prometheus_registry.map(ParachainInformantMetrics::new).transpose()?,
198 );
199 task_manager
200 .spawn_handle()
201 .spawn("parachain-informant", None, parachain_informant);
202
203 Ok(())
204}
205
206pub fn prepare_node_config(mut parachain_config: Configuration) -> Configuration {
213 parachain_config.announce_block = false;
214 parachain_config.network.min_peers_to_start_warp_sync = Some(1);
217
218 parachain_config
219}
220
221pub async fn build_relay_chain_interface(
225 relay_chain_config: Configuration,
226 parachain_config: &Configuration,
227 telemetry_worker_handle: Option<TelemetryWorkerHandle>,
228 task_manager: &mut TaskManager,
229 collator_options: CollatorOptions,
230 hwbench: Option<sc_sysinfo::HwBench>,
231) -> RelayChainResult<(
232 Arc<dyn RelayChainInterface + 'static>,
233 Option<CollatorPair>,
234 Arc<dyn NetworkService>,
235 async_channel::Receiver<IncomingRequest>,
236)> {
237 match collator_options.relay_chain_mode {
238 cumulus_client_cli::RelayChainMode::Embedded => build_inprocess_relay_chain(
239 relay_chain_config,
240 parachain_config,
241 telemetry_worker_handle,
242 task_manager,
243 hwbench,
244 ),
245 cumulus_client_cli::RelayChainMode::ExternalRpc(rpc_target_urls) =>
246 build_minimal_relay_chain_node_with_rpc(
247 relay_chain_config,
248 parachain_config.prometheus_registry(),
249 task_manager,
250 rpc_target_urls,
251 )
252 .await,
253 }
254}
255
256pub enum CollatorSybilResistance {
259 Resistant,
263 Unresistant,
268}
269
270pub struct BuildNetworkParams<
272 'a,
273 Block: BlockT,
274 Client: ProvideRuntimeApi<Block>
275 + BlockBackend<Block>
276 + HeaderMetadata<Block, Error = sp_blockchain::Error>
277 + HeaderBackend<Block>
278 + BlockIdTo<Block>
279 + 'static,
280 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
281 RCInterface,
282 IQ,
283> where
284 Client::Api: sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
285{
286 pub parachain_config: &'a Configuration,
287 pub net_config:
288 sc_network::config::FullNetworkConfiguration<Block, <Block as BlockT>::Hash, Network>,
289 pub client: Arc<Client>,
290 pub transaction_pool: Arc<sc_transaction_pool::TransactionPoolHandle<Block, Client>>,
291 pub para_id: ParaId,
292 pub relay_chain_interface: RCInterface,
293 pub spawn_handle: SpawnTaskHandle,
294 pub import_queue: IQ,
295 pub sybil_resistance_level: CollatorSybilResistance,
296 pub metrics: sc_network::NotificationMetrics,
297}
298
299pub async fn build_network<'a, Block, Client, RCInterface, IQ, Network>(
301 BuildNetworkParams {
302 parachain_config,
303 net_config,
304 client,
305 transaction_pool,
306 para_id,
307 spawn_handle,
308 relay_chain_interface,
309 import_queue,
310 sybil_resistance_level,
311 metrics,
312 }: BuildNetworkParams<'a, Block, Client, Network, RCInterface, IQ>,
313) -> sc_service::error::Result<(
314 Arc<dyn NetworkService>,
315 TracingUnboundedSender<sc_rpc::system::Request<Block>>,
316 TransactionsHandlerController<Block::Hash>,
317 Arc<SyncingService<Block>>,
318)>
319where
320 Block: BlockT,
321 Client: UsageProvider<Block>
322 + HeaderBackend<Block>
323 + sp_consensus::block_validation::Chain<Block>
324 + Send
325 + Sync
326 + BlockBackend<Block>
327 + BlockchainEvents<Block>
328 + ProvideRuntimeApi<Block>
329 + HeaderMetadata<Block, Error = sp_blockchain::Error>
330 + BlockIdTo<Block, Error = sp_blockchain::Error>
331 + ProofProvider<Block>
332 + 'static,
333 Client::Api: CollectCollationInfo<Block>
334 + sp_transaction_pool::runtime_api::TaggedTransactionQueue<Block>,
335 for<'b> &'b Client: BlockImport<Block>,
336 RCInterface: RelayChainInterface + Clone + 'static,
337 IQ: ImportQueue<Block> + 'static,
338 Network: NetworkBackend<Block, <Block as BlockT>::Hash>,
339{
340 let warp_sync_config = match parachain_config.network.sync_mode {
341 SyncMode::Warp => {
342 log::debug!(target: LOG_TARGET_SYNC, "waiting for announce block...");
343
344 let target_block =
345 wait_for_finalized_para_head::<Block, _>(para_id, relay_chain_interface.clone())
346 .await
347 .inspect_err(|e| {
348 log::error!(
349 target: LOG_TARGET_SYNC,
350 "Unable to determine parachain target block {:?}",
351 e
352 );
353 })?;
354 Some(WarpSyncConfig::WithTarget(target_block))
355 },
356 _ => None,
357 };
358
359 let block_announce_validator = match sybil_resistance_level {
360 CollatorSybilResistance::Resistant => {
361 let block_announce_validator = AssumeSybilResistance::allow_seconded_messages();
362 Box::new(block_announce_validator) as Box<_>
363 },
364 CollatorSybilResistance::Unresistant => {
365 let block_announce_validator =
366 RequireSecondedInBlockAnnounce::new(relay_chain_interface, para_id);
367 Box::new(block_announce_validator) as Box<_>
368 },
369 };
370
371 sc_service::build_network(sc_service::BuildNetworkParams {
372 config: parachain_config,
373 net_config,
374 client,
375 transaction_pool,
376 spawn_handle,
377 import_queue,
378 block_announce_validator_builder: Some(Box::new(move |_| block_announce_validator)),
379 warp_sync_config,
380 block_relay: None,
381 metrics,
382 })
383}
384
385async fn wait_for_finalized_para_head<B, RCInterface>(
388 para_id: ParaId,
389 relay_chain_interface: RCInterface,
390) -> sc_service::error::Result<<B as BlockT>::Header>
391where
392 B: BlockT + 'static,
393 RCInterface: RelayChainInterface + Send + 'static,
394{
395 let mut imported_blocks = relay_chain_interface
396 .import_notification_stream()
397 .await
398 .map_err(|error| {
399 sc_service::Error::Other(format!(
400 "Relay chain import notification stream error when waiting for parachain head: \
401 {error}"
402 ))
403 })?
404 .fuse();
405 while imported_blocks.next().await.is_some() {
406 let is_syncing = relay_chain_interface
407 .is_major_syncing()
408 .await
409 .map_err(|e| format!("Unable to determine sync status: {e}"))?;
410
411 if !is_syncing {
412 let relay_chain_best_hash = relay_chain_interface
413 .finalized_block_hash()
414 .await
415 .map_err(|e| Box::new(e) as Box<_>)?;
416
417 let validation_data = relay_chain_interface
418 .persisted_validation_data(
419 relay_chain_best_hash,
420 para_id,
421 OccupiedCoreAssumption::TimedOut,
422 )
423 .await
424 .map_err(|e| format!("{e:?}"))?
425 .ok_or("Could not find parachain head in relay chain")?;
426
427 let finalized_header = B::Header::decode(&mut &validation_data.parent_head.0[..])
428 .map_err(|e| format!("Failed to decode parachain head: {e}"))?;
429
430 log::info!(
431 "🎉 Received target parachain header #{} ({}) from the relay chain.",
432 finalized_header.number(),
433 finalized_header.hash()
434 );
435 return Ok(finalized_header)
436 }
437 }
438
439 Err("Stopping following imported blocks. Could not determine parachain target block".into())
440}
441
442async fn parachain_informant<Block: BlockT, Client>(
444 para_id: ParaId,
445 relay_chain_interface: impl RelayChainInterface + Clone,
446 client: Arc<Client>,
447 metrics: Option<ParachainInformantMetrics>,
448) where
449 Client: HeaderBackend<Block> + Send + Sync + 'static,
450{
451 let mut import_notifications = match relay_chain_interface.import_notification_stream().await {
452 Ok(import_notifications) => import_notifications,
453 Err(e) => {
454 log::error!("Failed to get import notification stream: {e:?}. Parachain informant will not run!");
455 return
456 },
457 };
458 let mut last_backed_block_time: Option<Instant> = None;
459 while let Some(n) = import_notifications.next().await {
460 let candidate_events = match relay_chain_interface.candidate_events(n.hash()).await {
461 Ok(candidate_events) => candidate_events,
462 Err(e) => {
463 log::warn!("Failed to get candidate events for block {}: {e:?}", n.hash());
464 continue
465 },
466 };
467 let mut backed_candidates = Vec::new();
468 let mut included_candidates = Vec::new();
469 let mut timed_out_candidates = Vec::new();
470 for event in candidate_events {
471 match event {
472 CandidateEvent::CandidateBacked(receipt, head, _, _) => {
473 if receipt.descriptor.para_id() != para_id {
474 continue;
475 }
476 let backed_block = match Block::Header::decode(&mut &head.0[..]) {
477 Ok(header) => header,
478 Err(e) => {
479 log::warn!(
480 "Failed to decode parachain header from backed block: {e:?}"
481 );
482 continue
483 },
484 };
485 let backed_block_time = Instant::now();
486 if let Some(last_backed_block_time) = &last_backed_block_time {
487 let duration = backed_block_time.duration_since(*last_backed_block_time);
488 if let Some(metrics) = &metrics {
489 metrics.parachain_block_backed_duration.observe(duration.as_secs_f64());
490 }
491 }
492 last_backed_block_time = Some(backed_block_time);
493 backed_candidates.push(backed_block);
494 },
495 CandidateEvent::CandidateIncluded(receipt, head, _, _) => {
496 if receipt.descriptor.para_id() != para_id {
497 continue;
498 }
499 let included_block = match Block::Header::decode(&mut &head.0[..]) {
500 Ok(header) => header,
501 Err(e) => {
502 log::warn!(
503 "Failed to decode parachain header from included block: {e:?}"
504 );
505 continue
506 },
507 };
508 let unincluded_segment_size =
509 client.info().best_number.saturating_sub(*included_block.number());
510 let unincluded_segment_size: u32 = unincluded_segment_size.saturated_into();
511 if let Some(metrics) = &metrics {
512 metrics.unincluded_segment_size.observe(unincluded_segment_size.into());
513 }
514 included_candidates.push(included_block);
515 },
516 CandidateEvent::CandidateTimedOut(receipt, head, _) => {
517 if receipt.descriptor.para_id() != para_id {
518 continue;
519 }
520 let timed_out_block = match Block::Header::decode(&mut &head.0[..]) {
521 Ok(header) => header,
522 Err(e) => {
523 log::warn!(
524 "Failed to decode parachain header from timed out block: {e:?}"
525 );
526 continue
527 },
528 };
529 timed_out_candidates.push(timed_out_block);
530 },
531 }
532 }
533 let mut log_parts = Vec::new();
534 if !backed_candidates.is_empty() {
535 let backed_candidates = backed_candidates
536 .into_iter()
537 .map(|c| format!("#{} ({})", c.number(), c.hash()))
538 .collect::<Vec<_>>()
539 .join(", ");
540 log_parts.push(format!("backed: {}", backed_candidates));
541 };
542 if !included_candidates.is_empty() {
543 let included_candidates = included_candidates
544 .into_iter()
545 .map(|c| format!("#{} ({})", c.number(), c.hash()))
546 .collect::<Vec<_>>()
547 .join(", ");
548 log_parts.push(format!("included: {}", included_candidates));
549 };
550 if !timed_out_candidates.is_empty() {
551 let timed_out_candidates = timed_out_candidates
552 .into_iter()
553 .map(|c| format!("#{} ({})", c.number(), c.hash()))
554 .collect::<Vec<_>>()
555 .join(", ");
556 log_parts.push(format!("timed out: {}", timed_out_candidates));
557 };
558 if !log_parts.is_empty() {
559 log::info!(
560 "Update at relay chain block #{} ({}) - {}",
561 n.number(),
562 n.hash(),
563 log_parts.join(", ")
564 );
565 }
566 }
567}
568
569struct ParachainInformantMetrics {
570 parachain_block_backed_duration: Histogram,
572 unincluded_segment_size: Histogram,
574}
575
576impl ParachainInformantMetrics {
577 fn new(prometheus_registry: &Registry) -> prometheus::Result<Self> {
578 let parachain_block_authorship_duration = Histogram::with_opts(HistogramOpts::new(
579 "parachain_block_backed_duration",
580 "Time between parachain blocks getting backed by the relaychain",
581 ))?;
582 prometheus_registry.register(Box::new(parachain_block_authorship_duration.clone()))?;
583
584 let unincluded_segment_size = Histogram::with_opts(
585 HistogramOpts::new(
586 "parachain_unincluded_segment_size",
587 "Number of blocks between best block and last included block",
588 )
589 .buckets((0..=24).into_iter().map(|i| i as f64).collect()),
590 )?;
591 prometheus_registry.register(Box::new(unincluded_segment_size.clone()))?;
592
593 Ok(Self {
594 parachain_block_backed_duration: parachain_block_authorship_duration,
595 unincluded_segment_size,
596 })
597 }
598}
599
600pub struct ParachainTracingExecuteBlock<Client> {
605 client: Arc<Client>,
606}
607
608impl<Client> ParachainTracingExecuteBlock<Client> {
609 pub fn new(client: Arc<Client>) -> Self {
611 Self { client }
612 }
613}
614
615impl<Block, Client> TracingExecuteBlock<Block> for ParachainTracingExecuteBlock<Client>
616where
617 Block: BlockT,
618 Client: ProvideRuntimeApi<Block> + Send + Sync,
619 Client::Api: Core<Block>,
620{
621 fn execute_block(&self, _: Block::Hash, block: Block) -> sp_blockchain::Result<()> {
622 let mut runtime_api = self.client.runtime_api();
623 let storage_proof_recorder = ProofRecorder::<Block>::default();
624 runtime_api.register_extension(ProofSizeExt::new(storage_proof_recorder.clone()));
625 runtime_api.record_proof_with_recorder(storage_proof_recorder);
626
627 runtime_api
628 .execute_block(*block.header().parent_hash(), block.into())
629 .map_err(Into::into)
630 }
631}