Skip to main content

tycho_core/block_strider/provider/
mod.rs

1use std::future::Future;
2use std::pin::pin;
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use arc_swap::{ArcSwapAny, ArcSwapOption};
9use futures_util::future::{self, BoxFuture};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::block::{
12    BlockIdRelation, BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug,
13    check_with_master_state, check_with_prev_key_block_proof,
14};
15use tycho_block_util::queue::QueueDiffStuffAug;
16use tycho_block_util::state::ShardStateStuff;
17use tycho_types::models::BlockId;
18use tycho_types::prelude::*;
19use tycho_util::metrics::HistogramGuard;
20use tycho_util::serde_helpers;
21
22pub use self::archive_provider::{
23    ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
24    ArchiveResponse, FoundArchive, HybridArchiveClient, IntoArchiveClient,
25};
26pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig};
27pub use self::box_provider::BoxBlockProvider;
28use self::futures::SelectNonEmptyFut;
29pub use self::storage_provider::StorageBlockProvider;
30use crate::storage::{BlockHandle, CoreStorage, MaybeExistingHandle, NewBlockMeta};
31
32mod archive_provider;
33mod blockchain_provider;
34mod box_provider;
35mod futures;
36mod storage_provider;
37
38pub type OptionalBlockStuff = Option<Result<BlockStuffAug>>;
39
40/// Block provider *MUST* validate the block before returning it.
41pub trait BlockProvider: Send + Sync + 'static {
42    type GetNextBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
43    type GetBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
44    type CleanupFut<'a>: Future<Output = Result<()>> + Send + 'a;
45
46    /// Wait for the next block. Mostly used for masterchain blocks.
47    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
48    /// Get the exact block. Provider must return the requested block.
49    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a>;
50    /// Clear resources until (and including) the specified masterchain block seqno.
51    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_>;
52}
53
54impl<T: BlockProvider> BlockProvider for Box<T> {
55    type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
56    type GetBlockFut<'a> = T::GetBlockFut<'a>;
57    type CleanupFut<'a> = T::CleanupFut<'a>;
58
59    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
60        <T as BlockProvider>::get_next_block(self, prev_block_id)
61    }
62
63    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
64        <T as BlockProvider>::get_block(self, block_id_relation)
65    }
66
67    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
68        <T as BlockProvider>::cleanup_until(self, mc_seqno)
69    }
70}
71
72impl<T: BlockProvider> BlockProvider for Arc<T> {
73    type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
74    type GetBlockFut<'a> = T::GetBlockFut<'a>;
75    type CleanupFut<'a> = T::CleanupFut<'a>;
76
77    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
78        <T as BlockProvider>::get_next_block(self, prev_block_id)
79    }
80
81    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
82        <T as BlockProvider>::get_block(self, block_id_relation)
83    }
84
85    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
86        <T as BlockProvider>::cleanup_until(self, mc_seqno)
87    }
88}
89
90pub trait BlockProviderExt: Sized {
91    fn boxed(self) -> BoxBlockProvider;
92
93    fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T>;
94
95    fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T>;
96
97    fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self>;
98}
99
100impl<B: BlockProvider> BlockProviderExt for B {
101    fn boxed(self) -> BoxBlockProvider {
102        castaway::match_type!(self, {
103            BoxBlockProvider as provider => provider,
104            provider => BoxBlockProvider::new(provider),
105        })
106    }
107
108    fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T> {
109        ChainBlockProvider::new(self, other)
110    }
111
112    fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T> {
113        CycleBlockProvider::new(self, other)
114    }
115
116    fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self> {
117        RetryBlockProvider {
118            inner: self,
119            config,
120        }
121    }
122}
123
124// === Provider combinators ===
125#[derive(Debug, Clone, Copy)]
126pub struct EmptyBlockProvider;
127
128impl BlockProvider for EmptyBlockProvider {
129    type GetNextBlockFut<'a> = future::Ready<OptionalBlockStuff>;
130    type GetBlockFut<'a> = future::Ready<OptionalBlockStuff>;
131    type CleanupFut<'a> = future::Ready<Result<()>>;
132
133    fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
134        future::ready(None)
135    }
136
137    fn get_block<'a>(&'a self, _block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
138        future::ready(None)
139    }
140
141    fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
142        future::ready(Ok(()))
143    }
144}
145
146pub struct ChainBlockProvider<T1, T2> {
147    left: ArcSwapOption<T1>,
148    right: T2,
149    cleanup_left_at: AtomicU32,
150}
151
152impl<T1, T2> ChainBlockProvider<T1, T2> {
153    pub fn new(left: T1, right: T2) -> Self {
154        Self {
155            left: ArcSwapAny::new(Some(Arc::new(left))),
156            right,
157            cleanup_left_at: AtomicU32::new(u32::MAX),
158        }
159    }
160}
161
162impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
163    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
164    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
165    type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
166
167    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
168        if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
169            && let Some(left) = self.left.load_full()
170        {
171            return Box::pin(async move {
172                let res = left.get_next_block(prev_block_id).await;
173                if res.is_some() {
174                    return res;
175                }
176
177                // Schedule left provider cleanup for the next block.
178                self.cleanup_left_at
179                    .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
180
181                // Fallback to right
182                self.right.get_next_block(prev_block_id).await
183            });
184        }
185
186        Box::pin(self.right.get_next_block(prev_block_id))
187    }
188
189    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
190        if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
191            && let Some(left) = self.left.load_full()
192        {
193            return Box::pin(async move { left.get_block(block_id_relation).await });
194        }
195
196        Box::pin(self.right.get_block(block_id_relation))
197    }
198
199    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
200        Box::pin(async move {
201            // Read the cleanup flag:
202            // - 0 means that `left` has been reset;
203            // - u32::MAX means that `left` is still in use;
204            // - other seqno indicates when we need to cleanup `left`.
205            let cleanup_left_at = self.cleanup_left_at.load(Ordering::Acquire);
206
207            if cleanup_left_at > 0 && cleanup_left_at <= mc_seqno {
208                // Cleanup and reset `left` if target block has been set.
209                if let Some(left) = self.left.load_full() {
210                    left.cleanup_until(mc_seqno).await?;
211                    self.left.store(None);
212                    self.cleanup_left_at.store(0, Ordering::Release);
213                }
214            } else if cleanup_left_at == u32::MAX {
215                // Cleanup only `left` until we switch to `right`.
216                if let Some(left) = self.left.load_full() {
217                    return left.cleanup_until(mc_seqno).await;
218                }
219            }
220
221            // In all other cases just cleanup `right`.
222            self.right.cleanup_until(mc_seqno).await
223        })
224    }
225}
226
227pub struct CycleBlockProvider<T1, T2> {
228    left: T1,
229    right: T2,
230    state: Mutex<CycleBlockProviderState>,
231}
232
233struct CycleBlockProviderState {
234    // Current used provider.
235    current: CycleBlockProviderPart,
236    // Next planned switch.
237    switch_at: u32,
238}
239
240#[derive(Debug, Clone, Copy, Eq, PartialEq)]
241enum CycleBlockProviderPart {
242    Left,
243    Right,
244}
245
246impl std::ops::Not for CycleBlockProviderPart {
247    type Output = Self;
248
249    #[inline]
250    fn not(self) -> Self::Output {
251        match self {
252            Self::Left => Self::Right,
253            Self::Right => Self::Left,
254        }
255    }
256}
257
258impl<T1, T2> CycleBlockProvider<T1, T2> {
259    pub fn new(left: T1, right: T2) -> Self {
260        Self {
261            left,
262            right,
263            state: Mutex::new(CycleBlockProviderState {
264                current: CycleBlockProviderPart::Left,
265                switch_at: u32::MAX,
266            }),
267        }
268    }
269
270    /// Determine which provider to use based on the current state and scheduled switch.
271    ///
272    /// This method implements the next logic:
273    /// - If a switch is scheduled (`switch_at` != `u32::MAX`) and the seqno has been reached,
274    ///   returns the NEW provider even though `is_right` hasn't been updated yet
275    /// - Otherwise returns the current provider
276    ///
277    /// This ensures that:
278    /// - Shard blocks are fetched from the same provider as their master block
279    /// - Parallel processing of master blocks use the correct provider during the switching transition
280    fn choose_provider(&self, mc_seqno: u32) -> CycleBlockProviderPart {
281        let state = self.state.lock().unwrap();
282        if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
283            // Switch in advance but without changing the state.
284            !state.current
285        } else {
286            state.current
287        }
288    }
289
290    fn toggle_switch_at(&self, mc_seqno: u32) -> CycleBlockProviderPart {
291        let mut state = self.state.lock().unwrap();
292        if state.switch_at == u32::MAX {
293            state.switch_at = mc_seqno;
294            !state.current
295        } else {
296            state.switch_at = u32::MAX;
297            state.current
298        }
299    }
300
301    fn try_apply_switch(&self, mc_seqno: u32) {
302        let mut state = self.state.lock().unwrap();
303        if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
304            state.current = !state.current;
305            state.switch_at = u32::MAX;
306        }
307    }
308}
309
310impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for CycleBlockProvider<T1, T2> {
311    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
312    type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
313    type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
314
315    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
316        Box::pin(async move {
317            // Try using the current provider.
318            let mut res = match self.choose_provider(prev_block_id.seqno) {
319                CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
320                CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
321            };
322            if res.is_some() {
323                return res;
324            }
325
326            loop {
327                // Fallback to the next provider.
328                res = match self.toggle_switch_at(prev_block_id.seqno.saturating_add(1)) {
329                    CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
330                    CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
331                };
332                if res.is_some() {
333                    return res;
334                }
335
336                // Allow executor to do some work if all these methods are blocking.
337                // FIXME: Add some sleep here in case of complete blocking?
338                tokio::task::yield_now().await;
339            }
340        })
341    }
342
343    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
344        match self.choose_provider(block_id_relation.mc_block_id.seqno) {
345            CycleBlockProviderPart::Left => Box::pin(self.left.get_block(block_id_relation)),
346            CycleBlockProviderPart::Right => Box::pin(self.right.get_block(block_id_relation)),
347        }
348    }
349
350    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
351        Box::pin(async move {
352            self.try_apply_switch(mc_seqno);
353
354            let cleanup_left = self.left.cleanup_until(mc_seqno);
355            let cleanup_right = self.right.cleanup_until(mc_seqno);
356            match futures_util::future::join(cleanup_left, cleanup_right).await {
357                (Err(e), _) | (_, Err(e)) => Err(e),
358                (Ok(()), Ok(())) => Ok(()),
359            }
360        })
361    }
362}
363
364pub struct RetryBlockProvider<T> {
365    inner: T,
366    config: RetryConfig,
367}
368
369impl<T: BlockProvider> BlockProvider for RetryBlockProvider<T> {
370    type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
371    type GetBlockFut<'a> = T::GetBlockFut<'a>;
372    type CleanupFut<'a> = T::CleanupFut<'a>;
373
374    fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
375        Box::pin(async move {
376            let mut attempts = 0usize;
377
378            loop {
379                let res = self.inner.get_next_block(prev_block_id).await;
380                if res.is_some() || attempts >= self.config.attempts {
381                    break res;
382                }
383
384                attempts += 1;
385
386                // TODO: Backoff?
387                tokio::time::sleep(self.config.interval).await;
388            }
389        })
390    }
391
392    fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
393        self.inner.get_block(block_id_relation)
394    }
395
396    fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
397        self.inner.cleanup_until(mc_seqno)
398    }
399}
400
401macro_rules! impl_provider_tuple {
402    ($join_fn:path, |$e:ident| $err_pat:pat$(,)?, {
403        $($n:tt: $var:ident = $ty:ident),*$(,)?
404    }) => {
405        impl<$($ty),*> BlockProvider for ($($ty),*)
406        where
407            $($ty: BlockProvider),*
408        {
409            type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
410            type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
411            type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
412
413            fn get_next_block<'a>(
414                &'a self,
415                prev_block_id: &'a BlockId,
416            ) -> Self::GetNextBlockFut<'a> {
417                $(let $var = self.$n.get_next_block(prev_block_id));*;
418
419                Box::pin(async move {
420                    $(let $var = pin!($var));*;
421                    SelectNonEmptyFut::from(($($var),*)).await
422                })
423            }
424
425            fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
426                $(let $var = self.$n.get_block(block_id_relation));*;
427
428                Box::pin(async move {
429                    $(let $var = pin!($var));*;
430                    SelectNonEmptyFut::from(($($var),*)).await
431                })
432            }
433
434            fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
435                $(let $var = self.$n.cleanup_until(mc_seqno));*;
436
437                Box::pin(async move {
438                    match $join_fn($($var),*).await {
439                        $err_pat => Err($e),
440                        _ => Ok(())
441                    }
442                })
443            }
444        }
445    };
446}
447
448impl_provider_tuple! {
449    futures_util::future::join,
450    |e| (Err(e), _) | (_, Err(e)),
451    {
452        0: a = T0,
453        1: b = T1,
454    }
455}
456impl_provider_tuple! {
457    futures_util::future::join3,
458    |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
459    {
460        0: a = T0,
461        1: b = T1,
462        2: c = T2,
463    }
464}
465impl_provider_tuple! {
466    futures_util::future::join4,
467    |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
468    {
469        0: a = T0,
470        1: b = T1,
471        2: c = T2,
472        3: d = T3,
473    }
474}
475impl_provider_tuple! {
476    futures_util::future::join5,
477    |e|
478        (Err(e), _, _, _, _)
479        | (_, Err(e), _, _, _)
480        | (_, _, Err(e), _, _)
481        | (_, _, _, Err(e), _)
482        | (_, _, _, _, Err(e)),
483    {
484        0: a = T0,
485        1: b = T1,
486        2: c = T2,
487        3: d = T3,
488        4: e = T4,
489    }
490}
491
492pub struct CheckProof<'a> {
493    pub mc_block_id: &'a BlockId,
494    pub block: &'a BlockStuff,
495    pub proof: &'a BlockProofStuffAug,
496    pub queue_diff: &'a QueueDiffStuffAug,
497
498    /// Whether to store `proof` and `queue_diff` if they are valid.
499    pub store_on_success: bool,
500}
501
502// TODO: Rename to something better since it checks proofs queue diffs now,
503//       and I don't want to parse block info twice to check queue diff separately.
504pub struct ProofChecker {
505    storage: CoreStorage,
506    zerostate_seqno: AtomicU32,
507    cached_zerostate: ArcSwapAny<Option<ShardStateStuff>>,
508    cached_prev_key_block_proof: ArcSwapAny<Option<BlockProofStuff>>,
509}
510
511impl ProofChecker {
512    pub fn new(storage: CoreStorage) -> Self {
513        Self {
514            storage,
515            zerostate_seqno: AtomicU32::new(UNINIT_MC_SEQNO),
516            cached_zerostate: Default::default(),
517            cached_prev_key_block_proof: Default::default(),
518        }
519    }
520
521    pub async fn check_proof(&self, ctx: CheckProof<'_>) -> Result<NewBlockMeta> {
522        // TODO: Add labels with shard?
523        let _histogram = HistogramGuard::begin("tycho_core_check_block_proof_time");
524
525        let CheckProof {
526            mc_block_id,
527            block,
528            proof,
529            queue_diff,
530            store_on_success,
531        } = ctx;
532
533        anyhow::ensure!(
534            block.id() == &proof.proof().proof_for,
535            "proof_for and block id mismatch: proof_for={}, block_id={}",
536            proof.proof().proof_for,
537            block.id(),
538        );
539
540        let is_masterchain = block.id().is_masterchain();
541        anyhow::ensure!(is_masterchain ^ proof.is_link(), "unexpected proof type");
542
543        let (virt_block, virt_block_info) = proof.pre_check_block_proof()?;
544        let meta = NewBlockMeta {
545            is_key_block: virt_block_info.key_block,
546            gen_utime: virt_block_info.gen_utime,
547            ref_by_mc_seqno: mc_block_id.seqno,
548        };
549
550        let block_storage = self.storage.block_storage();
551
552        anyhow::ensure!(
553            &virt_block.out_msg_queue_updates.diff_hash == queue_diff.diff_hash(),
554            "queue diff mismatch (expected: {}, got: {})",
555            virt_block.out_msg_queue_updates.diff_hash,
556            queue_diff.diff_hash(),
557        );
558
559        if is_masterchain {
560            let block_handles = self.storage.block_handle_storage();
561            let handle = block_handles
562                .load_key_block_handle(virt_block_info.prev_key_block_seqno)
563                .ok_or_else(|| {
564                    anyhow::anyhow!(
565                        "failed to load prev key block handle by prev_key_block_seqno {}",
566                        virt_block_info.prev_key_block_seqno
567                    )
568                })?;
569
570            if handle.id().seqno == self.load_zerostate_seqno() {
571                let zerostate = self
572                    .load_zerostate(&handle)
573                    .await
574                    .context("failed to load zerostate to check proof")?;
575
576                check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info)?;
577            } else {
578                let prev_key_block_proof = 'prev_proof: {
579                    if let Some(prev_proof) = self.cached_prev_key_block_proof.load_full()
580                        && &prev_proof.as_ref().proof_for == handle.id()
581                    {
582                        break 'prev_proof prev_proof;
583                    }
584
585                    let prev_key_block_proof = block_storage
586                        .load_block_proof(&handle)
587                        .await
588                        .context("failed to load prev key block proof")?;
589
590                    // NOTE: Assume that there is only one masterchain block using this cache.
591                    // Otherwise, it will be overwritten every time. Maybe use `rcu`.
592                    self.cached_prev_key_block_proof
593                        .store(Some(prev_key_block_proof.clone()));
594
595                    prev_key_block_proof
596                };
597
598                check_with_prev_key_block_proof(
599                    proof,
600                    &prev_key_block_proof,
601                    &virt_block,
602                    &virt_block_info,
603                )?;
604            }
605        }
606
607        if store_on_success {
608            // Store proof
609            let res = block_storage
610                .store_block_proof(proof, MaybeExistingHandle::New(meta))
611                .await?;
612
613            // Store queue diff
614            block_storage
615                .store_queue_diff(queue_diff, res.handle.into())
616                .await?;
617        }
618
619        Ok(meta)
620    }
621
622    fn load_zerostate_seqno(&self) -> u32 {
623        let seqno = self.zerostate_seqno.load(Ordering::Acquire);
624        if seqno != UNINIT_MC_SEQNO {
625            return seqno;
626        }
627
628        let seqno = self
629            .storage
630            .node_state()
631            .load_zerostate_mc_seqno()
632            .unwrap_or_default();
633        self.zerostate_seqno.store(seqno, Ordering::Release);
634        seqno
635    }
636
637    async fn load_zerostate(&self, handle: &BlockHandle) -> Result<ShardStateStuff> {
638        if let Some(zerostate) = self.cached_zerostate.load_full() {
639            return Ok(zerostate);
640        }
641
642        let zerostate = if let Some(proof) = self.storage.node_state().load_zerostate_proof() {
643            let untracked = self
644                .storage
645                .shard_state_storage()
646                .min_ref_mc_state()
647                .insert_untracked();
648
649            ShardStateStuff::from_root(handle.id(), Cell::virtualize(proof), untracked)
650                .context("failed to parse zerostate proof")?
651        } else {
652            // Fallback to the previos behavior.
653            self.storage
654                .shard_state_storage()
655                .load_state(0, handle.id())
656                .await?
657        };
658
659        self.cached_zerostate.store(Some(zerostate.clone()));
660        Ok(zerostate)
661    }
662}
663
664const UNINIT_MC_SEQNO: u32 = u32::MAX;
665
666#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
667#[serde(default)]
668pub struct RetryConfig {
669    /// Retry limit.
670    ///
671    /// Default: 1.
672    pub attempts: usize,
673
674    /// Polling interval.
675    ///
676    /// Default: 1 second.
677    #[serde(with = "serde_helpers::humantime")]
678    pub interval: Duration,
679}
680
681impl Default for RetryConfig {
682    fn default() -> Self {
683        Self {
684            attempts: 1,
685            interval: Duration::from_secs(1),
686        }
687    }
688}
689
690#[cfg(test)]
691mod test {
692    use std::sync::Arc;
693    use std::sync::atomic::{AtomicBool, Ordering};
694
695    use tycho_block_util::block::{BlockIdExt, BlockStuff};
696    use tycho_types::boc::Boc;
697    use tycho_types::models::Block;
698
699    use super::*;
700
701    struct MockBlockProvider {
702        // let's give it some state, pretending it's useful
703        has_block: AtomicBool,
704    }
705
706    impl BlockProvider for MockBlockProvider {
707        type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
708        type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
709        type CleanupFut<'a> = future::Ready<Result<()>>;
710
711        fn get_next_block(&self, _prev_block_id: &BlockId) -> Self::GetNextBlockFut<'_> {
712            Box::pin(async {
713                if self.has_block.load(Ordering::Acquire) {
714                    Some(Ok(get_empty_block()))
715                } else {
716                    None
717                }
718            })
719        }
720
721        fn get_block(&self, _block_id: &BlockIdRelation) -> Self::GetBlockFut<'_> {
722            Box::pin(async {
723                if self.has_block.load(Ordering::Acquire) {
724                    Some(Ok(get_empty_block()))
725                } else {
726                    None
727                }
728            })
729        }
730
731        fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
732            future::ready(Ok(()))
733        }
734    }
735
736    #[tokio::test]
737    async fn chain_block_provider_switches_providers_correctly() {
738        let left_provider = Arc::new(MockBlockProvider {
739            has_block: AtomicBool::new(true),
740        });
741        let right_provider = Arc::new(MockBlockProvider {
742            has_block: AtomicBool::new(false),
743        });
744
745        let chain_provider = ChainBlockProvider::new(left_provider.clone(), right_provider.clone());
746
747        chain_provider
748            .get_next_block(&get_default_block_id())
749            .await
750            .unwrap()
751            .unwrap();
752
753        // Now let's pretend the left provider ran out of blocks.
754        left_provider.has_block.store(false, Ordering::Release);
755        right_provider.has_block.store(true, Ordering::Release);
756
757        chain_provider
758            .get_next_block(&get_default_block_id())
759            .await
760            .unwrap()
761            .unwrap();
762
763        // End of blocks stream for both providers
764        left_provider.has_block.store(false, Ordering::Release);
765        right_provider.has_block.store(false, Ordering::Release);
766
767        assert!(
768            chain_provider
769                .get_next_block(&get_default_block_id())
770                .await
771                .is_none()
772        );
773    }
774
775    #[tokio::test]
776    async fn cycle_block_provider_switches_providers_correctly() {
777        const LEFT_LIMIT: usize = 10;
778        const RIGHT_LIMIT: usize = 1;
779
780        const POLLING_INTERVAL_MS: u64 = 100;
781
782        let left_provider = Arc::new(MockBlockProvider {
783            has_block: AtomicBool::new(true),
784        });
785        let right_provider = Arc::new(MockBlockProvider {
786            has_block: AtomicBool::new(false),
787        });
788
789        let left_config = RetryConfig {
790            attempts: LEFT_LIMIT,
791            interval: Duration::from_millis(POLLING_INTERVAL_MS),
792        };
793
794        let right_config = RetryConfig {
795            attempts: RIGHT_LIMIT,
796            interval: Duration::from_millis(POLLING_INTERVAL_MS),
797        };
798
799        let left = left_provider.clone().retry(left_config);
800        let right = right_provider.clone().retry(right_config);
801        let cycle_provider = left.cycle(right);
802
803        assert_eq!(
804            cycle_provider.state.lock().unwrap().current,
805            CycleBlockProviderPart::Left
806        );
807
808        cycle_provider
809            .get_next_block(&get_default_block_id())
810            .await
811            .unwrap()
812            .unwrap();
813
814        // Now let's pretend the left provider ran out of blocks.
815        left_provider.has_block.store(false, Ordering::Release);
816        right_provider.has_block.store(true, Ordering::Release);
817
818        while cycle_provider
819            .get_next_block(&get_default_block_id())
820            .await
821            .is_none()
822        {}
823
824        cycle_provider
825            .get_next_block(&get_default_block_id())
826            .await
827            .unwrap()
828            .unwrap();
829
830        cycle_provider
831            .cleanup_until(get_default_block_id().seqno + 1)
832            .await
833            .unwrap();
834
835        assert_eq!(
836            cycle_provider.state.lock().unwrap().current,
837            CycleBlockProviderPart::Right
838        );
839
840        // Cycle switch
841        left_provider.has_block.store(true, Ordering::Release);
842        right_provider.has_block.store(false, Ordering::Release);
843
844        while cycle_provider
845            .get_next_block(&get_default_block_id())
846            .await
847            .is_none()
848        {}
849
850        cycle_provider
851            .get_next_block(&get_default_block_id())
852            .await
853            .unwrap()
854            .unwrap();
855
856        cycle_provider
857            .cleanup_until(get_default_block_id().seqno + 1)
858            .await
859            .unwrap();
860
861        assert_eq!(
862            cycle_provider.state.lock().unwrap().current,
863            CycleBlockProviderPart::Left
864        );
865
866        cycle_provider
867            .get_block(&get_default_block_id().relative_to_self())
868            .await
869            .unwrap()
870            .unwrap();
871        assert_eq!(
872            cycle_provider.state.lock().unwrap().current,
873            CycleBlockProviderPart::Left
874        );
875
876        left_provider.has_block.store(false, Ordering::Release);
877        right_provider.has_block.store(true, Ordering::Release);
878
879        let block = cycle_provider
880            .get_block(&get_default_block_id().relative_to_self())
881            .await;
882        assert!(block.is_none());
883    }
884
885    fn get_empty_block() -> BlockStuffAug {
886        let block_data = include_bytes!("../../../tests/data/empty_block.bin");
887        let root = Boc::decode(block_data).unwrap();
888        let block = root.parse::<Block>().unwrap();
889
890        let block_id = BlockId {
891            root_hash: *root.repr_hash(),
892            ..Default::default()
893        };
894
895        BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
896            .with_archive_data(block_data.as_slice())
897    }
898
899    fn get_default_block_id() -> BlockId {
900        BlockId::default()
901    }
902}