1use sp_api::RuntimeApiInfo;
24use sp_consensus::block_validation::{
25 BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
26};
27use sp_core::traits::SpawnNamed;
28use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
29
30use cumulus_relay_chain_interface::RelayChainInterface;
31use polkadot_node_primitives::{CollationSecondedSignal, Statement};
32use polkadot_node_subsystem::messages::RuntimeApiRequest;
33use polkadot_parachain_primitives::primitives::HeadData;
34use polkadot_primitives::{
35 vstaging::CandidateReceiptV2 as CandidateReceipt, CompactStatement, Hash as PHash,
36 Id as ParaId, OccupiedCoreAssumption, SigningContext, UncheckedSigned,
37};
38
39use codec::{Decode, DecodeAll, Encode};
40use futures::{channel::oneshot, future::FutureExt, Future};
41use std::{fmt, marker::PhantomData, pin::Pin, sync::Arc};
42
43#[cfg(test)]
44mod tests;
45
46const LOG_TARGET: &str = "sync::cumulus";
47
48type BoxedError = Box<dyn std::error::Error + Send>;
49
50#[derive(Debug)]
51struct BlockAnnounceError(String);
52impl std::error::Error for BlockAnnounceError {}
53
54impl fmt::Display for BlockAnnounceError {
55 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56 self.0.fmt(f)
57 }
58}
59
60#[derive(Encode, Debug)]
65pub struct BlockAnnounceData {
66 receipt: CandidateReceipt,
68 statement: UncheckedSigned<CompactStatement>,
70 relay_parent: PHash,
72}
73
74impl Decode for BlockAnnounceData {
75 fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
76 let receipt = CandidateReceipt::decode(input)?;
77 let statement = UncheckedSigned::<CompactStatement>::decode(input)?;
78
79 let relay_parent = match PHash::decode(input) {
80 Ok(p) => p,
81 Err(_) => receipt.descriptor.relay_parent(),
83 };
84
85 Ok(Self { receipt, statement, relay_parent })
86 }
87}
88
89impl BlockAnnounceData {
90 fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
95 let candidate_hash =
96 if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() {
97 h
98 } else {
99 tracing::debug!(target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!",);
100 return Err(Validation::Failure { disconnect: true })
101 };
102
103 if *candidate_hash != self.receipt.hash() {
104 tracing::debug!(
105 target: LOG_TARGET,
106 "Receipt candidate hash doesn't match candidate hash in statement",
107 );
108 return Err(Validation::Failure { disconnect: true })
109 }
110
111 if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head() {
112 tracing::debug!(
113 target: LOG_TARGET,
114 "Receipt para head hash doesn't match the hash of the header in the block announcement",
115 );
116 return Err(Validation::Failure { disconnect: true })
117 }
118
119 Ok(())
120 }
121
122 async fn check_signature<RCInterface>(
126 self,
127 relay_chain_client: &RCInterface,
128 ) -> Result<Validation, BlockAnnounceError>
129 where
130 RCInterface: RelayChainInterface + 'static,
131 {
132 let validator_index = self.statement.unchecked_validator_index();
133
134 let session_index =
135 match relay_chain_client.session_index_for_child(self.relay_parent).await {
136 Ok(r) => r,
137 Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
138 };
139
140 let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
141
142 let authorities = match relay_chain_client.validators(self.relay_parent).await {
144 Ok(r) => r,
145 Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
146 };
147 let signer = match authorities.get(validator_index.0 as usize) {
148 Some(r) => r,
149 None => {
150 tracing::debug!(
151 target: LOG_TARGET,
152 "Block announcement justification signer is a validator index out of bound",
153 );
154
155 return Ok(Validation::Failure { disconnect: true })
156 },
157 };
158
159 if self.statement.try_into_checked(&signing_context, signer).is_err() {
161 tracing::debug!(
162 target: LOG_TARGET,
163 "Block announcement justification signature is invalid.",
164 );
165
166 return Ok(Validation::Failure { disconnect: true })
167 }
168
169 Ok(Validation::Success { is_new_best: true })
170 }
171}
172
173impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
174 type Error = ();
175
176 fn try_from(signal: &CollationSecondedSignal) -> Result<BlockAnnounceData, ()> {
177 let receipt = if let Statement::Seconded(receipt) = signal.statement.payload() {
178 receipt.to_plain()
179 } else {
180 return Err(())
181 };
182
183 Ok(BlockAnnounceData {
184 receipt,
185 statement: signal.statement.convert_payload().into(),
186 relay_parent: signal.relay_parent,
187 })
188 }
189}
190
191#[deprecated = "This has been renamed to RequireSecondedInBlockAnnounce"]
193pub type BlockAnnounceValidator<Block, RCInterface> =
194 RequireSecondedInBlockAnnounce<Block, RCInterface>;
195
196#[derive(Clone)]
228pub struct RequireSecondedInBlockAnnounce<Block, RCInterface> {
229 phantom: PhantomData<Block>,
230 relay_chain_interface: RCInterface,
231 para_id: ParaId,
232}
233
234impl<Block, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
235where
236 RCInterface: Clone,
237{
238 pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
240 Self { phantom: Default::default(), relay_chain_interface, para_id }
241 }
242}
243
244impl<Block: BlockT, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
245where
246 RCInterface: RelayChainInterface + Clone,
247{
248 async fn included_block(
250 relay_chain_interface: &RCInterface,
251 hash: PHash,
252 para_id: ParaId,
253 ) -> Result<Block::Header, BoxedError> {
254 let validation_data = relay_chain_interface
255 .persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut)
256 .await
257 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
258 .ok_or_else(|| {
259 Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
260 as Box<_>
261 })?;
262 let para_head =
263 Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| {
264 Box::new(BlockAnnounceError(format!("Failed to decode parachain head: {:?}", e)))
265 as Box<_>
266 })?;
267
268 Ok(para_head)
269 }
270
271 async fn backed_block_hashes(
273 relay_chain_interface: &RCInterface,
274 hash: PHash,
275 para_id: ParaId,
276 ) -> Result<impl Iterator<Item = PHash>, BoxedError> {
277 let runtime_api_version = relay_chain_interface
278 .version(hash)
279 .await
280 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
281 let parachain_host_runtime_api_version =
282 runtime_api_version
283 .api_version(
284 &<dyn polkadot_primitives::runtime_api::ParachainHost<
285 polkadot_primitives::Block,
286 >>::ID,
287 )
288 .unwrap_or_default();
289
290 let candidate_receipts = if parachain_host_runtime_api_version <
293 RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
294 {
295 #[allow(deprecated)]
296 relay_chain_interface
297 .candidate_pending_availability(hash, para_id)
298 .await
299 .map(|c| c.into_iter().collect::<Vec<_>>())
300 } else {
301 relay_chain_interface.candidates_pending_availability(hash, para_id).await
302 }
303 .map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
304
305 Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head()))
306 }
307
308 async fn handle_empty_block_announce_data(
310 &self,
311 header: Block::Header,
312 ) -> Result<Validation, BoxedError> {
313 let relay_chain_interface = self.relay_chain_interface.clone();
314 let para_id = self.para_id;
315
316 let relay_chain_best_hash = relay_chain_interface
318 .best_block_hash()
319 .await
320 .map_err(|e| Box::new(e) as Box<_>)?;
321 let block_number = header.number();
322
323 let best_head =
324 Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
325 let known_best_number = best_head.number();
326
327 if best_head == header {
328 tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
329
330 return Ok(Validation::Success { is_new_best: true })
331 }
332
333 let mut backed_blocks =
334 Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
335 .await?;
336
337 let head_hash = HeadData(header.encode()).hash();
338
339 if backed_blocks.any(|block_hash| block_hash == head_hash) {
340 tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
341
342 Ok(Validation::Success { is_new_best: true })
343 } else if block_number >= known_best_number {
344 tracing::debug!(
345 target: LOG_TARGET,
346 "Validation failed because a justification is needed if the block at the top of the chain."
347 );
348
349 Ok(Validation::Failure { disconnect: false })
350 } else {
351 Ok(Validation::Success { is_new_best: false })
352 }
353 }
354}
355
356impl<Block: BlockT, RCInterface> BlockAnnounceValidatorT<Block>
357 for RequireSecondedInBlockAnnounce<Block, RCInterface>
358where
359 RCInterface: RelayChainInterface + Clone + 'static,
360{
361 fn validate(
362 &mut self,
363 header: &Block::Header,
364 data: &[u8],
365 ) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
366 let relay_chain_interface = self.relay_chain_interface.clone();
367 let data = data.to_vec();
368 let header = header.clone();
369 let header_encoded = header.encode();
370 let block_announce_validator = self.clone();
371
372 async move {
373 let relay_chain_is_syncing = relay_chain_interface
374 .is_major_syncing()
375 .await
376 .map_err(
377 |e| tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e),
378 )
379 .unwrap_or(false);
380
381 if relay_chain_is_syncing {
382 return Ok(Validation::Success { is_new_best: false })
383 }
384
385 if data.is_empty() {
386 return block_announce_validator.handle_empty_block_announce_data(header).await
387 }
388
389 let block_announce_data = match BlockAnnounceData::decode_all(&mut data.as_slice()) {
390 Ok(r) => r,
391 Err(err) =>
392 return Err(Box::new(BlockAnnounceError(format!(
393 "Can not decode the `BlockAnnounceData`: {:?}",
394 err
395 ))) as Box<_>),
396 };
397
398 if let Err(e) = block_announce_data.validate(header_encoded) {
399 return Ok(e)
400 }
401
402 let relay_parent = block_announce_data.receipt.descriptor.relay_parent();
403
404 relay_chain_interface
405 .wait_for_block(relay_parent)
406 .await
407 .map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
408
409 block_announce_data
410 .check_signature(&relay_chain_interface)
411 .await
412 .map_err(|e| Box::new(e) as Box<_>)
413 }
414 .boxed()
415 }
416}
417
418pub struct WaitToAnnounce<Block: BlockT> {
424 spawner: Arc<dyn SpawnNamed + Send + Sync>,
425 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
426}
427
428impl<Block: BlockT> WaitToAnnounce<Block> {
429 pub fn new(
431 spawner: Arc<dyn SpawnNamed + Send + Sync>,
432 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
433 ) -> WaitToAnnounce<Block> {
434 WaitToAnnounce { spawner, announce_block }
435 }
436
437 pub fn wait_to_announce(
440 &mut self,
441 block_hash: <Block as BlockT>::Hash,
442 signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
443 ) {
444 let announce_block = self.announce_block.clone();
445
446 self.spawner.spawn(
447 "cumulus-wait-to-announce",
448 None,
449 async move {
450 tracing::debug!(
451 target: "cumulus-network",
452 "waiting for announce block in a background task...",
453 );
454
455 wait_to_announce::<Block>(block_hash, announce_block, signed_stmt_recv).await;
456
457 tracing::debug!(
458 target: "cumulus-network",
459 "block announcement finished",
460 );
461 }
462 .boxed(),
463 );
464 }
465}
466
467async fn wait_to_announce<Block: BlockT>(
468 block_hash: <Block as BlockT>::Hash,
469 announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
470 signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
471) {
472 let signal = match signed_stmt_recv.await {
473 Ok(s) => s,
474 Err(_) => {
475 tracing::debug!(
476 target: "cumulus-network",
477 block = ?block_hash,
478 "Wait to announce stopped, because sender was dropped.",
479 );
480 return
481 },
482 };
483
484 if let Ok(data) = BlockAnnounceData::try_from(&signal) {
485 announce_block(block_hash, Some(data.encode()));
486 } else {
487 tracing::debug!(
488 target: "cumulus-network",
489 ?signal,
490 block = ?block_hash,
491 "Received invalid statement while waiting to announce block.",
492 );
493 }
494}
495
496#[derive(Debug, Clone)]
499pub struct AssumeSybilResistance(bool);
500
501impl AssumeSybilResistance {
502 pub fn allow_seconded_messages() -> Self {
509 AssumeSybilResistance(true)
510 }
511
512 pub fn reject_seconded_messages() -> Self {
515 AssumeSybilResistance(false)
516 }
517}
518
519impl<Block: BlockT> BlockAnnounceValidatorT<Block> for AssumeSybilResistance {
520 fn validate(
521 &mut self,
522 _header: &Block::Header,
523 data: &[u8],
524 ) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
525 let allow_seconded_messages = self.0;
526 let data = data.to_vec();
527
528 async move {
529 Ok(if data.is_empty() {
530 Validation::Success { is_new_best: false }
531 } else if !allow_seconded_messages {
532 Validation::Failure { disconnect: false }
533 } else {
534 match BlockAnnounceData::decode_all(&mut data.as_slice()) {
535 Ok(_) => Validation::Success { is_new_best: false },
536 Err(_) => Validation::Failure { disconnect: true },
537 }
538 })
539 }
540 .boxed()
541 }
542}