Skip to main content

soil_network/sync/strategy/
state.rs

1// This file is part of Soil.
2
3// Copyright (C) Soil contributors.
4// Copyright (C) Parity Technologies (UK) Ltd.
5// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
6
7//! State sync strategy.
8
9use crate::{
10	sync::{
11		schema::v1::{StateRequest, StateResponse},
12		service::network::NetworkServiceHandle,
13		strategy::{
14			disconnected_peers::DisconnectedPeers,
15			state_sync::{ImportResult, StateSync, StateSyncProvider},
16			StrategyKey, SyncingAction,
17		},
18		types::{BadPeer, SyncState, SyncStatus},
19	},
20	LOG_TARGET,
21};
22use futures::{channel::oneshot, FutureExt};
23use log::{debug, error, trace};
24use prost::Message;
25use soil_client::client_api::ProofProvider;
26use soil_client::consensus::BlockOrigin;
27use soil_client::import::{BlockImportError, BlockImportStatus, IncomingBlock};
28use soil_network::common::sync::message::BlockAnnounce;
29use soil_network::types::PeerId;
30use soil_network::{IfDisconnected, ProtocolName};
31use std::{any::Any, collections::HashMap, sync::Arc};
32use subsoil::runtime::{
33	traits::{Block as BlockT, Header, NumberFor},
34	Justifications, SaturatedConversion,
35};
36
37mod rep {
38	use soil_network::ReputationChange as Rep;
39
40	/// Peer response data does not have requested bits.
41	pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
42
43	/// Reputation change for peers which send us a known bad state.
44	pub const BAD_STATE: Rep = Rep::new(-(1 << 29), "Bad state");
45}
46
47enum PeerState {
48	Available,
49	DownloadingState,
50}
51
52impl PeerState {
53	fn is_available(&self) -> bool {
54		matches!(self, PeerState::Available)
55	}
56}
57
58struct Peer<B: BlockT> {
59	best_number: NumberFor<B>,
60	state: PeerState,
61}
62
63/// Syncing strategy that downloads and imports a recent state directly.
64pub struct StateStrategy<B: BlockT> {
65	state_sync: Box<dyn StateSyncProvider<B>>,
66	peers: HashMap<PeerId, Peer<B>>,
67	disconnected_peers: DisconnectedPeers,
68	actions: Vec<SyncingAction<B>>,
69	protocol_name: ProtocolName,
70	succeeded: bool,
71}
72
73impl<B: BlockT> StateStrategy<B> {
74	/// Strategy key used by state sync.
75	pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("State");
76
77	/// Create a new instance.
78	pub fn new<Client>(
79		client: Arc<Client>,
80		target_header: B::Header,
81		target_body: Option<Vec<B::Extrinsic>>,
82		target_justifications: Option<Justifications>,
83		skip_proof: bool,
84		initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
85		protocol_name: ProtocolName,
86	) -> Self
87	where
88		Client: ProofProvider<B> + Send + Sync + 'static,
89	{
90		let peers = initial_peers
91			.map(|(peer_id, best_number)| {
92				(peer_id, Peer { best_number, state: PeerState::Available })
93			})
94			.collect();
95		Self {
96			state_sync: Box::new(StateSync::new(
97				client,
98				target_header,
99				target_body,
100				target_justifications,
101				skip_proof,
102			)),
103			peers,
104			disconnected_peers: DisconnectedPeers::new(),
105			actions: Vec::new(),
106			protocol_name,
107			succeeded: false,
108		}
109	}
110
111	/// Create a new instance with a custom state sync provider.
112	///
113	/// Note: In most cases, users should use [`StateStrategy::new`].
114	/// This method is intended for custom sync strategies and advanced use cases.
115	pub fn new_with_provider(
116		state_sync_provider: Box<dyn StateSyncProvider<B>>,
117		initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
118		protocol_name: ProtocolName,
119	) -> Self {
120		Self {
121			state_sync: state_sync_provider,
122			peers: initial_peers
123				.map(|(peer_id, best_number)| {
124					(peer_id, Peer { best_number, state: PeerState::Available })
125				})
126				.collect(),
127			disconnected_peers: DisconnectedPeers::new(),
128			actions: Vec::new(),
129			protocol_name,
130			succeeded: false,
131		}
132	}
133
134	/// Notify that a new peer has connected.
135	pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
136		self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
137	}
138
139	/// Notify that a peer has disconnected.
140	pub fn remove_peer(&mut self, peer_id: &PeerId) {
141		if let Some(state) = self.peers.remove(peer_id) {
142			if !state.state.is_available() {
143				if let Some(bad_peer) =
144					self.disconnected_peers.on_disconnect_during_request(*peer_id)
145				{
146					self.actions.push(SyncingAction::DropPeer(bad_peer));
147				}
148			}
149		}
150	}
151
152	/// Submit a validated block announcement.
153	///
154	/// Returns new best hash & best number of the peer if they are updated.
155	#[must_use]
156	pub fn on_validated_block_announce(
157		&mut self,
158		is_best: bool,
159		peer_id: PeerId,
160		announce: &BlockAnnounce<B::Header>,
161	) -> Option<(B::Hash, NumberFor<B>)> {
162		is_best.then(|| {
163			let best_number = *announce.header.number();
164			let best_hash = announce.header.hash();
165			if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
166				peer.best_number = best_number;
167			}
168			// Let `SyncingEngine` know that we should update the peer info.
169			(best_hash, best_number)
170		})
171	}
172
173	/// Process state response.
174	pub fn on_state_response(&mut self, peer_id: &PeerId, response: Vec<u8>) {
175		if let Err(bad_peer) = self.on_state_response_inner(peer_id, &response) {
176			self.actions.push(SyncingAction::DropPeer(bad_peer));
177		}
178	}
179
180	fn on_state_response_inner(
181		&mut self,
182		peer_id: &PeerId,
183		response: &[u8],
184	) -> Result<(), BadPeer> {
185		if let Some(peer) = self.peers.get_mut(&peer_id) {
186			peer.state = PeerState::Available;
187		}
188
189		let response = match StateResponse::decode(response) {
190			Ok(response) => response,
191			Err(error) => {
192				debug!(
193					target: LOG_TARGET,
194					"Failed to decode state response from peer {peer_id:?}: {error:?}.",
195				);
196
197				return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
198			},
199		};
200
201		debug!(
202			target: LOG_TARGET,
203			"Importing state data from {} with {} keys, {} proof nodes.",
204			peer_id,
205			response.entries.len(),
206			response.proof.len(),
207		);
208
209		match self.state_sync.import(response) {
210			ImportResult::Import(hash, header, state, body, justifications) => {
211				let origin = BlockOrigin::NetworkInitialSync;
212				let block = IncomingBlock {
213					hash,
214					header: Some(header),
215					body,
216					indexed_body: None,
217					justifications,
218					origin: None,
219					allow_missing_state: true,
220					import_existing: true,
221					skip_execution: true,
222					state: Some(state),
223				};
224				debug!(target: LOG_TARGET, "State download is complete. Import is queued");
225				self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
226				Ok(())
227			},
228			ImportResult::Continue => Ok(()),
229			ImportResult::BadResponse => {
230				debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
231				Err(BadPeer(*peer_id, rep::BAD_STATE))
232			},
233		}
234	}
235
236	/// A batch of blocks have been processed, with or without errors.
237	///
238	/// Normally this should be called when target block with state is imported.
239	pub fn on_blocks_processed(
240		&mut self,
241		imported: usize,
242		count: usize,
243		results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
244	) {
245		trace!(target: LOG_TARGET, "State sync: imported {imported} of {count}.");
246
247		let results = results
248			.into_iter()
249			.filter_map(|(result, hash)| {
250				if hash == self.state_sync.target_hash() {
251					Some(result)
252				} else {
253					debug!(
254						target: LOG_TARGET,
255						"Unexpected block processed: {hash} with result {result:?}.",
256					);
257					None
258				}
259			})
260			.collect::<Vec<_>>();
261
262		if !results.is_empty() {
263			// We processed the target block
264			results.iter().filter_map(|result| result.as_ref().err()).for_each(|e| {
265				error!(
266					target: LOG_TARGET,
267					"Failed to import target block with state: {e:?}."
268				);
269			});
270			self.succeeded |= results.into_iter().any(|result| result.is_ok());
271			self.actions.push(SyncingAction::Finished);
272		}
273	}
274
275	/// Produce state request.
276	fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
277		if self.state_sync.is_complete() {
278			return None;
279		}
280
281		if self
282			.peers
283			.values()
284			.any(|peer| matches!(peer.state, PeerState::DownloadingState))
285		{
286			// Only one state request at a time is possible.
287			return None;
288		}
289
290		let peer_id =
291			self.schedule_next_peer(PeerState::DownloadingState, self.state_sync.target_number())?;
292		let request = self.state_sync.next_request();
293		trace!(
294			target: LOG_TARGET,
295			"New state request to {peer_id}: {request:?}.",
296		);
297		Some((peer_id, request))
298	}
299
300	fn schedule_next_peer(
301		&mut self,
302		new_state: PeerState,
303		min_best_number: NumberFor<B>,
304	) -> Option<PeerId> {
305		let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
306		if targets.is_empty() {
307			return None;
308		}
309		targets.sort();
310		let median = targets[targets.len() / 2];
311		let threshold = std::cmp::max(median, min_best_number);
312		// Find a random peer that is synced as much as peer majority and is above
313		// `min_best_number`.
314		for (peer_id, peer) in self.peers.iter_mut() {
315			if peer.state.is_available()
316				&& peer.best_number >= threshold
317				&& self.disconnected_peers.is_peer_available(peer_id)
318			{
319				peer.state = new_state;
320				return Some(*peer_id);
321			}
322		}
323		None
324	}
325
326	/// Returns the current sync status.
327	pub fn status(&self) -> SyncStatus<B> {
328		SyncStatus {
329			state: if self.state_sync.is_complete() {
330				SyncState::Idle
331			} else {
332				SyncState::Downloading { target: self.state_sync.target_number() }
333			},
334			best_seen_block: Some(self.state_sync.target_number()),
335			num_peers: self.peers.len().saturated_into(),
336			queued_blocks: 0,
337			state_sync: Some(self.state_sync.progress()),
338			warp_sync: None,
339		}
340	}
341
342	/// Get actions that should be performed.
343	#[must_use]
344	pub fn actions(
345		&mut self,
346		network_service: &NetworkServiceHandle,
347	) -> impl Iterator<Item = SyncingAction<B>> {
348		let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
349			let (tx, rx) = oneshot::channel();
350
351			network_service.start_request(
352				peer_id,
353				self.protocol_name.clone(),
354				request.encode_to_vec(),
355				tx,
356				IfDisconnected::ImmediateError,
357			);
358
359			SyncingAction::StartRequest {
360				peer_id,
361				key: Self::STRATEGY_KEY,
362				request: async move {
363					Ok(rx.await?.and_then(|(response, protocol_name)| {
364						Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
365					}))
366				}
367				.boxed(),
368				remove_obsolete: false,
369			}
370		});
371		self.actions.extend(state_request);
372
373		std::mem::take(&mut self.actions).into_iter()
374	}
375
376	/// Check if state sync has succeeded.
377	#[must_use]
378	pub fn is_succeeded(&self) -> bool {
379		self.succeeded
380	}
381}
382
383#[cfg(test)]
384mod test {
385	use super::*;
386	use crate::sync::{
387		schema::v1::{StateRequest, StateResponse},
388		service::network::NetworkServiceProvider,
389		strategy::state_sync::{ImportResult, StateSyncProgress, StateSyncProvider},
390	};
391	use codec::Decode;
392	use soil_client::block_builder::BlockBuilderBuilder;
393	use soil_client::client_api::KeyValueStates;
394	use soil_client::import::{ImportedAux, ImportedState};
395	use subsoil::core::H256;
396	use subsoil::runtime::traits::Zero;
397	use soil_test_node_runtime_client::{
398		runtime::{Block, Hash},
399		BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
400	};
401
402	mockall::mock! {
403		pub StateSync<B: BlockT> {}
404
405		impl<B: BlockT> StateSyncProvider<B> for StateSync<B> {
406			fn import(&mut self, response: StateResponse) -> ImportResult<B>;
407			fn next_request(&self) -> StateRequest;
408			fn is_complete(&self) -> bool;
409			fn target_number(&self) -> NumberFor<B>;
410			fn target_hash(&self) -> B::Hash;
411			fn progress(&self) -> StateSyncProgress;
412		}
413	}
414
415	#[test]
416	fn no_peer_is_scheduled_if_no_peers_connected() {
417		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
418		let target_block = BlockBuilderBuilder::new(&*client)
419			.on_parent_block(client.chain_info().best_hash)
420			.with_parent_block_number(client.chain_info().best_number)
421			.build()
422			.unwrap()
423			.build()
424			.unwrap()
425			.block;
426		let target_header = target_block.header().clone();
427
428		let mut state_strategy = StateStrategy::new(
429			client,
430			target_header,
431			None,
432			None,
433			false,
434			std::iter::empty(),
435			ProtocolName::Static(""),
436		);
437
438		assert!(state_strategy
439			.schedule_next_peer(PeerState::DownloadingState, Zero::zero())
440			.is_none());
441	}
442
443	#[test]
444	fn at_least_median_synced_peer_is_scheduled() {
445		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
446		let target_block = BlockBuilderBuilder::new(&*client)
447			.on_parent_block(client.chain_info().best_hash)
448			.with_parent_block_number(client.chain_info().best_number)
449			.build()
450			.unwrap()
451			.build()
452			.unwrap()
453			.block;
454
455		for _ in 0..100 {
456			let peers = (1..=10)
457				.map(|best_number| (PeerId::random(), best_number))
458				.collect::<HashMap<_, _>>();
459			let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
460
461			let mut state_strategy = StateStrategy::new(
462				client.clone(),
463				target_block.header().clone(),
464				None,
465				None,
466				false,
467				initial_peers,
468				ProtocolName::Static(""),
469			);
470
471			let peer_id =
472				state_strategy.schedule_next_peer(PeerState::DownloadingState, Zero::zero());
473			assert!(*peers.get(&peer_id.unwrap()).unwrap() >= 6);
474		}
475	}
476
477	#[test]
478	fn min_best_number_peer_is_scheduled() {
479		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
480		let target_block = BlockBuilderBuilder::new(&*client)
481			.on_parent_block(client.chain_info().best_hash)
482			.with_parent_block_number(client.chain_info().best_number)
483			.build()
484			.unwrap()
485			.build()
486			.unwrap()
487			.block;
488
489		for _ in 0..10 {
490			let peers = (1..=10)
491				.map(|best_number| (PeerId::random(), best_number))
492				.collect::<HashMap<_, _>>();
493			let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
494
495			let mut state_strategy = StateStrategy::new(
496				client.clone(),
497				target_block.header().clone(),
498				None,
499				None,
500				false,
501				initial_peers,
502				ProtocolName::Static(""),
503			);
504
505			let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
506			assert!(*peers.get(&peer_id.unwrap()).unwrap() == 10);
507		}
508	}
509
510	#[test]
511	fn backedoff_number_peer_is_not_scheduled() {
512		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
513		let target_block = BlockBuilderBuilder::new(&*client)
514			.on_parent_block(client.chain_info().best_hash)
515			.with_parent_block_number(client.chain_info().best_number)
516			.build()
517			.unwrap()
518			.build()
519			.unwrap()
520			.block;
521
522		let peers = (1..=10)
523			.map(|best_number| (PeerId::random(), best_number))
524			.collect::<Vec<(_, _)>>();
525		let ninth_peer = peers[8].0;
526		let tenth_peer = peers[9].0;
527		let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
528
529		let mut state_strategy = StateStrategy::new(
530			client.clone(),
531			target_block.header().clone(),
532			None,
533			None,
534			false,
535			initial_peers,
536			ProtocolName::Static(""),
537		);
538
539		// Disconnecting a peer without an inflight request has no effect on persistent states.
540		state_strategy.remove_peer(&tenth_peer);
541		assert!(state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
542
543		// Disconnect the peer with an inflight request.
544		state_strategy.add_peer(tenth_peer, H256::random(), 10);
545		let peer_id: Option<PeerId> =
546			state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
547		assert_eq!(tenth_peer, peer_id.unwrap());
548		state_strategy.remove_peer(&tenth_peer);
549
550		// Peer is backed off.
551		assert!(!state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
552
553		// No peer available for 10'th best block because of the backoff.
554		state_strategy.add_peer(tenth_peer, H256::random(), 10);
555		let peer_id: Option<PeerId> =
556			state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
557		assert!(peer_id.is_none());
558
559		// Other requests can still happen.
560		let peer_id: Option<PeerId> =
561			state_strategy.schedule_next_peer(PeerState::DownloadingState, 9);
562		assert_eq!(ninth_peer, peer_id.unwrap());
563	}
564
565	#[test]
566	fn state_request_contains_correct_hash() {
567		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
568		let target_block = BlockBuilderBuilder::new(&*client)
569			.on_parent_block(client.chain_info().best_hash)
570			.with_parent_block_number(client.chain_info().best_number)
571			.build()
572			.unwrap()
573			.build()
574			.unwrap()
575			.block;
576
577		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
578
579		let mut state_strategy = StateStrategy::new(
580			client.clone(),
581			target_block.header().clone(),
582			None,
583			None,
584			false,
585			initial_peers,
586			ProtocolName::Static(""),
587		);
588
589		let (_peer_id, request) = state_strategy.state_request().unwrap();
590		let hash = Hash::decode(&mut &*request.block).unwrap();
591
592		assert_eq!(hash, target_block.header().hash());
593	}
594
595	#[test]
596	fn no_parallel_state_requests() {
597		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
598		let target_block = BlockBuilderBuilder::new(&*client)
599			.on_parent_block(client.chain_info().best_hash)
600			.with_parent_block_number(client.chain_info().best_number)
601			.build()
602			.unwrap()
603			.build()
604			.unwrap()
605			.block;
606
607		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
608
609		let mut state_strategy = StateStrategy::new(
610			client.clone(),
611			target_block.header().clone(),
612			None,
613			None,
614			false,
615			initial_peers,
616			ProtocolName::Static(""),
617		);
618
619		// First request is sent.
620		assert!(state_strategy.state_request().is_some());
621
622		// No parallel request is sent.
623		assert!(state_strategy.state_request().is_none());
624	}
625
626	#[test]
627	fn received_state_response_makes_peer_available_again() {
628		let mut state_sync_provider = MockStateSync::<Block>::new();
629		state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
630		let peer_id = PeerId::random();
631		let initial_peers = std::iter::once((peer_id, 10));
632		let mut state_strategy = StateStrategy::new_with_provider(
633			Box::new(state_sync_provider),
634			initial_peers,
635			ProtocolName::Static(""),
636		);
637		// Manually set the peer's state.
638		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
639
640		let dummy_response = StateResponse::default().encode_to_vec();
641		state_strategy.on_state_response(&peer_id, dummy_response);
642
643		assert!(state_strategy.peers.get(&peer_id).unwrap().state.is_available());
644	}
645
646	#[test]
647	fn bad_state_response_drops_peer() {
648		let mut state_sync_provider = MockStateSync::<Block>::new();
649		// Provider says that state response is bad.
650		state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse);
651		let peer_id = PeerId::random();
652		let initial_peers = std::iter::once((peer_id, 10));
653		let mut state_strategy = StateStrategy::new_with_provider(
654			Box::new(state_sync_provider),
655			initial_peers,
656			ProtocolName::Static(""),
657		);
658		// Manually set the peer's state.
659		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
660		let dummy_response = StateResponse::default().encode_to_vec();
661		// Receiving response drops the peer.
662		assert!(matches!(
663			state_strategy.on_state_response_inner(&peer_id, &dummy_response),
664			Err(BadPeer(id, _rep)) if id == peer_id,
665		));
666	}
667
668	#[test]
669	fn partial_state_response_doesnt_generate_actions() {
670		let mut state_sync_provider = MockStateSync::<Block>::new();
671		// Sync provider says that the response is partial.
672		state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
673		let peer_id = PeerId::random();
674		let initial_peers = std::iter::once((peer_id, 10));
675		let mut state_strategy = StateStrategy::new_with_provider(
676			Box::new(state_sync_provider),
677			initial_peers,
678			ProtocolName::Static(""),
679		);
680		// Manually set the peer's state .
681		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
682
683		let dummy_response = StateResponse::default().encode_to_vec();
684		state_strategy.on_state_response(&peer_id, dummy_response);
685
686		// No actions generated.
687		assert_eq!(state_strategy.actions.len(), 0)
688	}
689
690	#[test]
691	fn complete_state_response_leads_to_block_import() {
692		// Build block to use for checks.
693		let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
694		let mut block_builder = BlockBuilderBuilder::new(&*client)
695			.on_parent_block(client.chain_info().best_hash)
696			.with_parent_block_number(client.chain_info().best_number)
697			.build()
698			.unwrap();
699		block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
700		let block = block_builder.build().unwrap().block;
701		let header = block.header().clone();
702		let hash = header.hash();
703		let body = Some(block.extrinsics().iter().cloned().collect::<Vec<_>>());
704		let state = ImportedState { block: hash, state: KeyValueStates(Vec::new()) };
705		let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
706
707		// Prepare `StateSync`
708		let mut state_sync_provider = MockStateSync::<Block>::new();
709		let import = ImportResult::Import(
710			hash,
711			header.clone(),
712			state.clone(),
713			body.clone(),
714			justifications.clone(),
715		);
716		state_sync_provider.expect_import().return_once(move |_| import);
717
718		// Reference values to check against.
719		let expected_origin = BlockOrigin::NetworkInitialSync;
720		let expected_block = IncomingBlock {
721			hash,
722			header: Some(header),
723			body,
724			indexed_body: None,
725			justifications,
726			origin: None,
727			allow_missing_state: true,
728			import_existing: true,
729			skip_execution: true,
730			state: Some(state),
731		};
732		let expected_blocks = vec![expected_block];
733
734		// Prepare `StateStrategy`.
735		let peer_id = PeerId::random();
736		let initial_peers = std::iter::once((peer_id, 10));
737		let mut state_strategy = StateStrategy::new_with_provider(
738			Box::new(state_sync_provider),
739			initial_peers,
740			ProtocolName::Static(""),
741		);
742		// Manually set the peer's state .
743		state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
744
745		// Receive response.
746		let dummy_response = StateResponse::default().encode_to_vec();
747		state_strategy.on_state_response(&peer_id, dummy_response);
748
749		assert_eq!(state_strategy.actions.len(), 1);
750		assert!(matches!(
751			&state_strategy.actions[0],
752			SyncingAction::ImportBlocks { origin, blocks }
753				if *origin == expected_origin && *blocks == expected_blocks,
754		));
755	}
756
757	#[test]
758	fn importing_unknown_block_doesnt_finish_strategy() {
759		let target_hash = Hash::random();
760		let unknown_hash = Hash::random();
761		let mut state_sync_provider = MockStateSync::<Block>::new();
762		state_sync_provider.expect_target_hash().return_const(target_hash);
763
764		let mut state_strategy = StateStrategy::new_with_provider(
765			Box::new(state_sync_provider),
766			std::iter::empty(),
767			ProtocolName::Static(""),
768		);
769
770		// Unknown block imported.
771		state_strategy.on_blocks_processed(
772			1,
773			1,
774			vec![(
775				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
776				unknown_hash,
777			)],
778		);
779
780		// No actions generated.
781		assert_eq!(state_strategy.actions.len(), 0);
782	}
783
784	#[test]
785	fn successfully_importing_target_block_finishes_strategy() {
786		let target_hash = Hash::random();
787		let mut state_sync_provider = MockStateSync::<Block>::new();
788		state_sync_provider.expect_target_hash().return_const(target_hash);
789
790		let mut state_strategy = StateStrategy::new_with_provider(
791			Box::new(state_sync_provider),
792			std::iter::empty(),
793			ProtocolName::Static(""),
794		);
795
796		// Target block imported.
797		state_strategy.on_blocks_processed(
798			1,
799			1,
800			vec![(
801				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
802				target_hash,
803			)],
804		);
805
806		// Strategy finishes.
807		assert_eq!(state_strategy.actions.len(), 1);
808		assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
809	}
810
811	#[test]
812	fn failure_to_import_target_block_finishes_strategy() {
813		let target_hash = Hash::random();
814		let mut state_sync_provider = MockStateSync::<Block>::new();
815		state_sync_provider.expect_target_hash().return_const(target_hash);
816
817		let mut state_strategy = StateStrategy::new_with_provider(
818			Box::new(state_sync_provider),
819			std::iter::empty(),
820			ProtocolName::Static(""),
821		);
822
823		// Target block import failed.
824		state_strategy.on_blocks_processed(
825			1,
826			1,
827			vec![(
828				Err(BlockImportError::VerificationFailed(None, String::from("test-error"))),
829				target_hash,
830			)],
831		);
832
833		// Strategy finishes.
834		assert_eq!(state_strategy.actions.len(), 1);
835		assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
836	}
837
838	#[test]
839	fn finished_strategy_doesnt_generate_more_actions() {
840		let target_hash = Hash::random();
841		let mut state_sync_provider = MockStateSync::<Block>::new();
842		state_sync_provider.expect_target_hash().return_const(target_hash);
843		state_sync_provider.expect_is_complete().return_const(true);
844
845		// Get enough peers for possible spurious requests.
846		let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
847
848		let mut state_strategy = StateStrategy::new_with_provider(
849			Box::new(state_sync_provider),
850			initial_peers,
851			ProtocolName::Static(""),
852		);
853
854		state_strategy.on_blocks_processed(
855			1,
856			1,
857			vec![(
858				Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
859				target_hash,
860			)],
861		);
862
863		let network_provider = NetworkServiceProvider::new();
864		let network_handle = network_provider.handle();
865
866		// Strategy finishes.
867		let actions = state_strategy.actions(&network_handle).collect::<Vec<_>>();
868		assert_eq!(actions.len(), 1);
869		assert!(matches!(&actions[0], SyncingAction::Finished));
870
871		// No more actions generated.
872		assert_eq!(state_strategy.actions(&network_handle).count(), 0);
873	}
874}