cumulus_client_network/
lib.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Cumulus.
3
4// Cumulus is free software: you can redistribute it and/or modify
5// it under the terms of the GNU General Public License as published by
6// the Free Software Foundation, either version 3 of the License, or
7// (at your option) any later version.
8
9// Cumulus is distributed in the hope that it will be useful,
10// but WITHOUT ANY WARRANTY; without even the implied warranty of
11// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
12// GNU General Public License for more details.
13
14// You should have received a copy of the GNU General Public License
15// along with Cumulus.  If not, see <http://www.gnu.org/licenses/>.
16
17//! Parachain specific networking
18//!
19//! Provides a custom block announcement implementation for parachains
20//! that use the relay chain provided consensus. See [`RequireSecondedInBlockAnnounce`]
21//! and [`WaitToAnnounce`] for more information about this implementation.
22
23use sp_api::RuntimeApiInfo;
24use sp_consensus::block_validation::{
25	BlockAnnounceValidator as BlockAnnounceValidatorT, Validation,
26};
27use sp_core::traits::SpawnNamed;
28use sp_runtime::traits::{Block as BlockT, Header as HeaderT};
29
30use cumulus_relay_chain_interface::RelayChainInterface;
31use polkadot_node_primitives::{CollationSecondedSignal, Statement};
32use polkadot_node_subsystem::messages::RuntimeApiRequest;
33use polkadot_parachain_primitives::primitives::HeadData;
34use polkadot_primitives::{
35	vstaging::CandidateReceiptV2 as CandidateReceipt, CompactStatement, Hash as PHash,
36	Id as ParaId, OccupiedCoreAssumption, SigningContext, UncheckedSigned,
37};
38
39use codec::{Decode, DecodeAll, Encode};
40use futures::{channel::oneshot, future::FutureExt, Future};
41use std::{fmt, marker::PhantomData, pin::Pin, sync::Arc};
42
43#[cfg(test)]
44mod tests;
45
46const LOG_TARGET: &str = "sync::cumulus";
47
48type BoxedError = Box<dyn std::error::Error + Send>;
49
50#[derive(Debug)]
51struct BlockAnnounceError(String);
52impl std::error::Error for BlockAnnounceError {}
53
54impl fmt::Display for BlockAnnounceError {
55	fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
56		self.0.fmt(f)
57	}
58}
59
60/// The data that we attach to a block announcement.
61///
62/// This will be used to prove that a header belongs to a block that is probably being backed by
63/// the relay chain.
64#[derive(Encode, Debug)]
65pub struct BlockAnnounceData {
66	/// The receipt identifying the candidate.
67	receipt: CandidateReceipt,
68	/// The seconded statement issued by a relay chain validator that approves the candidate.
69	statement: UncheckedSigned<CompactStatement>,
70	/// The relay parent that was used as context to sign the [`Self::statement`].
71	relay_parent: PHash,
72}
73
74impl Decode for BlockAnnounceData {
75	fn decode<I: codec::Input>(input: &mut I) -> Result<Self, codec::Error> {
76		let receipt = CandidateReceipt::decode(input)?;
77		let statement = UncheckedSigned::<CompactStatement>::decode(input)?;
78
79		let relay_parent = match PHash::decode(input) {
80			Ok(p) => p,
81			// For being backwards compatible, we support missing relay-chain parent.
82			Err(_) => receipt.descriptor.relay_parent(),
83		};
84
85		Ok(Self { receipt, statement, relay_parent })
86	}
87}
88
89impl BlockAnnounceData {
90	/// Validate that the receipt, statement and announced header match.
91	///
92	/// This will not check the signature, for this you should use
93	/// [`BlockAnnounceData::check_signature`].
94	fn validate(&self, encoded_header: Vec<u8>) -> Result<(), Validation> {
95		let candidate_hash =
96			if let CompactStatement::Seconded(h) = self.statement.unchecked_payload() {
97				h
98			} else {
99				tracing::debug!(target: LOG_TARGET, "`CompactStatement` isn't the candidate variant!",);
100				return Err(Validation::Failure { disconnect: true })
101			};
102
103		if *candidate_hash != self.receipt.hash() {
104			tracing::debug!(
105				target: LOG_TARGET,
106				"Receipt candidate hash doesn't match candidate hash in statement",
107			);
108			return Err(Validation::Failure { disconnect: true })
109		}
110
111		if HeadData(encoded_header).hash() != self.receipt.descriptor.para_head() {
112			tracing::debug!(
113				target: LOG_TARGET,
114				"Receipt para head hash doesn't match the hash of the header in the block announcement",
115			);
116			return Err(Validation::Failure { disconnect: true })
117		}
118
119		Ok(())
120	}
121
122	/// Check the signature of the statement.
123	///
124	/// Returns an `Err(_)` if it failed.
125	async fn check_signature<RCInterface>(
126		self,
127		relay_chain_client: &RCInterface,
128	) -> Result<Validation, BlockAnnounceError>
129	where
130		RCInterface: RelayChainInterface + 'static,
131	{
132		let validator_index = self.statement.unchecked_validator_index();
133
134		let session_index =
135			match relay_chain_client.session_index_for_child(self.relay_parent).await {
136				Ok(r) => r,
137				Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
138			};
139
140		let signing_context = SigningContext { parent_hash: self.relay_parent, session_index };
141
142		// Check that the signer is a legit validator.
143		let authorities = match relay_chain_client.validators(self.relay_parent).await {
144			Ok(r) => r,
145			Err(e) => return Err(BlockAnnounceError(format!("{:?}", e))),
146		};
147		let signer = match authorities.get(validator_index.0 as usize) {
148			Some(r) => r,
149			None => {
150				tracing::debug!(
151					target: LOG_TARGET,
152					"Block announcement justification signer is a validator index out of bound",
153				);
154
155				return Ok(Validation::Failure { disconnect: true })
156			},
157		};
158
159		// Check statement is correctly signed.
160		if self.statement.try_into_checked(&signing_context, signer).is_err() {
161			tracing::debug!(
162				target: LOG_TARGET,
163				"Block announcement justification signature is invalid.",
164			);
165
166			return Ok(Validation::Failure { disconnect: true })
167		}
168
169		Ok(Validation::Success { is_new_best: true })
170	}
171}
172
173impl TryFrom<&'_ CollationSecondedSignal> for BlockAnnounceData {
174	type Error = ();
175
176	fn try_from(signal: &CollationSecondedSignal) -> Result<BlockAnnounceData, ()> {
177		let receipt = if let Statement::Seconded(receipt) = signal.statement.payload() {
178			receipt.to_plain()
179		} else {
180			return Err(())
181		};
182
183		Ok(BlockAnnounceData {
184			receipt,
185			statement: signal.statement.convert_payload().into(),
186			relay_parent: signal.relay_parent,
187		})
188	}
189}
190
191/// A type alias for the [`RequireSecondedInBlockAnnounce`] validator.
192#[deprecated = "This has been renamed to RequireSecondedInBlockAnnounce"]
193pub type BlockAnnounceValidator<Block, RCInterface> =
194	RequireSecondedInBlockAnnounce<Block, RCInterface>;
195
196/// Parachain specific block announce validator.
197///
198/// This is not required when the collation mechanism itself is sybil-resistant, as it is a spam
199/// protection mechanism used to prevent nodes from dealing with unbounded numbers of blocks. For
200/// sybil-resistant collation mechanisms, this will only slow things down.
201///
202/// This block announce validator is required if the parachain is running
203/// with the relay chain provided consensus to make sure each node only
204/// imports a reasonable number of blocks per round. The relay chain provided
205/// consensus doesn't have any authorities and so it could happen that without
206/// this special block announce validator a node would need to import *millions*
207/// of blocks per round, which is clearly not doable.
208///
209/// To solve this problem, each block announcement is delayed until a collator
210/// has received a [`Statement::Seconded`] for its `PoV`. This message tells the
211/// collator that its `PoV` was validated successfully by a parachain validator and
212/// that it is very likely that this `PoV` will be included in the relay chain. Every
213/// collator that doesn't receive the message for its `PoV` will not announce its block.
214/// For more information on the block announcement, see [`WaitToAnnounce`].
215///
216/// For each block announcement that is received, the generic block announcement validation
217/// will call this validator and provides the extra data that was attached to the announcement.
218/// We call this extra data `justification`.
219/// It is expected that the attached data is a SCALE encoded [`BlockAnnounceData`]. The
220/// statement is checked to be a [`CompactStatement::Seconded`] and that it is signed by an active
221/// parachain validator.
222///
223/// If no justification was provided we check if the block announcement is at the tip of the known
224/// chain. If it is at the tip, it is required to provide a justification or otherwise we reject
225/// it. However, if the announcement is for a block below the tip the announcement is accepted
226/// as it probably comes from a node that is currently syncing the chain.
227#[derive(Clone)]
228pub struct RequireSecondedInBlockAnnounce<Block, RCInterface> {
229	phantom: PhantomData<Block>,
230	relay_chain_interface: RCInterface,
231	para_id: ParaId,
232}
233
234impl<Block, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
235where
236	RCInterface: Clone,
237{
238	/// Create a new [`RequireSecondedInBlockAnnounce`].
239	pub fn new(relay_chain_interface: RCInterface, para_id: ParaId) -> Self {
240		Self { phantom: Default::default(), relay_chain_interface, para_id }
241	}
242}
243
244impl<Block: BlockT, RCInterface> RequireSecondedInBlockAnnounce<Block, RCInterface>
245where
246	RCInterface: RelayChainInterface + Clone,
247{
248	/// Get the included block of the given parachain in the relay chain.
249	async fn included_block(
250		relay_chain_interface: &RCInterface,
251		hash: PHash,
252		para_id: ParaId,
253	) -> Result<Block::Header, BoxedError> {
254		let validation_data = relay_chain_interface
255			.persisted_validation_data(hash, para_id, OccupiedCoreAssumption::TimedOut)
256			.await
257			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?
258			.ok_or_else(|| {
259				Box::new(BlockAnnounceError("Could not find parachain head in relay chain".into()))
260					as Box<_>
261			})?;
262		let para_head =
263			Block::Header::decode(&mut &validation_data.parent_head.0[..]).map_err(|e| {
264				Box::new(BlockAnnounceError(format!("Failed to decode parachain head: {:?}", e)))
265					as Box<_>
266			})?;
267
268		Ok(para_head)
269	}
270
271	/// Get the backed block hashes of the given parachain in the relay chain.
272	async fn backed_block_hashes(
273		relay_chain_interface: &RCInterface,
274		hash: PHash,
275		para_id: ParaId,
276	) -> Result<impl Iterator<Item = PHash>, BoxedError> {
277		let runtime_api_version = relay_chain_interface
278			.version(hash)
279			.await
280			.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
281		let parachain_host_runtime_api_version =
282			runtime_api_version
283				.api_version(
284					&<dyn polkadot_primitives::runtime_api::ParachainHost<
285						polkadot_primitives::Block,
286					>>::ID,
287				)
288				.unwrap_or_default();
289
290		// If the relay chain runtime does not support the new runtime API, fallback to the
291		// deprecated one.
292		let candidate_receipts = if parachain_host_runtime_api_version <
293			RuntimeApiRequest::CANDIDATES_PENDING_AVAILABILITY_RUNTIME_REQUIREMENT
294		{
295			#[allow(deprecated)]
296			relay_chain_interface
297				.candidate_pending_availability(hash, para_id)
298				.await
299				.map(|c| c.into_iter().collect::<Vec<_>>())
300		} else {
301			relay_chain_interface.candidates_pending_availability(hash, para_id).await
302		}
303		.map_err(|e| Box::new(BlockAnnounceError(format!("{:?}", e))) as Box<_>)?;
304
305		Ok(candidate_receipts.into_iter().map(|cr| cr.descriptor.para_head()))
306	}
307
308	/// Handle a block announcement with empty data (no statement) attached to it.
309	async fn handle_empty_block_announce_data(
310		&self,
311		header: Block::Header,
312	) -> Result<Validation, BoxedError> {
313		let relay_chain_interface = self.relay_chain_interface.clone();
314		let para_id = self.para_id;
315
316		// Check if block is equal or higher than best (this requires a justification)
317		let relay_chain_best_hash = relay_chain_interface
318			.best_block_hash()
319			.await
320			.map_err(|e| Box::new(e) as Box<_>)?;
321		let block_number = header.number();
322
323		let best_head =
324			Self::included_block(&relay_chain_interface, relay_chain_best_hash, para_id).await?;
325		let known_best_number = best_head.number();
326
327		if best_head == header {
328			tracing::debug!(target: LOG_TARGET, "Announced block matches best block.",);
329
330			return Ok(Validation::Success { is_new_best: true })
331		}
332
333		let mut backed_blocks =
334			Self::backed_block_hashes(&relay_chain_interface, relay_chain_best_hash, para_id)
335				.await?;
336
337		let head_hash = HeadData(header.encode()).hash();
338
339		if backed_blocks.any(|block_hash| block_hash == head_hash) {
340			tracing::debug!(target: LOG_TARGET, "Announced block matches latest backed block.",);
341
342			Ok(Validation::Success { is_new_best: true })
343		} else if block_number >= known_best_number {
344			tracing::debug!(
345				target: LOG_TARGET,
346				"Validation failed because a justification is needed if the block at the top of the chain."
347			);
348
349			Ok(Validation::Failure { disconnect: false })
350		} else {
351			Ok(Validation::Success { is_new_best: false })
352		}
353	}
354}
355
356impl<Block: BlockT, RCInterface> BlockAnnounceValidatorT<Block>
357	for RequireSecondedInBlockAnnounce<Block, RCInterface>
358where
359	RCInterface: RelayChainInterface + Clone + 'static,
360{
361	fn validate(
362		&mut self,
363		header: &Block::Header,
364		data: &[u8],
365	) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
366		let relay_chain_interface = self.relay_chain_interface.clone();
367		let data = data.to_vec();
368		let header = header.clone();
369		let header_encoded = header.encode();
370		let block_announce_validator = self.clone();
371
372		async move {
373			let relay_chain_is_syncing = relay_chain_interface
374				.is_major_syncing()
375				.await
376				.map_err(
377					|e| tracing::error!(target: LOG_TARGET, "Unable to determine sync status. {}", e),
378				)
379				.unwrap_or(false);
380
381			if relay_chain_is_syncing {
382				return Ok(Validation::Success { is_new_best: false })
383			}
384
385			if data.is_empty() {
386				return block_announce_validator.handle_empty_block_announce_data(header).await
387			}
388
389			let block_announce_data = match BlockAnnounceData::decode_all(&mut data.as_slice()) {
390				Ok(r) => r,
391				Err(err) =>
392					return Err(Box::new(BlockAnnounceError(format!(
393						"Can not decode the `BlockAnnounceData`: {:?}",
394						err
395					))) as Box<_>),
396			};
397
398			if let Err(e) = block_announce_data.validate(header_encoded) {
399				return Ok(e)
400			}
401
402			let relay_parent = block_announce_data.receipt.descriptor.relay_parent();
403
404			relay_chain_interface
405				.wait_for_block(relay_parent)
406				.await
407				.map_err(|e| Box::new(BlockAnnounceError(e.to_string())) as Box<_>)?;
408
409			block_announce_data
410				.check_signature(&relay_chain_interface)
411				.await
412				.map_err(|e| Box::new(e) as Box<_>)
413		}
414		.boxed()
415	}
416}
417
418/// Wait before announcing a block that a candidate message has been received for this block, then
419/// add this message as justification for the block announcement.
420///
421/// This object will spawn a new task every time the method `wait_to_announce` is called and cancel
422/// the previous task running.
423pub struct WaitToAnnounce<Block: BlockT> {
424	spawner: Arc<dyn SpawnNamed + Send + Sync>,
425	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
426}
427
428impl<Block: BlockT> WaitToAnnounce<Block> {
429	/// Create the `WaitToAnnounce` object
430	pub fn new(
431		spawner: Arc<dyn SpawnNamed + Send + Sync>,
432		announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
433	) -> WaitToAnnounce<Block> {
434		WaitToAnnounce { spawner, announce_block }
435	}
436
437	/// Wait for a candidate message for the block, then announce the block. The candidate
438	/// message will be added as justification to the block announcement.
439	pub fn wait_to_announce(
440		&mut self,
441		block_hash: <Block as BlockT>::Hash,
442		signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
443	) {
444		let announce_block = self.announce_block.clone();
445
446		self.spawner.spawn(
447			"cumulus-wait-to-announce",
448			None,
449			async move {
450				tracing::debug!(
451					target: "cumulus-network",
452					"waiting for announce block in a background task...",
453				);
454
455				wait_to_announce::<Block>(block_hash, announce_block, signed_stmt_recv).await;
456
457				tracing::debug!(
458					target: "cumulus-network",
459					"block announcement finished",
460				);
461			}
462			.boxed(),
463		);
464	}
465}
466
467async fn wait_to_announce<Block: BlockT>(
468	block_hash: <Block as BlockT>::Hash,
469	announce_block: Arc<dyn Fn(Block::Hash, Option<Vec<u8>>) + Send + Sync>,
470	signed_stmt_recv: oneshot::Receiver<CollationSecondedSignal>,
471) {
472	let signal = match signed_stmt_recv.await {
473		Ok(s) => s,
474		Err(_) => {
475			tracing::debug!(
476				target: "cumulus-network",
477				block = ?block_hash,
478				"Wait to announce stopped, because sender was dropped.",
479			);
480			return
481		},
482	};
483
484	if let Ok(data) = BlockAnnounceData::try_from(&signal) {
485		announce_block(block_hash, Some(data.encode()));
486	} else {
487		tracing::debug!(
488			target: "cumulus-network",
489			?signal,
490			block = ?block_hash,
491			"Received invalid statement while waiting to announce block.",
492		);
493	}
494}
495
496/// A [`BlockAnnounceValidator`] which accepts all block announcements, as it assumes
497/// sybil resistance is handled elsewhere.
498#[derive(Debug, Clone)]
499pub struct AssumeSybilResistance(bool);
500
501impl AssumeSybilResistance {
502	/// Instantiate this block announcement validator while permissively allowing (but ignoring)
503	/// announcements which come tagged with seconded messages.
504	///
505	/// This is useful for backwards compatibility when upgrading nodes: old nodes will continue
506	/// to broadcast announcements with seconded messages, so these announcements shouldn't be
507	/// rejected and the peers not punished.
508	pub fn allow_seconded_messages() -> Self {
509		AssumeSybilResistance(true)
510	}
511
512	/// Instantiate this block announcement validator while rejecting announcements that come with
513	/// data.
514	pub fn reject_seconded_messages() -> Self {
515		AssumeSybilResistance(false)
516	}
517}
518
519impl<Block: BlockT> BlockAnnounceValidatorT<Block> for AssumeSybilResistance {
520	fn validate(
521		&mut self,
522		_header: &Block::Header,
523		data: &[u8],
524	) -> Pin<Box<dyn Future<Output = Result<Validation, BoxedError>> + Send>> {
525		let allow_seconded_messages = self.0;
526		let data = data.to_vec();
527
528		async move {
529			Ok(if data.is_empty() {
530				Validation::Success { is_new_best: false }
531			} else if !allow_seconded_messages {
532				Validation::Failure { disconnect: false }
533			} else {
534				match BlockAnnounceData::decode_all(&mut data.as_slice()) {
535					Ok(_) => Validation::Success { is_new_best: false },
536					Err(_) => Validation::Failure { disconnect: true },
537				}
538			})
539		}
540		.boxed()
541	}
542}