1use crate::{
23 block_relay_protocol::BlockDownloader,
24 block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25 service::network::NetworkServiceHandle,
26 strategy::{
27 chain_sync::{ChainSync, ChainSyncMode},
28 state::StateStrategy,
29 warp::{WarpSync, WarpSyncConfig},
30 StrategyKey, SyncingAction, SyncingStrategy,
31 },
32 types::SyncStatus,
33 LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use prometheus_endpoint::Registry;
37use sc_client_api::{BlockBackend, ProofProvider};
38use sc_consensus::{BlockImportError, BlockImportStatus};
39use sc_network::ProtocolName;
40use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
41use sc_network_types::PeerId;
42use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48 match sync_mode {
49 SyncMode::Full => ChainSyncMode::Full,
50 SyncMode::LightState { skip_proofs, storage_chain_mode } =>
51 ChainSyncMode::LightState { skip_proofs, storage_chain_mode },
52 SyncMode::Warp => ChainSyncMode::Full,
53 }
54}
55
56#[derive(Clone, Debug)]
58pub struct PolkadotSyncingStrategyConfig<Block>
59where
60 Block: BlockT,
61{
62 pub mode: SyncMode,
64 pub max_parallel_downloads: u32,
66 pub max_blocks_per_request: u32,
68 pub metrics_registry: Option<Registry>,
70 pub state_request_protocol_name: ProtocolName,
72 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
74}
75
76pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
78 config: PolkadotSyncingStrategyConfig<B>,
80 client: Arc<Client>,
82 warp: Option<WarpSync<B, Client>>,
84 state: Option<StateStrategy<B>>,
86 chain_sync: Option<ChainSync<B, Client>>,
88 peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
91}
92
93impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
94where
95 B: BlockT,
96 Client: HeaderBackend<B>
97 + BlockBackend<B>
98 + HeaderMetadata<B, Error = sp_blockchain::Error>
99 + ProofProvider<B>
100 + Send
101 + Sync
102 + 'static,
103{
104 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
105 self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
106
107 self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
108 self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
109 self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
110 }
111
112 fn remove_peer(&mut self, peer_id: &PeerId) {
113 self.warp.as_mut().map(|s| s.remove_peer(peer_id));
114 self.state.as_mut().map(|s| s.remove_peer(peer_id));
115 self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
116
117 self.peer_best_blocks.remove(peer_id);
118 }
119
120 fn on_validated_block_announce(
121 &mut self,
122 is_best: bool,
123 peer_id: PeerId,
124 announce: &BlockAnnounce<B::Header>,
125 ) -> Option<(B::Hash, NumberFor<B>)> {
126 let new_best = if let Some(ref mut warp) = self.warp {
127 warp.on_validated_block_announce(is_best, peer_id, announce)
128 } else if let Some(ref mut state) = self.state {
129 state.on_validated_block_announce(is_best, peer_id, announce)
130 } else if let Some(ref mut chain_sync) = self.chain_sync {
131 chain_sync.on_validated_block_announce(is_best, peer_id, announce)
132 } else {
133 error!(target: LOG_TARGET, "No syncing strategy is active.");
134 debug_assert!(false);
135 Some((announce.header.hash(), *announce.header.number()))
136 };
137
138 if let Some(new_best) = new_best {
139 if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
140 *best = new_best;
141 } else {
142 debug!(
143 target: LOG_TARGET,
144 "Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
145 (already disconnected?)",
146 );
147 }
148 }
149
150 new_best
151 }
152
153 fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
154 if let Some(ref mut chain_sync) = self.chain_sync {
156 chain_sync.set_sync_fork_request(peers.clone(), hash, number);
157 }
158 }
159
160 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
161 if let Some(ref mut chain_sync) = self.chain_sync {
163 chain_sync.request_justification(hash, number);
164 }
165 }
166
167 fn clear_justification_requests(&mut self) {
168 if let Some(ref mut chain_sync) = self.chain_sync {
170 chain_sync.clear_justification_requests();
171 }
172 }
173
174 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
175 if let Some(ref mut chain_sync) = self.chain_sync {
177 chain_sync.on_justification_import(hash, number, success);
178 }
179 }
180
181 fn on_generic_response(
182 &mut self,
183 peer_id: &PeerId,
184 key: StrategyKey,
185 protocol_name: ProtocolName,
186 response: Box<dyn Any + Send>,
187 ) {
188 match key {
189 StateStrategy::<B>::STRATEGY_KEY =>
190 if let Some(state) = &mut self.state {
191 let Ok(response) = response.downcast::<Vec<u8>>() else {
192 warn!(target: LOG_TARGET, "Failed to downcast state response");
193 debug_assert!(false);
194 return;
195 };
196
197 state.on_state_response(peer_id, *response);
198 } else if let Some(chain_sync) = &mut self.chain_sync {
199 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
200 } else {
201 error!(
202 target: LOG_TARGET,
203 "`on_generic_response()` called with unexpected key {key:?} \
204 or corresponding strategy is not active.",
205 );
206 debug_assert!(false);
207 },
208 WarpSync::<B, Client>::STRATEGY_KEY =>
209 if let Some(warp) = &mut self.warp {
210 warp.on_generic_response(peer_id, protocol_name, response);
211 } else {
212 error!(
213 target: LOG_TARGET,
214 "`on_generic_response()` called with unexpected key {key:?} \
215 or warp strategy is not active",
216 );
217 debug_assert!(false);
218 },
219 ChainSync::<B, Client>::STRATEGY_KEY =>
220 if let Some(chain_sync) = &mut self.chain_sync {
221 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
222 } else {
223 error!(
224 target: LOG_TARGET,
225 "`on_generic_response()` called with unexpected key {key:?} \
226 or corresponding strategy is not active.",
227 );
228 debug_assert!(false);
229 },
230 key => {
231 warn!(
232 target: LOG_TARGET,
233 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
234 );
235 debug_assert!(false);
236 },
237 }
238 }
239
240 fn on_blocks_processed(
241 &mut self,
242 imported: usize,
243 count: usize,
244 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
245 ) {
246 if let Some(ref mut state) = self.state {
248 state.on_blocks_processed(imported, count, results);
249 } else if let Some(ref mut chain_sync) = self.chain_sync {
250 chain_sync.on_blocks_processed(imported, count, results);
251 }
252 }
253
254 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
255 if let Some(ref mut chain_sync) = self.chain_sync {
257 chain_sync.on_block_finalized(hash, number);
258 }
259 }
260
261 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
262 if let Some(ref mut chain_sync) = self.chain_sync {
264 chain_sync.update_chain_info(best_hash, best_number);
265 }
266 }
267
268 fn is_major_syncing(&self) -> bool {
269 self.warp.is_some() ||
270 self.state.is_some() ||
271 match self.chain_sync {
272 Some(ref s) => s.status().state.is_major_syncing(),
273 None => unreachable!("At least one syncing strategy is active; qed"),
274 }
275 }
276
277 fn num_peers(&self) -> usize {
278 self.peer_best_blocks.len()
279 }
280
281 fn status(&self) -> SyncStatus<B> {
282 if let Some(ref warp) = self.warp {
285 warp.status()
286 } else if let Some(ref state) = self.state {
287 state.status()
288 } else if let Some(ref chain_sync) = self.chain_sync {
289 chain_sync.status()
290 } else {
291 unreachable!("At least one syncing strategy is always active; qed")
292 }
293 }
294
295 fn num_downloaded_blocks(&self) -> usize {
296 self.chain_sync
297 .as_ref()
298 .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
299 }
300
301 fn num_sync_requests(&self) -> usize {
302 self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
303 }
304
305 fn actions(
306 &mut self,
307 network_service: &NetworkServiceHandle,
308 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
309 let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
312 warp.actions(network_service).map(Into::into).collect()
313 } else if let Some(ref mut state) = self.state {
314 state.actions(network_service).map(Into::into).collect()
315 } else if let Some(ref mut chain_sync) = self.chain_sync {
316 chain_sync.actions(network_service)?
317 } else {
318 unreachable!("At least one syncing strategy is always active; qed")
319 };
320
321 if actions.iter().any(SyncingAction::is_finished) {
322 self.proceed_to_next()?;
323 }
324
325 Ok(actions)
326 }
327}
328
329impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
330where
331 B: BlockT,
332 Client: HeaderBackend<B>
333 + BlockBackend<B>
334 + HeaderMetadata<B, Error = sp_blockchain::Error>
335 + ProofProvider<B>
336 + Send
337 + Sync
338 + 'static,
339{
340 pub fn new(
342 mut config: PolkadotSyncingStrategyConfig<B>,
343 client: Arc<Client>,
344 warp_sync_config: Option<WarpSyncConfig<B>>,
345 warp_sync_protocol_name: Option<ProtocolName>,
346 ) -> Result<Self, ClientError> {
347 if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
348 info!(
349 target: LOG_TARGET,
350 "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
351 );
352 config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
353 }
354
355 if let SyncMode::Warp = config.mode {
356 let warp_sync_config = warp_sync_config
357 .expect("Warp sync configuration must be supplied in warp sync mode.");
358 let warp_sync = WarpSync::new(
359 client.clone(),
360 warp_sync_config,
361 warp_sync_protocol_name,
362 config.block_downloader.clone(),
363 );
364 Ok(Self {
365 config,
366 client,
367 warp: Some(warp_sync),
368 state: None,
369 chain_sync: None,
370 peer_best_blocks: Default::default(),
371 })
372 } else {
373 let chain_sync = ChainSync::new(
374 chain_sync_mode(config.mode),
375 client.clone(),
376 config.max_parallel_downloads,
377 config.max_blocks_per_request,
378 config.state_request_protocol_name.clone(),
379 config.block_downloader.clone(),
380 config.metrics_registry.as_ref(),
381 std::iter::empty(),
382 )?;
383 Ok(Self {
384 config,
385 client,
386 warp: None,
387 state: None,
388 chain_sync: Some(chain_sync),
389 peer_best_blocks: Default::default(),
390 })
391 }
392 }
393
394 pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
396 if let Some(ref mut warp) = self.warp {
398 match warp.take_result() {
399 Some(res) => {
400 info!(
401 target: LOG_TARGET,
402 "Warp sync is complete, continuing with state sync."
403 );
404 let state_sync = StateStrategy::new(
405 self.client.clone(),
406 res.target_header,
407 res.target_body,
408 res.target_justifications,
409 false,
410 self.peer_best_blocks
411 .iter()
412 .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
413 self.config.state_request_protocol_name.clone(),
414 );
415
416 self.warp = None;
417 self.state = Some(state_sync);
418 Ok(())
419 },
420 None => {
421 error!(
422 target: LOG_TARGET,
423 "Warp sync failed. Continuing with full sync."
424 );
425 let chain_sync = match ChainSync::new(
426 chain_sync_mode(self.config.mode),
427 self.client.clone(),
428 self.config.max_parallel_downloads,
429 self.config.max_blocks_per_request,
430 self.config.state_request_protocol_name.clone(),
431 self.config.block_downloader.clone(),
432 self.config.metrics_registry.as_ref(),
433 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
434 (*peer_id, *best_hash, *best_number)
435 }),
436 ) {
437 Ok(chain_sync) => chain_sync,
438 Err(e) => {
439 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
440 return Err(e)
441 },
442 };
443
444 self.warp = None;
445 self.chain_sync = Some(chain_sync);
446 Ok(())
447 },
448 }
449 } else if let Some(state) = &self.state {
450 if state.is_succeeded() {
451 info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
452 } else {
453 error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
454 }
455 let chain_sync = match ChainSync::new(
456 chain_sync_mode(self.config.mode),
457 self.client.clone(),
458 self.config.max_parallel_downloads,
459 self.config.max_blocks_per_request,
460 self.config.state_request_protocol_name.clone(),
461 self.config.block_downloader.clone(),
462 self.config.metrics_registry.as_ref(),
463 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
464 (*peer_id, *best_hash, *best_number)
465 }),
466 ) {
467 Ok(chain_sync) => chain_sync,
468 Err(e) => {
469 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
470 return Err(e);
471 },
472 };
473
474 self.state = None;
475 self.chain_sync = Some(chain_sync);
476 Ok(())
477 } else {
478 unreachable!("Only warp & state strategies can finish; qed")
479 }
480 }
481}