1use crate::{
11 sync::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
12 LOG_TARGET,
13};
14
15use codec::{Decode, Encode};
16use futures::{channel::oneshot, stream::StreamExt};
17use log::{debug, trace};
18use prost::Message;
19use schnellru::{ByLength, LruMap};
20use soil_network::types::PeerId;
21
22use soil_client::client_api::{BlockBackend, ProofProvider};
23use soil_network::{
24 config::ProtocolId,
25 request_responses::{IncomingRequest, OutgoingResponse},
26 NetworkBackend, MAX_RESPONSE_SIZE,
27};
28use subsoil::runtime::traits::Block as BlockT;
29
30use std::{
31 hash::{Hash, Hasher},
32 sync::Arc,
33 time::Duration,
34};
35
36const MAX_RESPONSE_BYTES: usize = 2 * 1024 * 1024; const MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER: usize = 2;
38
39mod rep {
40 use soil_network::ReputationChange as Rep;
41
42 pub const SAME_REQUEST: Rep = Rep::new(i32::MIN, "Same state request multiple times");
44}
45
46pub fn generate_protocol_config<
49 Hash: AsRef<[u8]>,
50 B: BlockT,
51 N: NetworkBackend<B, <B as BlockT>::Hash>,
52>(
53 protocol_id: &ProtocolId,
54 genesis_hash: Hash,
55 fork_id: Option<&str>,
56 inbound_queue: async_channel::Sender<IncomingRequest>,
57) -> N::RequestResponseProtocolConfig {
58 N::request_response_config(
59 generate_protocol_name(genesis_hash, fork_id).into(),
60 std::iter::once(generate_legacy_protocol_name(protocol_id).into()).collect(),
61 1024 * 1024,
62 MAX_RESPONSE_SIZE,
63 Duration::from_secs(40),
64 Some(inbound_queue),
65 )
66}
67
68fn generate_protocol_name<Hash: AsRef<[u8]>>(genesis_hash: Hash, fork_id: Option<&str>) -> String {
70 let genesis_hash = genesis_hash.as_ref();
71 if let Some(fork_id) = fork_id {
72 format!("/{}/{}/state/2", array_bytes::bytes2hex("", genesis_hash), fork_id)
73 } else {
74 format!("/{}/state/2", array_bytes::bytes2hex("", genesis_hash))
75 }
76}
77
78fn generate_legacy_protocol_name(protocol_id: &ProtocolId) -> String {
80 format!("/{}/state/2", protocol_id.as_ref())
81}
82
83#[derive(Eq, PartialEq, Clone)]
85struct SeenRequestsKey<B: BlockT> {
86 peer: PeerId,
87 block: B::Hash,
88 start: Vec<Vec<u8>>,
89}
90
91#[allow(clippy::derived_hash_with_manual_eq)]
92impl<B: BlockT> Hash for SeenRequestsKey<B> {
93 fn hash<H: Hasher>(&self, state: &mut H) {
94 self.peer.hash(state);
95 self.block.hash(state);
96 self.start.hash(state);
97 }
98}
99
100enum SeenRequestsValue {
102 First,
104 Fulfilled(usize),
106}
107
108pub struct StateRequestHandler<B: BlockT, Client> {
110 client: Arc<Client>,
111 request_receiver: async_channel::Receiver<IncomingRequest>,
112 seen_requests: LruMap<SeenRequestsKey<B>, SeenRequestsValue>,
116}
117
118impl<B, Client> StateRequestHandler<B, Client>
119where
120 B: BlockT,
121 Client: BlockBackend<B> + ProofProvider<B> + Send + Sync + 'static,
122{
123 pub fn new<N: NetworkBackend<B, <B as BlockT>::Hash>>(
125 protocol_id: &ProtocolId,
126 fork_id: Option<&str>,
127 client: Arc<Client>,
128 num_peer_hint: usize,
129 ) -> (Self, N::RequestResponseProtocolConfig) {
130 let capacity = std::cmp::max(num_peer_hint, 1);
133 let (tx, request_receiver) = async_channel::bounded(capacity);
134
135 let protocol_config = generate_protocol_config::<_, B, N>(
136 protocol_id,
137 client
138 .block_hash(0u32.into())
139 .ok()
140 .flatten()
141 .expect("Genesis block exists; qed"),
142 fork_id,
143 tx,
144 );
145
146 let capacity = ByLength::new(num_peer_hint.max(1) as u32 * 2);
147 let seen_requests = LruMap::new(capacity);
148
149 (Self { client, request_receiver, seen_requests }, protocol_config)
150 }
151
152 pub async fn run(mut self) {
154 while let Some(request) = self.request_receiver.next().await {
155 let IncomingRequest { peer, payload, pending_response } = request;
156
157 match self.handle_request(payload, pending_response, &peer) {
158 Ok(()) => debug!(target: LOG_TARGET, "Handled block request from {}.", peer),
159 Err(e) => debug!(
160 target: LOG_TARGET,
161 "Failed to handle state request from {}: {}", peer, e,
162 ),
163 }
164 }
165 }
166
167 fn handle_request(
168 &mut self,
169 payload: Vec<u8>,
170 pending_response: oneshot::Sender<OutgoingResponse>,
171 peer: &PeerId,
172 ) -> Result<(), HandleRequestError> {
173 let request = StateRequest::decode(&payload[..])?;
174 let block: B::Hash = Decode::decode(&mut request.block.as_ref())?;
175
176 let key = SeenRequestsKey { peer: *peer, block, start: request.start.clone() };
177
178 let mut reputation_changes = Vec::new();
179
180 match self.seen_requests.get(&key) {
181 Some(SeenRequestsValue::First) => {},
182 Some(SeenRequestsValue::Fulfilled(ref mut requests)) => {
183 *requests = requests.saturating_add(1);
184
185 if *requests > MAX_NUMBER_OF_SAME_REQUESTS_PER_PEER {
186 reputation_changes.push(rep::SAME_REQUEST);
187 }
188 },
189 None => {
190 self.seen_requests.insert(key.clone(), SeenRequestsValue::First);
191 },
192 }
193
194 trace!(
195 target: LOG_TARGET,
196 "Handling state request from {}: Block {:?}, Starting at {:x?}, no_proof={}",
197 peer,
198 request.block,
199 &request.start,
200 request.no_proof,
201 );
202
203 let result = if reputation_changes.is_empty() {
204 let mut response = StateResponse::default();
205
206 if !request.no_proof {
207 let (proof, _count) = self.client.read_proof_collection(
208 block,
209 request.start.as_slice(),
210 MAX_RESPONSE_BYTES,
211 )?;
212 response.proof = proof.encode();
213 } else {
214 let entries = self.client.storage_collection(
215 block,
216 request.start.as_slice(),
217 MAX_RESPONSE_BYTES,
218 )?;
219 response.entries = entries
220 .into_iter()
221 .map(|(state, complete)| KeyValueStateEntry {
222 state_root: state.state_root,
223 entries: state
224 .key_values
225 .into_iter()
226 .map(|(key, value)| StateEntry { key, value })
227 .collect(),
228 complete,
229 })
230 .collect();
231 }
232
233 trace!(
234 target: LOG_TARGET,
235 "StateResponse contains {} keys, {}, proof nodes, from {:?} to {:?}",
236 response.entries.len(),
237 response.proof.len(),
238 response.entries.get(0).and_then(|top| top
239 .entries
240 .first()
241 .map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key))),
242 response.entries.get(0).and_then(|top| top
243 .entries
244 .last()
245 .map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key))),
246 );
247 if let Some(value) = self.seen_requests.get(&key) {
248 if let SeenRequestsValue::First = value {
251 *value = SeenRequestsValue::Fulfilled(1);
252 }
253 }
254
255 let mut data = Vec::with_capacity(response.encoded_len());
256 response.encode(&mut data)?;
257 Ok(data)
258 } else {
259 Err(())
260 };
261
262 pending_response
263 .send(OutgoingResponse { result, reputation_changes, sent_feedback: None })
264 .map_err(|_| HandleRequestError::SendResponse)
265 }
266}
267
268#[derive(Debug, thiserror::Error)]
269enum HandleRequestError {
270 #[error("Failed to decode request: {0}.")]
271 DecodeProto(#[from] prost::DecodeError),
272
273 #[error("Failed to encode response: {0}.")]
274 EncodeProto(#[from] prost::EncodeError),
275
276 #[error("Failed to decode block hash: {0}.")]
277 InvalidHash(#[from] codec::Error),
278
279 #[error(transparent)]
280 Client(#[from] soil_client::blockchain::Error),
281
282 #[error("Failed to send response.")]
283 SendResponse,
284}