tycho_core/block_strider/provider/
mod.rs

1use std::future::Future;
2use std::pin::pin;
3use std::sync::Arc;
4use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use arc_swap::{ArcSwapAny, ArcSwapOption};
9use futures_util::future::{self, BoxFuture};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::block::{
12    BlockIdRelation, BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug,
13    check_with_master_state, check_with_prev_key_block_proof,
14};
15use tycho_block_util::queue::QueueDiffStuffAug;
16use tycho_block_util::state::ShardStateStuff;
17use tycho_types::models::BlockId;
18use tycho_util::metrics::HistogramGuard;
19use tycho_util::serde_helpers;
20
21pub use self::archive_provider::{ArchiveBlockProvider, ArchiveBlockProviderConfig};
22pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig};
23pub use self::box_provider::BoxBlockProvider;
24use self::futures::SelectNonEmptyFut;
25pub use self::storage_provider::StorageBlockProvider;
26use crate::storage::{CoreStorage, MaybeExistingHandle, NewBlockMeta};
27
28mod archive_provider;
29mod blockchain_provider;
30mod box_provider;
31mod futures;
32mod storage_provider;
33
34pub type OptionalBlockStuff = Option<Result<BlockStuffAug>>;
35
36/// Block provider *MUST* validate the block before returning it.
37pub trait BlockProvider: Send + Sync + 'static {
38    type GetNextBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
39    type GetBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
40    type CleanupFut<'a>: Future<Output = Result<()>> + Send + 'a;
41
42    /// Wait for the next block. Mostly used for masterchain blocks.
43    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
44    /// Get the exact block. Provider must return the requested block.
45    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a>;
46    /// Clear resources until (and including) the specified masterchain block seqno.
47    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_>;
48}
49
50impl<T: BlockProvider> BlockProvider for Box<T> {
51    type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
52    type GetBlockFut<'a> = T::GetBlockFut<'a>;
53    type CleanupFut<'a> = T::CleanupFut<'a>;
54
55    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
56        <T as BlockProvider>::get_next_block(self, prev_block_id)
57    }
58
59    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
60        <T as BlockProvider>::get_block(self, block_id_relation)
61    }
62
63    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
64        <T as BlockProvider>::cleanup_until(self, mc_seqno)
65    }
66}
67
68impl<T: BlockProvider> BlockProvider for Arc<T> {
69    type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
70    type GetBlockFut<'a> = T::GetBlockFut<'a>;
71    type CleanupFut<'a> = T::CleanupFut<'a>;
72
73    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
74        <T as BlockProvider>::get_next_block(self, prev_block_id)
75    }
76
77    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
78        <T as BlockProvider>::get_block(self, block_id_relation)
79    }
80
81    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
82        <T as BlockProvider>::cleanup_until(self, mc_seqno)
83    }
84}
85
86pub trait BlockProviderExt: Sized {
87    fn boxed(self) -> BoxBlockProvider;
88
89    fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T>;
90
91    fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T>;
92
93    fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self>;
94}
95
96impl<B: BlockProvider> BlockProviderExt for B {
97    fn boxed(self) -> BoxBlockProvider {
98        castaway::match_type!(self, {
99            BoxBlockProvider as provider => provider,
100            provider => BoxBlockProvider::new(provider),
101        })
102    }
103
104    fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T> {
105        ChainBlockProvider::new(self, other)
106    }
107
108    fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T> {
109        CycleBlockProvider {
110            left: self,
111            right: other,
112            is_right: AtomicBool::new(false),
113        }
114    }
115
116    fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self> {
117        RetryBlockProvider {
118            inner: self,
119            config,
120        }
121    }
122}
123
124// === Provider combinators ===
125#[derive(Debug, Clone, Copy)]
126pub struct EmptyBlockProvider;
127
128impl BlockProvider for EmptyBlockProvider {
129    type GetNextBlockFut<'a> = future::Ready<OptionalBlockStuff>;
130    type GetBlockFut<'a> = future::Ready<OptionalBlockStuff>;
131    type CleanupFut<'a> = future::Ready<Result<()>>;
132
133    fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
134        future::ready(None)
135    }
136
137    fn get_block<'a>(&'a self, _block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
138        future::ready(None)
139    }
140
141    fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
142        future::ready(Ok(()))
143    }
144}
145
146pub struct ChainBlockProvider<T1, T2> {
147    left: ArcSwapOption<T1>,
148    right: T2,
149    cleanup_left_at: AtomicU32,
150}
151
152impl<T1, T2> ChainBlockProvider<T1, T2> {
153    pub fn new(left: T1, right: T2) -> Self {
154        Self {
155            left: ArcSwapAny::new(Some(Arc::new(left))),
156            right,
157            cleanup_left_at: AtomicU32::new(u32::MAX),
158        }
159    }
160}
161
162impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
163    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
164    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
165    type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
166
167    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
168        if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX {
169            if let Some(left) = self.left.load_full() {
170                return Box::pin(async move {
171                    let res = left.get_next_block(prev_block_id).await;
172                    if res.is_some() {
173                        return res;
174                    }
175
176                    // Schedule left provider cleanup for the next block.
177                    self.cleanup_left_at
178                        .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
179
180                    // Fallback to right
181                    self.right.get_next_block(prev_block_id).await
182                });
183            }
184        }
185
186        Box::pin(self.right.get_next_block(prev_block_id))
187    }
188
189    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
190        if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX {
191            if let Some(left) = self.left.load_full() {
192                return Box::pin(async move { left.get_block(block_id_relation).await });
193            }
194        }
195
196        Box::pin(self.right.get_block(block_id_relation))
197    }
198
199    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
200        Box::pin(async move {
201            // Read the cleanup flag:
202            // - 0 means that `left` has been reset;
203            // - u32::MAX means that `left` is still in use;
204            // - other seqno indicates when we need to cleanup `left`.
205            let cleanup_left_at = self.cleanup_left_at.load(Ordering::Acquire);
206
207            if cleanup_left_at > 0 && cleanup_left_at <= mc_seqno {
208                // Cleanup and reset `left` if target block has been set.
209                if let Some(left) = self.left.load_full() {
210                    left.cleanup_until(mc_seqno).await?;
211                    self.left.store(None);
212                    self.cleanup_left_at.store(0, Ordering::Release);
213                }
214            } else if cleanup_left_at == u32::MAX {
215                // Cleanup only `left` until we switch to `right`.
216                if let Some(left) = self.left.load_full() {
217                    return left.cleanup_until(mc_seqno).await;
218                }
219            }
220
221            // In all other cases just cleanup `right`.
222            self.right.cleanup_until(mc_seqno).await
223        })
224    }
225}
226
227pub struct CycleBlockProvider<T1, T2> {
228    left: T1,
229    right: T2,
230    is_right: AtomicBool,
231}
232
233impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for CycleBlockProvider<T1, T2> {
234    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
235    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
236    type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
237
238    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
239        Box::pin(async {
240            let is_right = self.is_right.load(Ordering::Acquire);
241
242            let res = if !is_right {
243                self.left.get_next_block(prev_block_id).await
244            } else {
245                self.right.get_next_block(prev_block_id).await
246            };
247
248            if res.is_some() {
249                return res;
250            }
251
252            let is_right = !is_right;
253            self.is_right.store(is_right, Ordering::Release);
254
255            if !is_right {
256                self.left.get_next_block(prev_block_id).await
257            } else {
258                self.right.get_next_block(prev_block_id).await
259            }
260        })
261    }
262
263    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
264        if self.is_right.load(Ordering::Acquire) {
265            Box::pin(self.right.get_block(block_id_relation))
266        } else {
267            Box::pin(self.left.get_block(block_id_relation))
268        }
269    }
270
271    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
272        Box::pin(async move {
273            let cleanup_left = self.left.cleanup_until(mc_seqno);
274            let cleanup_right = self.right.cleanup_until(mc_seqno);
275            match futures_util::future::join(cleanup_left, cleanup_right).await {
276                (Err(e), _) | (_, Err(e)) => Err(e),
277                (Ok(()), Ok(())) => Ok(()),
278            }
279        })
280    }
281}
282
283pub struct RetryBlockProvider<T> {
284    inner: T,
285    config: RetryConfig,
286}
287
288impl<T: BlockProvider> BlockProvider for RetryBlockProvider<T> {
289    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
290    type GetBlockFut<'a> = T::GetBlockFut<'a>;
291    type CleanupFut<'a> = T::CleanupFut<'a>;
292
293    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
294        Box::pin(async move {
295            let mut attempts = 0usize;
296
297            loop {
298                let res = self.inner.get_next_block(prev_block_id).await;
299                if res.is_some() || attempts >= self.config.attempts {
300                    break res;
301                }
302
303                attempts += 1;
304
305                // TODO: Backoff?
306                tokio::time::sleep(self.config.interval).await;
307            }
308        })
309    }
310
311    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
312        self.inner.get_block(block_id_relation)
313    }
314
315    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
316        self.inner.cleanup_until(mc_seqno)
317    }
318}
319
320macro_rules! impl_provider_tuple {
321    ($join_fn:path, |$e:ident| $err_pat:pat$(,)?, {
322        $($n:tt: $var:ident = $ty:ident),*$(,)?
323    }) => {
324        impl<$($ty),*> BlockProvider for ($($ty),*)
325        where
326            $($ty: BlockProvider),*
327        {
328            type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
329            type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
330            type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
331
332            fn get_next_block<'a>(
333                &'a self,
334                prev_block_id: &'a BlockId,
335            ) -> Self::GetNextBlockFut<'a> {
336                $(let $var = self.$n.get_next_block(prev_block_id));*;
337
338                Box::pin(async move {
339                    $(let $var = pin!($var));*;
340                    SelectNonEmptyFut::from(($($var),*)).await
341                })
342            }
343
344            fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
345                $(let $var = self.$n.get_block(block_id_relation));*;
346
347                Box::pin(async move {
348                    $(let $var = pin!($var));*;
349                    SelectNonEmptyFut::from(($($var),*)).await
350                })
351            }
352
353            fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
354                $(let $var = self.$n.cleanup_until(mc_seqno));*;
355
356                Box::pin(async move {
357                    match $join_fn($($var),*).await {
358                        $err_pat => Err($e),
359                        _ => Ok(())
360                    }
361                })
362            }
363        }
364    };
365}
366
367impl_provider_tuple! {
368    futures_util::future::join,
369    |e| (Err(e), _) | (_, Err(e)),
370    {
371        0: a = T0,
372        1: b = T1,
373    }
374}
375impl_provider_tuple! {
376    futures_util::future::join3,
377    |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
378    {
379        0: a = T0,
380        1: b = T1,
381        2: c = T2,
382    }
383}
384impl_provider_tuple! {
385    futures_util::future::join4,
386    |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
387    {
388        0: a = T0,
389        1: b = T1,
390        2: c = T2,
391        3: d = T3,
392    }
393}
394impl_provider_tuple! {
395    futures_util::future::join5,
396    |e|
397        (Err(e), _, _, _, _)
398        | (_, Err(e), _, _, _)
399        | (_, _, Err(e), _, _)
400        | (_, _, _, Err(e), _)
401        | (_, _, _, _, Err(e)),
402    {
403        0: a = T0,
404        1: b = T1,
405        2: c = T2,
406        3: d = T3,
407        4: e = T4,
408    }
409}
410
411pub struct CheckProof<'a> {
412    pub mc_block_id: &'a BlockId,
413    pub block: &'a BlockStuff,
414    pub proof: &'a BlockProofStuffAug,
415    pub queue_diff: &'a QueueDiffStuffAug,
416
417    /// Whether to store `proof` and `queue_diff` if they are valid.
418    pub store_on_success: bool,
419}
420
421// TODO: Rename to something better since it checks proofs queue diffs now,
422//       and I don't want to parse block info twice to check queue diff separately.
423pub struct ProofChecker {
424    storage: CoreStorage,
425    cached_zerostate: ArcSwapAny<Option<ShardStateStuff>>,
426    cached_prev_key_block_proof: ArcSwapAny<Option<BlockProofStuff>>,
427}
428
429impl ProofChecker {
430    pub fn new(storage: CoreStorage) -> Self {
431        Self {
432            storage,
433            cached_zerostate: Default::default(),
434            cached_prev_key_block_proof: Default::default(),
435        }
436    }
437
438    pub async fn check_proof(&self, ctx: CheckProof<'_>) -> Result<NewBlockMeta> {
439        // TODO: Add labels with shard?
440        let _histogram = HistogramGuard::begin("tycho_core_check_block_proof_time");
441
442        let CheckProof {
443            mc_block_id,
444            block,
445            proof,
446            queue_diff,
447            store_on_success,
448        } = ctx;
449
450        anyhow::ensure!(
451            block.id() == &proof.proof().proof_for,
452            "proof_for and block id mismatch: proof_for={}, block_id={}",
453            proof.proof().proof_for,
454            block.id(),
455        );
456
457        let is_masterchain = block.id().is_masterchain();
458        anyhow::ensure!(is_masterchain ^ proof.is_link(), "unexpected proof type");
459
460        let (virt_block, virt_block_info) = proof.pre_check_block_proof()?;
461        let meta = NewBlockMeta {
462            is_key_block: virt_block_info.key_block,
463            gen_utime: virt_block_info.gen_utime,
464            ref_by_mc_seqno: mc_block_id.seqno,
465        };
466
467        let block_storage = self.storage.block_storage();
468
469        anyhow::ensure!(
470            &virt_block.out_msg_queue_updates.diff_hash == queue_diff.diff_hash(),
471            "queue diff mismatch (expected: {}, got: {})",
472            virt_block.out_msg_queue_updates.diff_hash,
473            queue_diff.diff_hash(),
474        );
475
476        if is_masterchain {
477            let block_handles = self.storage.block_handle_storage();
478            let handle = block_handles
479                .load_key_block_handle(virt_block_info.prev_key_block_seqno)
480                .ok_or_else(|| {
481                    anyhow::anyhow!(
482                        "failed to load prev key block handle by prev_key_block_seqno {}",
483                        virt_block_info.prev_key_block_seqno
484                    )
485                })?;
486
487            if handle.id().seqno == 0 {
488                let zerostate = 'zerostate: {
489                    if let Some(zerostate) = self.cached_zerostate.load_full() {
490                        break 'zerostate zerostate;
491                    }
492
493                    let shard_states = self.storage.shard_state_storage();
494                    let zerostate = shard_states
495                        .load_state(handle.id())
496                        .await
497                        .context("failed to load mc zerostate")?;
498
499                    self.cached_zerostate.store(Some(zerostate.clone()));
500
501                    zerostate
502                };
503
504                check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info)?;
505            } else {
506                let prev_key_block_proof = 'prev_proof: {
507                    if let Some(prev_proof) = self.cached_prev_key_block_proof.load_full() {
508                        if &prev_proof.as_ref().proof_for == handle.id() {
509                            break 'prev_proof prev_proof;
510                        }
511                    }
512
513                    let prev_key_block_proof = block_storage
514                        .load_block_proof(&handle)
515                        .await
516                        .context("failed to load prev key block proof")?;
517
518                    // NOTE: Assume that there is only one masterchain block using this cache.
519                    // Otherwise, it will be overwritten every time. Maybe use `rcu`.
520                    self.cached_prev_key_block_proof
521                        .store(Some(prev_key_block_proof.clone()));
522
523                    prev_key_block_proof
524                };
525
526                check_with_prev_key_block_proof(
527                    proof,
528                    &prev_key_block_proof,
529                    &virt_block,
530                    &virt_block_info,
531                )?;
532            }
533        }
534
535        if store_on_success {
536            // Store proof
537            let res = block_storage
538                .store_block_proof(proof, MaybeExistingHandle::New(meta))
539                .await?;
540
541            // Store queue diff
542            block_storage
543                .store_queue_diff(queue_diff, res.handle.into())
544                .await?;
545        }
546
547        Ok(meta)
548    }
549}
550
551#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
552#[serde(default)]
553pub struct RetryConfig {
554    /// Retry limit.
555    ///
556    /// Default: 1.
557    pub attempts: usize,
558
559    /// Polling interval for downloading archive.
560    ///
561    /// Default: 1 second.
562    #[serde(with = "serde_helpers::humantime")]
563    pub interval: Duration,
564}
565
566impl Default for RetryConfig {
567    fn default() -> Self {
568        Self {
569            attempts: 1,
570            interval: Duration::from_secs(1),
571        }
572    }
573}
574
575#[cfg(test)]
576mod test {
577    use std::sync::Arc;
578    use std::sync::atomic::{AtomicBool, Ordering};
579
580    use tycho_block_util::block::{BlockIdExt, BlockStuff};
581    use tycho_types::boc::Boc;
582    use tycho_types::models::Block;
583
584    use super::*;
585
586    struct MockBlockProvider {
587        // let's give it some state, pretending it's useful
588        has_block: AtomicBool,
589    }
590
591    impl BlockProvider for MockBlockProvider {
592        type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
593        type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
594        type CleanupFut<'a> = future::Ready<Result<()>>;
595
596        fn get_next_block(&self, _prev_block_id: &BlockId) -> Self::GetNextBlockFut<'_> {
597            Box::pin(async {
598                if self.has_block.load(Ordering::Acquire) {
599                    Some(Ok(get_empty_block()))
600                } else {
601                    None
602                }
603            })
604        }
605
606        fn get_block(&self, _block_id: &BlockIdRelation) -> Self::GetBlockFut<'_> {
607            Box::pin(async {
608                if self.has_block.load(Ordering::Acquire) {
609                    Some(Ok(get_empty_block()))
610                } else {
611                    None
612                }
613            })
614        }
615
616        fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
617            future::ready(Ok(()))
618        }
619    }
620
621    #[tokio::test]
622    async fn chain_block_provider_switches_providers_correctly() {
623        let left_provider = Arc::new(MockBlockProvider {
624            has_block: AtomicBool::new(true),
625        });
626        let right_provider = Arc::new(MockBlockProvider {
627            has_block: AtomicBool::new(false),
628        });
629
630        let chain_provider = ChainBlockProvider::new(left_provider.clone(), right_provider.clone());
631
632        chain_provider
633            .get_next_block(&get_default_block_id())
634            .await
635            .unwrap()
636            .unwrap();
637
638        // Now let's pretend the left provider ran out of blocks.
639        left_provider.has_block.store(false, Ordering::Release);
640        right_provider.has_block.store(true, Ordering::Release);
641
642        chain_provider
643            .get_next_block(&get_default_block_id())
644            .await
645            .unwrap()
646            .unwrap();
647
648        // End of blocks stream for both providers
649        left_provider.has_block.store(false, Ordering::Release);
650        right_provider.has_block.store(false, Ordering::Release);
651
652        assert!(
653            chain_provider
654                .get_next_block(&get_default_block_id())
655                .await
656                .is_none()
657        );
658    }
659
660    #[tokio::test]
661    async fn cycle_block_provider_switches_providers_correctly() {
662        const LEFT_LIMIT: usize = 10;
663        const RIGHT_LIMIT: usize = 1;
664
665        const POLLING_INTERVAL_MS: u64 = 100;
666
667        let left_provider = Arc::new(MockBlockProvider {
668            has_block: AtomicBool::new(true),
669        });
670        let right_provider = Arc::new(MockBlockProvider {
671            has_block: AtomicBool::new(false),
672        });
673
674        let left_config = RetryConfig {
675            attempts: LEFT_LIMIT,
676            interval: Duration::from_millis(POLLING_INTERVAL_MS),
677        };
678
679        let right_config = RetryConfig {
680            attempts: RIGHT_LIMIT,
681            interval: Duration::from_millis(POLLING_INTERVAL_MS),
682        };
683
684        let left = left_provider.clone().retry(left_config);
685        let right = right_provider.clone().retry(right_config);
686        let cycle_provider = left.cycle(right);
687
688        assert!(!cycle_provider.is_right.load(Ordering::Acquire));
689
690        cycle_provider
691            .get_next_block(&get_default_block_id())
692            .await
693            .unwrap()
694            .unwrap();
695
696        // Now let's pretend the left provider ran out of blocks.
697        left_provider.has_block.store(false, Ordering::Release);
698        right_provider.has_block.store(true, Ordering::Release);
699
700        while cycle_provider
701            .get_next_block(&get_default_block_id())
702            .await
703            .is_none()
704        {}
705
706        cycle_provider
707            .get_next_block(&get_default_block_id())
708            .await
709            .unwrap()
710            .unwrap();
711
712        assert!(cycle_provider.is_right.load(Ordering::Acquire));
713
714        // Cycle switch
715        left_provider.has_block.store(true, Ordering::Release);
716        right_provider.has_block.store(false, Ordering::Release);
717
718        while cycle_provider
719            .get_next_block(&get_default_block_id())
720            .await
721            .is_none()
722        {}
723
724        cycle_provider
725            .get_next_block(&get_default_block_id())
726            .await
727            .unwrap()
728            .unwrap();
729        assert!(!cycle_provider.is_right.load(Ordering::Acquire));
730
731        cycle_provider
732            .get_block(&get_default_block_id().relative_to_self())
733            .await
734            .unwrap()
735            .unwrap();
736        assert!(!cycle_provider.is_right.load(Ordering::Acquire));
737
738        left_provider.has_block.store(false, Ordering::Release);
739        right_provider.has_block.store(true, Ordering::Release);
740
741        let block = cycle_provider
742            .get_block(&get_default_block_id().relative_to_self())
743            .await;
744        assert!(block.is_none());
745    }
746
747    fn get_empty_block() -> BlockStuffAug {
748        let block_data = include_bytes!("../../../tests/data/empty_block.bin");
749        let root = Boc::decode(block_data).unwrap();
750        let block = root.parse::<Block>().unwrap();
751
752        let block_id = BlockId {
753            root_hash: *root.repr_hash(),
754            ..Default::default()
755        };
756
757        BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
758            .with_archive_data(block_data.as_slice())
759    }
760
761    fn get_default_block_id() -> BlockId {
762        BlockId::default()
763    }
764}