Skip to main content

soil_network/sync/strategy/
polkadot.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! [`PolkadotSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`]
8//! and specific syncing algorithms.
9
10use crate::{
11	sync::{
12		block_relay_protocol::BlockDownloader,
13		block_request_handler::MAX_BLOCKS_IN_RESPONSE,
14		service::network::NetworkServiceHandle,
15		strategy::{
16			chain_sync::{ChainSync, ChainSyncMode},
17			state::StateStrategy,
18			warp::{WarpSync, WarpSyncConfig},
19			StrategyKey, SyncingAction, SyncingStrategy,
20		},
21		types::SyncStatus,
22	},
23	LOG_TARGET,
24};
25use log::{debug, error, info, warn};
26use soil_prometheus::Registry;
27use soil_client::blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
28use soil_client::client_api::{BlockBackend, ProofProvider};
29use soil_client::import::{BlockImportError, BlockImportStatus};
30use soil_network::common::sync::{message::BlockAnnounce, SyncMode};
31use soil_network::types::PeerId;
32use soil_network::ProtocolName;
33use std::{any::Any, collections::HashMap, sync::Arc};
34use subsoil::runtime::traits::{Block as BlockT, Header, NumberFor};
35
36/// Corresponding `ChainSync` mode.
37fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
38	match sync_mode {
39		SyncMode::Full => ChainSyncMode::Full,
40		SyncMode::LightState { skip_proofs, storage_chain_mode } => {
41			ChainSyncMode::LightState { skip_proofs, storage_chain_mode }
42		},
43		SyncMode::Warp => ChainSyncMode::Full,
44	}
45}
46
47/// Syncing configuration containing data for [`PolkadotSyncingStrategy`].
48#[derive(Clone, Debug)]
49pub struct PolkadotSyncingStrategyConfig<Block>
50where
51	Block: BlockT,
52{
53	/// Syncing mode.
54	pub mode: SyncMode,
55	/// The number of parallel downloads to guard against slow peers.
56	pub max_parallel_downloads: u32,
57	/// Maximum number of blocks to request.
58	pub max_blocks_per_request: u32,
59	/// Number of peers that need to be connected before warp sync is started.
60	pub min_peers_to_start_warp_sync: Option<usize>,
61	/// Prometheus metrics registry.
62	pub metrics_registry: Option<Registry>,
63	/// Protocol name used to send out state requests
64	pub state_request_protocol_name: ProtocolName,
65	/// Block downloader
66	pub block_downloader: Arc<dyn BlockDownloader<Block>>,
67	/// Whether to archive blocks. When `true`, gap sync requests bodies to maintain complete
68	/// block history.
69	pub archive_blocks: bool,
70}
71
72/// Proxy to specific syncing strategies used in Polkadot.
73pub struct PolkadotSyncingStrategy<B: BlockT, Client> {
74	/// Initial syncing configuration.
75	config: PolkadotSyncingStrategyConfig<B>,
76	/// Client used by syncing strategies.
77	client: Arc<Client>,
78	/// Warp strategy.
79	warp: Option<WarpSync<B>>,
80	/// State strategy.
81	state: Option<StateStrategy<B>>,
82	/// `ChainSync` strategy.`
83	chain_sync: Option<ChainSync<B, Client>>,
84	/// Connected peers and their best blocks used to seed a new strategy when switching to it in
85	/// `PolkadotSyncingStrategy::proceed_to_next`.
86	peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
87}
88
89impl<B: BlockT, Client> SyncingStrategy<B> for PolkadotSyncingStrategy<B, Client>
90where
91	B: BlockT,
92	Client: HeaderBackend<B>
93		+ BlockBackend<B>
94		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
95		+ ProofProvider<B>
96		+ Send
97		+ Sync
98		+ 'static,
99{
100	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
101		self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
102
103		self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
104		self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
105		self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
106	}
107
108	fn remove_peer(&mut self, peer_id: &PeerId) {
109		self.warp.as_mut().map(|s| s.remove_peer(peer_id));
110		self.state.as_mut().map(|s| s.remove_peer(peer_id));
111		self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
112
113		self.peer_best_blocks.remove(peer_id);
114	}
115
116	fn on_validated_block_announce(
117		&mut self,
118		is_best: bool,
119		peer_id: PeerId,
120		announce: &BlockAnnounce<B::Header>,
121	) -> Option<(B::Hash, NumberFor<B>)> {
122		let new_best = if let Some(ref mut warp) = self.warp {
123			warp.on_validated_block_announce(is_best, peer_id, announce)
124		} else if let Some(ref mut state) = self.state {
125			state.on_validated_block_announce(is_best, peer_id, announce)
126		} else if let Some(ref mut chain_sync) = self.chain_sync {
127			chain_sync.on_validated_block_announce(is_best, peer_id, announce)
128		} else {
129			error!(target: LOG_TARGET, "No syncing strategy is active.");
130			debug_assert!(false);
131			Some((announce.header.hash(), *announce.header.number()))
132		};
133
134		if let Some(new_best) = new_best {
135			if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
136				*best = new_best;
137			} else {
138				debug!(
139					target: LOG_TARGET,
140					"Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
141					 (already disconnected?)",
142				);
143			}
144		}
145
146		new_best
147	}
148
149	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
150		// Fork requests are only handled by `ChainSync`.
151		if let Some(ref mut chain_sync) = self.chain_sync {
152			chain_sync.set_sync_fork_request(peers.clone(), hash, number);
153		}
154	}
155
156	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
157		// Justifications can only be requested via `ChainSync`.
158		if let Some(ref mut chain_sync) = self.chain_sync {
159			chain_sync.request_justification(hash, number);
160		}
161	}
162
163	fn clear_justification_requests(&mut self) {
164		// Justification requests can only be cleared by `ChainSync`.
165		if let Some(ref mut chain_sync) = self.chain_sync {
166			chain_sync.clear_justification_requests();
167		}
168	}
169
170	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
171		// Only `ChainSync` is interested in justification import.
172		if let Some(ref mut chain_sync) = self.chain_sync {
173			chain_sync.on_justification_import(hash, number, success);
174		}
175	}
176
177	fn on_generic_response(
178		&mut self,
179		peer_id: &PeerId,
180		key: StrategyKey,
181		protocol_name: ProtocolName,
182		response: Box<dyn Any + Send>,
183	) {
184		match key {
185			StateStrategy::<B>::STRATEGY_KEY => {
186				if let Some(state) = &mut self.state {
187					let Ok(response) = response.downcast::<Vec<u8>>() else {
188						warn!(target: LOG_TARGET, "Failed to downcast state response");
189						debug_assert!(false);
190						return;
191					};
192
193					state.on_state_response(peer_id, *response);
194				} else if let Some(chain_sync) = &mut self.chain_sync {
195					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
196				} else {
197					error!(
198						target: LOG_TARGET,
199						"`on_generic_response()` called with unexpected key {key:?} \
200						 or corresponding strategy is not active.",
201					);
202					debug_assert!(false);
203				}
204			},
205			WarpSync::<B>::STRATEGY_KEY => {
206				if let Some(warp) = &mut self.warp {
207					warp.on_generic_response(peer_id, protocol_name, response);
208				} else {
209					error!(
210						target: LOG_TARGET,
211						"`on_generic_response()` called with unexpected key {key:?} \
212						 or warp strategy is not active",
213					);
214					debug_assert!(false);
215				}
216			},
217			ChainSync::<B, Client>::STRATEGY_KEY => {
218				if let Some(chain_sync) = &mut self.chain_sync {
219					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
220				} else {
221					error!(
222						target: LOG_TARGET,
223						"`on_generic_response()` called with unexpected key {key:?} \
224						 or corresponding strategy is not active.",
225					);
226					debug_assert!(false);
227				}
228			},
229			key => {
230				warn!(
231					target: LOG_TARGET,
232					"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
233				);
234				debug_assert!(false);
235			},
236		}
237	}
238
239	fn on_blocks_processed(
240		&mut self,
241		imported: usize,
242		count: usize,
243		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
244	) {
245		// Only `StateStrategy` and `ChainSync` are interested in block processing notifications.
246		if let Some(ref mut state) = self.state {
247			state.on_blocks_processed(imported, count, results);
248		} else if let Some(ref mut chain_sync) = self.chain_sync {
249			chain_sync.on_blocks_processed(imported, count, results);
250		}
251	}
252
253	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
254		// Only `ChainSync` is interested in block finalization notifications.
255		if let Some(ref mut chain_sync) = self.chain_sync {
256			chain_sync.on_block_finalized(hash, number);
257		}
258	}
259
260	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
261		// This is relevant to `ChainSync` only.
262		if let Some(ref mut chain_sync) = self.chain_sync {
263			chain_sync.update_chain_info(best_hash, best_number);
264		}
265	}
266
267	fn is_major_syncing(&self) -> bool {
268		self.warp.is_some()
269			|| self.state.is_some()
270			|| match self.chain_sync {
271				Some(ref s) => s.status().state.is_major_syncing(),
272				None => unreachable!("At least one syncing strategy is active; qed"),
273			}
274	}
275
276	fn num_peers(&self) -> usize {
277		self.peer_best_blocks.len()
278	}
279
280	fn status(&self) -> SyncStatus<B> {
281		// This function presumes that strategies are executed serially and must be refactored
282		// once we have parallel strategies.
283		if let Some(ref warp) = self.warp {
284			warp.status()
285		} else if let Some(ref state) = self.state {
286			state.status()
287		} else if let Some(ref chain_sync) = self.chain_sync {
288			chain_sync.status()
289		} else {
290			unreachable!("At least one syncing strategy is always active; qed")
291		}
292	}
293
294	fn num_downloaded_blocks(&self) -> usize {
295		self.chain_sync
296			.as_ref()
297			.map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
298	}
299
300	fn num_sync_requests(&self) -> usize {
301		self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
302	}
303
304	fn actions(
305		&mut self,
306		network_service: &NetworkServiceHandle,
307	) -> Result<Vec<SyncingAction<B>>, ClientError> {
308		// This function presumes that strategies are executed serially and must be refactored once
309		// we have parallel strategies.
310		let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
311			warp.actions(network_service).map(Into::into).collect()
312		} else if let Some(ref mut state) = self.state {
313			state.actions(network_service).map(Into::into).collect()
314		} else if let Some(ref mut chain_sync) = self.chain_sync {
315			chain_sync.actions(network_service)?
316		} else {
317			unreachable!("At least one syncing strategy is always active; qed")
318		};
319
320		if actions.iter().any(SyncingAction::is_finished) {
321			self.proceed_to_next()?;
322		}
323
324		Ok(actions)
325	}
326}
327
328impl<B: BlockT, Client> PolkadotSyncingStrategy<B, Client>
329where
330	B: BlockT,
331	Client: HeaderBackend<B>
332		+ BlockBackend<B>
333		+ HeaderMetadata<B, Error = soil_client::blockchain::Error>
334		+ ProofProvider<B>
335		+ Send
336		+ Sync
337		+ 'static,
338{
339	/// Initialize a new syncing strategy.
340	pub fn new(
341		mut config: PolkadotSyncingStrategyConfig<B>,
342		client: Arc<Client>,
343		warp_sync_config: Option<WarpSyncConfig<B>>,
344		warp_sync_protocol_name: Option<ProtocolName>,
345	) -> Result<Self, ClientError> {
346		if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
347			info!(
348				target: LOG_TARGET,
349				"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
350			);
351			config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
352		}
353
354		if let SyncMode::Warp = config.mode {
355			let warp_sync_config = warp_sync_config
356				.expect("Warp sync configuration must be supplied in warp sync mode.");
357			let warp_sync = WarpSync::new(
358				client.clone(),
359				warp_sync_config,
360				warp_sync_protocol_name,
361				config.block_downloader.clone(),
362				config.min_peers_to_start_warp_sync,
363			);
364			Ok(Self {
365				config,
366				client,
367				warp: Some(warp_sync),
368				state: None,
369				chain_sync: None,
370				peer_best_blocks: Default::default(),
371			})
372		} else {
373			let chain_sync = ChainSync::new(
374				chain_sync_mode(config.mode),
375				client.clone(),
376				config.max_parallel_downloads,
377				config.max_blocks_per_request,
378				config.state_request_protocol_name.clone(),
379				config.block_downloader.clone(),
380				config.archive_blocks,
381				config.metrics_registry.as_ref(),
382				std::iter::empty(),
383			)?;
384			Ok(Self {
385				config,
386				client,
387				warp: None,
388				state: None,
389				chain_sync: Some(chain_sync),
390				peer_best_blocks: Default::default(),
391			})
392		}
393	}
394
395	/// Proceed with the next strategy if the active one finished.
396	pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
397		// The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`.
398		if let Some(ref mut warp) = self.warp {
399			match warp.take_result() {
400				Some(res) => {
401					info!(
402						target: LOG_TARGET,
403						"Warp sync is complete, continuing with state sync."
404					);
405					let state_sync = StateStrategy::new(
406						self.client.clone(),
407						res.target_header,
408						res.target_body,
409						res.target_justifications,
410						false,
411						self.peer_best_blocks
412							.iter()
413							.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
414						self.config.state_request_protocol_name.clone(),
415					);
416
417					self.warp = None;
418					self.state = Some(state_sync);
419					Ok(())
420				},
421				None => {
422					error!(
423						target: LOG_TARGET,
424						"Warp sync failed. Continuing with full sync."
425					);
426					let chain_sync = match ChainSync::new(
427						chain_sync_mode(self.config.mode),
428						self.client.clone(),
429						self.config.max_parallel_downloads,
430						self.config.max_blocks_per_request,
431						self.config.state_request_protocol_name.clone(),
432						self.config.block_downloader.clone(),
433						self.config.archive_blocks,
434						self.config.metrics_registry.as_ref(),
435						self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
436							(*peer_id, *best_hash, *best_number)
437						}),
438					) {
439						Ok(chain_sync) => chain_sync,
440						Err(e) => {
441							error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
442							return Err(e);
443						},
444					};
445
446					self.warp = None;
447					self.chain_sync = Some(chain_sync);
448					Ok(())
449				},
450			}
451		} else if let Some(state) = &self.state {
452			if state.is_succeeded() {
453				info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
454			} else {
455				error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
456			}
457			let chain_sync = match ChainSync::new(
458				chain_sync_mode(self.config.mode),
459				self.client.clone(),
460				self.config.max_parallel_downloads,
461				self.config.max_blocks_per_request,
462				self.config.state_request_protocol_name.clone(),
463				self.config.block_downloader.clone(),
464				self.config.archive_blocks,
465				self.config.metrics_registry.as_ref(),
466				self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
467					(*peer_id, *best_hash, *best_number)
468				}),
469			) {
470				Ok(chain_sync) => chain_sync,
471				Err(e) => {
472					error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
473					return Err(e);
474				},
475			};
476
477			self.state = None;
478			self.chain_sync = Some(chain_sync);
479			Ok(())
480		} else {
481			unreachable!("Only warp & state strategies can finish; qed")
482		}
483	}
484}