Skip to main content

soil_network/sync/strategy/
state_sync.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! State sync support.
8
9use crate::{
10	sync::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
11	LOG_TARGET,
12};
13use codec::{Decode, Encode};
14use log::debug;
15use smallvec::SmallVec;
16use soil_client::client_api::{CompactProof, KeyValueStates, ProofProvider};
17use soil_client::import::ImportedState;
18use std::{collections::HashMap, fmt, sync::Arc};
19use subsoil::core::storage::well_known_keys;
20use subsoil::runtime::{
21	traits::{Block as BlockT, Header, NumberFor},
22	Justifications,
23};
24
25/// Generic state sync provider. Used for mocking in tests.
26pub trait StateSyncProvider<B: BlockT>: Send + Sync {
27	/// Validate and import a state response.
28	fn import(&mut self, response: StateResponse) -> ImportResult<B>;
29	/// Produce next state request.
30	fn next_request(&self) -> StateRequest;
31	/// Check if the state is complete.
32	fn is_complete(&self) -> bool;
33	/// Returns target block number.
34	fn target_number(&self) -> NumberFor<B>;
35	/// Returns target block hash.
36	fn target_hash(&self) -> B::Hash;
37	/// Returns state sync estimated progress.
38	fn progress(&self) -> StateSyncProgress;
39}
40
41// Reported state sync phase.
42#[derive(Clone, Eq, PartialEq, Debug)]
43pub enum StateSyncPhase {
44	// State download in progress.
45	DownloadingState,
46	// Download is complete, state is being imported.
47	ImportingState,
48}
49
50impl fmt::Display for StateSyncPhase {
51	fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52		match self {
53			Self::DownloadingState => write!(f, "Downloading state"),
54			Self::ImportingState => write!(f, "Importing state"),
55		}
56	}
57}
58
59/// Reported state download progress.
60#[derive(Clone, Eq, PartialEq, Debug)]
61pub struct StateSyncProgress {
62	/// Estimated download percentage.
63	pub percentage: u32,
64	/// Total state size in bytes downloaded so far.
65	pub size: u64,
66	/// Current state sync phase.
67	pub phase: StateSyncPhase,
68}
69
70/// Import state chunk result.
71pub enum ImportResult<B: BlockT> {
72	/// State is complete and ready for import.
73	Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>, Option<Justifications>),
74	/// Continue downloading.
75	Continue,
76	/// Bad state chunk.
77	BadResponse,
78}
79
80struct StateSyncMetadata<B: BlockT> {
81	last_key: SmallVec<[Vec<u8>; 2]>,
82	target_header: B::Header,
83	target_body: Option<Vec<B::Extrinsic>>,
84	target_justifications: Option<Justifications>,
85	complete: bool,
86	imported_bytes: u64,
87	skip_proof: bool,
88}
89
90impl<B: BlockT> StateSyncMetadata<B> {
91	fn target_hash(&self) -> B::Hash {
92		self.target_header.hash()
93	}
94
95	/// Returns target block number.
96	fn target_number(&self) -> NumberFor<B> {
97		*self.target_header.number()
98	}
99
100	fn target_root(&self) -> B::Hash {
101		*self.target_header.state_root()
102	}
103
104	fn next_request(&self) -> StateRequest {
105		StateRequest {
106			block: self.target_hash().encode(),
107			start: self.last_key.clone().into_vec(),
108			no_proof: self.skip_proof,
109		}
110	}
111
112	fn progress(&self) -> StateSyncProgress {
113		let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
114		let percent_done = cursor as u32 * 100 / 256;
115		StateSyncProgress {
116			percentage: percent_done,
117			size: self.imported_bytes,
118			phase: if self.complete {
119				StateSyncPhase::ImportingState
120			} else {
121				StateSyncPhase::DownloadingState
122			},
123		}
124	}
125}
126
127/// State sync state machine.
128///
129/// Accumulates partial state data until it is ready to be imported.
130pub struct StateSync<B: BlockT, Client> {
131	metadata: StateSyncMetadata<B>,
132	state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
133	client: Arc<Client>,
134}
135
136impl<B, Client> StateSync<B, Client>
137where
138	B: BlockT,
139	Client: ProofProvider<B> + Send + Sync + 'static,
140{
141	///  Create a new instance.
142	pub fn new(
143		client: Arc<Client>,
144		target_header: B::Header,
145		target_body: Option<Vec<B::Extrinsic>>,
146		target_justifications: Option<Justifications>,
147		skip_proof: bool,
148	) -> Self {
149		Self {
150			client,
151			metadata: StateSyncMetadata {
152				last_key: SmallVec::default(),
153				target_header,
154				target_body,
155				target_justifications,
156				complete: false,
157				imported_bytes: 0,
158				skip_proof,
159			},
160			state: HashMap::default(),
161		}
162	}
163
164	fn process_state_key_values(
165		&mut self,
166		state_root: Vec<u8>,
167		key_values: impl IntoIterator<Item = (Vec<u8>, Vec<u8>)>,
168	) {
169		let is_top = state_root.is_empty();
170
171		let entry = self.state.entry(state_root).or_default();
172
173		if entry.0.len() > 0 && entry.1.len() > 1 {
174			// Already imported child_trie with same root.
175			// Warning this will not work with parallel download.
176			return;
177		}
178
179		let mut child_storage_roots = Vec::new();
180
181		for (key, value) in key_values {
182			// Skip all child key root (will be recalculated on import)
183			if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
184				child_storage_roots.push((value, key));
185			} else {
186				self.metadata.imported_bytes += key.len() as u64;
187				entry.0.push((key, value));
188			}
189		}
190
191		for (root, storage_key) in child_storage_roots {
192			self.state.entry(root).or_default().1.push(storage_key);
193		}
194	}
195
196	fn process_state_verified(&mut self, values: KeyValueStates) {
197		for values in values.0 {
198			self.process_state_key_values(values.state_root, values.key_values);
199		}
200	}
201
202	fn process_state_unverified(&mut self, response: StateResponse) -> bool {
203		let mut complete = true;
204		// if the trie is a child trie and one of its parent trie is empty,
205		// the parent cursor stays valid.
206		// Empty parent trie content only happens when all the response content
207		// is part of a single child trie.
208		if self.metadata.last_key.len() == 2 && response.entries[0].entries.is_empty() {
209			// Do not remove the parent trie position.
210			self.metadata.last_key.pop();
211		} else {
212			self.metadata.last_key.clear();
213		}
214		for state in response.entries {
215			debug!(
216				target: LOG_TARGET,
217				"Importing state from {:?} to {:?}",
218				state.entries.last().map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key)),
219				state.entries.first().map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key)),
220			);
221
222			if !state.complete {
223				if let Some(e) = state.entries.last() {
224					self.metadata.last_key.push(e.key.clone());
225				}
226				complete = false;
227			}
228
229			let KeyValueStateEntry { state_root, entries, complete: _ } = state;
230			self.process_state_key_values(
231				state_root,
232				entries.into_iter().map(|StateEntry { key, value }| (key, value)),
233			);
234		}
235		complete
236	}
237}
238
239impl<B, Client> StateSyncProvider<B> for StateSync<B, Client>
240where
241	B: BlockT,
242	Client: ProofProvider<B> + Send + Sync + 'static,
243{
244	///  Validate and import a state response.
245	fn import(&mut self, response: StateResponse) -> ImportResult<B> {
246		if response.entries.is_empty() && response.proof.is_empty() {
247			debug!(target: LOG_TARGET, "Bad state response");
248			return ImportResult::BadResponse;
249		}
250		if !self.metadata.skip_proof && response.proof.is_empty() {
251			debug!(target: LOG_TARGET, "Missing proof");
252			return ImportResult::BadResponse;
253		}
254		let complete = if !self.metadata.skip_proof {
255			debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
256			let proof_size = response.proof.len() as u64;
257			let proof = match CompactProof::decode(&mut response.proof.as_ref()) {
258				Ok(proof) => proof,
259				Err(e) => {
260					debug!(target: LOG_TARGET, "Error decoding proof: {:?}", e);
261					return ImportResult::BadResponse;
262				},
263			};
264			let (values, completed) = match self.client.verify_range_proof(
265				self.metadata.target_root(),
266				proof,
267				self.metadata.last_key.as_slice(),
268			) {
269				Err(e) => {
270					debug!(
271						target: LOG_TARGET,
272						"StateResponse failed proof verification: {}",
273						e,
274					);
275					return ImportResult::BadResponse;
276				},
277				Ok(values) => values,
278			};
279			debug!(target: LOG_TARGET, "Imported with {} keys", values.len());
280
281			let complete = completed == 0;
282			if !complete && !values.update_last_key(completed, &mut self.metadata.last_key) {
283				debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
284			};
285
286			self.process_state_verified(values);
287			self.metadata.imported_bytes += proof_size;
288			complete
289		} else {
290			self.process_state_unverified(response)
291		};
292		if complete {
293			self.metadata.complete = true;
294			let target_hash = self.metadata.target_hash();
295			ImportResult::Import(
296				target_hash,
297				self.metadata.target_header.clone(),
298				ImportedState { block: target_hash, state: std::mem::take(&mut self.state).into() },
299				self.metadata.target_body.clone(),
300				self.metadata.target_justifications.clone(),
301			)
302		} else {
303			ImportResult::Continue
304		}
305	}
306
307	/// Produce next state request.
308	fn next_request(&self) -> StateRequest {
309		self.metadata.next_request()
310	}
311
312	/// Check if the state is complete.
313	fn is_complete(&self) -> bool {
314		self.metadata.complete
315	}
316
317	/// Returns target block number.
318	fn target_number(&self) -> NumberFor<B> {
319		self.metadata.target_number()
320	}
321
322	/// Returns target block hash.
323	fn target_hash(&self) -> B::Hash {
324		self.metadata.target_hash()
325	}
326
327	/// Returns state sync estimated progress.
328	fn progress(&self) -> StateSyncProgress {
329		self.metadata.progress()
330	}
331}