1use crate::{
10 sync::{
11 schema::v1::{StateRequest, StateResponse},
12 service::network::NetworkServiceHandle,
13 strategy::{
14 disconnected_peers::DisconnectedPeers,
15 state_sync::{ImportResult, StateSync, StateSyncProvider},
16 StrategyKey, SyncingAction,
17 },
18 types::{BadPeer, SyncState, SyncStatus},
19 },
20 LOG_TARGET,
21};
22use futures::{channel::oneshot, FutureExt};
23use log::{debug, error, trace};
24use prost::Message;
25use soil_client::client_api::ProofProvider;
26use soil_client::consensus::BlockOrigin;
27use soil_client::import::{BlockImportError, BlockImportStatus, IncomingBlock};
28use soil_network::common::sync::message::BlockAnnounce;
29use soil_network::types::PeerId;
30use soil_network::{IfDisconnected, ProtocolName};
31use std::{any::Any, collections::HashMap, sync::Arc};
32use subsoil::runtime::{
33 traits::{Block as BlockT, Header, NumberFor},
34 Justifications, SaturatedConversion,
35};
36
37mod rep {
38 use soil_network::ReputationChange as Rep;
39
40 pub const BAD_RESPONSE: Rep = Rep::new(-(1 << 12), "Incomplete response");
42
43 pub const BAD_STATE: Rep = Rep::new(-(1 << 29), "Bad state");
45}
46
47enum PeerState {
48 Available,
49 DownloadingState,
50}
51
52impl PeerState {
53 fn is_available(&self) -> bool {
54 matches!(self, PeerState::Available)
55 }
56}
57
58struct Peer<B: BlockT> {
59 best_number: NumberFor<B>,
60 state: PeerState,
61}
62
63pub struct StateStrategy<B: BlockT> {
65 state_sync: Box<dyn StateSyncProvider<B>>,
66 peers: HashMap<PeerId, Peer<B>>,
67 disconnected_peers: DisconnectedPeers,
68 actions: Vec<SyncingAction<B>>,
69 protocol_name: ProtocolName,
70 succeeded: bool,
71}
72
73impl<B: BlockT> StateStrategy<B> {
74 pub const STRATEGY_KEY: StrategyKey = StrategyKey::new("State");
76
77 pub fn new<Client>(
79 client: Arc<Client>,
80 target_header: B::Header,
81 target_body: Option<Vec<B::Extrinsic>>,
82 target_justifications: Option<Justifications>,
83 skip_proof: bool,
84 initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
85 protocol_name: ProtocolName,
86 ) -> Self
87 where
88 Client: ProofProvider<B> + Send + Sync + 'static,
89 {
90 let peers = initial_peers
91 .map(|(peer_id, best_number)| {
92 (peer_id, Peer { best_number, state: PeerState::Available })
93 })
94 .collect();
95 Self {
96 state_sync: Box::new(StateSync::new(
97 client,
98 target_header,
99 target_body,
100 target_justifications,
101 skip_proof,
102 )),
103 peers,
104 disconnected_peers: DisconnectedPeers::new(),
105 actions: Vec::new(),
106 protocol_name,
107 succeeded: false,
108 }
109 }
110
111 pub fn new_with_provider(
116 state_sync_provider: Box<dyn StateSyncProvider<B>>,
117 initial_peers: impl Iterator<Item = (PeerId, NumberFor<B>)>,
118 protocol_name: ProtocolName,
119 ) -> Self {
120 Self {
121 state_sync: state_sync_provider,
122 peers: initial_peers
123 .map(|(peer_id, best_number)| {
124 (peer_id, Peer { best_number, state: PeerState::Available })
125 })
126 .collect(),
127 disconnected_peers: DisconnectedPeers::new(),
128 actions: Vec::new(),
129 protocol_name,
130 succeeded: false,
131 }
132 }
133
134 pub fn add_peer(&mut self, peer_id: PeerId, _best_hash: B::Hash, best_number: NumberFor<B>) {
136 self.peers.insert(peer_id, Peer { best_number, state: PeerState::Available });
137 }
138
139 pub fn remove_peer(&mut self, peer_id: &PeerId) {
141 if let Some(state) = self.peers.remove(peer_id) {
142 if !state.state.is_available() {
143 if let Some(bad_peer) =
144 self.disconnected_peers.on_disconnect_during_request(*peer_id)
145 {
146 self.actions.push(SyncingAction::DropPeer(bad_peer));
147 }
148 }
149 }
150 }
151
152 #[must_use]
156 pub fn on_validated_block_announce(
157 &mut self,
158 is_best: bool,
159 peer_id: PeerId,
160 announce: &BlockAnnounce<B::Header>,
161 ) -> Option<(B::Hash, NumberFor<B>)> {
162 is_best.then(|| {
163 let best_number = *announce.header.number();
164 let best_hash = announce.header.hash();
165 if let Some(ref mut peer) = self.peers.get_mut(&peer_id) {
166 peer.best_number = best_number;
167 }
168 (best_hash, best_number)
170 })
171 }
172
173 pub fn on_state_response(&mut self, peer_id: &PeerId, response: Vec<u8>) {
175 if let Err(bad_peer) = self.on_state_response_inner(peer_id, &response) {
176 self.actions.push(SyncingAction::DropPeer(bad_peer));
177 }
178 }
179
180 fn on_state_response_inner(
181 &mut self,
182 peer_id: &PeerId,
183 response: &[u8],
184 ) -> Result<(), BadPeer> {
185 if let Some(peer) = self.peers.get_mut(&peer_id) {
186 peer.state = PeerState::Available;
187 }
188
189 let response = match StateResponse::decode(response) {
190 Ok(response) => response,
191 Err(error) => {
192 debug!(
193 target: LOG_TARGET,
194 "Failed to decode state response from peer {peer_id:?}: {error:?}.",
195 );
196
197 return Err(BadPeer(*peer_id, rep::BAD_RESPONSE));
198 },
199 };
200
201 debug!(
202 target: LOG_TARGET,
203 "Importing state data from {} with {} keys, {} proof nodes.",
204 peer_id,
205 response.entries.len(),
206 response.proof.len(),
207 );
208
209 match self.state_sync.import(response) {
210 ImportResult::Import(hash, header, state, body, justifications) => {
211 let origin = BlockOrigin::NetworkInitialSync;
212 let block = IncomingBlock {
213 hash,
214 header: Some(header),
215 body,
216 indexed_body: None,
217 justifications,
218 origin: None,
219 allow_missing_state: true,
220 import_existing: true,
221 skip_execution: true,
222 state: Some(state),
223 };
224 debug!(target: LOG_TARGET, "State download is complete. Import is queued");
225 self.actions.push(SyncingAction::ImportBlocks { origin, blocks: vec![block] });
226 Ok(())
227 },
228 ImportResult::Continue => Ok(()),
229 ImportResult::BadResponse => {
230 debug!(target: LOG_TARGET, "Bad state data received from {peer_id}");
231 Err(BadPeer(*peer_id, rep::BAD_STATE))
232 },
233 }
234 }
235
236 pub fn on_blocks_processed(
240 &mut self,
241 imported: usize,
242 count: usize,
243 results: Vec<(Result<BlockImportStatus<NumberFor<B>>, BlockImportError>, B::Hash)>,
244 ) {
245 trace!(target: LOG_TARGET, "State sync: imported {imported} of {count}.");
246
247 let results = results
248 .into_iter()
249 .filter_map(|(result, hash)| {
250 if hash == self.state_sync.target_hash() {
251 Some(result)
252 } else {
253 debug!(
254 target: LOG_TARGET,
255 "Unexpected block processed: {hash} with result {result:?}.",
256 );
257 None
258 }
259 })
260 .collect::<Vec<_>>();
261
262 if !results.is_empty() {
263 results.iter().filter_map(|result| result.as_ref().err()).for_each(|e| {
265 error!(
266 target: LOG_TARGET,
267 "Failed to import target block with state: {e:?}."
268 );
269 });
270 self.succeeded |= results.into_iter().any(|result| result.is_ok());
271 self.actions.push(SyncingAction::Finished);
272 }
273 }
274
275 fn state_request(&mut self) -> Option<(PeerId, StateRequest)> {
277 if self.state_sync.is_complete() {
278 return None;
279 }
280
281 if self
282 .peers
283 .values()
284 .any(|peer| matches!(peer.state, PeerState::DownloadingState))
285 {
286 return None;
288 }
289
290 let peer_id =
291 self.schedule_next_peer(PeerState::DownloadingState, self.state_sync.target_number())?;
292 let request = self.state_sync.next_request();
293 trace!(
294 target: LOG_TARGET,
295 "New state request to {peer_id}: {request:?}.",
296 );
297 Some((peer_id, request))
298 }
299
300 fn schedule_next_peer(
301 &mut self,
302 new_state: PeerState,
303 min_best_number: NumberFor<B>,
304 ) -> Option<PeerId> {
305 let mut targets: Vec<_> = self.peers.values().map(|p| p.best_number).collect();
306 if targets.is_empty() {
307 return None;
308 }
309 targets.sort();
310 let median = targets[targets.len() / 2];
311 let threshold = std::cmp::max(median, min_best_number);
312 for (peer_id, peer) in self.peers.iter_mut() {
315 if peer.state.is_available()
316 && peer.best_number >= threshold
317 && self.disconnected_peers.is_peer_available(peer_id)
318 {
319 peer.state = new_state;
320 return Some(*peer_id);
321 }
322 }
323 None
324 }
325
326 pub fn status(&self) -> SyncStatus<B> {
328 SyncStatus {
329 state: if self.state_sync.is_complete() {
330 SyncState::Idle
331 } else {
332 SyncState::Downloading { target: self.state_sync.target_number() }
333 },
334 best_seen_block: Some(self.state_sync.target_number()),
335 num_peers: self.peers.len().saturated_into(),
336 queued_blocks: 0,
337 state_sync: Some(self.state_sync.progress()),
338 warp_sync: None,
339 }
340 }
341
342 #[must_use]
344 pub fn actions(
345 &mut self,
346 network_service: &NetworkServiceHandle,
347 ) -> impl Iterator<Item = SyncingAction<B>> {
348 let state_request = self.state_request().into_iter().map(|(peer_id, request)| {
349 let (tx, rx) = oneshot::channel();
350
351 network_service.start_request(
352 peer_id,
353 self.protocol_name.clone(),
354 request.encode_to_vec(),
355 tx,
356 IfDisconnected::ImmediateError,
357 );
358
359 SyncingAction::StartRequest {
360 peer_id,
361 key: Self::STRATEGY_KEY,
362 request: async move {
363 Ok(rx.await?.and_then(|(response, protocol_name)| {
364 Ok((Box::new(response) as Box<dyn Any + Send>, protocol_name))
365 }))
366 }
367 .boxed(),
368 remove_obsolete: false,
369 }
370 });
371 self.actions.extend(state_request);
372
373 std::mem::take(&mut self.actions).into_iter()
374 }
375
376 #[must_use]
378 pub fn is_succeeded(&self) -> bool {
379 self.succeeded
380 }
381}
382
383#[cfg(test)]
384mod test {
385 use super::*;
386 use crate::sync::{
387 schema::v1::{StateRequest, StateResponse},
388 service::network::NetworkServiceProvider,
389 strategy::state_sync::{ImportResult, StateSyncProgress, StateSyncProvider},
390 };
391 use codec::Decode;
392 use soil_client::block_builder::BlockBuilderBuilder;
393 use soil_client::client_api::KeyValueStates;
394 use soil_client::import::{ImportedAux, ImportedState};
395 use subsoil::core::H256;
396 use subsoil::runtime::traits::Zero;
397 use soil_test_node_runtime_client::{
398 runtime::{Block, Hash},
399 BlockBuilderExt, DefaultTestClientBuilderExt, TestClientBuilder, TestClientBuilderExt,
400 };
401
402 mockall::mock! {
403 pub StateSync<B: BlockT> {}
404
405 impl<B: BlockT> StateSyncProvider<B> for StateSync<B> {
406 fn import(&mut self, response: StateResponse) -> ImportResult<B>;
407 fn next_request(&self) -> StateRequest;
408 fn is_complete(&self) -> bool;
409 fn target_number(&self) -> NumberFor<B>;
410 fn target_hash(&self) -> B::Hash;
411 fn progress(&self) -> StateSyncProgress;
412 }
413 }
414
415 #[test]
416 fn no_peer_is_scheduled_if_no_peers_connected() {
417 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
418 let target_block = BlockBuilderBuilder::new(&*client)
419 .on_parent_block(client.chain_info().best_hash)
420 .with_parent_block_number(client.chain_info().best_number)
421 .build()
422 .unwrap()
423 .build()
424 .unwrap()
425 .block;
426 let target_header = target_block.header().clone();
427
428 let mut state_strategy = StateStrategy::new(
429 client,
430 target_header,
431 None,
432 None,
433 false,
434 std::iter::empty(),
435 ProtocolName::Static(""),
436 );
437
438 assert!(state_strategy
439 .schedule_next_peer(PeerState::DownloadingState, Zero::zero())
440 .is_none());
441 }
442
443 #[test]
444 fn at_least_median_synced_peer_is_scheduled() {
445 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
446 let target_block = BlockBuilderBuilder::new(&*client)
447 .on_parent_block(client.chain_info().best_hash)
448 .with_parent_block_number(client.chain_info().best_number)
449 .build()
450 .unwrap()
451 .build()
452 .unwrap()
453 .block;
454
455 for _ in 0..100 {
456 let peers = (1..=10)
457 .map(|best_number| (PeerId::random(), best_number))
458 .collect::<HashMap<_, _>>();
459 let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
460
461 let mut state_strategy = StateStrategy::new(
462 client.clone(),
463 target_block.header().clone(),
464 None,
465 None,
466 false,
467 initial_peers,
468 ProtocolName::Static(""),
469 );
470
471 let peer_id =
472 state_strategy.schedule_next_peer(PeerState::DownloadingState, Zero::zero());
473 assert!(*peers.get(&peer_id.unwrap()).unwrap() >= 6);
474 }
475 }
476
477 #[test]
478 fn min_best_number_peer_is_scheduled() {
479 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
480 let target_block = BlockBuilderBuilder::new(&*client)
481 .on_parent_block(client.chain_info().best_hash)
482 .with_parent_block_number(client.chain_info().best_number)
483 .build()
484 .unwrap()
485 .build()
486 .unwrap()
487 .block;
488
489 for _ in 0..10 {
490 let peers = (1..=10)
491 .map(|best_number| (PeerId::random(), best_number))
492 .collect::<HashMap<_, _>>();
493 let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
494
495 let mut state_strategy = StateStrategy::new(
496 client.clone(),
497 target_block.header().clone(),
498 None,
499 None,
500 false,
501 initial_peers,
502 ProtocolName::Static(""),
503 );
504
505 let peer_id = state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
506 assert!(*peers.get(&peer_id.unwrap()).unwrap() == 10);
507 }
508 }
509
510 #[test]
511 fn backedoff_number_peer_is_not_scheduled() {
512 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
513 let target_block = BlockBuilderBuilder::new(&*client)
514 .on_parent_block(client.chain_info().best_hash)
515 .with_parent_block_number(client.chain_info().best_number)
516 .build()
517 .unwrap()
518 .build()
519 .unwrap()
520 .block;
521
522 let peers = (1..=10)
523 .map(|best_number| (PeerId::random(), best_number))
524 .collect::<Vec<(_, _)>>();
525 let ninth_peer = peers[8].0;
526 let tenth_peer = peers[9].0;
527 let initial_peers = peers.iter().map(|(p, n)| (*p, *n));
528
529 let mut state_strategy = StateStrategy::new(
530 client.clone(),
531 target_block.header().clone(),
532 None,
533 None,
534 false,
535 initial_peers,
536 ProtocolName::Static(""),
537 );
538
539 state_strategy.remove_peer(&tenth_peer);
541 assert!(state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
542
543 state_strategy.add_peer(tenth_peer, H256::random(), 10);
545 let peer_id: Option<PeerId> =
546 state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
547 assert_eq!(tenth_peer, peer_id.unwrap());
548 state_strategy.remove_peer(&tenth_peer);
549
550 assert!(!state_strategy.disconnected_peers.is_peer_available(&tenth_peer));
552
553 state_strategy.add_peer(tenth_peer, H256::random(), 10);
555 let peer_id: Option<PeerId> =
556 state_strategy.schedule_next_peer(PeerState::DownloadingState, 10);
557 assert!(peer_id.is_none());
558
559 let peer_id: Option<PeerId> =
561 state_strategy.schedule_next_peer(PeerState::DownloadingState, 9);
562 assert_eq!(ninth_peer, peer_id.unwrap());
563 }
564
565 #[test]
566 fn state_request_contains_correct_hash() {
567 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
568 let target_block = BlockBuilderBuilder::new(&*client)
569 .on_parent_block(client.chain_info().best_hash)
570 .with_parent_block_number(client.chain_info().best_number)
571 .build()
572 .unwrap()
573 .build()
574 .unwrap()
575 .block;
576
577 let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
578
579 let mut state_strategy = StateStrategy::new(
580 client.clone(),
581 target_block.header().clone(),
582 None,
583 None,
584 false,
585 initial_peers,
586 ProtocolName::Static(""),
587 );
588
589 let (_peer_id, request) = state_strategy.state_request().unwrap();
590 let hash = Hash::decode(&mut &*request.block).unwrap();
591
592 assert_eq!(hash, target_block.header().hash());
593 }
594
595 #[test]
596 fn no_parallel_state_requests() {
597 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
598 let target_block = BlockBuilderBuilder::new(&*client)
599 .on_parent_block(client.chain_info().best_hash)
600 .with_parent_block_number(client.chain_info().best_number)
601 .build()
602 .unwrap()
603 .build()
604 .unwrap()
605 .block;
606
607 let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
608
609 let mut state_strategy = StateStrategy::new(
610 client.clone(),
611 target_block.header().clone(),
612 None,
613 None,
614 false,
615 initial_peers,
616 ProtocolName::Static(""),
617 );
618
619 assert!(state_strategy.state_request().is_some());
621
622 assert!(state_strategy.state_request().is_none());
624 }
625
626 #[test]
627 fn received_state_response_makes_peer_available_again() {
628 let mut state_sync_provider = MockStateSync::<Block>::new();
629 state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
630 let peer_id = PeerId::random();
631 let initial_peers = std::iter::once((peer_id, 10));
632 let mut state_strategy = StateStrategy::new_with_provider(
633 Box::new(state_sync_provider),
634 initial_peers,
635 ProtocolName::Static(""),
636 );
637 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
639
640 let dummy_response = StateResponse::default().encode_to_vec();
641 state_strategy.on_state_response(&peer_id, dummy_response);
642
643 assert!(state_strategy.peers.get(&peer_id).unwrap().state.is_available());
644 }
645
646 #[test]
647 fn bad_state_response_drops_peer() {
648 let mut state_sync_provider = MockStateSync::<Block>::new();
649 state_sync_provider.expect_import().return_once(|_| ImportResult::BadResponse);
651 let peer_id = PeerId::random();
652 let initial_peers = std::iter::once((peer_id, 10));
653 let mut state_strategy = StateStrategy::new_with_provider(
654 Box::new(state_sync_provider),
655 initial_peers,
656 ProtocolName::Static(""),
657 );
658 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
660 let dummy_response = StateResponse::default().encode_to_vec();
661 assert!(matches!(
663 state_strategy.on_state_response_inner(&peer_id, &dummy_response),
664 Err(BadPeer(id, _rep)) if id == peer_id,
665 ));
666 }
667
668 #[test]
669 fn partial_state_response_doesnt_generate_actions() {
670 let mut state_sync_provider = MockStateSync::<Block>::new();
671 state_sync_provider.expect_import().return_once(|_| ImportResult::Continue);
673 let peer_id = PeerId::random();
674 let initial_peers = std::iter::once((peer_id, 10));
675 let mut state_strategy = StateStrategy::new_with_provider(
676 Box::new(state_sync_provider),
677 initial_peers,
678 ProtocolName::Static(""),
679 );
680 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
682
683 let dummy_response = StateResponse::default().encode_to_vec();
684 state_strategy.on_state_response(&peer_id, dummy_response);
685
686 assert_eq!(state_strategy.actions.len(), 0)
688 }
689
690 #[test]
691 fn complete_state_response_leads_to_block_import() {
692 let client = Arc::new(TestClientBuilder::new().set_no_genesis().build());
694 let mut block_builder = BlockBuilderBuilder::new(&*client)
695 .on_parent_block(client.chain_info().best_hash)
696 .with_parent_block_number(client.chain_info().best_number)
697 .build()
698 .unwrap();
699 block_builder.push_storage_change(vec![1, 2, 3], Some(vec![4, 5, 6])).unwrap();
700 let block = block_builder.build().unwrap().block;
701 let header = block.header().clone();
702 let hash = header.hash();
703 let body = Some(block.extrinsics().iter().cloned().collect::<Vec<_>>());
704 let state = ImportedState { block: hash, state: KeyValueStates(Vec::new()) };
705 let justifications = Some(Justifications::from((*b"FRNK", Vec::new())));
706
707 let mut state_sync_provider = MockStateSync::<Block>::new();
709 let import = ImportResult::Import(
710 hash,
711 header.clone(),
712 state.clone(),
713 body.clone(),
714 justifications.clone(),
715 );
716 state_sync_provider.expect_import().return_once(move |_| import);
717
718 let expected_origin = BlockOrigin::NetworkInitialSync;
720 let expected_block = IncomingBlock {
721 hash,
722 header: Some(header),
723 body,
724 indexed_body: None,
725 justifications,
726 origin: None,
727 allow_missing_state: true,
728 import_existing: true,
729 skip_execution: true,
730 state: Some(state),
731 };
732 let expected_blocks = vec![expected_block];
733
734 let peer_id = PeerId::random();
736 let initial_peers = std::iter::once((peer_id, 10));
737 let mut state_strategy = StateStrategy::new_with_provider(
738 Box::new(state_sync_provider),
739 initial_peers,
740 ProtocolName::Static(""),
741 );
742 state_strategy.peers.get_mut(&peer_id).unwrap().state = PeerState::DownloadingState;
744
745 let dummy_response = StateResponse::default().encode_to_vec();
747 state_strategy.on_state_response(&peer_id, dummy_response);
748
749 assert_eq!(state_strategy.actions.len(), 1);
750 assert!(matches!(
751 &state_strategy.actions[0],
752 SyncingAction::ImportBlocks { origin, blocks }
753 if *origin == expected_origin && *blocks == expected_blocks,
754 ));
755 }
756
757 #[test]
758 fn importing_unknown_block_doesnt_finish_strategy() {
759 let target_hash = Hash::random();
760 let unknown_hash = Hash::random();
761 let mut state_sync_provider = MockStateSync::<Block>::new();
762 state_sync_provider.expect_target_hash().return_const(target_hash);
763
764 let mut state_strategy = StateStrategy::new_with_provider(
765 Box::new(state_sync_provider),
766 std::iter::empty(),
767 ProtocolName::Static(""),
768 );
769
770 state_strategy.on_blocks_processed(
772 1,
773 1,
774 vec![(
775 Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
776 unknown_hash,
777 )],
778 );
779
780 assert_eq!(state_strategy.actions.len(), 0);
782 }
783
784 #[test]
785 fn successfully_importing_target_block_finishes_strategy() {
786 let target_hash = Hash::random();
787 let mut state_sync_provider = MockStateSync::<Block>::new();
788 state_sync_provider.expect_target_hash().return_const(target_hash);
789
790 let mut state_strategy = StateStrategy::new_with_provider(
791 Box::new(state_sync_provider),
792 std::iter::empty(),
793 ProtocolName::Static(""),
794 );
795
796 state_strategy.on_blocks_processed(
798 1,
799 1,
800 vec![(
801 Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
802 target_hash,
803 )],
804 );
805
806 assert_eq!(state_strategy.actions.len(), 1);
808 assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
809 }
810
811 #[test]
812 fn failure_to_import_target_block_finishes_strategy() {
813 let target_hash = Hash::random();
814 let mut state_sync_provider = MockStateSync::<Block>::new();
815 state_sync_provider.expect_target_hash().return_const(target_hash);
816
817 let mut state_strategy = StateStrategy::new_with_provider(
818 Box::new(state_sync_provider),
819 std::iter::empty(),
820 ProtocolName::Static(""),
821 );
822
823 state_strategy.on_blocks_processed(
825 1,
826 1,
827 vec![(
828 Err(BlockImportError::VerificationFailed(None, String::from("test-error"))),
829 target_hash,
830 )],
831 );
832
833 assert_eq!(state_strategy.actions.len(), 1);
835 assert!(matches!(&state_strategy.actions[0], SyncingAction::Finished));
836 }
837
838 #[test]
839 fn finished_strategy_doesnt_generate_more_actions() {
840 let target_hash = Hash::random();
841 let mut state_sync_provider = MockStateSync::<Block>::new();
842 state_sync_provider.expect_target_hash().return_const(target_hash);
843 state_sync_provider.expect_is_complete().return_const(true);
844
845 let initial_peers = (1..=10).map(|best_number| (PeerId::random(), best_number));
847
848 let mut state_strategy = StateStrategy::new_with_provider(
849 Box::new(state_sync_provider),
850 initial_peers,
851 ProtocolName::Static(""),
852 );
853
854 state_strategy.on_blocks_processed(
855 1,
856 1,
857 vec![(
858 Ok(BlockImportStatus::ImportedUnknown(1, ImportedAux::default(), None)),
859 target_hash,
860 )],
861 );
862
863 let network_provider = NetworkServiceProvider::new();
864 let network_handle = network_provider.handle();
865
866 let actions = state_strategy.actions(&network_handle).collect::<Vec<_>>();
868 assert_eq!(actions.len(), 1);
869 assert!(matches!(&actions[0], SyncingAction::Finished));
870
871 assert_eq!(state_strategy.actions(&network_handle).count(), 0);
873 }
874}