1use std::future::Future;
2use std::pin::pin;
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use arc_swap::{ArcSwapAny, ArcSwapOption};
9use futures_util::future::{self, BoxFuture};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::block::{
12 BlockIdRelation, BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug,
13 check_with_master_state, check_with_prev_key_block_proof,
14};
15use tycho_block_util::queue::QueueDiffStuffAug;
16use tycho_block_util::state::ShardStateStuff;
17use tycho_types::models::BlockId;
18use tycho_util::metrics::HistogramGuard;
19use tycho_util::serde_helpers;
20
21pub use self::archive_provider::{
22 ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
23 ArchiveResponse, FoundArchive, HybridArchiveClient, IntoArchiveClient,
24};
25pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig};
26pub use self::box_provider::BoxBlockProvider;
27use self::futures::SelectNonEmptyFut;
28pub use self::storage_provider::StorageBlockProvider;
29use crate::storage::{CoreStorage, MaybeExistingHandle, NewBlockMeta};
30
31mod archive_provider;
32mod blockchain_provider;
33mod box_provider;
34mod futures;
35mod storage_provider;
36
37pub type OptionalBlockStuff = Option<Result<BlockStuffAug>>;
38
39pub trait BlockProvider: Send + Sync + 'static {
41 type GetNextBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
42 type GetBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
43 type CleanupFut<'a>: Future<Output = Result<()>> + Send + 'a;
44
45 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
47 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a>;
49 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_>;
51}
52
53impl<T: BlockProvider> BlockProvider for Box<T> {
54 type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
55 type GetBlockFut<'a> = T::GetBlockFut<'a>;
56 type CleanupFut<'a> = T::CleanupFut<'a>;
57
58 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
59 <T as BlockProvider>::get_next_block(self, prev_block_id)
60 }
61
62 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
63 <T as BlockProvider>::get_block(self, block_id_relation)
64 }
65
66 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
67 <T as BlockProvider>::cleanup_until(self, mc_seqno)
68 }
69}
70
71impl<T: BlockProvider> BlockProvider for Arc<T> {
72 type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
73 type GetBlockFut<'a> = T::GetBlockFut<'a>;
74 type CleanupFut<'a> = T::CleanupFut<'a>;
75
76 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
77 <T as BlockProvider>::get_next_block(self, prev_block_id)
78 }
79
80 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
81 <T as BlockProvider>::get_block(self, block_id_relation)
82 }
83
84 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
85 <T as BlockProvider>::cleanup_until(self, mc_seqno)
86 }
87}
88
89pub trait BlockProviderExt: Sized {
90 fn boxed(self) -> BoxBlockProvider;
91
92 fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T>;
93
94 fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T>;
95
96 fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self>;
97}
98
99impl<B: BlockProvider> BlockProviderExt for B {
100 fn boxed(self) -> BoxBlockProvider {
101 castaway::match_type!(self, {
102 BoxBlockProvider as provider => provider,
103 provider => BoxBlockProvider::new(provider),
104 })
105 }
106
107 fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T> {
108 ChainBlockProvider::new(self, other)
109 }
110
111 fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T> {
112 CycleBlockProvider::new(self, other)
113 }
114
115 fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self> {
116 RetryBlockProvider {
117 inner: self,
118 config,
119 }
120 }
121}
122
123#[derive(Debug, Clone, Copy)]
125pub struct EmptyBlockProvider;
126
127impl BlockProvider for EmptyBlockProvider {
128 type GetNextBlockFut<'a> = future::Ready<OptionalBlockStuff>;
129 type GetBlockFut<'a> = future::Ready<OptionalBlockStuff>;
130 type CleanupFut<'a> = future::Ready<Result<()>>;
131
132 fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
133 future::ready(None)
134 }
135
136 fn get_block<'a>(&'a self, _block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
137 future::ready(None)
138 }
139
140 fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
141 future::ready(Ok(()))
142 }
143}
144
145pub struct ChainBlockProvider<T1, T2> {
146 left: ArcSwapOption<T1>,
147 right: T2,
148 cleanup_left_at: AtomicU32,
149}
150
151impl<T1, T2> ChainBlockProvider<T1, T2> {
152 pub fn new(left: T1, right: T2) -> Self {
153 Self {
154 left: ArcSwapAny::new(Some(Arc::new(left))),
155 right,
156 cleanup_left_at: AtomicU32::new(u32::MAX),
157 }
158 }
159}
160
161impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
162 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
163 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
164 type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
165
166 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
167 if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
168 && let Some(left) = self.left.load_full()
169 {
170 return Box::pin(async move {
171 let res = left.get_next_block(prev_block_id).await;
172 if res.is_some() {
173 return res;
174 }
175
176 self.cleanup_left_at
178 .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
179
180 self.right.get_next_block(prev_block_id).await
182 });
183 }
184
185 Box::pin(self.right.get_next_block(prev_block_id))
186 }
187
188 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
189 if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
190 && let Some(left) = self.left.load_full()
191 {
192 return Box::pin(async move { left.get_block(block_id_relation).await });
193 }
194
195 Box::pin(self.right.get_block(block_id_relation))
196 }
197
198 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
199 Box::pin(async move {
200 let cleanup_left_at = self.cleanup_left_at.load(Ordering::Acquire);
205
206 if cleanup_left_at > 0 && cleanup_left_at <= mc_seqno {
207 if let Some(left) = self.left.load_full() {
209 left.cleanup_until(mc_seqno).await?;
210 self.left.store(None);
211 self.cleanup_left_at.store(0, Ordering::Release);
212 }
213 } else if cleanup_left_at == u32::MAX {
214 if let Some(left) = self.left.load_full() {
216 return left.cleanup_until(mc_seqno).await;
217 }
218 }
219
220 self.right.cleanup_until(mc_seqno).await
222 })
223 }
224}
225
226pub struct CycleBlockProvider<T1, T2> {
227 left: T1,
228 right: T2,
229 state: Mutex<CycleBlockProviderState>,
230}
231
232struct CycleBlockProviderState {
233 current: CycleBlockProviderPart,
235 switch_at: u32,
237}
238
239#[derive(Debug, Clone, Copy, Eq, PartialEq)]
240enum CycleBlockProviderPart {
241 Left,
242 Right,
243}
244
245impl std::ops::Not for CycleBlockProviderPart {
246 type Output = Self;
247
248 #[inline]
249 fn not(self) -> Self::Output {
250 match self {
251 Self::Left => Self::Right,
252 Self::Right => Self::Left,
253 }
254 }
255}
256
257impl<T1, T2> CycleBlockProvider<T1, T2> {
258 pub fn new(left: T1, right: T2) -> Self {
259 Self {
260 left,
261 right,
262 state: Mutex::new(CycleBlockProviderState {
263 current: CycleBlockProviderPart::Left,
264 switch_at: u32::MAX,
265 }),
266 }
267 }
268
269 fn choose_provider(&self, mc_seqno: u32) -> CycleBlockProviderPart {
280 let state = self.state.lock().unwrap();
281 if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
282 !state.current
284 } else {
285 state.current
286 }
287 }
288
289 fn toggle_switch_at(&self, mc_seqno: u32) -> CycleBlockProviderPart {
290 let mut state = self.state.lock().unwrap();
291 if state.switch_at == u32::MAX {
292 state.switch_at = mc_seqno;
293 !state.current
294 } else {
295 state.switch_at = u32::MAX;
296 state.current
297 }
298 }
299
300 fn try_apply_switch(&self, mc_seqno: u32) {
301 let mut state = self.state.lock().unwrap();
302 if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
303 state.current = !state.current;
304 state.switch_at = u32::MAX;
305 }
306 }
307}
308
309impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for CycleBlockProvider<T1, T2> {
310 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
311 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
312 type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
313
314 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
315 Box::pin(async move {
316 let mut res = match self.choose_provider(prev_block_id.seqno) {
318 CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
319 CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
320 };
321 if res.is_some() {
322 return res;
323 }
324
325 loop {
326 res = match self.toggle_switch_at(prev_block_id.seqno.saturating_add(1)) {
328 CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
329 CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
330 };
331 if res.is_some() {
332 return res;
333 }
334
335 tokio::task::yield_now().await;
338 }
339 })
340 }
341
342 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
343 match self.choose_provider(block_id_relation.mc_block_id.seqno) {
344 CycleBlockProviderPart::Left => Box::pin(self.left.get_block(block_id_relation)),
345 CycleBlockProviderPart::Right => Box::pin(self.right.get_block(block_id_relation)),
346 }
347 }
348
349 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
350 Box::pin(async move {
351 self.try_apply_switch(mc_seqno);
352
353 let cleanup_left = self.left.cleanup_until(mc_seqno);
354 let cleanup_right = self.right.cleanup_until(mc_seqno);
355 match futures_util::future::join(cleanup_left, cleanup_right).await {
356 (Err(e), _) | (_, Err(e)) => Err(e),
357 (Ok(()), Ok(())) => Ok(()),
358 }
359 })
360 }
361}
362
363pub struct RetryBlockProvider<T> {
364 inner: T,
365 config: RetryConfig,
366}
367
368impl<T: BlockProvider> BlockProvider for RetryBlockProvider<T> {
369 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
370 type GetBlockFut<'a> = T::GetBlockFut<'a>;
371 type CleanupFut<'a> = T::CleanupFut<'a>;
372
373 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
374 Box::pin(async move {
375 let mut attempts = 0usize;
376
377 loop {
378 let res = self.inner.get_next_block(prev_block_id).await;
379 if res.is_some() || attempts >= self.config.attempts {
380 break res;
381 }
382
383 attempts += 1;
384
385 tokio::time::sleep(self.config.interval).await;
387 }
388 })
389 }
390
391 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
392 self.inner.get_block(block_id_relation)
393 }
394
395 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
396 self.inner.cleanup_until(mc_seqno)
397 }
398}
399
400macro_rules! impl_provider_tuple {
401 ($join_fn:path, |$e:ident| $err_pat:pat$(,)?, {
402 $($n:tt: $var:ident = $ty:ident),*$(,)?
403 }) => {
404 impl<$($ty),*> BlockProvider for ($($ty),*)
405 where
406 $($ty: BlockProvider),*
407 {
408 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
409 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
410 type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
411
412 fn get_next_block<'a>(
413 &'a self,
414 prev_block_id: &'a BlockId,
415 ) -> Self::GetNextBlockFut<'a> {
416 $(let $var = self.$n.get_next_block(prev_block_id));*;
417
418 Box::pin(async move {
419 $(let $var = pin!($var));*;
420 SelectNonEmptyFut::from(($($var),*)).await
421 })
422 }
423
424 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
425 $(let $var = self.$n.get_block(block_id_relation));*;
426
427 Box::pin(async move {
428 $(let $var = pin!($var));*;
429 SelectNonEmptyFut::from(($($var),*)).await
430 })
431 }
432
433 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
434 $(let $var = self.$n.cleanup_until(mc_seqno));*;
435
436 Box::pin(async move {
437 match $join_fn($($var),*).await {
438 $err_pat => Err($e),
439 _ => Ok(())
440 }
441 })
442 }
443 }
444 };
445}
446
447impl_provider_tuple! {
448 futures_util::future::join,
449 |e| (Err(e), _) | (_, Err(e)),
450 {
451 0: a = T0,
452 1: b = T1,
453 }
454}
455impl_provider_tuple! {
456 futures_util::future::join3,
457 |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
458 {
459 0: a = T0,
460 1: b = T1,
461 2: c = T2,
462 }
463}
464impl_provider_tuple! {
465 futures_util::future::join4,
466 |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
467 {
468 0: a = T0,
469 1: b = T1,
470 2: c = T2,
471 3: d = T3,
472 }
473}
474impl_provider_tuple! {
475 futures_util::future::join5,
476 |e|
477 (Err(e), _, _, _, _)
478 | (_, Err(e), _, _, _)
479 | (_, _, Err(e), _, _)
480 | (_, _, _, Err(e), _)
481 | (_, _, _, _, Err(e)),
482 {
483 0: a = T0,
484 1: b = T1,
485 2: c = T2,
486 3: d = T3,
487 4: e = T4,
488 }
489}
490
491pub struct CheckProof<'a> {
492 pub mc_block_id: &'a BlockId,
493 pub block: &'a BlockStuff,
494 pub proof: &'a BlockProofStuffAug,
495 pub queue_diff: &'a QueueDiffStuffAug,
496
497 pub store_on_success: bool,
499}
500
501pub struct ProofChecker {
504 storage: CoreStorage,
505 cached_zerostate: ArcSwapAny<Option<ShardStateStuff>>,
506 cached_prev_key_block_proof: ArcSwapAny<Option<BlockProofStuff>>,
507}
508
509impl ProofChecker {
510 pub fn new(storage: CoreStorage) -> Self {
511 Self {
512 storage,
513 cached_zerostate: Default::default(),
514 cached_prev_key_block_proof: Default::default(),
515 }
516 }
517
518 pub async fn check_proof(&self, ctx: CheckProof<'_>) -> Result<NewBlockMeta> {
519 let _histogram = HistogramGuard::begin("tycho_core_check_block_proof_time");
521
522 let CheckProof {
523 mc_block_id,
524 block,
525 proof,
526 queue_diff,
527 store_on_success,
528 } = ctx;
529
530 anyhow::ensure!(
531 block.id() == &proof.proof().proof_for,
532 "proof_for and block id mismatch: proof_for={}, block_id={}",
533 proof.proof().proof_for,
534 block.id(),
535 );
536
537 let is_masterchain = block.id().is_masterchain();
538 anyhow::ensure!(is_masterchain ^ proof.is_link(), "unexpected proof type");
539
540 let (virt_block, virt_block_info) = proof.pre_check_block_proof()?;
541 let meta = NewBlockMeta {
542 is_key_block: virt_block_info.key_block,
543 gen_utime: virt_block_info.gen_utime,
544 ref_by_mc_seqno: mc_block_id.seqno,
545 };
546
547 let block_storage = self.storage.block_storage();
548
549 anyhow::ensure!(
550 &virt_block.out_msg_queue_updates.diff_hash == queue_diff.diff_hash(),
551 "queue diff mismatch (expected: {}, got: {})",
552 virt_block.out_msg_queue_updates.diff_hash,
553 queue_diff.diff_hash(),
554 );
555
556 if is_masterchain {
557 let block_handles = self.storage.block_handle_storage();
558 let handle = block_handles
559 .load_key_block_handle(virt_block_info.prev_key_block_seqno)
560 .ok_or_else(|| {
561 anyhow::anyhow!(
562 "failed to load prev key block handle by prev_key_block_seqno {}",
563 virt_block_info.prev_key_block_seqno
564 )
565 })?;
566
567 if handle.id().seqno == 0 {
568 let zerostate = 'zerostate: {
569 if let Some(zerostate) = self.cached_zerostate.load_full() {
570 break 'zerostate zerostate;
571 }
572
573 let shard_states = self.storage.shard_state_storage();
574 let zerostate = shard_states
575 .load_state(0, handle.id())
576 .await
577 .context("failed to load mc zerostate")?;
578
579 self.cached_zerostate.store(Some(zerostate.clone()));
580
581 zerostate
582 };
583
584 check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info)?;
585 } else {
586 let prev_key_block_proof = 'prev_proof: {
587 if let Some(prev_proof) = self.cached_prev_key_block_proof.load_full()
588 && &prev_proof.as_ref().proof_for == handle.id()
589 {
590 break 'prev_proof prev_proof;
591 }
592
593 let prev_key_block_proof = block_storage
594 .load_block_proof(&handle)
595 .await
596 .context("failed to load prev key block proof")?;
597
598 self.cached_prev_key_block_proof
601 .store(Some(prev_key_block_proof.clone()));
602
603 prev_key_block_proof
604 };
605
606 check_with_prev_key_block_proof(
607 proof,
608 &prev_key_block_proof,
609 &virt_block,
610 &virt_block_info,
611 )?;
612 }
613 }
614
615 if store_on_success {
616 let res = block_storage
618 .store_block_proof(proof, MaybeExistingHandle::New(meta))
619 .await?;
620
621 block_storage
623 .store_queue_diff(queue_diff, res.handle.into())
624 .await?;
625 }
626
627 Ok(meta)
628 }
629}
630
631#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
632#[serde(default)]
633pub struct RetryConfig {
634 pub attempts: usize,
638
639 #[serde(with = "serde_helpers::humantime")]
643 pub interval: Duration,
644}
645
646impl Default for RetryConfig {
647 fn default() -> Self {
648 Self {
649 attempts: 1,
650 interval: Duration::from_secs(1),
651 }
652 }
653}
654
655#[cfg(test)]
656mod test {
657 use std::sync::Arc;
658 use std::sync::atomic::{AtomicBool, Ordering};
659
660 use tycho_block_util::block::{BlockIdExt, BlockStuff};
661 use tycho_types::boc::Boc;
662 use tycho_types::models::Block;
663
664 use super::*;
665
666 struct MockBlockProvider {
667 has_block: AtomicBool,
669 }
670
671 impl BlockProvider for MockBlockProvider {
672 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
673 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
674 type CleanupFut<'a> = future::Ready<Result<()>>;
675
676 fn get_next_block(&self, _prev_block_id: &BlockId) -> Self::GetNextBlockFut<'_> {
677 Box::pin(async {
678 if self.has_block.load(Ordering::Acquire) {
679 Some(Ok(get_empty_block()))
680 } else {
681 None
682 }
683 })
684 }
685
686 fn get_block(&self, _block_id: &BlockIdRelation) -> Self::GetBlockFut<'_> {
687 Box::pin(async {
688 if self.has_block.load(Ordering::Acquire) {
689 Some(Ok(get_empty_block()))
690 } else {
691 None
692 }
693 })
694 }
695
696 fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
697 future::ready(Ok(()))
698 }
699 }
700
701 #[tokio::test]
702 async fn chain_block_provider_switches_providers_correctly() {
703 let left_provider = Arc::new(MockBlockProvider {
704 has_block: AtomicBool::new(true),
705 });
706 let right_provider = Arc::new(MockBlockProvider {
707 has_block: AtomicBool::new(false),
708 });
709
710 let chain_provider = ChainBlockProvider::new(left_provider.clone(), right_provider.clone());
711
712 chain_provider
713 .get_next_block(&get_default_block_id())
714 .await
715 .unwrap()
716 .unwrap();
717
718 left_provider.has_block.store(false, Ordering::Release);
720 right_provider.has_block.store(true, Ordering::Release);
721
722 chain_provider
723 .get_next_block(&get_default_block_id())
724 .await
725 .unwrap()
726 .unwrap();
727
728 left_provider.has_block.store(false, Ordering::Release);
730 right_provider.has_block.store(false, Ordering::Release);
731
732 assert!(
733 chain_provider
734 .get_next_block(&get_default_block_id())
735 .await
736 .is_none()
737 );
738 }
739
740 #[tokio::test]
741 async fn cycle_block_provider_switches_providers_correctly() {
742 const LEFT_LIMIT: usize = 10;
743 const RIGHT_LIMIT: usize = 1;
744
745 const POLLING_INTERVAL_MS: u64 = 100;
746
747 let left_provider = Arc::new(MockBlockProvider {
748 has_block: AtomicBool::new(true),
749 });
750 let right_provider = Arc::new(MockBlockProvider {
751 has_block: AtomicBool::new(false),
752 });
753
754 let left_config = RetryConfig {
755 attempts: LEFT_LIMIT,
756 interval: Duration::from_millis(POLLING_INTERVAL_MS),
757 };
758
759 let right_config = RetryConfig {
760 attempts: RIGHT_LIMIT,
761 interval: Duration::from_millis(POLLING_INTERVAL_MS),
762 };
763
764 let left = left_provider.clone().retry(left_config);
765 let right = right_provider.clone().retry(right_config);
766 let cycle_provider = left.cycle(right);
767
768 assert_eq!(
769 cycle_provider.state.lock().unwrap().current,
770 CycleBlockProviderPart::Left
771 );
772
773 cycle_provider
774 .get_next_block(&get_default_block_id())
775 .await
776 .unwrap()
777 .unwrap();
778
779 left_provider.has_block.store(false, Ordering::Release);
781 right_provider.has_block.store(true, Ordering::Release);
782
783 while cycle_provider
784 .get_next_block(&get_default_block_id())
785 .await
786 .is_none()
787 {}
788
789 cycle_provider
790 .get_next_block(&get_default_block_id())
791 .await
792 .unwrap()
793 .unwrap();
794
795 cycle_provider
796 .cleanup_until(get_default_block_id().seqno + 1)
797 .await
798 .unwrap();
799
800 assert_eq!(
801 cycle_provider.state.lock().unwrap().current,
802 CycleBlockProviderPart::Right
803 );
804
805 left_provider.has_block.store(true, Ordering::Release);
807 right_provider.has_block.store(false, Ordering::Release);
808
809 while cycle_provider
810 .get_next_block(&get_default_block_id())
811 .await
812 .is_none()
813 {}
814
815 cycle_provider
816 .get_next_block(&get_default_block_id())
817 .await
818 .unwrap()
819 .unwrap();
820
821 cycle_provider
822 .cleanup_until(get_default_block_id().seqno + 1)
823 .await
824 .unwrap();
825
826 assert_eq!(
827 cycle_provider.state.lock().unwrap().current,
828 CycleBlockProviderPart::Left
829 );
830
831 cycle_provider
832 .get_block(&get_default_block_id().relative_to_self())
833 .await
834 .unwrap()
835 .unwrap();
836 assert_eq!(
837 cycle_provider.state.lock().unwrap().current,
838 CycleBlockProviderPart::Left
839 );
840
841 left_provider.has_block.store(false, Ordering::Release);
842 right_provider.has_block.store(true, Ordering::Release);
843
844 let block = cycle_provider
845 .get_block(&get_default_block_id().relative_to_self())
846 .await;
847 assert!(block.is_none());
848 }
849
850 fn get_empty_block() -> BlockStuffAug {
851 let block_data = include_bytes!("../../../tests/data/empty_block.bin");
852 let root = Boc::decode(block_data).unwrap();
853 let block = root.parse::<Block>().unwrap();
854
855 let block_id = BlockId {
856 root_hash: *root.repr_hash(),
857 ..Default::default()
858 };
859
860 BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
861 .with_archive_data(block_data.as_slice())
862 }
863
864 fn get_default_block_id() -> BlockId {
865 BlockId::default()
866 }
867}