sc_network_sync/strategy/
polkadot.rs

1// This file is part of Substrate.
2
3// Copyright (C) Parity Technologies (UK) Ltd.
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`]
20//! and specific syncing algorithms.
21
22use crate::{
23	block_relay_protocol::BlockDownloader,
24	block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25	service::network::NetworkServiceHandle,
26	strategy::{
27		chain_sync::{ChainSync, ChainSyncMode},
28		state::StateStrategy,
29		warp::{WarpSync, WarpSyncConfig},
30		StrategyKey, SyncingAction, SyncingStrategy,
31	},
32	types::SyncStatus,
33	LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use prometheus_endpoint::Registry;
37use sc_client_api::{BlockBackend, ProofProvider};
38use sc_consensus::{BlockImportError, BlockImportStatus};
39use sc_network::ProtocolName;
40use sc_network_common::sync::{message::BlockAnnounce, SyncMode};
41use sc_network_types::PeerId;
42use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
43use sp_runtime::traits::{Block as BlockT, Header, NumberFor};
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46/// Corresponding `ChainSync` mode.
47fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48	match sync_mode {
49		SyncMode::Full => ChainSyncMode::Full,
50		SyncMode::LightState { skip_proofs, storage_chain_mode } =>
51			ChainSyncMode::LightState { skip_proofs, storage_chain_mode },
52		SyncMode::Warp => ChainSyncMode::Full,
53	}
54}
55
56/// Syncing configuration containing data for [`PolkadotSyncingStrategy`].
57#[derive(Clone, Debug)]
58pub struct PolkadotSyncingStrategyConfig<Block>
59where
60	Block: BlockT,
61{
62	/// Syncing mode.
63	pub mode: SyncMode,
64	/// The number of parallel downloads to guard against slow peers.
65	pub max_parallel_downloads: u32,
66	/// Maximum number of blocks to request.
67	pub max_blocks_per_request: u32,
68	/// Prometheus metrics registry.
69	pub metrics_registry: Option<Registry>,
70	/// Protocol name used to send out state requests
71	pub state_request_protocol_name: ProtocolName,
72	/// Block downloader
73	pub block_downloader: Arc<dyn BlockDownloader<Block>>,
74}
75
76/// Proxy to specific syncing strategies used in Polkadot.
77pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
78	/// Initial syncing configuration.
79	config: PolkadotSyncingStrategyConfig<B>,
80	/// Client used by syncing strategies.
81	client: Arc<Client>,
82	/// Warp strategy.
83	warp: Option<WarpSync<B, Client>>,
84	/// State strategy.
85	state: Option<StateStrategy<B>>,
86	/// `ChainSync` strategy.`
87	chain_sync: Option<ChainSync<B, Client>>,
88	/// Connected peers and their best blocks used to seed a new strategy when switching to it in
89	/// `PolkadotSyncingStrategy::proceed_to_next`.
90	peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
91}
92
93impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
94where
95	B: BlockT,
96	Client: HeaderBackend<B>
97		+ BlockBackend<B>
98		+ HeaderMetadata<B, Error = sp_blockchain::Error>
99		+ ProofProvider<B>
100		+ Send
101		+ Sync
102		+ 'static,
103{
104	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
105		self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
106
107		self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
108		self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
109		self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
110	}
111
112	fn remove_peer(&mut self, peer_id: &PeerId) {
113		self.warp.as_mut().map(|s| s.remove_peer(peer_id));
114		self.state.as_mut().map(|s| s.remove_peer(peer_id));
115		self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
116
117		self.peer_best_blocks.remove(peer_id);
118	}
119
120	fn on_validated_block_announce(
121		&mut self,
122		is_best: bool,
123		peer_id: PeerId,
124		announce: &BlockAnnounce<B::Header>,
125	) -> Option<(B::Hash, NumberFor<B>)> {
126		let new_best = if let Some(ref mut warp) = self.warp {
127			warp.on_validated_block_announce(is_best, peer_id, announce)
128		} else if let Some(ref mut state) = self.state {
129			state.on_validated_block_announce(is_best, peer_id, announce)
130		} else if let Some(ref mut chain_sync) = self.chain_sync {
131			chain_sync.on_validated_block_announce(is_best, peer_id, announce)
132		} else {
133			error!(target: LOG_TARGET, "No syncing strategy is active.");
134			debug_assert!(false);
135			Some((announce.header.hash(), *announce.header.number()))
136		};
137
138		if let Some(new_best) = new_best {
139			if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
140				*best = new_best;
141			} else {
142				debug!(
143					target: LOG_TARGET,
144					"Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
145					 (already disconnected?)",
146				);
147			}
148		}
149
150		new_best
151	}
152
153	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
154		// Fork requests are only handled by `ChainSync`.
155		if let Some(ref mut chain_sync) = self.chain_sync {
156			chain_sync.set_sync_fork_request(peers.clone(), hash, number);
157		}
158	}
159
160	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
161		// Justifications can only be requested via `ChainSync`.
162		if let Some(ref mut chain_sync) = self.chain_sync {
163			chain_sync.request_justification(hash, number);
164		}
165	}
166
167	fn clear_justification_requests(&mut self) {
168		// Justification requests can only be cleared by `ChainSync`.
169		if let Some(ref mut chain_sync) = self.chain_sync {
170			chain_sync.clear_justification_requests();
171		}
172	}
173
174	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
175		// Only `ChainSync` is interested in justification import.
176		if let Some(ref mut chain_sync) = self.chain_sync {
177			chain_sync.on_justification_import(hash, number, success);
178		}
179	}
180
181	fn on_generic_response(
182		&mut self,
183		peer_id: &PeerId,
184		key: StrategyKey,
185		protocol_name: ProtocolName,
186		response: Box<dyn Any + Send>,
187	) {
188		match key {
189			StateStrategy::<B>::STRATEGY_KEY =>
190				if let Some(state) = &mut self.state {
191					let Ok(response) = response.downcast::<Vec<u8>>() else {
192						warn!(target: LOG_TARGET, "Failed to downcast state response");
193						debug_assert!(false);
194						return;
195					};
196
197					state.on_state_response(peer_id, *response);
198				} else if let Some(chain_sync) = &mut self.chain_sync {
199					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
200				} else {
201					error!(
202						target: LOG_TARGET,
203						"`on_generic_response()` called with unexpected key {key:?} \
204						 or corresponding strategy is not active.",
205					);
206					debug_assert!(false);
207				},
208			WarpSync::<B, Client>::STRATEGY_KEY =>
209				if let Some(warp) = &mut self.warp {
210					warp.on_generic_response(peer_id, protocol_name, response);
211				} else {
212					error!(
213						target: LOG_TARGET,
214						"`on_generic_response()` called with unexpected key {key:?} \
215						 or warp strategy is not active",
216					);
217					debug_assert!(false);
218				},
219			ChainSync::<B, Client>::STRATEGY_KEY =>
220				if let Some(chain_sync) = &mut self.chain_sync {
221					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
222				} else {
223					error!(
224						target: LOG_TARGET,
225						"`on_generic_response()` called with unexpected key {key:?} \
226						 or corresponding strategy is not active.",
227					);
228					debug_assert!(false);
229				},
230			key => {
231				warn!(
232					target: LOG_TARGET,
233					"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
234				);
235				debug_assert!(false);
236			},
237		}
238	}
239
240	fn on_blocks_processed(
241		&mut self,
242		imported: usize,
243		count: usize,
244		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
245	) {
246		// Only `StateStrategy` and `ChainSync` are interested in block processing notifications.
247		if let Some(ref mut state) = self.state {
248			state.on_blocks_processed(imported, count, results);
249		} else if let Some(ref mut chain_sync) = self.chain_sync {
250			chain_sync.on_blocks_processed(imported, count, results);
251		}
252	}
253
254	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
255		// Only `ChainSync` is interested in block finalization notifications.
256		if let Some(ref mut chain_sync) = self.chain_sync {
257			chain_sync.on_block_finalized(hash, number);
258		}
259	}
260
261	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
262		// This is relevant to `ChainSync` only.
263		if let Some(ref mut chain_sync) = self.chain_sync {
264			chain_sync.update_chain_info(best_hash, best_number);
265		}
266	}
267
268	fn is_major_syncing(&self) -> bool {
269		self.warp.is_some() ||
270			self.state.is_some() ||
271			match self.chain_sync {
272				Some(ref s) => s.status().state.is_major_syncing(),
273				None => unreachable!("At least one syncing strategy is active; qed"),
274			}
275	}
276
277	fn num_peers(&self) -> usize {
278		self.peer_best_blocks.len()
279	}
280
281	fn status(&self) -> SyncStatus<B> {
282		// This function presumes that strategies are executed serially and must be refactored
283		// once we have parallel strategies.
284		if let Some(ref warp) = self.warp {
285			warp.status()
286		} else if let Some(ref state) = self.state {
287			state.status()
288		} else if let Some(ref chain_sync) = self.chain_sync {
289			chain_sync.status()
290		} else {
291			unreachable!("At least one syncing strategy is always active; qed")
292		}
293	}
294
295	fn num_downloaded_blocks(&self) -> usize {
296		self.chain_sync
297			.as_ref()
298			.map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
299	}
300
301	fn num_sync_requests(&self) -> usize {
302		self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
303	}
304
305	fn actions(
306		&mut self,
307		network_service: &NetworkServiceHandle,
308	) -> Result<Vec<SyncingAction<B>>, ClientError> {
309		// This function presumes that strategies are executed serially and must be refactored once
310		// we have parallel strategies.
311		let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
312			warp.actions(network_service).map(Into::into).collect()
313		} else if let Some(ref mut state) = self.state {
314			state.actions(network_service).map(Into::into).collect()
315		} else if let Some(ref mut chain_sync) = self.chain_sync {
316			chain_sync.actions(network_service)?
317		} else {
318			unreachable!("At least one syncing strategy is always active; qed")
319		};
320
321		if actions.iter().any(SyncingAction::is_finished) {
322			self.proceed_to_next()?;
323		}
324
325		Ok(actions)
326	}
327}
328
329impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
330where
331	B: BlockT,
332	Client: HeaderBackend<B>
333		+ BlockBackend<B>
334		+ HeaderMetadata<B, Error = sp_blockchain::Error>
335		+ ProofProvider<B>
336		+ Send
337		+ Sync
338		+ 'static,
339{
340	/// Initialize a new syncing strategy.
341	pub fn new(
342		mut config: PolkadotSyncingStrategyConfig<B>,
343		client: Arc<Client>,
344		warp_sync_config: Option<WarpSyncConfig<B>>,
345		warp_sync_protocol_name: Option<ProtocolName>,
346	) -> Result<Self, ClientError> {
347		if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
348			info!(
349				target: LOG_TARGET,
350				"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
351			);
352			config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
353		}
354
355		if let SyncMode::Warp = config.mode {
356			let warp_sync_config = warp_sync_config
357				.expect("Warp sync configuration must be supplied in warp sync mode.");
358			let warp_sync = WarpSync::new(
359				client.clone(),
360				warp_sync_config,
361				warp_sync_protocol_name,
362				config.block_downloader.clone(),
363			);
364			Ok(Self {
365				config,
366				client,
367				warp: Some(warp_sync),
368				state: None,
369				chain_sync: None,
370				peer_best_blocks: Default::default(),
371			})
372		} else {
373			let chain_sync = ChainSync::new(
374				chain_sync_mode(config.mode),
375				client.clone(),
376				config.max_parallel_downloads,
377				config.max_blocks_per_request,
378				config.state_request_protocol_name.clone(),
379				config.block_downloader.clone(),
380				config.metrics_registry.as_ref(),
381				std::iter::empty(),
382			)?;
383			Ok(Self {
384				config,
385				client,
386				warp: None,
387				state: None,
388				chain_sync: Some(chain_sync),
389				peer_best_blocks: Default::default(),
390			})
391		}
392	}
393
394	/// Proceed with the next strategy if the active one finished.
395	pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
396		// The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`.
397		if let Some(ref mut warp) = self.warp {
398			match warp.take_result() {
399				Some(res) => {
400					info!(
401						target: LOG_TARGET,
402						"Warp sync is complete, continuing with state sync."
403					);
404					let state_sync = StateStrategy::new(
405						self.client.clone(),
406						res.target_header,
407						res.target_body,
408						res.target_justifications,
409						false,
410						self.peer_best_blocks
411							.iter()
412							.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
413						self.config.state_request_protocol_name.clone(),
414					);
415
416					self.warp = None;
417					self.state = Some(state_sync);
418					Ok(())
419				},
420				None => {
421					error!(
422						target: LOG_TARGET,
423						"Warp sync failed. Continuing with full sync."
424					);
425					let chain_sync = match ChainSync::new(
426						chain_sync_mode(self.config.mode),
427						self.client.clone(),
428						self.config.max_parallel_downloads,
429						self.config.max_blocks_per_request,
430						self.config.state_request_protocol_name.clone(),
431						self.config.block_downloader.clone(),
432						self.config.metrics_registry.as_ref(),
433						self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
434							(*peer_id, *best_hash, *best_number)
435						}),
436					) {
437						Ok(chain_sync) => chain_sync,
438						Err(e) => {
439							error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
440							return Err(e)
441						},
442					};
443
444					self.warp = None;
445					self.chain_sync = Some(chain_sync);
446					Ok(())
447				},
448			}
449		} else if let Some(state) = &self.state {
450			if state.is_succeeded() {
451				info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
452			} else {
453				error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
454			}
455			let chain_sync = match ChainSync::new(
456				chain_sync_mode(self.config.mode),
457				self.client.clone(),
458				self.config.max_parallel_downloads,
459				self.config.max_blocks_per_request,
460				self.config.state_request_protocol_name.clone(),
461				self.config.block_downloader.clone(),
462				self.config.metrics_registry.as_ref(),
463				self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
464					(*peer_id, *best_hash, *best_number)
465				}),
466			) {
467				Ok(chain_sync) => chain_sync,
468				Err(e) => {
469					error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
470					return Err(e);
471				},
472			};
473
474			self.state = None;
475			self.chain_sync = Some(chain_sync);
476			Ok(())
477		} else {
478			unreachable!("Only warp & state strategies can finish; qed")
479		}
480	}
481}