Skip to main content

soil_network/sync/
state_request_handler.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! Helper for handling (i.e. answering) state requests from a remote peer via the
8//! `crate::request_responses::RequestResponsesBehaviour`.
9
10use crate::{
11	sync::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
12	LOG_TARGET,
13};
14
15use codec::{Decode, Encode};
16use futures::{channel::oneshot, stream::StreamExt};
17use log::{debug, trace};
18use prost::Message;
19use schnellru::{ByLength, LruMap};
20use soil_network::types::PeerId;
21
22use soil_client::client_api::{BlockBackend, ProofProvider};
23use soil_network::{
24	config::ProtocolId,
25	request_responses::{IncomingRequest, OutgoingResponse},
26	NetworkBackend, MAX_RESPONSE_SIZE,
27};
28use subsoil::runtime::traits::Block as BlockT;
29
30use std::{
31	hash::{Hash, Hasher},
32	sync::Arc,
33	time::Duration,
34};
35
36const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; // Actual reponse may be bigger.
37const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
38
39mod rep {
40	use soil_network::ReputationChange as Rep;
41
42	/// Reputation change when a peer sent us the same request multiple times.
43	pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
44}
45
46/// Generates a `RequestResponseProtocolConfig` for the state request protocol, refusing incoming
47/// requests.
48pub fn generate_protocol_config<
49	Hash: AsRef<[u8]>,
50	B: BlockT,
51	N: NetworkBackend<B, <B as BlockT>::Hash>,
52>(
53	protocol_id: &ProtocolId,
54	genesis_hash: Hash,
55	fork_id: Option<&str>,
56	inbound_queue: async_channel::Sender<IncomingRequest>,
57) -> N::RequestResponseProtocolConfig {
58	N::request_response_config(
59		generate_protocol_name(genesis_hash, fork_id).into(),
60		std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
61		1024 * 1024,
62		MAX_RESPONSE_SIZE,
63		Duration::from_secs(40),
64		Some(inbound_queue),
65	)
66}
67
68/// Generate the state protocol name from the genesis hash and fork id.
69fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
70	let genesis_hash = genesis_hash.as_ref();
71	if let Some(fork_id) = fork_id {
72		format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
73	} else {
74		format!("/{}/state/2", array_bytes::bytes2hex("", genesis_hash))
75	}
76}
77
78/// Generate the legacy state protocol name from chain specific protocol identifier.
79fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
80	format!("/{}/state/2", protocol_id.as_ref())
81}
82
83/// The key of [`BlockRequestHandler::seen_requests`].
84#[derive(Eq, PartialEq, Clone)]
85struct SeenRequestsKey<B: BlockT> {
86	peer: PeerId,
87	block: B::Hash,
88	start: Vec<Vec<u8>>,
89}
90
91#[allow(clippy::derived_hash_with_manual_eq)]
92impl<B: BlockT> Hash for SeenRequestsKey<B> {
93	fn hash<H: Hasher>(&self, state: &mut H) {
94		self.peer.hash(state);
95		self.block.hash(state);
96		self.start.hash(state);
97	}
98}
99
100/// The value of [`StateRequestHandler::seen_requests`].
101enum SeenRequestsValue {
102	/// First time we have seen the request.
103	First,
104	/// We have fulfilled the request `n` times.
105	Fulfilled(usize),
106}
107
108/// Handler for incoming block requests from a remote peer.
109pub struct StateRequestHandler<B: BlockT, Client> {
110	client: Arc<Client>,
111	request_receiver: async_channel::Receiver<IncomingRequest>,
112	/// Maps from request to number of times we have seen this request.
113	///
114	/// This is used to check if a peer is spamming us with the same request.
115	seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
116}
117
118impl<B, Client> StateRequestHandler<B, Client>
119where
120	B: BlockT,
121	Client: BlockBackend<B> + ProofProvider<B> + Send + Sync + 'static,
122{
123	/// Create a new [`StateRequestHandler`].
124	pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
125		protocol_id: &ProtocolId,
126		fork_id: Option<&str>,
127		client: Arc<Client>,
128		num_peer_hint: usize,
129	) -> (Self, N::RequestResponseProtocolConfig) {
130		// Reserve enough request slots for one request per peer when we are at the maximum
131		// number of peers.
132		let capacity = std::cmp::max(num_peer_hint, 1);
133		let (tx, request_receiver) = async_channel::bounded(capacity);
134
135		let protocol_config = generate_protocol_config::<_, B, N>(
136			protocol_id,
137			client
138				.block_hash(0u32.into())
139				.ok()
140				.flatten()
141				.expect("Genesis block exists; qed"),
142			fork_id,
143			tx,
144		);
145
146		let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
147		let seen_requests = LruMap::new(capacity);
148
149		(Self { client, request_receiver, seen_requests }, protocol_config)
150	}
151
152	/// Run [`StateRequestHandler`].
153	pub async fn run(mut self) {
154		while let Some(request) = self.request_receiver.next().await {
155			let IncomingRequest { peer, payload, pending_response } = request;
156
157			match self.handle_request(payload, pending_response, &peer) {
158				Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
159				Err(e) => debug!(
160					target: LOG_TARGET,
161					"Failed to handle state request from {}: {}", peer, e,
162				),
163			}
164		}
165	}
166
167	fn handle_request(
168		&mut self,
169		payload: Vec<u8>,
170		pending_response: oneshot::Sender<OutgoingResponse>,
171		peer: &PeerId,
172	) -> Result<(), HandleRequestError> {
173		let request = StateRequest::decode(&payload[..])?;
174		let block: B::Hash = Decode::decode(&mut request.block.as_ref())?;
175
176		let key = SeenRequestsKey { peer: *peer, block, start: request.start.clone() };
177
178		let mut reputation_changes = Vec::new();
179
180		match self.seen_requests.get(&key) {
181			Some(SeenRequestsValue::First) => {},
182			Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
183				*requests = requests.saturating_add(1);
184
185				if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
186					reputation_changes.push(rep::SAME_REQUEST);
187				}
188			},
189			None => {
190				self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
191			},
192		}
193
194		trace!(
195			target: LOG_TARGET,
196			"Handling state request from {}: Block {:?}, Starting at {:x?}, no_proof={}",
197			peer,
198			request.block,
199			&request.start,
200			request.no_proof,
201		);
202
203		let result = if reputation_changes.is_empty() {
204			let mut response = StateResponse::default();
205
206			if !request.no_proof {
207				let (proof, _count) = self.client.read_proof_collection(
208					block,
209					request.start.as_slice(),
210					MAX_RESPONSE_BYTES,
211				)?;
212				response.proof = proof.encode();
213			} else {
214				let entries = self.client.storage_collection(
215					block,
216					request.start.as_slice(),
217					MAX_RESPONSE_BYTES,
218				)?;
219				response.entries = entries
220					.into_iter()
221					.map(|(state, complete)| KeyValueStateEntry {
222						state_root: state.state_root,
223						entries: state
224							.key_values
225							.into_iter()
226							.map(|(key, value)| StateEntry { key, value })
227							.collect(),
228						complete,
229					})
230					.collect();
231			}
232
233			trace!(
234				target: LOG_TARGET,
235				"StateResponse contains {} keys, {}, proof nodes, from {:?} to {:?}",
236				response.entries.len(),
237				response.proof.len(),
238				response.entries.get(0).and_then(|top| top
239					.entries
240					.first()
241					.map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key))),
242				response.entries.get(0).and_then(|top| top
243					.entries
244					.last()
245					.map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key))),
246			);
247			if let Some(value) = self.seen_requests.get(&key) {
248				// If this is the first time we have processed this request, we need to change
249				// it to `Fulfilled`.
250				if let SeenRequestsValue::First = value {
251					*value = SeenRequestsValue::Fulfilled(1);
252				}
253			}
254
255			let mut data = Vec::with_capacity(response.encoded_len());
256			response.encode(&mut data)?;
257			Ok(data)
258		} else {
259			Err(())
260		};
261
262		pending_response
263			.send(OutgoingResponse { result, reputation_changes, sent_feedback: None })
264			.map_err(|_| HandleRequestError::SendResponse)
265	}
266}
267
268#[derive(Debug, thiserror::Error)]
269enum HandleRequestError {
270	#[error("Failed to decode request: {0}.")]
271	DecodeProto(#[from] prost::DecodeError),
272
273	#[error("Failed to encode response: {0}.")]
274	EncodeProto(#[from] prost::EncodeError),
275
276	#[error("Failed to decode block hash: {0}.")]
277	InvalidHash(#[from] codec::Error),
278
279	#[error(transparent)]
280	Client(#[from] soil_client::blockchain::Error),
281
282	#[error("Failed to send response.")]
283	SendResponse,
284}