1use sp_api::RuntimeApiInfo;
25use sp_consensus::block_validation::{
26 BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
27};
28use sp_core::traits::SpawnNamed;
29use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
30
31use cumulus_relay_chain_interface::RelayChainInterface;
32use polkadot_node_primitives::{CollationSecondedSignal, Statement};
33use polkadot_node_subsystem::messages::RuntimeApiRequest;
34use polkadot_parachain_primitives::primitives::HeadData;
35use polkadot_primitives::{
36 CandidateReceiptV2 as CandidateReceipt, CompactStatement, Hash as PHash, Id as ParaId,
37 OccupiedCoreAssumption, SigningContext, UncheckedSigned,
38};
39
40use codec::{Decode, DecodeAll, Encode};
41use futures::{channel::oneshot, future::FutureExt, Future};
42use std::{fmt, marker::PhantomData, pin::Pin, sync::Arc};
43
44#[cfg(test)]
45mod tests;
46
47const LOG_TARGET: &str = "sync::cumulus";
48
49type BoxedError = Box<dyn std::error::Error + Send>;
50
51#[derive(Debug)]
52struct BlockAnnounceError(String);
53impl std::error::Error for BlockAnnounceError {}
54
55impl fmt::Display for BlockAnnounceError {
56 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
57 self.0.fmt(f)
58 }
59}
60
61#[derive(Encode, Debug)]
66pub struct BlockAnnounceData {
67 receipt: CandidateReceipt,
69 statement: UncheckedSigned<CompactStatement>,
71 relay_parent: PHash,
73}
74
75impl Decode for BlockAnnounceData {
76 fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
77 let receipt = CandidateReceipt::decode(input)?;
78 let statement = UncheckedSigned::<CompactStatement>::decode(input)?;
79
80 let relay_parent = match PHash::decode(input) {
81 Ok(p) => p,
82 Err(_) => receipt.descriptor.relay_parent(),
84 };
85
86 Ok(Self { receipt, statement, relay_parent })
87 }
88}
89
90impl BlockAnnounceData {
91 fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
96 let candidate_hash =
97 if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() {
98 h
99 } else {
100 tracing::debug!(target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!",);
101 return Err(Validation::Failure { disconnect: true });
102 };
103
104 if *candidate_hash != self.receipt.hash() {
105 tracing::debug!(
106 target: LOG_TARGET,
107 "Receipt candidate hash doesn't match candidate hash in statement",
108 );
109 return Err(Validation::Failure { disconnect: true });
110 }
111
112 if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head() {
113 tracing::debug!(
114 target: LOG_TARGET,
115 "Receipt para head hash doesn't match the hash of the header in the block announcement",
116 );
117 return Err(Validation::Failure { disconnect: true });
118 }
119
120 Ok(())
121 }
122
123 async fn check_signature<RCInterface>(
127 self,
128 relay_chain_client: &RCInterface,
129 ) -> Result<Validation, BlockAnnounceError>
130 where
131 RCInterface: RelayChainInterface + 'static,
132 {
133 let validator_index = self.statement.unchecked_validator_index();
134
135 let session_index =
136 match relay_chain_client.session_index_for_child(self.relay_parent).await {
137 Ok(r) => r,
138 Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
139 };
140
141 let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
142
143 let authorities = match relay_chain_client.validators(self.relay_parent).await {
145 Ok(r) => r,
146 Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
147 };
148 let signer = match authorities.get(validator_index.0 as usize) {
149 Some(r) => r,
150 None => {
151 tracing::debug!(
152 target: LOG_TARGET,
153 "Block announcement justification signer is a validator index out of bound",
154 );
155
156 return Ok(Validation::Failure { disconnect: true });
157 },
158 };
159
160 if self.statement.try_into_checked(&signing_context, signer).is_err() {
162 tracing::debug!(
163 target: LOG_TARGET,
164 "Block announcement justification signature is invalid.",
165 );
166
167 return Ok(Validation::Failure { disconnect: true });
168 }
169
170 Ok(Validation::Success { is_new_best: true })
171 }
172}
173
174impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
175 type Error = ();
176
177 fn try_from(signal: &CollationSecondedSignal) -> Result<BlockAnnounceData, ()> {
178 let receipt = if let Statement::Seconded(receipt) = signal.statement.payload() {
179 receipt.to_plain()
180 } else {
181 return Err(());
182 };
183
184 Ok(BlockAnnounceData {
185 receipt,
186 statement: signal.statement.convert_payload().into(),
187 relay_parent: signal.scheduling_parent,
188 })
189 }
190}
191
192#[deprecated = "This has been renamed to RequireSecondedInBlockAnnounce"]
194pub type BlockAnnounceValidator<Block, RCInterface> =
195 RequireSecondedInBlockAnnounce<Block, RCInterface>;
196
197#[derive(Clone)]
229pub struct RequireSecondedInBlockAnnounce<Block, RCInterface> {
230 phantom: PhantomData<Block>,
231 relay_chain_interface: RCInterface,
232 para_id: ParaId,
233}
234
235impl<Block, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
236where
237 RCInterface: Clone,
238{
239 pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
241 Self { phantom: Default::default(), relay_chain_interface, para_id }
242 }
243}
244
245impl<Block: BlockT, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
246where
247 RCInterface: RelayChainInterface + Clone,
248{
249 async fn included_block(
251 relay_chain_interface: &RCInterface,
252 hash: PHash,
253 para_id: ParaId,
254 ) -> Result<Block::Header, BoxedError> {
255 let validation_data = relay_chain_interface
256 .persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut)
257 .await
258 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
259 .ok_or_else(|| {
260 Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
261 as Box<_>
262 })?;
263 let para_head =
264 Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| {
265 Box::new(BlockAnnounceError(format!("Failed to decode parachain head: {:?}", e)))
266 as Box<_>
267 })?;
268
269 Ok(para_head)
270 }
271
272 async fn backed_block_hashes(
274 relay_chain_interface: &RCInterface,
275 hash: PHash,
276 para_id: ParaId,
277 ) -> Result<impl Iterator<Item = PHash>, BoxedError> {
278 let runtime_api_version = relay_chain_interface
279 .version(hash)
280 .await
281 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
282 let parachain_host_runtime_api_version =
283 runtime_api_version
284 .api_version(
285 &<dyn polkadot_primitives::runtime_api::ParachainHost<
286 polkadot_primitives::Block,
287 >>::ID,
288 )
289 .unwrap_or_default();
290
291 let candidate_receipts = if parachain_host_runtime_api_version <
294 RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
295 {
296 #[allow(deprecated)]
297 relay_chain_interface
298 .candidate_pending_availability(hash, para_id)
299 .await
300 .map(|c| c.into_iter().collect::<Vec<_>>())
301 } else {
302 relay_chain_interface.candidates_pending_availability(hash, para_id).await
303 }
304 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
305
306 Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head()))
307 }
308
309 async fn handle_empty_block_announce_data(
311 &self,
312 header: Block::Header,
313 ) -> Result<Validation, BoxedError> {
314 let relay_chain_interface = self.relay_chain_interface.clone();
315 let para_id = self.para_id;
316
317 let relay_chain_best_hash = relay_chain_interface
319 .best_block_hash()
320 .await
321 .map_err(|e| Box::new(e) as Box<_>)?;
322 let block_number = header.number();
323
324 let best_head =
325 Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
326 let known_best_number = best_head.number();
327
328 if best_head == header {
329 tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
330
331 return Ok(Validation::Success { is_new_best: true });
332 }
333
334 let mut backed_blocks =
335 Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
336 .await?;
337
338 let head_hash = HeadData(header.encode()).hash();
339
340 if backed_blocks.any(|block_hash| block_hash == head_hash) {
341 tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
342
343 Ok(Validation::Success { is_new_best: true })
344 } else if block_number >= known_best_number {
345 tracing::debug!(
346 target: LOG_TARGET,
347 "Validation failed because a justification is needed if the block at the top of the chain."
348 );
349
350 Ok(Validation::Failure { disconnect: false })
351 } else {
352 Ok(Validation::Success { is_new_best: false })
353 }
354 }
355}
356
357impl<Block: BlockT, RCInterface> BlockAnnounceValidatorT<Block>
358 for RequireSecondedInBlockAnnounce<Block, RCInterface>
359where
360 RCInterface: RelayChainInterface + Clone + 'static,
361{
362 fn validate(
363 &mut self,
364 header: &Block::Header,
365 data: &[u8],
366 ) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
367 let relay_chain_interface = self.relay_chain_interface.clone();
368 let data = data.to_vec();
369 let header = header.clone();
370 let header_encoded = header.encode();
371 let block_announce_validator = self.clone();
372
373 async move {
374 let relay_chain_is_syncing = relay_chain_interface
375 .is_major_syncing()
376 .await
377 .map_err(
378 |e| tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e),
379 )
380 .unwrap_or(false);
381
382 if relay_chain_is_syncing {
383 return Ok(Validation::Success { is_new_best: false });
384 }
385
386 if data.is_empty() {
387 return block_announce_validator.handle_empty_block_announce_data(header).await;
388 }
389
390 let block_announce_data = match BlockAnnounceData::decode_all(&mut data.as_slice()) {
391 Ok(r) => r,
392 Err(err) => {
393 return Err(Box::new(BlockAnnounceError(format!(
394 "Can not decode the `BlockAnnounceData`: {:?}",
395 err
396 ))) as Box<_>)
397 },
398 };
399
400 if let Err(e) = block_announce_data.validate(header_encoded) {
401 return Ok(e);
402 }
403
404 let relay_parent = block_announce_data.receipt.descriptor.relay_parent();
405
406 relay_chain_interface
407 .wait_for_block(relay_parent)
408 .await
409 .map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
410
411 block_announce_data
412 .check_signature(&relay_chain_interface)
413 .await
414 .map_err(|e| Box::new(e) as Box<_>)
415 }
416 .boxed()
417 }
418}
419
420pub struct WaitToAnnounce<Block: BlockT> {
426 spawner: Arc<dyn SpawnNamed + Send + Sync>,
427 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
428}
429
430impl<Block: BlockT> WaitToAnnounce<Block> {
431 pub fn new(
433 spawner: Arc<dyn SpawnNamed + Send + Sync>,
434 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
435 ) -> WaitToAnnounce<Block> {
436 WaitToAnnounce { spawner, announce_block }
437 }
438
439 pub fn wait_to_announce(
442 &mut self,
443 block_hash: <Block as BlockT>::Hash,
444 signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
445 ) {
446 let announce_block = self.announce_block.clone();
447
448 self.spawner.spawn(
449 "cumulus-wait-to-announce",
450 None,
451 async move {
452 tracing::debug!(
453 target: "cumulus-network",
454 "waiting for announce block in a background task...",
455 );
456
457 wait_to_announce::<Block>(block_hash, announce_block, signed_stmt_recv).await;
458
459 tracing::debug!(
460 target: "cumulus-network",
461 "block announcement finished",
462 );
463 }
464 .boxed(),
465 );
466 }
467}
468
469async fn wait_to_announce<Block: BlockT>(
470 block_hash: <Block as BlockT>::Hash,
471 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
472 signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
473) {
474 let signal = match signed_stmt_recv.await {
475 Ok(s) => s,
476 Err(_) => {
477 tracing::debug!(
478 target: "cumulus-network",
479 block = ?block_hash,
480 "Wait to announce stopped, because sender was dropped.",
481 );
482 return;
483 },
484 };
485
486 if let Ok(data) = BlockAnnounceData::try_from(&signal) {
487 announce_block(block_hash, Some(data.encode()));
488 } else {
489 tracing::debug!(
490 target: "cumulus-network",
491 ?signal,
492 block = ?block_hash,
493 "Received invalid statement while waiting to announce block.",
494 );
495 }
496}
497
498#[derive(Debug, Clone)]
501pub struct AssumeSybilResistance(bool);
502
503impl AssumeSybilResistance {
504 pub fn allow_seconded_messages() -> Self {
511 AssumeSybilResistance(true)
512 }
513
514 pub fn reject_seconded_messages() -> Self {
517 AssumeSybilResistance(false)
518 }
519}
520
521impl<Block: BlockT> BlockAnnounceValidatorT<Block> for AssumeSybilResistance {
522 fn validate(
523 &mut self,
524 _header: &Block::Header,
525 data: &[u8],
526 ) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
527 let allow_seconded_messages = self.0;
528 let data = data.to_vec();
529
530 async move {
531 Ok(if data.is_empty() {
532 Validation::Success { is_new_best: false }
533 } else if !allow_seconded_messages {
534 Validation::Failure { disconnect: false }
535 } else {
536 match BlockAnnounceData::decode_all(&mut data.as_slice()) {
537 Ok(_) => Validation::Success { is_new_best: false },
538 Err(_) => Validation::Failure { disconnect: true },
539 }
540 })
541 }
542 .boxed()
543 }
544}