pezsc_network_sync/strategy/
pezkuwi.rs

1// This file is part of Bizinikiwi.
2
3// Copyright (C) Parity Technologies (UK) Ltd. and Dijital Kurdistan Tech Institute
4// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
5
6// This program is free software: you can redistribute it and/or modify
7// it under the terms of the GNU General Public License as published by
8// the Free Software Foundation, either version 3 of the License, or
9// (at your option) any later version.
10
11// This program is distributed in the hope that it will be useful,
12// but WITHOUT ANY WARRANTY; without even the implied warranty of
13// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14// GNU General Public License for more details.
15
16// You should have received a copy of the GNU General Public License
17// along with this program. If not, see <https://www.gnu.org/licenses/>.
18
19//! [`PezkuwiSyncingStrategy`] is a proxy between [`crate::engine::SyncingEngine`]
20//! and specific syncing algorithms.
21
22use crate::{
23	block_relay_protocol::BlockDownloader,
24	block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25	service::network::NetworkServiceHandle,
26	strategy::{
27		chain_sync::{ChainSync, ChainSyncMode},
28		state::StateStrategy,
29		warp::{WarpSync, WarpSyncConfig},
30		StrategyKey, SyncingAction, SyncingStrategy,
31	},
32	types::SyncStatus,
33	LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use pezsc_client_api::{BlockBackend, ProofProvider};
37use pezsc_consensus::{BlockImportError, BlockImportStatus};
38use pezsc_network::ProtocolName;
39use pezsc_network_common::sync::{message::BlockAnnounce, SyncMode};
40use pezsc_network_types::PeerId;
41use pezsp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
42use pezsp_runtime::traits::{Block as BlockT, Header, NumberFor};
43use prometheus_endpoint::Registry;
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46/// Corresponding `ChainSync` mode.
47fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48	match sync_mode {
49		SyncMode::Full => ChainSyncMode::Full,
50		SyncMode::LightState { skip_proofs, storage_chain_mode } => {
51			ChainSyncMode::LightState { skip_proofs, storage_chain_mode }
52		},
53		SyncMode::Warp => ChainSyncMode::Full,
54	}
55}
56
57/// Syncing configuration containing data for [`PezkuwiSyncingStrategy`].
58#[derive(Clone, Debug)]
59pub struct PezkuwiSyncingStrategyConfig<Block>
60where
61	Block: BlockT,
62{
63	/// Syncing mode.
64	pub mode: SyncMode,
65	/// The number of parallel downloads to guard against slow peers.
66	pub max_parallel_downloads: u32,
67	/// Maximum number of blocks to request.
68	pub max_blocks_per_request: u32,
69	/// Number of peers that need to be connected before warp sync is started.
70	pub min_peers_to_start_warp_sync: Option<usize>,
71	/// Prometheus metrics registry.
72	pub metrics_registry: Option<Registry>,
73	/// Protocol name used to send out state requests
74	pub state_request_protocol_name: ProtocolName,
75	/// Block downloader
76	pub block_downloader: Arc<dyn BlockDownloader<Block>>,
77}
78
79/// Proxy to specific syncing strategies used in Pezkuwi.
80pub struct PezkuwiSyncingStrategy<B: BlockT, Client> {
81	/// Initial syncing configuration.
82	config: PezkuwiSyncingStrategyConfig<B>,
83	/// Client used by syncing strategies.
84	client: Arc<Client>,
85	/// Warp strategy.
86	warp: Option<WarpSync<B, Client>>,
87	/// State strategy.
88	state: Option<StateStrategy<B>>,
89	/// `ChainSync` strategy.`
90	chain_sync: Option<ChainSync<B, Client>>,
91	/// Connected peers and their best blocks used to seed a new strategy when switching to it in
92	/// `PezkuwiSyncingStrategy::proceed_to_next`.
93	peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
94}
95
96impl<B: BlockT, Client> SyncingStrategy<B> for PezkuwiSyncingStrategy<B, Client>
97where
98	B: BlockT,
99	Client: HeaderBackend<B>
100		+ BlockBackend<B>
101		+ HeaderMetadata<B, Error = pezsp_blockchain::Error>
102		+ ProofProvider<B>
103		+ Send
104		+ Sync
105		+ 'static,
106{
107	fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
108		self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
109
110		self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
111		self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
112		self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
113	}
114
115	fn remove_peer(&mut self, peer_id: &PeerId) {
116		self.warp.as_mut().map(|s| s.remove_peer(peer_id));
117		self.state.as_mut().map(|s| s.remove_peer(peer_id));
118		self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
119
120		self.peer_best_blocks.remove(peer_id);
121	}
122
123	fn on_validated_block_announce(
124		&mut self,
125		is_best: bool,
126		peer_id: PeerId,
127		announce: &BlockAnnounce<B::Header>,
128	) -> Option<(B::Hash, NumberFor<B>)> {
129		let new_best = if let Some(ref mut warp) = self.warp {
130			warp.on_validated_block_announce(is_best, peer_id, announce)
131		} else if let Some(ref mut state) = self.state {
132			state.on_validated_block_announce(is_best, peer_id, announce)
133		} else if let Some(ref mut chain_sync) = self.chain_sync {
134			chain_sync.on_validated_block_announce(is_best, peer_id, announce)
135		} else {
136			error!(target: LOG_TARGET, "No syncing strategy is active.");
137			debug_assert!(false);
138			Some((announce.header.hash(), *announce.header.number()))
139		};
140
141		if let Some(new_best) = new_best {
142			if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
143				*best = new_best;
144			} else {
145				debug!(
146					target: LOG_TARGET,
147					"Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
148					 (already disconnected?)",
149				);
150			}
151		}
152
153		new_best
154	}
155
156	fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
157		// Fork requests are only handled by `ChainSync`.
158		if let Some(ref mut chain_sync) = self.chain_sync {
159			chain_sync.set_sync_fork_request(peers.clone(), hash, number);
160		}
161	}
162
163	fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
164		// Justifications can only be requested via `ChainSync`.
165		if let Some(ref mut chain_sync) = self.chain_sync {
166			chain_sync.request_justification(hash, number);
167		}
168	}
169
170	fn clear_justification_requests(&mut self) {
171		// Justification requests can only be cleared by `ChainSync`.
172		if let Some(ref mut chain_sync) = self.chain_sync {
173			chain_sync.clear_justification_requests();
174		}
175	}
176
177	fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
178		// Only `ChainSync` is interested in justification import.
179		if let Some(ref mut chain_sync) = self.chain_sync {
180			chain_sync.on_justification_import(hash, number, success);
181		}
182	}
183
184	fn on_generic_response(
185		&mut self,
186		peer_id: &PeerId,
187		key: StrategyKey,
188		protocol_name: ProtocolName,
189		response: Box<dyn Any + Send>,
190	) {
191		match key {
192			StateStrategy::<B>::STRATEGY_KEY => {
193				if let Some(state) = &mut self.state {
194					let Ok(response) = response.downcast::<Vec<u8>>() else {
195						warn!(target: LOG_TARGET, "Failed to downcast state response");
196						debug_assert!(false);
197						return;
198					};
199
200					state.on_state_response(peer_id, *response);
201				} else if let Some(chain_sync) = &mut self.chain_sync {
202					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
203				} else {
204					error!(
205						target: LOG_TARGET,
206						"`on_generic_response()` called with unexpected key {key:?} \
207						 or corresponding strategy is not active.",
208					);
209					debug_assert!(false);
210				}
211			},
212			WarpSync::<B, Client>::STRATEGY_KEY => {
213				if let Some(warp) = &mut self.warp {
214					warp.on_generic_response(peer_id, protocol_name, response);
215				} else {
216					error!(
217						target: LOG_TARGET,
218						"`on_generic_response()` called with unexpected key {key:?} \
219						 or warp strategy is not active",
220					);
221					debug_assert!(false);
222				}
223			},
224			ChainSync::<B, Client>::STRATEGY_KEY => {
225				if let Some(chain_sync) = &mut self.chain_sync {
226					chain_sync.on_generic_response(peer_id, key, protocol_name, response);
227				} else {
228					error!(
229						target: LOG_TARGET,
230						"`on_generic_response()` called with unexpected key {key:?} \
231						 or corresponding strategy is not active.",
232					);
233					debug_assert!(false);
234				}
235			},
236			key => {
237				warn!(
238					target: LOG_TARGET,
239					"Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
240				);
241				debug_assert!(false);
242			},
243		}
244	}
245
246	fn on_blocks_processed(
247		&mut self,
248		imported: usize,
249		count: usize,
250		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
251	) {
252		// Only `StateStrategy` and `ChainSync` are interested in block processing notifications.
253		if let Some(ref mut state) = self.state {
254			state.on_blocks_processed(imported, count, results);
255		} else if let Some(ref mut chain_sync) = self.chain_sync {
256			chain_sync.on_blocks_processed(imported, count, results);
257		}
258	}
259
260	fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
261		// Only `ChainSync` is interested in block finalization notifications.
262		if let Some(ref mut chain_sync) = self.chain_sync {
263			chain_sync.on_block_finalized(hash, number);
264		}
265	}
266
267	fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
268		// This is relevant to `ChainSync` only.
269		if let Some(ref mut chain_sync) = self.chain_sync {
270			chain_sync.update_chain_info(best_hash, best_number);
271		}
272	}
273
274	fn is_major_syncing(&self) -> bool {
275		self.warp.is_some()
276			|| self.state.is_some()
277			|| match self.chain_sync {
278				Some(ref s) => s.status().state.is_major_syncing(),
279				None => unreachable!("At least one syncing strategy is active; qed"),
280			}
281	}
282
283	fn num_peers(&self) -> usize {
284		self.peer_best_blocks.len()
285	}
286
287	fn status(&self) -> SyncStatus<B> {
288		// This function presumes that strategies are executed serially and must be refactored
289		// once we have parallel strategies.
290		if let Some(ref warp) = self.warp {
291			warp.status()
292		} else if let Some(ref state) = self.state {
293			state.status()
294		} else if let Some(ref chain_sync) = self.chain_sync {
295			chain_sync.status()
296		} else {
297			unreachable!("At least one syncing strategy is always active; qed")
298		}
299	}
300
301	fn num_downloaded_blocks(&self) -> usize {
302		self.chain_sync
303			.as_ref()
304			.map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
305	}
306
307	fn num_sync_requests(&self) -> usize {
308		self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
309	}
310
311	fn actions(
312		&mut self,
313		network_service: &NetworkServiceHandle,
314	) -> Result<Vec<SyncingAction<B>>, ClientError> {
315		// This function presumes that strategies are executed serially and must be refactored once
316		// we have parallel strategies.
317		let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
318			warp.actions(network_service).map(Into::into).collect()
319		} else if let Some(ref mut state) = self.state {
320			state.actions(network_service).map(Into::into).collect()
321		} else if let Some(ref mut chain_sync) = self.chain_sync {
322			chain_sync.actions(network_service)?
323		} else {
324			unreachable!("At least one syncing strategy is always active; qed")
325		};
326
327		if actions.iter().any(SyncingAction::is_finished) {
328			self.proceed_to_next()?;
329		}
330
331		Ok(actions)
332	}
333}
334
335impl<B: BlockT, Client> PezkuwiSyncingStrategy<B, Client>
336where
337	B: BlockT,
338	Client: HeaderBackend<B>
339		+ BlockBackend<B>
340		+ HeaderMetadata<B, Error = pezsp_blockchain::Error>
341		+ ProofProvider<B>
342		+ Send
343		+ Sync
344		+ 'static,
345{
346	/// Initialize a new syncing strategy.
347	pub fn new(
348		mut config: PezkuwiSyncingStrategyConfig<B>,
349		client: Arc<Client>,
350		warp_sync_config: Option<WarpSyncConfig<B>>,
351		warp_sync_protocol_name: Option<ProtocolName>,
352	) -> Result<Self, ClientError> {
353		if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
354			info!(
355				target: LOG_TARGET,
356				"clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
357			);
358			config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
359		}
360
361		if let SyncMode::Warp = config.mode {
362			let warp_sync_config = warp_sync_config
363				.expect("Warp sync configuration must be supplied in warp sync mode.");
364			let warp_sync = WarpSync::new(
365				client.clone(),
366				warp_sync_config,
367				warp_sync_protocol_name,
368				config.block_downloader.clone(),
369				config.min_peers_to_start_warp_sync,
370			);
371			Ok(Self {
372				config,
373				client,
374				warp: Some(warp_sync),
375				state: None,
376				chain_sync: None,
377				peer_best_blocks: Default::default(),
378			})
379		} else {
380			let chain_sync = ChainSync::new(
381				chain_sync_mode(config.mode),
382				client.clone(),
383				config.max_parallel_downloads,
384				config.max_blocks_per_request,
385				config.state_request_protocol_name.clone(),
386				config.block_downloader.clone(),
387				config.metrics_registry.as_ref(),
388				std::iter::empty(),
389			)?;
390			Ok(Self {
391				config,
392				client,
393				warp: None,
394				state: None,
395				chain_sync: Some(chain_sync),
396				peer_best_blocks: Default::default(),
397			})
398		}
399	}
400
401	/// Proceed with the next strategy if the active one finished.
402	pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
403		// The strategies are switched as `WarpSync` -> `StateStrategy` -> `ChainSync`.
404		if let Some(ref mut warp) = self.warp {
405			match warp.take_result() {
406				Some(res) => {
407					info!(
408						target: LOG_TARGET,
409						"Warp sync is complete, continuing with state sync."
410					);
411					let state_sync = StateStrategy::new(
412						self.client.clone(),
413						res.target_header,
414						res.target_body,
415						res.target_justifications,
416						false,
417						self.peer_best_blocks
418							.iter()
419							.map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
420						self.config.state_request_protocol_name.clone(),
421					);
422
423					self.warp = None;
424					self.state = Some(state_sync);
425					Ok(())
426				},
427				None => {
428					error!(
429						target: LOG_TARGET,
430						"Warp sync failed. Continuing with full sync."
431					);
432					let chain_sync = match ChainSync::new(
433						chain_sync_mode(self.config.mode),
434						self.client.clone(),
435						self.config.max_parallel_downloads,
436						self.config.max_blocks_per_request,
437						self.config.state_request_protocol_name.clone(),
438						self.config.block_downloader.clone(),
439						self.config.metrics_registry.as_ref(),
440						self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
441							(*peer_id, *best_hash, *best_number)
442						}),
443					) {
444						Ok(chain_sync) => chain_sync,
445						Err(e) => {
446							error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
447							return Err(e);
448						},
449					};
450
451					self.warp = None;
452					self.chain_sync = Some(chain_sync);
453					Ok(())
454				},
455			}
456		} else if let Some(state) = &self.state {
457			if state.is_succeeded() {
458				info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
459			} else {
460				error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
461			}
462			let chain_sync = match ChainSync::new(
463				chain_sync_mode(self.config.mode),
464				self.client.clone(),
465				self.config.max_parallel_downloads,
466				self.config.max_blocks_per_request,
467				self.config.state_request_protocol_name.clone(),
468				self.config.block_downloader.clone(),
469				self.config.metrics_registry.as_ref(),
470				self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
471					(*peer_id, *best_hash, *best_number)
472				}),
473			) {
474				Ok(chain_sync) => chain_sync,
475				Err(e) => {
476					error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
477					return Err(e);
478				},
479			};
480
481			self.state = None;
482			self.chain_sync = Some(chain_sync);
483			Ok(())
484		} else {
485			unreachable!("Only warp & state strategies can finish; qed")
486		}
487	}
488}