1use crate::{
23 block_relay_protocol::BlockDownloader,
24 block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25 service::network::NetworkServiceHandle,
26 strategy::{
27 chain_sync::{ChainSync, ChainSyncMode},
28 state::StateStrategy,
29 warp::{WarpSync, WarpSyncConfig},
30 StrategyKey, SyncingAction, SyncingStrategy,
31 },
32 types::SyncStatus,
33 LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use prometheus_endpoint::Registry;
37use sc_client_api::{BlockBackend, ProofProvider};
38use sc_consensus::{BlockImportError, BlockImportStatus};
39use sc_network::ProtocolName;
40use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
41use sc_network_types::PeerId;
42use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48 match sync_mode {
49 SyncMode::Full => ChainSyncMode::Full,
50 SyncMode::LightState { skip_proofs, storage_chain_mode } => {
51 ChainSyncMode::LightState { skip_proofs, storage_chain_mode }
52 },
53 SyncMode::Warp => ChainSyncMode::Full,
54 }
55}
56
57#[derive(Clone, Debug)]
59pub struct PolkadotSyncingStrategyConfig<Block>
60where
61 Block: BlockT,
62{
63 pub mode: SyncMode,
65 pub max_parallel_downloads: u32,
67 pub max_blocks_per_request: u32,
69 pub min_peers_to_start_warp_sync: Option<usize>,
71 pub metrics_registry: Option<Registry>,
73 pub state_request_protocol_name: ProtocolName,
75 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
77 pub archive_blocks: bool,
80}
81
82pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
84 config: PolkadotSyncingStrategyConfig<B>,
86 client: Arc<Client>,
88 warp: Option<WarpSync<B>>,
90 state: Option<StateStrategy<B>>,
92 chain_sync: Option<ChainSync<B, Client>>,
94 peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
97}
98
99impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
100where
101 B: BlockT,
102 Client: HeaderBackend<B>
103 + BlockBackend<B>
104 + HeaderMetadata<B, Error = sp_blockchain::Error>
105 + ProofProvider<B>
106 + Send
107 + Sync
108 + 'static,
109{
110 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
111 self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
112
113 self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
114 self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
115 self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
116 }
117
118 fn remove_peer(&mut self, peer_id: &PeerId) {
119 self.warp.as_mut().map(|s| s.remove_peer(peer_id));
120 self.state.as_mut().map(|s| s.remove_peer(peer_id));
121 self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
122
123 self.peer_best_blocks.remove(peer_id);
124 }
125
126 fn on_validated_block_announce(
127 &mut self,
128 is_best: bool,
129 peer_id: PeerId,
130 announce: &BlockAnnounce<B::Header>,
131 ) -> Option<(B::Hash, NumberFor<B>)> {
132 let new_best = if let Some(ref mut warp) = self.warp {
133 warp.on_validated_block_announce(is_best, peer_id, announce)
134 } else if let Some(ref mut state) = self.state {
135 state.on_validated_block_announce(is_best, peer_id, announce)
136 } else if let Some(ref mut chain_sync) = self.chain_sync {
137 chain_sync.on_validated_block_announce(is_best, peer_id, announce)
138 } else {
139 error!(target: LOG_TARGET, "No syncing strategy is active.");
140 debug_assert!(false);
141 Some((announce.header.hash(), *announce.header.number()))
142 };
143
144 if let Some(new_best) = new_best {
145 if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
146 *best = new_best;
147 } else {
148 debug!(
149 target: LOG_TARGET,
150 "Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
151 (already disconnected?)",
152 );
153 }
154 }
155
156 new_best
157 }
158
159 fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
160 if let Some(ref mut chain_sync) = self.chain_sync {
162 chain_sync.set_sync_fork_request(peers.clone(), hash, number);
163 }
164 }
165
166 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
167 if let Some(ref mut chain_sync) = self.chain_sync {
169 chain_sync.request_justification(hash, number);
170 }
171 }
172
173 fn clear_justification_requests(&mut self) {
174 if let Some(ref mut chain_sync) = self.chain_sync {
176 chain_sync.clear_justification_requests();
177 }
178 }
179
180 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
181 if let Some(ref mut chain_sync) = self.chain_sync {
183 chain_sync.on_justification_import(hash, number, success);
184 }
185 }
186
187 fn on_generic_response(
188 &mut self,
189 peer_id: &PeerId,
190 key: StrategyKey,
191 protocol_name: ProtocolName,
192 response: Box<dyn Any + Send>,
193 ) {
194 match key {
195 StateStrategy::<B>::STRATEGY_KEY => {
196 if let Some(state) = &mut self.state {
197 let Ok(response) = response.downcast::<Vec<u8>>() else {
198 warn!(target: LOG_TARGET, "Failed to downcast state response");
199 debug_assert!(false);
200 return;
201 };
202
203 state.on_state_response(peer_id, *response);
204 } else if let Some(chain_sync) = &mut self.chain_sync {
205 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
206 } else {
207 error!(
208 target: LOG_TARGET,
209 "`on_generic_response()` called with unexpected key {key:?} \
210 or corresponding strategy is not active.",
211 );
212 debug_assert!(false);
213 }
214 },
215 WarpSync::<B>::STRATEGY_KEY => {
216 if let Some(warp) = &mut self.warp {
217 warp.on_generic_response(peer_id, protocol_name, response);
218 } else {
219 error!(
220 target: LOG_TARGET,
221 "`on_generic_response()` called with unexpected key {key:?} \
222 or warp strategy is not active",
223 );
224 debug_assert!(false);
225 }
226 },
227 ChainSync::<B, Client>::STRATEGY_KEY => {
228 if let Some(chain_sync) = &mut self.chain_sync {
229 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
230 } else {
231 error!(
232 target: LOG_TARGET,
233 "`on_generic_response()` called with unexpected key {key:?} \
234 or corresponding strategy is not active.",
235 );
236 debug_assert!(false);
237 }
238 },
239 key => {
240 warn!(
241 target: LOG_TARGET,
242 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
243 );
244 debug_assert!(false);
245 },
246 }
247 }
248
249 fn on_blocks_processed(
250 &mut self,
251 imported: usize,
252 count: usize,
253 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
254 ) {
255 if let Some(ref mut state) = self.state {
257 state.on_blocks_processed(imported, count, results);
258 } else if let Some(ref mut chain_sync) = self.chain_sync {
259 chain_sync.on_blocks_processed(imported, count, results);
260 }
261 }
262
263 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
264 if let Some(ref mut chain_sync) = self.chain_sync {
266 chain_sync.on_block_finalized(hash, number);
267 }
268 }
269
270 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
271 if let Some(ref mut chain_sync) = self.chain_sync {
273 chain_sync.update_chain_info(best_hash, best_number);
274 }
275 }
276
277 fn is_major_syncing(&self) -> bool {
278 self.warp.is_some() ||
279 self.state.is_some() ||
280 match self.chain_sync {
281 Some(ref s) => s.status().state.is_major_syncing(),
282 None => unreachable!("At least one syncing strategy is active; qed"),
283 }
284 }
285
286 fn num_peers(&self) -> usize {
287 self.peer_best_blocks.len()
288 }
289
290 fn status(&self) -> SyncStatus<B> {
291 if let Some(ref warp) = self.warp {
294 warp.status()
295 } else if let Some(ref state) = self.state {
296 state.status()
297 } else if let Some(ref chain_sync) = self.chain_sync {
298 chain_sync.status()
299 } else {
300 unreachable!("At least one syncing strategy is always active; qed")
301 }
302 }
303
304 fn num_downloaded_blocks(&self) -> usize {
305 self.chain_sync
306 .as_ref()
307 .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
308 }
309
310 fn num_sync_requests(&self) -> usize {
311 self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
312 }
313
314 fn actions(
315 &mut self,
316 network_service: &NetworkServiceHandle,
317 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
318 let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
321 warp.actions(network_service).map(Into::into).collect()
322 } else if let Some(ref mut state) = self.state {
323 state.actions(network_service).map(Into::into).collect()
324 } else if let Some(ref mut chain_sync) = self.chain_sync {
325 chain_sync.actions(network_service)?
326 } else {
327 unreachable!("At least one syncing strategy is always active; qed")
328 };
329
330 if actions.iter().any(SyncingAction::is_finished) {
331 self.proceed_to_next()?;
332 }
333
334 Ok(actions)
335 }
336}
337
338impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
339where
340 B: BlockT,
341 Client: HeaderBackend<B>
342 + BlockBackend<B>
343 + HeaderMetadata<B, Error = sp_blockchain::Error>
344 + ProofProvider<B>
345 + Send
346 + Sync
347 + 'static,
348{
349 pub fn new(
351 mut config: PolkadotSyncingStrategyConfig<B>,
352 client: Arc<Client>,
353 warp_sync_config: Option<WarpSyncConfig<B>>,
354 warp_sync_protocol_name: Option<ProtocolName>,
355 ) -> Result<Self, ClientError> {
356 if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
357 info!(
358 target: LOG_TARGET,
359 "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
360 );
361 config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
362 }
363
364 if let SyncMode::Warp = config.mode {
365 let warp_sync_config = warp_sync_config
366 .expect("Warp sync configuration must be supplied in warp sync mode.");
367 let warp_sync = WarpSync::new(
368 client.clone(),
369 warp_sync_config,
370 warp_sync_protocol_name,
371 config.block_downloader.clone(),
372 config.min_peers_to_start_warp_sync,
373 );
374 Ok(Self {
375 config,
376 client,
377 warp: Some(warp_sync),
378 state: None,
379 chain_sync: None,
380 peer_best_blocks: Default::default(),
381 })
382 } else {
383 let chain_sync = ChainSync::new(
384 chain_sync_mode(config.mode),
385 client.clone(),
386 config.max_parallel_downloads,
387 config.max_blocks_per_request,
388 config.state_request_protocol_name.clone(),
389 config.block_downloader.clone(),
390 config.archive_blocks,
391 config.metrics_registry.as_ref(),
392 std::iter::empty(),
393 )?;
394 Ok(Self {
395 config,
396 client,
397 warp: None,
398 state: None,
399 chain_sync: Some(chain_sync),
400 peer_best_blocks: Default::default(),
401 })
402 }
403 }
404
405 pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
407 if let Some(ref mut warp) = self.warp {
409 match warp.take_result() {
410 Some(res) => {
411 info!(
412 target: LOG_TARGET,
413 "Warp sync is complete, continuing with state sync."
414 );
415 let state_sync = StateStrategy::new(
416 self.client.clone(),
417 res.target_header,
418 res.target_body,
419 res.target_justifications,
420 false,
421 self.peer_best_blocks
422 .iter()
423 .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
424 self.config.state_request_protocol_name.clone(),
425 );
426
427 self.warp = None;
428 self.state = Some(state_sync);
429 Ok(())
430 },
431 None => {
432 error!(
433 target: LOG_TARGET,
434 "Warp sync failed. Continuing with full sync."
435 );
436 let chain_sync = match ChainSync::new(
437 chain_sync_mode(self.config.mode),
438 self.client.clone(),
439 self.config.max_parallel_downloads,
440 self.config.max_blocks_per_request,
441 self.config.state_request_protocol_name.clone(),
442 self.config.block_downloader.clone(),
443 self.config.archive_blocks,
444 self.config.metrics_registry.as_ref(),
445 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
446 (*peer_id, *best_hash, *best_number)
447 }),
448 ) {
449 Ok(chain_sync) => chain_sync,
450 Err(e) => {
451 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
452 return Err(e);
453 },
454 };
455
456 self.warp = None;
457 self.chain_sync = Some(chain_sync);
458 Ok(())
459 },
460 }
461 } else if let Some(state) = &self.state {
462 if state.is_succeeded() {
463 info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
464 } else {
465 error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
466 }
467 let chain_sync = match ChainSync::new(
468 chain_sync_mode(self.config.mode),
469 self.client.clone(),
470 self.config.max_parallel_downloads,
471 self.config.max_blocks_per_request,
472 self.config.state_request_protocol_name.clone(),
473 self.config.block_downloader.clone(),
474 self.config.archive_blocks,
475 self.config.metrics_registry.as_ref(),
476 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
477 (*peer_id, *best_hash, *best_number)
478 }),
479 ) {
480 Ok(chain_sync) => chain_sync,
481 Err(e) => {
482 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
483 return Err(e);
484 },
485 };
486
487 self.state = None;
488 self.chain_sync = Some(chain_sync);
489 Ok(())
490 } else {
491 unreachable!("Only warp & state strategies can finish; qed")
492 }
493 }
494}