soil_network/sync/strategy/
state_sync.rs1use crate::{
10 sync::schema::v1::{KeyValueStateEntry, StateEntry, StateRequest, StateResponse},
11 LOG_TARGET,
12};
13use codec::{Decode, Encode};
14use log::debug;
15use smallvec::SmallVec;
16use soil_client::client_api::{CompactProof, KeyValueStates, ProofProvider};
17use soil_client::import::ImportedState;
18use std::{collections::HashMap, fmt, sync::Arc};
19use subsoil::core::storage::well_known_keys;
20use subsoil::runtime::{
21 traits::{Block as BlockT, Header, NumberFor},
22 Justifications,
23};
24
25pub trait StateSyncProvider<B: BlockT>: Send + Sync {
27 fn import(&mut self, response: StateResponse) -> ImportResult<B>;
29 fn next_request(&self) -> StateRequest;
31 fn is_complete(&self) -> bool;
33 fn target_number(&self) -> NumberFor<B>;
35 fn target_hash(&self) -> B::Hash;
37 fn progress(&self) -> StateSyncProgress;
39}
40
41#[derive(Clone, Eq, PartialEq, Debug)]
43pub enum StateSyncPhase {
44 DownloadingState,
46 ImportingState,
48}
49
50impl fmt::Display for StateSyncPhase {
51 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
52 match self {
53 Self::DownloadingState => write!(f, "Downloading state"),
54 Self::ImportingState => write!(f, "Importing state"),
55 }
56 }
57}
58
59#[derive(Clone, Eq, PartialEq, Debug)]
61pub struct StateSyncProgress {
62 pub percentage: u32,
64 pub size: u64,
66 pub phase: StateSyncPhase,
68}
69
70pub enum ImportResult<B: BlockT> {
72 Import(B::Hash, B::Header, ImportedState<B>, Option<Vec<B::Extrinsic>>, Option<Justifications>),
74 Continue,
76 BadResponse,
78}
79
80struct StateSyncMetadata<B: BlockT> {
81 last_key: SmallVec<[Vec<u8>; 2]>,
82 target_header: B::Header,
83 target_body: Option<Vec<B::Extrinsic>>,
84 target_justifications: Option<Justifications>,
85 complete: bool,
86 imported_bytes: u64,
87 skip_proof: bool,
88}
89
90impl<B: BlockT> StateSyncMetadata<B> {
91 fn target_hash(&self) -> B::Hash {
92 self.target_header.hash()
93 }
94
95 fn target_number(&self) -> NumberFor<B> {
97 *self.target_header.number()
98 }
99
100 fn target_root(&self) -> B::Hash {
101 *self.target_header.state_root()
102 }
103
104 fn next_request(&self) -> StateRequest {
105 StateRequest {
106 block: self.target_hash().encode(),
107 start: self.last_key.clone().into_vec(),
108 no_proof: self.skip_proof,
109 }
110 }
111
112 fn progress(&self) -> StateSyncProgress {
113 let cursor = *self.last_key.get(0).and_then(|last| last.get(0)).unwrap_or(&0u8);
114 let percent_done = cursor as u32 * 100 / 256;
115 StateSyncProgress {
116 percentage: percent_done,
117 size: self.imported_bytes,
118 phase: if self.complete {
119 StateSyncPhase::ImportingState
120 } else {
121 StateSyncPhase::DownloadingState
122 },
123 }
124 }
125}
126
127pub struct StateSync<B: BlockT, Client> {
131 metadata: StateSyncMetadata<B>,
132 state: HashMap<Vec<u8>, (Vec<(Vec<u8>, Vec<u8>)>, Vec<Vec<u8>>)>,
133 client: Arc<Client>,
134}
135
136impl<B, Client> StateSync<B, Client>
137where
138 B: BlockT,
139 Client: ProofProvider<B> + Send + Sync + 'static,
140{
141 pub fn new(
143 client: Arc<Client>,
144 target_header: B::Header,
145 target_body: Option<Vec<B::Extrinsic>>,
146 target_justifications: Option<Justifications>,
147 skip_proof: bool,
148 ) -> Self {
149 Self {
150 client,
151 metadata: StateSyncMetadata {
152 last_key: SmallVec::default(),
153 target_header,
154 target_body,
155 target_justifications,
156 complete: false,
157 imported_bytes: 0,
158 skip_proof,
159 },
160 state: HashMap::default(),
161 }
162 }
163
164 fn process_state_key_values(
165 &mut self,
166 state_root: Vec<u8>,
167 key_values: impl IntoIterator<Item = (Vec<u8>, Vec<u8>)>,
168 ) {
169 let is_top = state_root.is_empty();
170
171 let entry = self.state.entry(state_root).or_default();
172
173 if entry.0.len() > 0 && entry.1.len() > 1 {
174 return;
177 }
178
179 let mut child_storage_roots = Vec::new();
180
181 for (key, value) in key_values {
182 if is_top && well_known_keys::is_child_storage_key(key.as_slice()) {
184 child_storage_roots.push((value, key));
185 } else {
186 self.metadata.imported_bytes += key.len() as u64;
187 entry.0.push((key, value));
188 }
189 }
190
191 for (root, storage_key) in child_storage_roots {
192 self.state.entry(root).or_default().1.push(storage_key);
193 }
194 }
195
196 fn process_state_verified(&mut self, values: KeyValueStates) {
197 for values in values.0 {
198 self.process_state_key_values(values.state_root, values.key_values);
199 }
200 }
201
202 fn process_state_unverified(&mut self, response: StateResponse) -> bool {
203 let mut complete = true;
204 if self.metadata.last_key.len() == 2 && response.entries[0].entries.is_empty() {
209 self.metadata.last_key.pop();
211 } else {
212 self.metadata.last_key.clear();
213 }
214 for state in response.entries {
215 debug!(
216 target: LOG_TARGET,
217 "Importing state from {:?} to {:?}",
218 state.entries.last().map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key)),
219 state.entries.first().map(|e| subsoil::core::hexdisplay::HexDisplay::from(&e.key)),
220 );
221
222 if !state.complete {
223 if let Some(e) = state.entries.last() {
224 self.metadata.last_key.push(e.key.clone());
225 }
226 complete = false;
227 }
228
229 let KeyValueStateEntry { state_root, entries, complete: _ } = state;
230 self.process_state_key_values(
231 state_root,
232 entries.into_iter().map(|StateEntry { key, value }| (key, value)),
233 );
234 }
235 complete
236 }
237}
238
239impl<B, Client> StateSyncProvider<B> for StateSync<B, Client>
240where
241 B: BlockT,
242 Client: ProofProvider<B> + Send + Sync + 'static,
243{
244 fn import(&mut self, response: StateResponse) -> ImportResult<B> {
246 if response.entries.is_empty() && response.proof.is_empty() {
247 debug!(target: LOG_TARGET, "Bad state response");
248 return ImportResult::BadResponse;
249 }
250 if !self.metadata.skip_proof && response.proof.is_empty() {
251 debug!(target: LOG_TARGET, "Missing proof");
252 return ImportResult::BadResponse;
253 }
254 let complete = if !self.metadata.skip_proof {
255 debug!(target: LOG_TARGET, "Importing state from {} trie nodes", response.proof.len());
256 let proof_size = response.proof.len() as u64;
257 let proof = match CompactProof::decode(&mut response.proof.as_ref()) {
258 Ok(proof) => proof,
259 Err(e) => {
260 debug!(target: LOG_TARGET, "Error decoding proof: {:?}", e);
261 return ImportResult::BadResponse;
262 },
263 };
264 let (values, completed) = match self.client.verify_range_proof(
265 self.metadata.target_root(),
266 proof,
267 self.metadata.last_key.as_slice(),
268 ) {
269 Err(e) => {
270 debug!(
271 target: LOG_TARGET,
272 "StateResponse failed proof verification: {}",
273 e,
274 );
275 return ImportResult::BadResponse;
276 },
277 Ok(values) => values,
278 };
279 debug!(target: LOG_TARGET, "Imported with {} keys", values.len());
280
281 let complete = completed == 0;
282 if !complete && !values.update_last_key(completed, &mut self.metadata.last_key) {
283 debug!(target: LOG_TARGET, "Error updating key cursor, depth: {}", completed);
284 };
285
286 self.process_state_verified(values);
287 self.metadata.imported_bytes += proof_size;
288 complete
289 } else {
290 self.process_state_unverified(response)
291 };
292 if complete {
293 self.metadata.complete = true;
294 let target_hash = self.metadata.target_hash();
295 ImportResult::Import(
296 target_hash,
297 self.metadata.target_header.clone(),
298 ImportedState { block: target_hash, state: std::mem::take(&mut self.state).into() },
299 self.metadata.target_body.clone(),
300 self.metadata.target_justifications.clone(),
301 )
302 } else {
303 ImportResult::Continue
304 }
305 }
306
307 fn next_request(&self) -> StateRequest {
309 self.metadata.next_request()
310 }
311
312 fn is_complete(&self) -> bool {
314 self.metadata.complete
315 }
316
317 fn target_number(&self) -> NumberFor<B> {
319 self.metadata.target_number()
320 }
321
322 fn target_hash(&self) -> B::Hash {
324 self.metadata.target_hash()
325 }
326
327 fn progress(&self) -> StateSyncProgress {
329 self.metadata.progress()
330 }
331}