1use std::future::Future;
2use std::pin::pin;
3use std::sync::atomic::{AtomicU32, Ordering};
4use std::sync::{Arc, Mutex};
5use std::time::Duration;
6
7use anyhow::{Context, Result};
8use arc_swap::{ArcSwapAny, ArcSwapOption};
9use futures_util::future::{self, BoxFuture};
10use serde::{Deserialize, Serialize};
11use tycho_block_util::block::{
12 BlockIdRelation, BlockProofStuff, BlockProofStuffAug, BlockStuff, BlockStuffAug,
13 check_with_master_state, check_with_prev_key_block_proof,
14};
15use tycho_block_util::queue::QueueDiffStuffAug;
16use tycho_block_util::state::ShardStateStuff;
17use tycho_types::models::BlockId;
18use tycho_types::prelude::*;
19use tycho_util::metrics::HistogramGuard;
20use tycho_util::serde_helpers;
21
22pub use self::archive_provider::{
23 ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
24 ArchiveResponse, FoundArchive, HybridArchiveClient, IntoArchiveClient,
25};
26pub use self::blockchain_provider::{BlockchainBlockProvider, BlockchainBlockProviderConfig};
27pub use self::box_provider::BoxBlockProvider;
28use self::futures::SelectNonEmptyFut;
29pub use self::storage_provider::StorageBlockProvider;
30use crate::storage::{BlockHandle, CoreStorage, MaybeExistingHandle, NewBlockMeta};
31
32mod archive_provider;
33mod blockchain_provider;
34mod box_provider;
35mod futures;
36mod storage_provider;
37
38pub type OptionalBlockStuff = Option<Result<BlockStuffAug>>;
39
40pub trait BlockProvider: Send + Sync + 'static {
42 type GetNextBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
43 type GetBlockFut<'a>: Future<Output = OptionalBlockStuff> + Send + 'a;
44 type CleanupFut<'a>: Future<Output = Result<()>> + Send + 'a;
45
46 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a>;
48 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a>;
50 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_>;
52}
53
54impl<T: BlockProvider> BlockProvider for Box<T> {
55 type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
56 type GetBlockFut<'a> = T::GetBlockFut<'a>;
57 type CleanupFut<'a> = T::CleanupFut<'a>;
58
59 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
60 <T as BlockProvider>::get_next_block(self, prev_block_id)
61 }
62
63 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
64 <T as BlockProvider>::get_block(self, block_id_relation)
65 }
66
67 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
68 <T as BlockProvider>::cleanup_until(self, mc_seqno)
69 }
70}
71
72impl<T: BlockProvider> BlockProvider for Arc<T> {
73 type GetNextBlockFut<'a> = T::GetNextBlockFut<'a>;
74 type GetBlockFut<'a> = T::GetBlockFut<'a>;
75 type CleanupFut<'a> = T::CleanupFut<'a>;
76
77 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
78 <T as BlockProvider>::get_next_block(self, prev_block_id)
79 }
80
81 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
82 <T as BlockProvider>::get_block(self, block_id_relation)
83 }
84
85 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
86 <T as BlockProvider>::cleanup_until(self, mc_seqno)
87 }
88}
89
90pub trait BlockProviderExt: Sized {
91 fn boxed(self) -> BoxBlockProvider;
92
93 fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T>;
94
95 fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T>;
96
97 fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self>;
98}
99
100impl<B: BlockProvider> BlockProviderExt for B {
101 fn boxed(self) -> BoxBlockProvider {
102 castaway::match_type!(self, {
103 BoxBlockProvider as provider => provider,
104 provider => BoxBlockProvider::new(provider),
105 })
106 }
107
108 fn chain<T: BlockProvider>(self, other: T) -> ChainBlockProvider<Self, T> {
109 ChainBlockProvider::new(self, other)
110 }
111
112 fn cycle<T: BlockProvider>(self, other: T) -> CycleBlockProvider<Self, T> {
113 CycleBlockProvider::new(self, other)
114 }
115
116 fn retry(self, config: RetryConfig) -> RetryBlockProvider<Self> {
117 RetryBlockProvider {
118 inner: self,
119 config,
120 }
121 }
122}
123
124#[derive(Debug, Clone, Copy)]
126pub struct EmptyBlockProvider;
127
128impl BlockProvider for EmptyBlockProvider {
129 type GetNextBlockFut<'a> = future::Ready<OptionalBlockStuff>;
130 type GetBlockFut<'a> = future::Ready<OptionalBlockStuff>;
131 type CleanupFut<'a> = future::Ready<Result<()>>;
132
133 fn get_next_block<'a>(&'a self, _prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
134 future::ready(None)
135 }
136
137 fn get_block<'a>(&'a self, _block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
138 future::ready(None)
139 }
140
141 fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
142 future::ready(Ok(()))
143 }
144}
145
146pub struct ChainBlockProvider<T1, T2> {
147 left: ArcSwapOption<T1>,
148 right: T2,
149 cleanup_left_at: AtomicU32,
150}
151
152impl<T1, T2> ChainBlockProvider<T1, T2> {
153 pub fn new(left: T1, right: T2) -> Self {
154 Self {
155 left: ArcSwapAny::new(Some(Arc::new(left))),
156 right,
157 cleanup_left_at: AtomicU32::new(u32::MAX),
158 }
159 }
160}
161
162impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for ChainBlockProvider<T1, T2> {
163 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
164 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
165 type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
166
167 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
168 if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
169 && let Some(left) = self.left.load_full()
170 {
171 return Box::pin(async move {
172 let res = left.get_next_block(prev_block_id).await;
173 if res.is_some() {
174 return res;
175 }
176
177 self.cleanup_left_at
179 .store(prev_block_id.seqno.saturating_add(1), Ordering::Release);
180
181 self.right.get_next_block(prev_block_id).await
183 });
184 }
185
186 Box::pin(self.right.get_next_block(prev_block_id))
187 }
188
189 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
190 if self.cleanup_left_at.load(Ordering::Acquire) == u32::MAX
191 && let Some(left) = self.left.load_full()
192 {
193 return Box::pin(async move { left.get_block(block_id_relation).await });
194 }
195
196 Box::pin(self.right.get_block(block_id_relation))
197 }
198
199 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
200 Box::pin(async move {
201 let cleanup_left_at = self.cleanup_left_at.load(Ordering::Acquire);
206
207 if cleanup_left_at > 0 && cleanup_left_at <= mc_seqno {
208 if let Some(left) = self.left.load_full() {
210 left.cleanup_until(mc_seqno).await?;
211 self.left.store(None);
212 self.cleanup_left_at.store(0, Ordering::Release);
213 }
214 } else if cleanup_left_at == u32::MAX {
215 if let Some(left) = self.left.load_full() {
217 return left.cleanup_until(mc_seqno).await;
218 }
219 }
220
221 self.right.cleanup_until(mc_seqno).await
223 })
224 }
225}
226
227pub struct CycleBlockProvider<T1, T2> {
228 left: T1,
229 right: T2,
230 state: Mutex<CycleBlockProviderState>,
231}
232
233struct CycleBlockProviderState {
234 current: CycleBlockProviderPart,
236 switch_at: u32,
238}
239
240#[derive(Debug, Clone, Copy, Eq, PartialEq)]
241enum CycleBlockProviderPart {
242 Left,
243 Right,
244}
245
246impl std::ops::Not for CycleBlockProviderPart {
247 type Output = Self;
248
249 #[inline]
250 fn not(self) -> Self::Output {
251 match self {
252 Self::Left => Self::Right,
253 Self::Right => Self::Left,
254 }
255 }
256}
257
258impl<T1, T2> CycleBlockProvider<T1, T2> {
259 pub fn new(left: T1, right: T2) -> Self {
260 Self {
261 left,
262 right,
263 state: Mutex::new(CycleBlockProviderState {
264 current: CycleBlockProviderPart::Left,
265 switch_at: u32::MAX,
266 }),
267 }
268 }
269
270 fn choose_provider(&self, mc_seqno: u32) -> CycleBlockProviderPart {
281 let state = self.state.lock().unwrap();
282 if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
283 !state.current
285 } else {
286 state.current
287 }
288 }
289
290 fn toggle_switch_at(&self, mc_seqno: u32) -> CycleBlockProviderPart {
291 let mut state = self.state.lock().unwrap();
292 if state.switch_at == u32::MAX {
293 state.switch_at = mc_seqno;
294 !state.current
295 } else {
296 state.switch_at = u32::MAX;
297 state.current
298 }
299 }
300
301 fn try_apply_switch(&self, mc_seqno: u32) {
302 let mut state = self.state.lock().unwrap();
303 if state.switch_at != u32::MAX && state.switch_at <= mc_seqno {
304 state.current = !state.current;
305 state.switch_at = u32::MAX;
306 }
307 }
308}
309
310impl<T1: BlockProvider, T2: BlockProvider> BlockProvider for CycleBlockProvider<T1, T2> {
311 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
312 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
313 type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
314
315 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
316 Box::pin(async move {
317 let mut res = match self.choose_provider(prev_block_id.seqno) {
319 CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
320 CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
321 };
322 if res.is_some() {
323 return res;
324 }
325
326 loop {
327 res = match self.toggle_switch_at(prev_block_id.seqno.saturating_add(1)) {
329 CycleBlockProviderPart::Left => self.left.get_next_block(prev_block_id).await,
330 CycleBlockProviderPart::Right => self.right.get_next_block(prev_block_id).await,
331 };
332 if res.is_some() {
333 return res;
334 }
335
336 tokio::task::yield_now().await;
339 }
340 })
341 }
342
343 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
344 match self.choose_provider(block_id_relation.mc_block_id.seqno) {
345 CycleBlockProviderPart::Left => Box::pin(self.left.get_block(block_id_relation)),
346 CycleBlockProviderPart::Right => Box::pin(self.right.get_block(block_id_relation)),
347 }
348 }
349
350 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
351 Box::pin(async move {
352 self.try_apply_switch(mc_seqno);
353
354 let cleanup_left = self.left.cleanup_until(mc_seqno);
355 let cleanup_right = self.right.cleanup_until(mc_seqno);
356 match futures_util::future::join(cleanup_left, cleanup_right).await {
357 (Err(e), _) | (_, Err(e)) => Err(e),
358 (Ok(()), Ok(())) => Ok(()),
359 }
360 })
361 }
362}
363
364pub struct RetryBlockProvider<T> {
365 inner: T,
366 config: RetryConfig,
367}
368
369impl<T: BlockProvider> BlockProvider for RetryBlockProvider<T> {
370 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
371 type GetBlockFut<'a> = T::GetBlockFut<'a>;
372 type CleanupFut<'a> = T::CleanupFut<'a>;
373
374 fn get_next_block<'a>(&'a self, prev_block_id: &'a BlockId) -> Self::GetNextBlockFut<'a> {
375 Box::pin(async move {
376 let mut attempts = 0usize;
377
378 loop {
379 let res = self.inner.get_next_block(prev_block_id).await;
380 if res.is_some() || attempts >= self.config.attempts {
381 break res;
382 }
383
384 attempts += 1;
385
386 tokio::time::sleep(self.config.interval).await;
388 }
389 })
390 }
391
392 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
393 self.inner.get_block(block_id_relation)
394 }
395
396 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
397 self.inner.cleanup_until(mc_seqno)
398 }
399}
400
401macro_rules! impl_provider_tuple {
402 ($join_fn:path, |$e:ident| $err_pat:pat$(,)?, {
403 $($n:tt: $var:ident = $ty:ident),*$(,)?
404 }) => {
405 impl<$($ty),*> BlockProvider for ($($ty),*)
406 where
407 $($ty: BlockProvider),*
408 {
409 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
410 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
411 type CleanupFut<'a> = BoxFuture<'a, Result<()>>;
412
413 fn get_next_block<'a>(
414 &'a self,
415 prev_block_id: &'a BlockId,
416 ) -> Self::GetNextBlockFut<'a> {
417 $(let $var = self.$n.get_next_block(prev_block_id));*;
418
419 Box::pin(async move {
420 $(let $var = pin!($var));*;
421 SelectNonEmptyFut::from(($($var),*)).await
422 })
423 }
424
425 fn get_block<'a>(&'a self, block_id_relation: &'a BlockIdRelation) -> Self::GetBlockFut<'a> {
426 $(let $var = self.$n.get_block(block_id_relation));*;
427
428 Box::pin(async move {
429 $(let $var = pin!($var));*;
430 SelectNonEmptyFut::from(($($var),*)).await
431 })
432 }
433
434 fn cleanup_until(&self, mc_seqno: u32) -> Self::CleanupFut<'_> {
435 $(let $var = self.$n.cleanup_until(mc_seqno));*;
436
437 Box::pin(async move {
438 match $join_fn($($var),*).await {
439 $err_pat => Err($e),
440 _ => Ok(())
441 }
442 })
443 }
444 }
445 };
446}
447
448impl_provider_tuple! {
449 futures_util::future::join,
450 |e| (Err(e), _) | (_, Err(e)),
451 {
452 0: a = T0,
453 1: b = T1,
454 }
455}
456impl_provider_tuple! {
457 futures_util::future::join3,
458 |e| (Err(e), _, _) | (_, Err(e), _) | (_, _, Err(e)),
459 {
460 0: a = T0,
461 1: b = T1,
462 2: c = T2,
463 }
464}
465impl_provider_tuple! {
466 futures_util::future::join4,
467 |e| (Err(e), _, _, _) | (_, Err(e), _, _) | (_, _, Err(e), _) | (_, _, _, Err(e)),
468 {
469 0: a = T0,
470 1: b = T1,
471 2: c = T2,
472 3: d = T3,
473 }
474}
475impl_provider_tuple! {
476 futures_util::future::join5,
477 |e|
478 (Err(e), _, _, _, _)
479 | (_, Err(e), _, _, _)
480 | (_, _, Err(e), _, _)
481 | (_, _, _, Err(e), _)
482 | (_, _, _, _, Err(e)),
483 {
484 0: a = T0,
485 1: b = T1,
486 2: c = T2,
487 3: d = T3,
488 4: e = T4,
489 }
490}
491
492pub struct CheckProof<'a> {
493 pub mc_block_id: &'a BlockId,
494 pub block: &'a BlockStuff,
495 pub proof: &'a BlockProofStuffAug,
496 pub queue_diff: &'a QueueDiffStuffAug,
497
498 pub store_on_success: bool,
500}
501
502pub struct ProofChecker {
505 storage: CoreStorage,
506 zerostate_seqno: AtomicU32,
507 cached_zerostate: ArcSwapAny<Option<ShardStateStuff>>,
508 cached_prev_key_block_proof: ArcSwapAny<Option<BlockProofStuff>>,
509}
510
511impl ProofChecker {
512 pub fn new(storage: CoreStorage) -> Self {
513 Self {
514 storage,
515 zerostate_seqno: AtomicU32::new(UNINIT_MC_SEQNO),
516 cached_zerostate: Default::default(),
517 cached_prev_key_block_proof: Default::default(),
518 }
519 }
520
521 pub async fn check_proof(&self, ctx: CheckProof<'_>) -> Result<NewBlockMeta> {
522 let _histogram = HistogramGuard::begin("tycho_core_check_block_proof_time");
524
525 let CheckProof {
526 mc_block_id,
527 block,
528 proof,
529 queue_diff,
530 store_on_success,
531 } = ctx;
532
533 anyhow::ensure!(
534 block.id() == &proof.proof().proof_for,
535 "proof_for and block id mismatch: proof_for={}, block_id={}",
536 proof.proof().proof_for,
537 block.id(),
538 );
539
540 let is_masterchain = block.id().is_masterchain();
541 anyhow::ensure!(is_masterchain ^ proof.is_link(), "unexpected proof type");
542
543 let (virt_block, virt_block_info) = proof.pre_check_block_proof()?;
544 let meta = NewBlockMeta {
545 is_key_block: virt_block_info.key_block,
546 gen_utime: virt_block_info.gen_utime,
547 ref_by_mc_seqno: mc_block_id.seqno,
548 };
549
550 let block_storage = self.storage.block_storage();
551
552 anyhow::ensure!(
553 &virt_block.out_msg_queue_updates.diff_hash == queue_diff.diff_hash(),
554 "queue diff mismatch (expected: {}, got: {})",
555 virt_block.out_msg_queue_updates.diff_hash,
556 queue_diff.diff_hash(),
557 );
558
559 if is_masterchain {
560 let block_handles = self.storage.block_handle_storage();
561 let handle = block_handles
562 .load_key_block_handle(virt_block_info.prev_key_block_seqno)
563 .ok_or_else(|| {
564 anyhow::anyhow!(
565 "failed to load prev key block handle by prev_key_block_seqno {}",
566 virt_block_info.prev_key_block_seqno
567 )
568 })?;
569
570 if handle.id().seqno == self.load_zerostate_seqno() {
571 let zerostate = self
572 .load_zerostate(&handle)
573 .await
574 .context("failed to load zerostate to check proof")?;
575
576 check_with_master_state(proof, &zerostate, &virt_block, &virt_block_info)?;
577 } else {
578 let prev_key_block_proof = 'prev_proof: {
579 if let Some(prev_proof) = self.cached_prev_key_block_proof.load_full()
580 && &prev_proof.as_ref().proof_for == handle.id()
581 {
582 break 'prev_proof prev_proof;
583 }
584
585 let prev_key_block_proof = block_storage
586 .load_block_proof(&handle)
587 .await
588 .context("failed to load prev key block proof")?;
589
590 self.cached_prev_key_block_proof
593 .store(Some(prev_key_block_proof.clone()));
594
595 prev_key_block_proof
596 };
597
598 check_with_prev_key_block_proof(
599 proof,
600 &prev_key_block_proof,
601 &virt_block,
602 &virt_block_info,
603 )?;
604 }
605 }
606
607 if store_on_success {
608 let res = block_storage
610 .store_block_proof(proof, MaybeExistingHandle::New(meta))
611 .await?;
612
613 block_storage
615 .store_queue_diff(queue_diff, res.handle.into())
616 .await?;
617 }
618
619 Ok(meta)
620 }
621
622 fn load_zerostate_seqno(&self) -> u32 {
623 let seqno = self.zerostate_seqno.load(Ordering::Acquire);
624 if seqno != UNINIT_MC_SEQNO {
625 return seqno;
626 }
627
628 let seqno = self
629 .storage
630 .node_state()
631 .load_zerostate_mc_seqno()
632 .unwrap_or_default();
633 self.zerostate_seqno.store(seqno, Ordering::Release);
634 seqno
635 }
636
637 async fn load_zerostate(&self, handle: &BlockHandle) -> Result<ShardStateStuff> {
638 if let Some(zerostate) = self.cached_zerostate.load_full() {
639 return Ok(zerostate);
640 }
641
642 let zerostate = if let Some(proof) = self.storage.node_state().load_zerostate_proof() {
643 let untracked = self
644 .storage
645 .shard_state_storage()
646 .min_ref_mc_state()
647 .insert_untracked();
648
649 ShardStateStuff::from_root(handle.id(), Cell::virtualize(proof), untracked)
650 .context("failed to parse zerostate proof")?
651 } else {
652 self.storage
654 .shard_state_storage()
655 .load_state(0, handle.id())
656 .await?
657 };
658
659 self.cached_zerostate.store(Some(zerostate.clone()));
660 Ok(zerostate)
661 }
662}
663
664const UNINIT_MC_SEQNO: u32 = u32::MAX;
665
666#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
667#[serde(default)]
668pub struct RetryConfig {
669 pub attempts: usize,
673
674 #[serde(with = "serde_helpers::humantime")]
678 pub interval: Duration,
679}
680
681impl Default for RetryConfig {
682 fn default() -> Self {
683 Self {
684 attempts: 1,
685 interval: Duration::from_secs(1),
686 }
687 }
688}
689
690#[cfg(test)]
691mod test {
692 use std::sync::Arc;
693 use std::sync::atomic::{AtomicBool, Ordering};
694
695 use tycho_block_util::block::{BlockIdExt, BlockStuff};
696 use tycho_types::boc::Boc;
697 use tycho_types::models::Block;
698
699 use super::*;
700
701 struct MockBlockProvider {
702 has_block: AtomicBool,
704 }
705
706 impl BlockProvider for MockBlockProvider {
707 type GetNextBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
708 type GetBlockFut<'a> = BoxFuture<'a, OptionalBlockStuff>;
709 type CleanupFut<'a> = future::Ready<Result<()>>;
710
711 fn get_next_block(&self, _prev_block_id: &BlockId) -> Self::GetNextBlockFut<'_> {
712 Box::pin(async {
713 if self.has_block.load(Ordering::Acquire) {
714 Some(Ok(get_empty_block()))
715 } else {
716 None
717 }
718 })
719 }
720
721 fn get_block(&self, _block_id: &BlockIdRelation) -> Self::GetBlockFut<'_> {
722 Box::pin(async {
723 if self.has_block.load(Ordering::Acquire) {
724 Some(Ok(get_empty_block()))
725 } else {
726 None
727 }
728 })
729 }
730
731 fn cleanup_until(&self, _mc_seqno: u32) -> Self::CleanupFut<'_> {
732 future::ready(Ok(()))
733 }
734 }
735
736 #[tokio::test]
737 async fn chain_block_provider_switches_providers_correctly() {
738 let left_provider = Arc::new(MockBlockProvider {
739 has_block: AtomicBool::new(true),
740 });
741 let right_provider = Arc::new(MockBlockProvider {
742 has_block: AtomicBool::new(false),
743 });
744
745 let chain_provider = ChainBlockProvider::new(left_provider.clone(), right_provider.clone());
746
747 chain_provider
748 .get_next_block(&get_default_block_id())
749 .await
750 .unwrap()
751 .unwrap();
752
753 left_provider.has_block.store(false, Ordering::Release);
755 right_provider.has_block.store(true, Ordering::Release);
756
757 chain_provider
758 .get_next_block(&get_default_block_id())
759 .await
760 .unwrap()
761 .unwrap();
762
763 left_provider.has_block.store(false, Ordering::Release);
765 right_provider.has_block.store(false, Ordering::Release);
766
767 assert!(
768 chain_provider
769 .get_next_block(&get_default_block_id())
770 .await
771 .is_none()
772 );
773 }
774
775 #[tokio::test]
776 async fn cycle_block_provider_switches_providers_correctly() {
777 const LEFT_LIMIT: usize = 10;
778 const RIGHT_LIMIT: usize = 1;
779
780 const POLLING_INTERVAL_MS: u64 = 100;
781
782 let left_provider = Arc::new(MockBlockProvider {
783 has_block: AtomicBool::new(true),
784 });
785 let right_provider = Arc::new(MockBlockProvider {
786 has_block: AtomicBool::new(false),
787 });
788
789 let left_config = RetryConfig {
790 attempts: LEFT_LIMIT,
791 interval: Duration::from_millis(POLLING_INTERVAL_MS),
792 };
793
794 let right_config = RetryConfig {
795 attempts: RIGHT_LIMIT,
796 interval: Duration::from_millis(POLLING_INTERVAL_MS),
797 };
798
799 let left = left_provider.clone().retry(left_config);
800 let right = right_provider.clone().retry(right_config);
801 let cycle_provider = left.cycle(right);
802
803 assert_eq!(
804 cycle_provider.state.lock().unwrap().current,
805 CycleBlockProviderPart::Left
806 );
807
808 cycle_provider
809 .get_next_block(&get_default_block_id())
810 .await
811 .unwrap()
812 .unwrap();
813
814 left_provider.has_block.store(false, Ordering::Release);
816 right_provider.has_block.store(true, Ordering::Release);
817
818 while cycle_provider
819 .get_next_block(&get_default_block_id())
820 .await
821 .is_none()
822 {}
823
824 cycle_provider
825 .get_next_block(&get_default_block_id())
826 .await
827 .unwrap()
828 .unwrap();
829
830 cycle_provider
831 .cleanup_until(get_default_block_id().seqno + 1)
832 .await
833 .unwrap();
834
835 assert_eq!(
836 cycle_provider.state.lock().unwrap().current,
837 CycleBlockProviderPart::Right
838 );
839
840 left_provider.has_block.store(true, Ordering::Release);
842 right_provider.has_block.store(false, Ordering::Release);
843
844 while cycle_provider
845 .get_next_block(&get_default_block_id())
846 .await
847 .is_none()
848 {}
849
850 cycle_provider
851 .get_next_block(&get_default_block_id())
852 .await
853 .unwrap()
854 .unwrap();
855
856 cycle_provider
857 .cleanup_until(get_default_block_id().seqno + 1)
858 .await
859 .unwrap();
860
861 assert_eq!(
862 cycle_provider.state.lock().unwrap().current,
863 CycleBlockProviderPart::Left
864 );
865
866 cycle_provider
867 .get_block(&get_default_block_id().relative_to_self())
868 .await
869 .unwrap()
870 .unwrap();
871 assert_eq!(
872 cycle_provider.state.lock().unwrap().current,
873 CycleBlockProviderPart::Left
874 );
875
876 left_provider.has_block.store(false, Ordering::Release);
877 right_provider.has_block.store(true, Ordering::Release);
878
879 let block = cycle_provider
880 .get_block(&get_default_block_id().relative_to_self())
881 .await;
882 assert!(block.is_none());
883 }
884
885 fn get_empty_block() -> BlockStuffAug {
886 let block_data = include_bytes!("../../../tests/data/empty_block.bin");
887 let root = Boc::decode(block_data).unwrap();
888 let block = root.parse::<Block>().unwrap();
889
890 let block_id = BlockId {
891 root_hash: *root.repr_hash(),
892 ..Default::default()
893 };
894
895 BlockStuff::from_block_and_root(&block_id, block, root, block_data.len())
896 .with_archive_data(block_data.as_slice())
897 }
898
899 fn get_default_block_id() -> BlockId {
900 BlockId::default()
901 }
902}