1use crate::{
23 block_relay_protocol::BlockDownloader,
24 block_request_handler::MAX_BLOCKS_IN_RESPONSE,
25 service::network::NetworkServiceHandle,
26 strategy::{
27 chain_sync::{ChainSync, ChainSyncMode},
28 state::StateStrategy,
29 warp::{WarpSync, WarpSyncConfig},
30 StrategyKey, SyncingAction, SyncingStrategy,
31 },
32 types::SyncStatus,
33 LOG_TARGET,
34};
35use log::{debug, error, info, warn};
36use pezsc_client_api::{BlockBackend, ProofProvider};
37use pezsc_consensus::{BlockImportError, BlockImportStatus};
38use pezsc_network::ProtocolName;
39use pezsc_network_common::sync::{message::BlockAnnounce, SyncMode};
40use pezsc_network_types::PeerId;
41use pezsp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata};
42use pezsp_runtime::traits::{Block as BlockT, Header, NumberFor};
43use prometheus_endpoint::Registry;
44use std::{any::Any, collections::HashMap, sync::Arc};
45
46fn chain_sync_mode(sync_mode: SyncMode) -> ChainSyncMode {
48 match sync_mode {
49 SyncMode::Full => ChainSyncMode::Full,
50 SyncMode::LightState { skip_proofs, storage_chain_mode } => {
51 ChainSyncMode::LightState { skip_proofs, storage_chain_mode }
52 },
53 SyncMode::Warp => ChainSyncMode::Full,
54 }
55}
56
57#[derive(Clone, Debug)]
59pub struct PezkuwiSyncingStrategyConfig<Block>
60where
61 Block: BlockT,
62{
63 pub mode: SyncMode,
65 pub max_parallel_downloads: u32,
67 pub max_blocks_per_request: u32,
69 pub min_peers_to_start_warp_sync: Option<usize>,
71 pub metrics_registry: Option<Registry>,
73 pub state_request_protocol_name: ProtocolName,
75 pub block_downloader: Arc<dyn BlockDownloader<Block>>,
77}
78
79pub struct PezkuwiSyncingStrategy<B: BlockT, Client> {
81 config: PezkuwiSyncingStrategyConfig<B>,
83 client: Arc<Client>,
85 warp: Option<WarpSync<B, Client>>,
87 state: Option<StateStrategy<B>>,
89 chain_sync: Option<ChainSync<B, Client>>,
91 peer_best_blocks: HashMap<PeerId, (B::Hash, NumberFor<B>)>,
94}
95
96impl<B: BlockT, Client> SyncingStrategy<B> for PezkuwiSyncingStrategy<B, Client>
97where
98 B: BlockT,
99 Client: HeaderBackend<B>
100 + BlockBackend<B>
101 + HeaderMetadata<B, Error = pezsp_blockchain::Error>
102 + ProofProvider<B>
103 + Send
104 + Sync
105 + 'static,
106{
107 fn add_peer(&mut self, peer_id: PeerId, best_hash: B::Hash, best_number: NumberFor<B>) {
108 self.peer_best_blocks.insert(peer_id, (best_hash, best_number));
109
110 self.warp.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
111 self.state.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
112 self.chain_sync.as_mut().map(|s| s.add_peer(peer_id, best_hash, best_number));
113 }
114
115 fn remove_peer(&mut self, peer_id: &PeerId) {
116 self.warp.as_mut().map(|s| s.remove_peer(peer_id));
117 self.state.as_mut().map(|s| s.remove_peer(peer_id));
118 self.chain_sync.as_mut().map(|s| s.remove_peer(peer_id));
119
120 self.peer_best_blocks.remove(peer_id);
121 }
122
123 fn on_validated_block_announce(
124 &mut self,
125 is_best: bool,
126 peer_id: PeerId,
127 announce: &BlockAnnounce<B::Header>,
128 ) -> Option<(B::Hash, NumberFor<B>)> {
129 let new_best = if let Some(ref mut warp) = self.warp {
130 warp.on_validated_block_announce(is_best, peer_id, announce)
131 } else if let Some(ref mut state) = self.state {
132 state.on_validated_block_announce(is_best, peer_id, announce)
133 } else if let Some(ref mut chain_sync) = self.chain_sync {
134 chain_sync.on_validated_block_announce(is_best, peer_id, announce)
135 } else {
136 error!(target: LOG_TARGET, "No syncing strategy is active.");
137 debug_assert!(false);
138 Some((announce.header.hash(), *announce.header.number()))
139 };
140
141 if let Some(new_best) = new_best {
142 if let Some(best) = self.peer_best_blocks.get_mut(&peer_id) {
143 *best = new_best;
144 } else {
145 debug!(
146 target: LOG_TARGET,
147 "Cannot update `peer_best_blocks` as peer {peer_id} is not known to `Strategy` \
148 (already disconnected?)",
149 );
150 }
151 }
152
153 new_best
154 }
155
156 fn set_sync_fork_request(&mut self, peers: Vec<PeerId>, hash: &B::Hash, number: NumberFor<B>) {
157 if let Some(ref mut chain_sync) = self.chain_sync {
159 chain_sync.set_sync_fork_request(peers.clone(), hash, number);
160 }
161 }
162
163 fn request_justification(&mut self, hash: &B::Hash, number: NumberFor<B>) {
164 if let Some(ref mut chain_sync) = self.chain_sync {
166 chain_sync.request_justification(hash, number);
167 }
168 }
169
170 fn clear_justification_requests(&mut self) {
171 if let Some(ref mut chain_sync) = self.chain_sync {
173 chain_sync.clear_justification_requests();
174 }
175 }
176
177 fn on_justification_import(&mut self, hash: B::Hash, number: NumberFor<B>, success: bool) {
178 if let Some(ref mut chain_sync) = self.chain_sync {
180 chain_sync.on_justification_import(hash, number, success);
181 }
182 }
183
184 fn on_generic_response(
185 &mut self,
186 peer_id: &PeerId,
187 key: StrategyKey,
188 protocol_name: ProtocolName,
189 response: Box<dyn Any + Send>,
190 ) {
191 match key {
192 StateStrategy::<B>::STRATEGY_KEY => {
193 if let Some(state) = &mut self.state {
194 let Ok(response) = response.downcast::<Vec<u8>>() else {
195 warn!(target: LOG_TARGET, "Failed to downcast state response");
196 debug_assert!(false);
197 return;
198 };
199
200 state.on_state_response(peer_id, *response);
201 } else if let Some(chain_sync) = &mut self.chain_sync {
202 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
203 } else {
204 error!(
205 target: LOG_TARGET,
206 "`on_generic_response()` called with unexpected key {key:?} \
207 or corresponding strategy is not active.",
208 );
209 debug_assert!(false);
210 }
211 },
212 WarpSync::<B, Client>::STRATEGY_KEY => {
213 if let Some(warp) = &mut self.warp {
214 warp.on_generic_response(peer_id, protocol_name, response);
215 } else {
216 error!(
217 target: LOG_TARGET,
218 "`on_generic_response()` called with unexpected key {key:?} \
219 or warp strategy is not active",
220 );
221 debug_assert!(false);
222 }
223 },
224 ChainSync::<B, Client>::STRATEGY_KEY => {
225 if let Some(chain_sync) = &mut self.chain_sync {
226 chain_sync.on_generic_response(peer_id, key, protocol_name, response);
227 } else {
228 error!(
229 target: LOG_TARGET,
230 "`on_generic_response()` called with unexpected key {key:?} \
231 or corresponding strategy is not active.",
232 );
233 debug_assert!(false);
234 }
235 },
236 key => {
237 warn!(
238 target: LOG_TARGET,
239 "Unexpected generic response strategy key {key:?}, protocol {protocol_name}",
240 );
241 debug_assert!(false);
242 },
243 }
244 }
245
246 fn on_blocks_processed(
247 &mut self,
248 imported: usize,
249 count: usize,
250 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
251 ) {
252 if let Some(ref mut state) = self.state {
254 state.on_blocks_processed(imported, count, results);
255 } else if let Some(ref mut chain_sync) = self.chain_sync {
256 chain_sync.on_blocks_processed(imported, count, results);
257 }
258 }
259
260 fn on_block_finalized(&mut self, hash: &B::Hash, number: NumberFor<B>) {
261 if let Some(ref mut chain_sync) = self.chain_sync {
263 chain_sync.on_block_finalized(hash, number);
264 }
265 }
266
267 fn update_chain_info(&mut self, best_hash: &B::Hash, best_number: NumberFor<B>) {
268 if let Some(ref mut chain_sync) = self.chain_sync {
270 chain_sync.update_chain_info(best_hash, best_number);
271 }
272 }
273
274 fn is_major_syncing(&self) -> bool {
275 self.warp.is_some()
276 || self.state.is_some()
277 || match self.chain_sync {
278 Some(ref s) => s.status().state.is_major_syncing(),
279 None => unreachable!("At least one syncing strategy is active; qed"),
280 }
281 }
282
283 fn num_peers(&self) -> usize {
284 self.peer_best_blocks.len()
285 }
286
287 fn status(&self) -> SyncStatus<B> {
288 if let Some(ref warp) = self.warp {
291 warp.status()
292 } else if let Some(ref state) = self.state {
293 state.status()
294 } else if let Some(ref chain_sync) = self.chain_sync {
295 chain_sync.status()
296 } else {
297 unreachable!("At least one syncing strategy is always active; qed")
298 }
299 }
300
301 fn num_downloaded_blocks(&self) -> usize {
302 self.chain_sync
303 .as_ref()
304 .map_or(0, |chain_sync| chain_sync.num_downloaded_blocks())
305 }
306
307 fn num_sync_requests(&self) -> usize {
308 self.chain_sync.as_ref().map_or(0, |chain_sync| chain_sync.num_sync_requests())
309 }
310
311 fn actions(
312 &mut self,
313 network_service: &NetworkServiceHandle,
314 ) -> Result<Vec<SyncingAction<B>>, ClientError> {
315 let actions: Vec<_> = if let Some(ref mut warp) = self.warp {
318 warp.actions(network_service).map(Into::into).collect()
319 } else if let Some(ref mut state) = self.state {
320 state.actions(network_service).map(Into::into).collect()
321 } else if let Some(ref mut chain_sync) = self.chain_sync {
322 chain_sync.actions(network_service)?
323 } else {
324 unreachable!("At least one syncing strategy is always active; qed")
325 };
326
327 if actions.iter().any(SyncingAction::is_finished) {
328 self.proceed_to_next()?;
329 }
330
331 Ok(actions)
332 }
333}
334
335impl<B: BlockT, Client> PezkuwiSyncingStrategy<B, Client>
336where
337 B: BlockT,
338 Client: HeaderBackend<B>
339 + BlockBackend<B>
340 + HeaderMetadata<B, Error = pezsp_blockchain::Error>
341 + ProofProvider<B>
342 + Send
343 + Sync
344 + 'static,
345{
346 pub fn new(
348 mut config: PezkuwiSyncingStrategyConfig<B>,
349 client: Arc<Client>,
350 warp_sync_config: Option<WarpSyncConfig<B>>,
351 warp_sync_protocol_name: Option<ProtocolName>,
352 ) -> Result<Self, ClientError> {
353 if config.max_blocks_per_request > MAX_BLOCKS_IN_RESPONSE as u32 {
354 info!(
355 target: LOG_TARGET,
356 "clamping maximum blocks per request to {MAX_BLOCKS_IN_RESPONSE}",
357 );
358 config.max_blocks_per_request = MAX_BLOCKS_IN_RESPONSE as u32;
359 }
360
361 if let SyncMode::Warp = config.mode {
362 let warp_sync_config = warp_sync_config
363 .expect("Warp sync configuration must be supplied in warp sync mode.");
364 let warp_sync = WarpSync::new(
365 client.clone(),
366 warp_sync_config,
367 warp_sync_protocol_name,
368 config.block_downloader.clone(),
369 config.min_peers_to_start_warp_sync,
370 );
371 Ok(Self {
372 config,
373 client,
374 warp: Some(warp_sync),
375 state: None,
376 chain_sync: None,
377 peer_best_blocks: Default::default(),
378 })
379 } else {
380 let chain_sync = ChainSync::new(
381 chain_sync_mode(config.mode),
382 client.clone(),
383 config.max_parallel_downloads,
384 config.max_blocks_per_request,
385 config.state_request_protocol_name.clone(),
386 config.block_downloader.clone(),
387 config.metrics_registry.as_ref(),
388 std::iter::empty(),
389 )?;
390 Ok(Self {
391 config,
392 client,
393 warp: None,
394 state: None,
395 chain_sync: Some(chain_sync),
396 peer_best_blocks: Default::default(),
397 })
398 }
399 }
400
401 pub fn proceed_to_next(&mut self) -> Result<(), ClientError> {
403 if let Some(ref mut warp) = self.warp {
405 match warp.take_result() {
406 Some(res) => {
407 info!(
408 target: LOG_TARGET,
409 "Warp sync is complete, continuing with state sync."
410 );
411 let state_sync = StateStrategy::new(
412 self.client.clone(),
413 res.target_header,
414 res.target_body,
415 res.target_justifications,
416 false,
417 self.peer_best_blocks
418 .iter()
419 .map(|(peer_id, (_, best_number))| (*peer_id, *best_number)),
420 self.config.state_request_protocol_name.clone(),
421 );
422
423 self.warp = None;
424 self.state = Some(state_sync);
425 Ok(())
426 },
427 None => {
428 error!(
429 target: LOG_TARGET,
430 "Warp sync failed. Continuing with full sync."
431 );
432 let chain_sync = match ChainSync::new(
433 chain_sync_mode(self.config.mode),
434 self.client.clone(),
435 self.config.max_parallel_downloads,
436 self.config.max_blocks_per_request,
437 self.config.state_request_protocol_name.clone(),
438 self.config.block_downloader.clone(),
439 self.config.metrics_registry.as_ref(),
440 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
441 (*peer_id, *best_hash, *best_number)
442 }),
443 ) {
444 Ok(chain_sync) => chain_sync,
445 Err(e) => {
446 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
447 return Err(e);
448 },
449 };
450
451 self.warp = None;
452 self.chain_sync = Some(chain_sync);
453 Ok(())
454 },
455 }
456 } else if let Some(state) = &self.state {
457 if state.is_succeeded() {
458 info!(target: LOG_TARGET, "State sync is complete, continuing with block sync.");
459 } else {
460 error!(target: LOG_TARGET, "State sync failed. Falling back to full sync.");
461 }
462 let chain_sync = match ChainSync::new(
463 chain_sync_mode(self.config.mode),
464 self.client.clone(),
465 self.config.max_parallel_downloads,
466 self.config.max_blocks_per_request,
467 self.config.state_request_protocol_name.clone(),
468 self.config.block_downloader.clone(),
469 self.config.metrics_registry.as_ref(),
470 self.peer_best_blocks.iter().map(|(peer_id, (best_hash, best_number))| {
471 (*peer_id, *best_hash, *best_number)
472 }),
473 ) {
474 Ok(chain_sync) => chain_sync,
475 Err(e) => {
476 error!(target: LOG_TARGET, "Failed to start `ChainSync`.");
477 return Err(e);
478 },
479 };
480
481 self.state = None;
482 self.chain_sync = Some(chain_sync);
483 Ok(())
484 } else {
485 unreachable!("Only warp & state strategies can finish; qed")
486 }
487 }
488}