Skip to main content

sc_network_sync/
block_request_handler.rs

1// Copyright (C) Parity Technologies (UK) Ltd.
2// This file is part of Substrate.
3// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
4
5// Substrate is free software: you can redistribute it and/or modify
6// it under the terms of the GNU General Public License as published by
7// the Free Software Foundation, either version 3 of the License, or
8// (at your option) any later version.
9
10// Substrate is distributed in the hope that it will be useful,
11// but WITHOUT ANY WARRANTY; without even the implied warranty of
12// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13// GNU General Public License for more details.
14
15// You should have received a copy of the GNU General Public License
16// along with Substrate. If not, see <https://www.gnu.org/licenses/>.
17
18//! Helper for handling (i.e. answering) block requests from a remote peer via the
19//! `crate::request_responses::RequestResponsesBehaviour`.
20
21use crate::{
22	block_relay_protocol::{BlockDownloader, BlockRelayParams, BlockResponseError, BlockServer},
23	schema::v1::{
24		block_request::FromBlock as FromBlockSchema, BlockRequest as BlockRequestSchema,
25		BlockResponse as BlockResponseSchema, BlockResponse, Direction,
26	},
27	service::network::NetworkServiceHandle,
28	LOG_TARGET,
29};
30
31use codec::{Decode, DecodeAll, Encode};
32use futures::{channel::oneshot, stream::StreamExt};
33use log::debug;
34use prost::Message;
35use schnellru::{ByLength, LruMap};
36
37use sc_client_api::BlockBackend;
38use sc_network::{
39	config::ProtocolId,
40	request_responses::{IfDisconnected, IncomingRequest, OutgoingResponse, RequestFailure},
41	service::traits::RequestResponseConfig,
42	types::ProtocolName,
43	NetworkBackend, MAX_RESPONSE_SIZE,
44};
45use sc_network_common::sync::message::{BlockAttributes, BlockData, BlockRequest, FromBlock};
46use sc_network_types::PeerId;
47use sp_blockchain::HeaderBackend;
48use sp_runtime::{
49	generic::BlockId,
50	traits::{Block as BlockT, Header, One, Zero},
51};
52
53use std::{
54	cmp::min,
55	hash::{Hash, Hasher},
56	sync::Arc,
57	time::Duration,
58};
59
60/// Maximum blocks per response.
61pub(crate) const MAX_BLOCKS_IN_RESPONSE: usize = 128;
62
63const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
64
65mod rep {
66	use sc_network::ReputationChange as Rep;
67
68	/// Reputation change when a peer sent us the same request multiple times.
69	pub const SAME_REQUEST: Rep = Rep::new_fatal("Same block request multiple times");
70
71	/// Reputation change when a peer sent us the same "small" request multiple times.
72	pub const SAME_SMALL_REQUEST: Rep =
73		Rep::new(-(1 << 10), "same small block request multiple times");
74}
75
76/// Generates a `RequestResponseProtocolConfig` for the block request protocol,
77/// refusing incoming requests.
78pub fn generate_protocol_config<
79	Hash: AsRef<[u8]>,
80	B: BlockT,
81	N: NetworkBackend<B, <B as BlockT>::Hash>,
82>(
83	protocol_id: &ProtocolId,
84	genesis_hash: Hash,
85	fork_id: Option<&str>,
86	inbound_queue: async_channel::Sender<IncomingRequest>,
87) -> N::RequestResponseProtocolConfig {
88	N::request_response_config(
89		generate_protocol_name(genesis_hash, fork_id).into(),
90		std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
91		1024 * 1024,
92		MAX_RESPONSE_SIZE,
93		Duration::from_secs(20),
94		Some(inbound_queue),
95	)
96}
97
98/// Generate the block protocol name from the genesis hash and fork id.
99fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
100	let genesis_hash = genesis_hash.as_ref();
101	if let Some(fork_id) = fork_id {
102		format!("/{}/{}/sync/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
103	} else {
104		format!("/{}/sync/2", array_bytes::bytes2hex("", genesis_hash))
105	}
106}
107
108/// Generate the legacy block protocol name from chain specific protocol identifier.
109fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
110	format!("/{}/sync/2", protocol_id.as_ref())
111}
112
113/// The key of [`BlockRequestHandler::seen_requests`].
114#[derive(Eq, PartialEq, Clone)]
115struct SeenRequestsKey<B: BlockT> {
116	peer: PeerId,
117	from: BlockId<B>,
118	max_blocks: usize,
119	direction: Direction,
120	attributes: BlockAttributes,
121	support_multiple_justifications: bool,
122}
123
124#[allow(clippy::derived_hash_with_manual_eq)]
125impl<B: BlockT> Hash for SeenRequestsKey<B> {
126	fn hash<H: Hasher>(&self, state: &mut H) {
127		self.peer.hash(state);
128		self.max_blocks.hash(state);
129		self.direction.hash(state);
130		self.attributes.hash(state);
131		self.support_multiple_justifications.hash(state);
132		match self.from {
133			BlockId::Hash(h) => h.hash(state),
134			BlockId::Number(n) => n.hash(state),
135		}
136	}
137}
138
139/// The value of [`BlockRequestHandler::seen_requests`].
140enum SeenRequestsValue {
141	/// First time we have seen the request.
142	First,
143	/// We have fulfilled the request `n` times.
144	Fulfilled(usize),
145}
146
147/// The full block server implementation of [`BlockServer`]. It handles
148/// the incoming block requests from a remote peer.
149pub struct BlockRequestHandler<B: BlockT, Client> {
150	client: Arc<Client>,
151	request_receiver: async_channel::Receiver<IncomingRequest>,
152	/// Maps from request to number of times we have seen this request.
153	///
154	/// This is used to check if a peer is spamming us with the same request.
155	seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
156}
157
158impl<B, Client> BlockRequestHandler<B, Client>
159where
160	B: BlockT,
161	Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
162{
163	/// Create a new [`BlockRequestHandler`].
164	pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
165		network: NetworkServiceHandle,
166		protocol_id: &ProtocolId,
167		fork_id: Option<&str>,
168		client: Arc<Client>,
169		num_peer_hint: usize,
170	) -> BlockRelayParams<B, N> {
171		// Reserve enough request slots for one request per peer when we are at the maximum
172		// number of peers.
173		let capacity = std::cmp::max(num_peer_hint, 1);
174		let (tx, request_receiver) = async_channel::bounded(capacity);
175
176		let protocol_config = generate_protocol_config::<_, B, N>(
177			protocol_id,
178			client
179				.block_hash(0u32.into())
180				.ok()
181				.flatten()
182				.expect("Genesis block exists; qed"),
183			fork_id,
184			tx,
185		);
186
187		let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
188		let seen_requests = LruMap::new(capacity);
189
190		BlockRelayParams {
191			server: Box::new(Self { client, request_receiver, seen_requests }),
192			downloader: Arc::new(FullBlockDownloader::new(
193				protocol_config.protocol_name().clone(),
194				network,
195			)),
196			request_response_config: protocol_config,
197		}
198	}
199
200	/// Run [`BlockRequestHandler`].
201	async fn process_requests(&mut self) {
202		while let Some(request) = self.request_receiver.next().await {
203			let IncomingRequest { peer, payload, pending_response } = request;
204
205			match self.handle_request(payload, pending_response, &peer) {
206				Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
207				Err(e) => debug!(
208					target: LOG_TARGET,
209					"Failed to handle block request from {}: {}", peer, e,
210				),
211			}
212		}
213	}
214
215	fn handle_request(
216		&mut self,
217		payload: Vec<u8>,
218		pending_response: oneshot::Sender<OutgoingResponse>,
219		peer: &PeerId,
220	) -> Result<(), HandleRequestError> {
221		let request = crate::schema::v1::BlockRequest::decode(&payload[..])?;
222
223		let from_block_id = match request.from_block.ok_or(HandleRequestError::MissingFromField)? {
224			FromBlockSchema::Hash(ref h) => {
225				let h = Decode::decode(&mut h.as_ref())?;
226				BlockId::<B>::Hash(h)
227			},
228			FromBlockSchema::Number(ref n) => {
229				let n = Decode::decode(&mut n.as_ref())?;
230				BlockId::<B>::Number(n)
231			},
232		};
233
234		let max_blocks = if request.max_blocks == 0 {
235			MAX_BLOCKS_IN_RESPONSE
236		} else {
237			min(request.max_blocks as usize, MAX_BLOCKS_IN_RESPONSE)
238		};
239
240		let direction =
241			i32::try_into(request.direction).map_err(|_| HandleRequestError::ParseDirection)?;
242
243		let attributes = BlockAttributes::from_be_u32(request.fields)?;
244
245		let support_multiple_justifications = request.support_multiple_justifications;
246
247		let key = SeenRequestsKey {
248			peer: *peer,
249			max_blocks,
250			direction,
251			from: from_block_id,
252			attributes,
253			support_multiple_justifications,
254		};
255
256		let mut reputation_change = None;
257
258		let small_request = attributes
259			.difference(BlockAttributes::HEADER | BlockAttributes::JUSTIFICATION)
260			.is_empty();
261
262		match self.seen_requests.get(&key) {
263			Some(SeenRequestsValue::First) => {},
264			Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
265				*requests = requests.saturating_add(1);
266
267				if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
268					reputation_change = Some(if small_request {
269						rep::SAME_SMALL_REQUEST
270					} else {
271						rep::SAME_REQUEST
272					});
273				}
274			},
275			None => {
276				self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
277			},
278		}
279
280		debug!(
281			target: LOG_TARGET,
282			"Handling block request from {peer}: Starting at `{from_block_id:?}` with \
283			maximum blocks of `{max_blocks}`, reputation_change: `{reputation_change:?}`, \
284			small_request `{small_request:?}`, direction `{direction:?}` and \
285			attributes `{attributes:?}`.",
286		);
287
288		let maybe_block_response = if reputation_change.is_none() || small_request {
289			let block_response = self.get_block_response(
290				attributes,
291				from_block_id,
292				direction,
293				max_blocks,
294				support_multiple_justifications,
295			)?;
296
297			// If any of the blocks contains any data, we can consider it as successful request.
298			if block_response
299				.blocks
300				.iter()
301				.any(|b| !b.header.is_empty() || !b.body.is_empty() || b.is_empty_justification)
302			{
303				if let Some(value) = self.seen_requests.get(&key) {
304					// If this is the first time we have processed this request, we need to change
305					// it to `Fulfilled`.
306					if let SeenRequestsValue::First = value {
307						*value = SeenRequestsValue::Fulfilled(1);
308					}
309				}
310			}
311
312			Some(block_response)
313		} else {
314			None
315		};
316
317		debug!(
318			target: LOG_TARGET,
319			"Sending result of block request from {peer} starting at `{from_block_id:?}`: \
320			blocks: {:?}, data: {:?}",
321			maybe_block_response.as_ref().map(|res| res.blocks.len()),
322			maybe_block_response.as_ref().map(|res| res.encoded_len()),
323		);
324
325		let result = if let Some(block_response) = maybe_block_response {
326			let mut data = Vec::with_capacity(block_response.encoded_len());
327			block_response.encode(&mut data)?;
328			Ok(data)
329		} else {
330			Err(())
331		};
332
333		pending_response
334			.send(OutgoingResponse {
335				result,
336				reputation_changes: reputation_change.into_iter().collect(),
337				sent_feedback: None,
338			})
339			.map_err(|_| HandleRequestError::SendResponse)
340	}
341
342	fn get_block_response(
343		&self,
344		attributes: BlockAttributes,
345		mut block_id: BlockId<B>,
346		direction: Direction,
347		max_blocks: usize,
348		support_multiple_justifications: bool,
349	) -> Result<BlockResponse, HandleRequestError> {
350		let get_header = attributes.contains(BlockAttributes::HEADER);
351		let get_body = attributes.contains(BlockAttributes::BODY);
352		let get_indexed_body = attributes.contains(BlockAttributes::INDEXED_BODY);
353		let get_justification = attributes.contains(BlockAttributes::JUSTIFICATION);
354
355		let mut blocks = Vec::new();
356
357		let mut total_size: usize = 0;
358
359		let client_header_from_block_id =
360			|block_id: BlockId<B>| -> Result<Option<B::Header>, HandleRequestError> {
361				if let Some(hash) = self.client.block_hash_from_id(&block_id)? {
362					return self.client.header(hash).map_err(Into::into);
363				}
364				Ok(None)
365			};
366
367		while let Some(header) = client_header_from_block_id(block_id).unwrap_or_default() {
368			let number = *header.number();
369			let hash = header.hash();
370			let parent_hash = *header.parent_hash();
371			let justifications =
372				if get_justification { self.client.justifications(hash)? } else { None };
373
374			let (justifications, justification, is_empty_justification) =
375				if support_multiple_justifications {
376					let justifications = match justifications {
377						Some(v) => v.encode(),
378						None => Vec::new(),
379					};
380					(justifications, Vec::new(), false)
381				} else {
382					// For now we keep compatibility by selecting precisely the GRANDPA one, and not
383					// just the first one. When sending we could have just taken the first one,
384					// since we don't expect there to be any other kind currently, but when
385					// receiving we need to add the engine ID tag.
386					// The ID tag is hardcoded here to avoid depending on the GRANDPA crate, and
387					// will be removed once we remove the backwards compatibility.
388					// See: https://github.com/paritytech/substrate/issues/8172
389					let justification =
390						justifications.and_then(|just| just.into_justification(*b"FRNK"));
391
392					let is_empty_justification =
393						justification.as_ref().map(|j| j.is_empty()).unwrap_or(false);
394
395					let justification = justification.unwrap_or_default();
396
397					(Vec::new(), justification, is_empty_justification)
398				};
399
400			let body = if get_body {
401				match self.client.block_body(hash)? {
402					Some(mut extrinsics) => {
403						extrinsics.iter_mut().map(|extrinsic| extrinsic.encode()).collect()
404					},
405					None => {
406						log::trace!(target: LOG_TARGET, "Missing data for block request.");
407						break;
408					},
409				}
410			} else {
411				Vec::new()
412			};
413
414			let indexed_body = if get_indexed_body {
415				match self.client.block_indexed_body(hash)? {
416					Some(transactions) => transactions,
417					None => {
418						log::trace!(
419							target: LOG_TARGET,
420							"Missing indexed block data for block request."
421						);
422						// If the indexed body is missing we still continue returning headers.
423						// Ideally `None` should distinguish a missing body from the empty body,
424						// but the current protobuf based protocol does not allow it.
425						Vec::new()
426					},
427				}
428			} else {
429				Vec::new()
430			};
431
432			let block_data = crate::schema::v1::BlockData {
433				hash: hash.encode(),
434				header: if get_header { header.encode() } else { Vec::new() },
435				body,
436				receipt: Vec::new(),
437				message_queue: Vec::new(),
438				justification,
439				is_empty_justification,
440				justifications,
441				indexed_body,
442			};
443
444			let new_total_size = total_size + block_data.encoded_len();
445
446			// Reserve 20 KiB for protocol overhead (length prefixes of `BlockData` + the final
447			// encoding in `BlockResponse`)
448			if new_total_size > (MAX_RESPONSE_SIZE as usize - 20 * 1024) {
449				if blocks.is_empty() {
450					log::error!(
451						target: LOG_TARGET,
452						"Single block response is bigger than the max allowed response size! This is a bug!"
453					);
454				}
455
456				break;
457			}
458
459			total_size = new_total_size;
460
461			blocks.push(block_data);
462
463			if blocks.len() >= max_blocks as usize {
464				break;
465			}
466
467			match direction {
468				Direction::Ascending => block_id = BlockId::Number(number + One::one()),
469				Direction::Descending => {
470					if number.is_zero() {
471						break;
472					}
473					block_id = BlockId::Hash(parent_hash)
474				},
475			}
476		}
477
478		Ok(BlockResponse { blocks })
479	}
480}
481
482#[async_trait::async_trait]
483impl<B, Client> BlockServer<B> for BlockRequestHandler<B, Client>
484where
485	B: BlockT,
486	Client: HeaderBackend<B> + BlockBackend<B> + Send + Sync + 'static,
487{
488	async fn run(&mut self) {
489		self.process_requests().await;
490	}
491}
492
493#[derive(Debug, thiserror::Error)]
494enum HandleRequestError {
495	#[error("Failed to decode request: {0}.")]
496	DecodeProto(#[from] prost::DecodeError),
497	#[error("Failed to encode response: {0}.")]
498	EncodeProto(#[from] prost::EncodeError),
499	#[error("Failed to decode block hash: {0}.")]
500	DecodeScale(#[from] codec::Error),
501	#[error("Missing `BlockRequest::from_block` field.")]
502	MissingFromField,
503	#[error("Failed to parse BlockRequest::direction.")]
504	ParseDirection,
505	#[error(transparent)]
506	Client(#[from] sp_blockchain::Error),
507	#[error("Failed to send response.")]
508	SendResponse,
509}
510
511/// The full block downloader implementation of [`BlockDownloader].
512#[derive(Debug)]
513pub struct FullBlockDownloader {
514	protocol_name: ProtocolName,
515	network: NetworkServiceHandle,
516}
517
518impl FullBlockDownloader {
519	fn new(protocol_name: ProtocolName, network: NetworkServiceHandle) -> Self {
520		Self { protocol_name, network }
521	}
522
523	/// Extracts the blocks from the response schema.
524	fn blocks_from_schema<B: BlockT>(
525		&self,
526		request: &BlockRequest<B>,
527		response: BlockResponseSchema,
528	) -> Result<Vec<BlockData<B>>, String> {
529		response
530			.blocks
531			.into_iter()
532			.map(|block_data| {
533				Ok(BlockData::<B> {
534					hash: Decode::decode(&mut block_data.hash.as_ref())?,
535					header: if !block_data.header.is_empty() {
536						Some(Decode::decode(&mut block_data.header.as_ref())?)
537					} else {
538						None
539					},
540					body: if request.fields.contains(BlockAttributes::BODY) {
541						Some(
542							block_data
543								.body
544								.iter()
545								.map(|body| Decode::decode(&mut body.as_ref()))
546								.collect::<Result<Vec<_>, _>>()?,
547						)
548					} else {
549						None
550					},
551					indexed_body: if request.fields.contains(BlockAttributes::INDEXED_BODY) {
552						Some(block_data.indexed_body)
553					} else {
554						None
555					},
556					receipt: if !block_data.receipt.is_empty() {
557						Some(block_data.receipt)
558					} else {
559						None
560					},
561					message_queue: if !block_data.message_queue.is_empty() {
562						Some(block_data.message_queue)
563					} else {
564						None
565					},
566					justification: if !block_data.justification.is_empty() {
567						Some(block_data.justification)
568					} else if block_data.is_empty_justification {
569						Some(Vec::new())
570					} else {
571						None
572					},
573					justifications: if !block_data.justifications.is_empty() {
574						Some(DecodeAll::decode_all(&mut block_data.justifications.as_ref())?)
575					} else {
576						None
577					},
578				})
579			})
580			.collect::<Result<_, _>>()
581			.map_err(|error: codec::Error| error.to_string())
582	}
583}
584
585#[async_trait::async_trait]
586impl<B: BlockT> BlockDownloader<B> for FullBlockDownloader {
587	fn protocol_name(&self) -> &ProtocolName {
588		&self.protocol_name
589	}
590
591	async fn download_blocks(
592		&self,
593		who: PeerId,
594		request: BlockRequest<B>,
595	) -> Result<Result<(Vec<u8>, ProtocolName), RequestFailure>, oneshot::Canceled> {
596		// Build the request protobuf.
597		let bytes = BlockRequestSchema {
598			fields: request.fields.to_be_u32(),
599			from_block: match request.from {
600				FromBlock::Hash(h) => Some(FromBlockSchema::Hash(h.encode())),
601				FromBlock::Number(n) => Some(FromBlockSchema::Number(n.encode())),
602			},
603			direction: request.direction as i32,
604			max_blocks: request.max.unwrap_or(0),
605			support_multiple_justifications: true,
606		}
607		.encode_to_vec();
608
609		let (tx, rx) = oneshot::channel();
610		self.network.start_request(
611			who,
612			self.protocol_name.clone(),
613			bytes,
614			tx,
615			IfDisconnected::ImmediateError,
616		);
617		rx.await
618	}
619
620	fn block_response_into_blocks(
621		&self,
622		request: &BlockRequest<B>,
623		response: Vec<u8>,
624	) -> Result<Vec<BlockData<B>>, BlockResponseError> {
625		// Decode the response protobuf
626		let response_schema = BlockResponseSchema::decode(response.as_slice())
627			.map_err(|error| BlockResponseError::DecodeFailed(error.to_string()))?;
628
629		// Extract the block data from the protobuf
630		self.blocks_from_schema::<B>(request, response_schema)
631			.map_err(|error| BlockResponseError::ExtractionFailed(error.to_string()))
632	}
633}