Skip to main content

sc_network_sync/strategy/
polkadot.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`]
20//! and specific syncing algorithms.
21
22use crate::{
23	block_relay_protocol::BlockDownloader,
24	block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25	service::network::NetworkServiceHandle,
26	strategy::{
27		chain_sync::{ChainSync, ChainSyncMode},
28		state::StateStrategy,
29		warp::{WarpSync, WarpSyncConfig},
30		StrategyKey, SyncingAction, SyncingStrategy,
31	},
32	types::SyncStatus,
33	LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use prometheus_endpoint::Registry;
37use sc_client_api::{BlockBackend, ProofProvider};
38use sc_consensus::{BlockImportError, BlockImportStatus};
39use sc_network::ProtocolName;
40use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
41use sc_network_types::PeerId;
42use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46/// Corresponding `ChainSync` mode.
47fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48	match sync_mode {
49		SyncMode::Full => ChainSyncMode::Full,
50		SyncMode::LightState { skip_proofs, storage_chain_mode } => {
51			ChainSyncMode::LightState { skip_proofs, storage_chain_mode }
52		},
53		SyncMode::Warp => ChainSyncMode::Full,
54	}
55}
56
57/// Syncing configuration containing data for [`PolkadotSyncingStrategy`].
58#[derive(Clone, Debug)]
59pub struct PolkadotSyncingStrategyConfig<Block>
60where
61	Block: BlockT,
62{
63	/// Syncing mode.
64	pub mode: SyncMode,
65	/// The number of parallel downloads to guard against slow peers.
66	pub max_parallel_downloads: u32,
67	/// Maximum number of blocks to request.
68	pub max_blocks_per_request: u32,
69	/// Number of peers that need to be connected before warp sync is started.
70	pub min_peers_to_start_warp_sync: Option<usize>,
71	/// Prometheus metrics registry.
72	pub metrics_registry: Option<Registry>,
73	/// Protocol name used to send out state requests
74	pub state_request_protocol_name: ProtocolName,
75	/// Block downloader
76	pub block_downloader: Arc<dyn BlockDownloader<Block>>,
77	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
78	/// block history.
79	pub archive_blocks: bool,
80}
81
82/// Proxy to specific syncing strategies used in Polkadot.
83pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
84	/// Initial syncing configuration.
85	config: PolkadotSyncingStrategyConfig<B>,
86	/// Client used by syncing strategies.
87	client: Arc<Client>,
88	/// Warp strategy.
89	warp: Option<WarpSync<B>>,
90	/// State strategy.
91	state: Option<StateStrategy<B>>,
92	/// `ChainSync` strategy.`
93	chain_sync: Option<ChainSync<B, Client>>,
94	/// Connected peers and their best blocks used to seed a new strategy when switching to it in
95	/// `PolkadotSyncingStrategy::proceed_to_next`.
96	peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
97}
98
99impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
100where
101	B: BlockT,
102	Client: HeaderBackend<B>
103		+ BlockBackend<B>
104		+ HeaderMetadata<B, Error = sp_blockchain::Error>
105		+ ProofProvider<B>
106		+ Send
107		+ Sync
108		+ 'static,
109{
110	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
111		self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
112
113		self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
114		self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
115		self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
116	}
117
118	fn remove_peer(&mut self, peer_id: &PeerId) {
119		self.warp.as_mut().map(|s| s.remove_peer(peer_id));
120		self.state.as_mut().map(|s| s.remove_peer(peer_id));
121		self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
122
123		self.peer_best_blocks.remove(peer_id);
124	}
125
126	fn on_validated_block_announce(
127		&mut self,
128		is_best: bool,
129		peer_id: PeerId,
130		announce: &BlockAnnounce<B::Header>,
131	) -> Option<(B::Hash, NumberFor<B>)> {
132		let new_best = if let Some(ref mut warp) = self.warp {
133			warp.on_validated_block_announce(is_best, peer_id, announce)
134		} else if let Some(ref mut state) = self.state {
135			state.on_validated_block_announce(is_best, peer_id, announce)
136		} else if let Some(ref mut chain_sync) = self.chain_sync {
137			chain_sync.on_validated_block_announce(is_best, peer_id, announce)
138		} else {
139			error!(target: LOG_TARGET, "No syncing strategy is active.");
140			debug_assert!(false);
141			Some((announce.header.hash(), *announce.header.number()))
142		};
143
144		if let Some(new_best) = new_best {
145			if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
146				*best = new_best;
147			} else {
148				debug!(
149					target: LOG_TARGET,
150					"Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
151					 (already disconnected?)",
152				);
153			}
154		}
155
156		new_best
157	}
158
159	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
160		// Fork requests are only handled by `ChainSync`.
161		if let Some(ref mut chain_sync) = self.chain_sync {
162			chain_sync.set_sync_fork_request(peers.clone(), hash, number);
163		}
164	}
165
166	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
167		// Justifications can only be requested via `ChainSync`.
168		if let Some(ref mut chain_sync) = self.chain_sync {
169			chain_sync.request_justification(hash, number);
170		}
171	}
172
173	fn clear_justification_requests(&mut self) {
174		// Justification requests can only be cleared by `ChainSync`.
175		if let Some(ref mut chain_sync) = self.chain_sync {
176			chain_sync.clear_justification_requests();
177		}
178	}
179
180	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
181		// Only `ChainSync` is interested in justification import.
182		if let Some(ref mut chain_sync) = self.chain_sync {
183			chain_sync.on_justification_import(hash, number, success);
184		}
185	}
186
187	fn on_generic_response(
188		&mut self,
189		peer_id: &PeerId,
190		key: StrategyKey,
191		protocol_name: ProtocolName,
192		response: Box<dyn Any + Send>,
193	) {
194		match key {
195			StateStrategy::<B>::STRATEGY_KEY => {
196				if let Some(state) = &mut self.state {
197					let Ok(response) = response.downcast::<Vec<u8>>() else {
198						warn!(target: LOG_TARGET, "Failed to downcast state response");
199						debug_assert!(false);
200						return;
201					};
202
203					state.on_state_response(peer_id, *response);
204				} else if let Some(chain_sync) = &mut self.chain_sync {
205					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
206				} else {
207					error!(
208						target: LOG_TARGET,
209						"`on_generic_response()` called with unexpected key {key:?} \
210						 or corresponding strategy is not active.",
211					);
212					debug_assert!(false);
213				}
214			},
215			WarpSync::<B>::STRATEGY_KEY => {
216				if let Some(warp) = &mut self.warp {
217					warp.on_generic_response(peer_id, protocol_name, response);
218				} else {
219					error!(
220						target: LOG_TARGET,
221						"`on_generic_response()` called with unexpected key {key:?} \
222						 or warp strategy is not active",
223					);
224					debug_assert!(false);
225				}
226			},
227			ChainSync::<B, Client>::STRATEGY_KEY => {
228				if let Some(chain_sync) = &mut self.chain_sync {
229					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
230				} else {
231					error!(
232						target: LOG_TARGET,
233						"`on_generic_response()` called with unexpected key {key:?} \
234						 or corresponding strategy is not active.",
235					);
236					debug_assert!(false);
237				}
238			},
239			key => {
240				warn!(
241					target: LOG_TARGET,
242					"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
243				);
244				debug_assert!(false);
245			},
246		}
247	}
248
249	fn on_blocks_processed(
250		&mut self,
251		imported: usize,
252		count: usize,
253		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
254	) {
255		// Only `StateStrategy` and `ChainSync` are interested in block processing notifications.
256		if let Some(ref mut state) = self.state {
257			state.on_blocks_processed(imported, count, results);
258		} else if let Some(ref mut chain_sync) = self.chain_sync {
259			chain_sync.on_blocks_processed(imported, count, results);
260		}
261	}
262
263	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
264		// Only `ChainSync` is interested in block finalization notifications.
265		if let Some(ref mut chain_sync) = self.chain_sync {
266			chain_sync.on_block_finalized(hash, number);
267		}
268	}
269
270	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
271		// This is relevant to `ChainSync` only.
272		if let Some(ref mut chain_sync) = self.chain_sync {
273			chain_sync.update_chain_info(best_hash, best_number);
274		}
275	}
276
277	fn is_major_syncing(&self) -> bool {
278		self.warp.is_some() ||
279			self.state.is_some() ||
280			match self.chain_sync {
281				Some(ref s) => s.status().state.is_major_syncing(),
282				None => unreachable!("At least one syncing strategy is active; qed"),
283			}
284	}
285
286	fn num_peers(&self) -> usize {
287		self.peer_best_blocks.len()
288	}
289
290	fn status(&self) -> SyncStatus<B> {
291		// This function presumes that strategies are executed serially and must be refactored
292		// once we have parallel strategies.
293		if let Some(ref warp) = self.warp {
294			warp.status()
295		} else if let Some(ref state) = self.state {
296			state.status()
297		} else if let Some(ref chain_sync) = self.chain_sync {
298			chain_sync.status()
299		} else {
300			unreachable!("At least one syncing strategy is always active; qed")
301		}
302	}
303
304	fn num_downloaded_blocks(&self) -> usize {
305		self.chain_sync
306			.as_ref()
307			.map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
308	}
309
310	fn num_sync_requests(&self) -> usize {
311		self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
312	}
313
314	fn actions(
315		&mut self,
316		network_service: &NetworkServiceHandle,
317	) -> Result<Vec<SyncingAction<B>>, ClientError> {
318		// This function presumes that strategies are executed serially and must be refactored once
319		// we have parallel strategies.
320		let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
321			warp.actions(network_service).map(Into::into).collect()
322		} else if let Some(ref mut state) = self.state {
323			state.actions(network_service).map(Into::into).collect()
324		} else if let Some(ref mut chain_sync) = self.chain_sync {
325			chain_sync.actions(network_service)?
326		} else {
327			unreachable!("At least one syncing strategy is always active; qed")
328		};
329
330		if actions.iter().any(SyncingAction::is_finished) {
331			self.proceed_to_next()?;
332		}
333
334		Ok(actions)
335	}
336}
337
338impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
339where
340	B: BlockT,
341	Client: HeaderBackend<B>
342		+ BlockBackend<B>
343		+ HeaderMetadata<B, Error = sp_blockchain::Error>
344		+ ProofProvider<B>
345		+ Send
346		+ Sync
347		+ 'static,
348{
349	/// Initialize a new syncing strategy.
350	pub fn new(
351		mut config: PolkadotSyncingStrategyConfig<B>,
352		client: Arc<Client>,
353		warp_sync_config: Option<WarpSyncConfig<B>>,
354		warp_sync_protocol_name: Option<ProtocolName>,
355	) -> Result<Self, ClientError> {
356		if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
357			info!(
358				target: LOG_TARGET,
359				"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
360			);
361			config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
362		}
363
364		if let SyncMode::Warp = config.mode {
365			let warp_sync_config = warp_sync_config
366				.expect("Warp sync configuration must be supplied in warp sync mode.");
367			let warp_sync = WarpSync::new(
368				client.clone(),
369				warp_sync_config,
370				warp_sync_protocol_name,
371				config.block_downloader.clone(),
372				config.min_peers_to_start_warp_sync,
373			);
374			Ok(Self {
375				config,
376				client,
377				warp: Some(warp_sync),
378				state: None,
379				chain_sync: None,
380				peer_best_blocks: Default::default(),
381			})
382		} else {
383			let chain_sync = ChainSync::new(
384				chain_sync_mode(config.mode),
385				client.clone(),
386				config.max_parallel_downloads,
387				config.max_blocks_per_request,
388				config.state_request_protocol_name.clone(),
389				config.block_downloader.clone(),
390				config.archive_blocks,
391				config.metrics_registry.as_ref(),
392				std::iter::empty(),
393			)?;
394			Ok(Self {
395				config,
396				client,
397				warp: None,
398				state: None,
399				chain_sync: Some(chain_sync),
400				peer_best_blocks: Default::default(),
401			})
402		}
403	}
404
405	/// Proceed with the next strategy if the active one finished.
406	pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
407		// The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`.
408		if let Some(ref mut warp) = self.warp {
409			match warp.take_result() {
410				Some(res) => {
411					info!(
412						target: LOG_TARGET,
413						"Warp sync is complete, continuing with state sync."
414					);
415					let state_sync = StateStrategy::new(
416						self.client.clone(),
417						res.target_header,
418						res.target_body,
419						res.target_justifications,
420						false,
421						self.peer_best_blocks
422							.iter()
423							.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
424						self.config.state_request_protocol_name.clone(),
425					);
426
427					self.warp = None;
428					self.state = Some(state_sync);
429					Ok(())
430				},
431				None => {
432					error!(
433						target: LOG_TARGET,
434						"Warp sync failed. Continuing with full sync."
435					);
436					let chain_sync = match ChainSync::new(
437						chain_sync_mode(self.config.mode),
438						self.client.clone(),
439						self.config.max_parallel_downloads,
440						self.config.max_blocks_per_request,
441						self.config.state_request_protocol_name.clone(),
442						self.config.block_downloader.clone(),
443						self.config.archive_blocks,
444						self.config.metrics_registry.as_ref(),
445						self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
446							(*peer_id, *best_hash, *best_number)
447						}),
448					) {
449						Ok(chain_sync) => chain_sync,
450						Err(e) => {
451							error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
452							return Err(e);
453						},
454					};
455
456					self.warp = None;
457					self.chain_sync = Some(chain_sync);
458					Ok(())
459				},
460			}
461		} else if let Some(state) = &self.state {
462			if state.is_succeeded() {
463				info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
464			} else {
465				error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
466			}
467			let chain_sync = match ChainSync::new(
468				chain_sync_mode(self.config.mode),
469				self.client.clone(),
470				self.config.max_parallel_downloads,
471				self.config.max_blocks_per_request,
472				self.config.state_request_protocol_name.clone(),
473				self.config.block_downloader.clone(),
474				self.config.archive_blocks,
475				self.config.metrics_registry.as_ref(),
476				self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
477					(*peer_id, *best_hash, *best_number)
478				}),
479			) {
480				Ok(chain_sync) => chain_sync,
481				Err(e) => {
482					error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
483					return Err(e);
484				},
485			};
486
487			self.state = None;
488			self.chain_sync = Some(chain_sync);
489			Ok(())
490		} else {
491			unreachable!("Only warp & state strategies can finish; qed")
492		}
493	}
494}