tycho_core/block_strider/provider/
mod.rs

1use std::future::Future;
2use std::pin::pin;
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use arc_swap::{ArcSwapAny, ArcSwapOption};
9use futures_util::future::{self, BoxFuture};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::block::{
12    BlockIdRelation, BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug,
13    check_with_master_state, check_with_prev_key_block_proof,
14};
15use tycho_block_util::queue::QueueDiffStuffAug;
16use tycho_block_util::state::ShardStateStuff;
17use tycho_types::models::BlockId;
18use tycho_util::metrics::HistogramGuard;
19use tycho_util::serde_helpers;
20
21pub use self::archive_provider::{
22    ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
23    ArchiveResponse, ArchiveWriter, FoundArchive, HybridArchiveClient, IntoArchiveClient,
24};
25pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig};
26pub use self::box_provider::BoxBlockProvider;
27use self::futures::SelectNonEmptyFut;
28pub use self::storage_provider::StorageBlockProvider;
29use crate::storage::{CoreStorage, MaybeExistingHandle, NewBlockMeta};
30
31mod archive_provider;
32mod blockchain_provider;
33mod box_provider;
34mod futures;
35mod storage_provider;
36
37pub type OptionalBlockStuff = Option<Result<BlockStuffAug>>;
38
39/// Block provider *MUST* validate the block before returning it.
40pub trait BlockProvider: Send + Sync + 'static {
41    type GetNextBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
42    type GetBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
43    type CleanupFut<'a>: Future<Output = Result<()>> + Send + 'a;
44
45    /// Wait for the next block. Mostly used for masterchain blocks.
46    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
47    /// Get the exact block. Provider must return the requested block.
48    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a>;
49    /// Clear resources until (and including) the specified masterchain block seqno.
50    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_>;
51}
52
53impl<T: BlockProvider> BlockProvider for Box<T> {
54    type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
55    type GetBlockFut<'a> = T::GetBlockFut<'a>;
56    type CleanupFut<'a> = T::CleanupFut<'a>;
57
58    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
59        <T as BlockProvider>::get_next_block(self, prev_block_id)
60    }
61
62    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
63        <T as BlockProvider>::get_block(self, block_id_relation)
64    }
65
66    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
67        <T as BlockProvider>::cleanup_until(self, mc_seqno)
68    }
69}
70
71impl<T: BlockProvider> BlockProvider for Arc<T> {
72    type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
73    type GetBlockFut<'a> = T::GetBlockFut<'a>;
74    type CleanupFut<'a> = T::CleanupFut<'a>;
75
76    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
77        <T as BlockProvider>::get_next_block(self, prev_block_id)
78    }
79
80    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
81        <T as BlockProvider>::get_block(self, block_id_relation)
82    }
83
84    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
85        <T as BlockProvider>::cleanup_until(self, mc_seqno)
86    }
87}
88
89pub trait BlockProviderExt: Sized {
90    fn boxed(self) -> BoxBlockProvider;
91
92    fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T>;
93
94    fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T>;
95
96    fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self>;
97}
98
99impl<B: BlockProvider> BlockProviderExt for B {
100    fn boxed(self) -> BoxBlockProvider {
101        castaway::match_type!(self, {
102            BoxBlockProvider as provider => provider,
103            provider => BoxBlockProvider::new(provider),
104        })
105    }
106
107    fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T> {
108        ChainBlockProvider::new(self, other)
109    }
110
111    fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T> {
112        CycleBlockProvider::new(self, other)
113    }
114
115    fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self> {
116        RetryBlockProvider {
117            inner: self,
118            config,
119        }
120    }
121}
122
123// === Provider combinators ===
124#[derive(Debug, Clone, Copy)]
125pub struct EmptyBlockProvider;
126
127impl BlockProvider for EmptyBlockProvider {
128    type GetNextBlockFut<'a> = future::Ready<OptionalBlockStuff>;
129    type GetBlockFut<'a> = future::Ready<OptionalBlockStuff>;
130    type CleanupFut<'a> = future::Ready<Result<()>>;
131
132    fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
133        future::ready(None)
134    }
135
136    fn get_block<'a>(&'a self, _block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
137        future::ready(None)
138    }
139
140    fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
141        future::ready(Ok(()))
142    }
143}
144
145pub struct ChainBlockProvider<T1, T2> {
146    left: ArcSwapOption<T1>,
147    right: T2,
148    cleanup_left_at: AtomicU32,
149}
150
151impl<T1, T2> ChainBlockProvider<T1, T2> {
152    pub fn new(left: T1, right: T2) -> Self {
153        Self {
154            left: ArcSwapAny::new(Some(Arc::new(left))),
155            right,
156            cleanup_left_at: AtomicU32::new(u32::MAX),
157        }
158    }
159}
160
161impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
162    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
163    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
164    type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
165
166    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
167        if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
168            && let Some(left) = self.left.load_full()
169        {
170            return Box::pin(async move {
171                let res = left.get_next_block(prev_block_id).await;
172                if res.is_some() {
173                    return res;
174                }
175
176                // Schedule left provider cleanup for the next block.
177                self.cleanup_left_at
178                    .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
179
180                // Fallback to right
181                self.right.get_next_block(prev_block_id).await
182            });
183        }
184
185        Box::pin(self.right.get_next_block(prev_block_id))
186    }
187
188    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
189        if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
190            && let Some(left) = self.left.load_full()
191        {
192            return Box::pin(async move { left.get_block(block_id_relation).await });
193        }
194
195        Box::pin(self.right.get_block(block_id_relation))
196    }
197
198    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
199        Box::pin(async move {
200            // Read the cleanup flag:
201            // - 0 means that `left` has been reset;
202            // - u32::MAX means that `left` is still in use;
203            // - other seqno indicates when we need to cleanup `left`.
204            let cleanup_left_at = self.cleanup_left_at.load(Ordering::Acquire);
205
206            if cleanup_left_at > 0 && cleanup_left_at <= mc_seqno {
207                // Cleanup and reset `left` if target block has been set.
208                if let Some(left) = self.left.load_full() {
209                    left.cleanup_until(mc_seqno).await?;
210                    self.left.store(None);
211                    self.cleanup_left_at.store(0, Ordering::Release);
212                }
213            } else if cleanup_left_at == u32::MAX {
214                // Cleanup only `left` until we switch to `right`.
215                if let Some(left) = self.left.load_full() {
216                    return left.cleanup_until(mc_seqno).await;
217                }
218            }
219
220            // In all other cases just cleanup `right`.
221            self.right.cleanup_until(mc_seqno).await
222        })
223    }
224}
225
226pub struct CycleBlockProvider<T1, T2> {
227    left: T1,
228    right: T2,
229    state: Mutex<CycleBlockProviderState>,
230}
231
232struct CycleBlockProviderState {
233    // Current used provider.
234    current: CycleBlockProviderPart,
235    // Next planned switch.
236    switch_at: u32,
237}
238
239#[derive(Debug, Clone, Copy, Eq, PartialEq)]
240enum CycleBlockProviderPart {
241    Left,
242    Right,
243}
244
245impl std::ops::Not for CycleBlockProviderPart {
246    type Output = Self;
247
248    #[inline]
249    fn not(self) -> Self::Output {
250        match self {
251            Self::Left => Self::Right,
252            Self::Right => Self::Left,
253        }
254    }
255}
256
257impl<T1, T2> CycleBlockProvider<T1, T2> {
258    pub fn new(left: T1, right: T2) -> Self {
259        Self {
260            left,
261            right,
262            state: Mutex::new(CycleBlockProviderState {
263                current: CycleBlockProviderPart::Left,
264                switch_at: u32::MAX,
265            }),
266        }
267    }
268
269    /// Determine which provider to use based on the current state and scheduled switch.
270    ///
271    /// This method implements the next logic:
272    /// - If a switch is scheduled (`switch_at` != `u32::MAX`) and the seqno has been reached,
273    ///   returns the NEW provider even though `is_right` hasn't been updated yet
274    /// - Otherwise returns the current provider
275    ///
276    /// This ensures that:
277    /// - Shard blocks are fetched from the same provider as their master block
278    /// - Parallel processing of master blocks use the correct provider during the switching transition
279    fn choose_provider(&self, mc_seqno: u32) -> CycleBlockProviderPart {
280        let state = self.state.lock().unwrap();
281        if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
282            // Switch in advance but without changing the state.
283            !state.current
284        } else {
285            state.current
286        }
287    }
288
289    fn toggle_switch_at(&self, mc_seqno: u32) -> CycleBlockProviderPart {
290        let mut state = self.state.lock().unwrap();
291        if state.switch_at == u32::MAX {
292            state.switch_at = mc_seqno;
293            !state.current
294        } else {
295            state.switch_at = u32::MAX;
296            state.current
297        }
298    }
299
300    fn try_apply_switch(&self, mc_seqno: u32) {
301        let mut state = self.state.lock().unwrap();
302        if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
303            state.current = !state.current;
304            state.switch_at = u32::MAX;
305        }
306    }
307}
308
309impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for CycleBlockProvider<T1, T2> {
310    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
311    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
312    type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
313
314    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
315        Box::pin(async move {
316            // Try using the current provider.
317            let mut res = match self.choose_provider(prev_block_id.seqno) {
318                CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
319                CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
320            };
321            if res.is_some() {
322                return res;
323            }
324
325            loop {
326                // Fallback to the next provider.
327                res = match self.toggle_switch_at(prev_block_id.seqno.saturating_add(1)) {
328                    CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
329                    CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
330                };
331                if res.is_some() {
332                    return res;
333                }
334
335                // Allow executor to do some work if all these methods are blocking.
336                // FIXME: Add some sleep here in case of complete blocking?
337                tokio::task::yield_now().await;
338            }
339        })
340    }
341
342    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
343        match self.choose_provider(block_id_relation.mc_block_id.seqno) {
344            CycleBlockProviderPart::Left => Box::pin(self.left.get_block(block_id_relation)),
345            CycleBlockProviderPart::Right => Box::pin(self.right.get_block(block_id_relation)),
346        }
347    }
348
349    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
350        Box::pin(async move {
351            self.try_apply_switch(mc_seqno);
352
353            let cleanup_left = self.left.cleanup_until(mc_seqno);
354            let cleanup_right = self.right.cleanup_until(mc_seqno);
355            match futures_util::future::join(cleanup_left, cleanup_right).await {
356                (Err(e), _) | (_, Err(e)) => Err(e),
357                (Ok(()), Ok(())) => Ok(()),
358            }
359        })
360    }
361}
362
363pub struct RetryBlockProvider<T> {
364    inner: T,
365    config: RetryConfig,
366}
367
368impl<T: BlockProvider> BlockProvider for RetryBlockProvider<T> {
369    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
370    type GetBlockFut<'a> = T::GetBlockFut<'a>;
371    type CleanupFut<'a> = T::CleanupFut<'a>;
372
373    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
374        Box::pin(async move {
375            let mut attempts = 0usize;
376
377            loop {
378                let res = self.inner.get_next_block(prev_block_id).await;
379                if res.is_some() || attempts >= self.config.attempts {
380                    break res;
381                }
382
383                attempts += 1;
384
385                // TODO: Backoff?
386                tokio::time::sleep(self.config.interval).await;
387            }
388        })
389    }
390
391    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
392        self.inner.get_block(block_id_relation)
393    }
394
395    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
396        self.inner.cleanup_until(mc_seqno)
397    }
398}
399
400macro_rules! impl_provider_tuple {
401    ($join_fn:path, |$e:ident| $err_pat:pat$(,)?, {
402        $($n:tt: $var:ident = $ty:ident),*$(,)?
403    }) => {
404        impl<$($ty),*> BlockProvider for ($($ty),*)
405        where
406            $($ty: BlockProvider),*
407        {
408            type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
409            type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
410            type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
411
412            fn get_next_block<'a>(
413                &'a self,
414                prev_block_id: &'a BlockId,
415            ) -> Self::GetNextBlockFut<'a> {
416                $(let $var = self.$n.get_next_block(prev_block_id));*;
417
418                Box::pin(async move {
419                    $(let $var = pin!($var));*;
420                    SelectNonEmptyFut::from(($($var),*)).await
421                })
422            }
423
424            fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
425                $(let $var = self.$n.get_block(block_id_relation));*;
426
427                Box::pin(async move {
428                    $(let $var = pin!($var));*;
429                    SelectNonEmptyFut::from(($($var),*)).await
430                })
431            }
432
433            fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
434                $(let $var = self.$n.cleanup_until(mc_seqno));*;
435
436                Box::pin(async move {
437                    match $join_fn($($var),*).await {
438                        $err_pat => Err($e),
439                        _ => Ok(())
440                    }
441                })
442            }
443        }
444    };
445}
446
447impl_provider_tuple! {
448    futures_util::future::join,
449    |e| (Err(e), _) | (_, Err(e)),
450    {
451        0: a = T0,
452        1: b = T1,
453    }
454}
455impl_provider_tuple! {
456    futures_util::future::join3,
457    |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
458    {
459        0: a = T0,
460        1: b = T1,
461        2: c = T2,
462    }
463}
464impl_provider_tuple! {
465    futures_util::future::join4,
466    |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
467    {
468        0: a = T0,
469        1: b = T1,
470        2: c = T2,
471        3: d = T3,
472    }
473}
474impl_provider_tuple! {
475    futures_util::future::join5,
476    |e|
477        (Err(e), _, _, _, _)
478        | (_, Err(e), _, _, _)
479        | (_, _, Err(e), _, _)
480        | (_, _, _, Err(e), _)
481        | (_, _, _, _, Err(e)),
482    {
483        0: a = T0,
484        1: b = T1,
485        2: c = T2,
486        3: d = T3,
487        4: e = T4,
488    }
489}
490
491pub struct CheckProof<'a> {
492    pub mc_block_id: &'a BlockId,
493    pub block: &'a BlockStuff,
494    pub proof: &'a BlockProofStuffAug,
495    pub queue_diff: &'a QueueDiffStuffAug,
496
497    /// Whether to store `proof` and `queue_diff` if they are valid.
498    pub store_on_success: bool,
499}
500
501// TODO: Rename to something better since it checks proofs queue diffs now,
502//       and I don't want to parse block info twice to check queue diff separately.
503pub struct ProofChecker {
504    storage: CoreStorage,
505    cached_zerostate: ArcSwapAny<Option<ShardStateStuff>>,
506    cached_prev_key_block_proof: ArcSwapAny<Option<BlockProofStuff>>,
507}
508
509impl ProofChecker {
510    pub fn new(storage: CoreStorage) -> Self {
511        Self {
512            storage,
513            cached_zerostate: Default::default(),
514            cached_prev_key_block_proof: Default::default(),
515        }
516    }
517
518    pub async fn check_proof(&self, ctx: CheckProof<'_>) -> Result<NewBlockMeta> {
519        // TODO: Add labels with shard?
520        let _histogram = HistogramGuard::begin("tycho_core_check_block_proof_time");
521
522        let CheckProof {
523            mc_block_id,
524            block,
525            proof,
526            queue_diff,
527            store_on_success,
528        } = ctx;
529
530        anyhow::ensure!(
531            block.id() == &proof.proof().proof_for,
532            "proof_for and block id mismatch: proof_for={}, block_id={}",
533            proof.proof().proof_for,
534            block.id(),
535        );
536
537        let is_masterchain = block.id().is_masterchain();
538        anyhow::ensure!(is_masterchain ^ proof.is_link(), "unexpected proof type");
539
540        let (virt_block, virt_block_info) = proof.pre_check_block_proof()?;
541        let meta = NewBlockMeta {
542            is_key_block: virt_block_info.key_block,
543            gen_utime: virt_block_info.gen_utime,
544            ref_by_mc_seqno: mc_block_id.seqno,
545        };
546
547        let block_storage = self.storage.block_storage();
548
549        anyhow::ensure!(
550            &virt_block.out_msg_queue_updates.diff_hash == queue_diff.diff_hash(),
551            "queue diff mismatch (expected: {}, got: {})",
552            virt_block.out_msg_queue_updates.diff_hash,
553            queue_diff.diff_hash(),
554        );
555
556        if is_masterchain {
557            let block_handles = self.storage.block_handle_storage();
558            let handle = block_handles
559                .load_key_block_handle(virt_block_info.prev_key_block_seqno)
560                .ok_or_else(|| {
561                    anyhow::anyhow!(
562                        "failed to load prev key block handle by prev_key_block_seqno {}",
563                        virt_block_info.prev_key_block_seqno
564                    )
565                })?;
566
567            if handle.id().seqno == 0 {
568                let zerostate = 'zerostate: {
569                    if let Some(zerostate) = self.cached_zerostate.load_full() {
570                        break 'zerostate zerostate;
571                    }
572
573                    let shard_states = self.storage.shard_state_storage();
574                    let zerostate = shard_states
575                        .load_state(0, handle.id())
576                        .await
577                        .context("failed to load mc zerostate")?;
578
579                    self.cached_zerostate.store(Some(zerostate.clone()));
580
581                    zerostate
582                };
583
584                check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info)?;
585            } else {
586                let prev_key_block_proof = 'prev_proof: {
587                    if let Some(prev_proof) = self.cached_prev_key_block_proof.load_full()
588                        && &prev_proof.as_ref().proof_for == handle.id()
589                    {
590                        break 'prev_proof prev_proof;
591                    }
592
593                    let prev_key_block_proof = block_storage
594                        .load_block_proof(&handle)
595                        .await
596                        .context("failed to load prev key block proof")?;
597
598                    // NOTE: Assume that there is only one masterchain block using this cache.
599                    // Otherwise, it will be overwritten every time. Maybe use `rcu`.
600                    self.cached_prev_key_block_proof
601                        .store(Some(prev_key_block_proof.clone()));
602
603                    prev_key_block_proof
604                };
605
606                check_with_prev_key_block_proof(
607                    proof,
608                    &prev_key_block_proof,
609                    &virt_block,
610                    &virt_block_info,
611                )?;
612            }
613        }
614
615        if store_on_success {
616            // Store proof
617            let res = block_storage
618                .store_block_proof(proof, MaybeExistingHandle::New(meta))
619                .await?;
620
621            // Store queue diff
622            block_storage
623                .store_queue_diff(queue_diff, res.handle.into())
624                .await?;
625        }
626
627        Ok(meta)
628    }
629}
630
631#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
632#[serde(default)]
633pub struct RetryConfig {
634    /// Retry limit.
635    ///
636    /// Default: 1.
637    pub attempts: usize,
638
639    /// Polling interval.
640    ///
641    /// Default: 1 second.
642    #[serde(with = "serde_helpers::humantime")]
643    pub interval: Duration,
644}
645
646impl Default for RetryConfig {
647    fn default() -> Self {
648        Self {
649            attempts: 1,
650            interval: Duration::from_secs(1),
651        }
652    }
653}
654
655#[cfg(test)]
656mod test {
657    use std::sync::Arc;
658    use std::sync::atomic::{AtomicBool, Ordering};
659
660    use tycho_block_util::block::{BlockIdExt, BlockStuff};
661    use tycho_types::boc::Boc;
662    use tycho_types::models::Block;
663
664    use super::*;
665
666    struct MockBlockProvider {
667        // let's give it some state, pretending it's useful
668        has_block: AtomicBool,
669    }
670
671    impl BlockProvider for MockBlockProvider {
672        type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
673        type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
674        type CleanupFut<'a> = future::Ready<Result<()>>;
675
676        fn get_next_block(&self, _prev_block_id: &BlockId) -> Self::GetNextBlockFut<'_> {
677            Box::pin(async {
678                if self.has_block.load(Ordering::Acquire) {
679                    Some(Ok(get_empty_block()))
680                } else {
681                    None
682                }
683            })
684        }
685
686        fn get_block(&self, _block_id: &BlockIdRelation) -> Self::GetBlockFut<'_> {
687            Box::pin(async {
688                if self.has_block.load(Ordering::Acquire) {
689                    Some(Ok(get_empty_block()))
690                } else {
691                    None
692                }
693            })
694        }
695
696        fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
697            future::ready(Ok(()))
698        }
699    }
700
701    #[tokio::test]
702    async fn chain_block_provider_switches_providers_correctly() {
703        let left_provider = Arc::new(MockBlockProvider {
704            has_block: AtomicBool::new(true),
705        });
706        let right_provider = Arc::new(MockBlockProvider {
707            has_block: AtomicBool::new(false),
708        });
709
710        let chain_provider = ChainBlockProvider::new(left_provider.clone(), right_provider.clone());
711
712        chain_provider
713            .get_next_block(&get_default_block_id())
714            .await
715            .unwrap()
716            .unwrap();
717
718        // Now let's pretend the left provider ran out of blocks.
719        left_provider.has_block.store(false, Ordering::Release);
720        right_provider.has_block.store(true, Ordering::Release);
721
722        chain_provider
723            .get_next_block(&get_default_block_id())
724            .await
725            .unwrap()
726            .unwrap();
727
728        // End of blocks stream for both providers
729        left_provider.has_block.store(false, Ordering::Release);
730        right_provider.has_block.store(false, Ordering::Release);
731
732        assert!(
733            chain_provider
734                .get_next_block(&get_default_block_id())
735                .await
736                .is_none()
737        );
738    }
739
740    #[tokio::test]
741    async fn cycle_block_provider_switches_providers_correctly() {
742        const LEFT_LIMIT: usize = 10;
743        const RIGHT_LIMIT: usize = 1;
744
745        const POLLING_INTERVAL_MS: u64 = 100;
746
747        let left_provider = Arc::new(MockBlockProvider {
748            has_block: AtomicBool::new(true),
749        });
750        let right_provider = Arc::new(MockBlockProvider {
751            has_block: AtomicBool::new(false),
752        });
753
754        let left_config = RetryConfig {
755            attempts: LEFT_LIMIT,
756            interval: Duration::from_millis(POLLING_INTERVAL_MS),
757        };
758
759        let right_config = RetryConfig {
760            attempts: RIGHT_LIMIT,
761            interval: Duration::from_millis(POLLING_INTERVAL_MS),
762        };
763
764        let left = left_provider.clone().retry(left_config);
765        let right = right_provider.clone().retry(right_config);
766        let cycle_provider = left.cycle(right);
767
768        assert_eq!(
769            cycle_provider.state.lock().unwrap().current,
770            CycleBlockProviderPart::Left
771        );
772
773        cycle_provider
774            .get_next_block(&get_default_block_id())
775            .await
776            .unwrap()
777            .unwrap();
778
779        // Now let's pretend the left provider ran out of blocks.
780        left_provider.has_block.store(false, Ordering::Release);
781        right_provider.has_block.store(true, Ordering::Release);
782
783        while cycle_provider
784            .get_next_block(&get_default_block_id())
785            .await
786            .is_none()
787        {}
788
789        cycle_provider
790            .get_next_block(&get_default_block_id())
791            .await
792            .unwrap()
793            .unwrap();
794
795        cycle_provider
796            .cleanup_until(get_default_block_id().seqno + 1)
797            .await
798            .unwrap();
799
800        assert_eq!(
801            cycle_provider.state.lock().unwrap().current,
802            CycleBlockProviderPart::Right
803        );
804
805        // Cycle switch
806        left_provider.has_block.store(true, Ordering::Release);
807        right_provider.has_block.store(false, Ordering::Release);
808
809        while cycle_provider
810            .get_next_block(&get_default_block_id())
811            .await
812            .is_none()
813        {}
814
815        cycle_provider
816            .get_next_block(&get_default_block_id())
817            .await
818            .unwrap()
819            .unwrap();
820
821        cycle_provider
822            .cleanup_until(get_default_block_id().seqno + 1)
823            .await
824            .unwrap();
825
826        assert_eq!(
827            cycle_provider.state.lock().unwrap().current,
828            CycleBlockProviderPart::Left
829        );
830
831        cycle_provider
832            .get_block(&get_default_block_id().relative_to_self())
833            .await
834            .unwrap()
835            .unwrap();
836        assert_eq!(
837            cycle_provider.state.lock().unwrap().current,
838            CycleBlockProviderPart::Left
839        );
840
841        left_provider.has_block.store(false, Ordering::Release);
842        right_provider.has_block.store(true, Ordering::Release);
843
844        let block = cycle_provider
845            .get_block(&get_default_block_id().relative_to_self())
846            .await;
847        assert!(block.is_none());
848    }
849
850    fn get_empty_block() -> BlockStuffAug {
851        let block_data = include_bytes!("../../../tests/data/empty_block.bin");
852        let root = Boc::decode(block_data).unwrap();
853        let block = root.parse::<Block>().unwrap();
854
855        let block_id = BlockId {
856            root_hash: *root.repr_hash(),
857            ..Default::default()
858        };
859
860        BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
861            .with_archive_data(block_data.as_slice())
862    }
863
864    fn get_default_block_id() -> BlockId {
865        BlockId::default()
866    }
867}