tycho_core/block_strider/
mod.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9    BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19    ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
20    ArchiveResponse, ArchiveWriter, BlockProvider, BlockProviderExt, BlockchainBlockProvider,
21    BlockchainBlockProviderConfig, BoxBlockProvider, ChainBlockProvider, CheckProof,
22    CycleBlockProvider, EmptyBlockProvider, FoundArchive, HybridArchiveClient, IntoArchiveClient,
23    OptionalBlockStuff, ProofChecker, RetryBlockProvider, RetryConfig, StorageBlockProvider,
24};
25pub use self::starter::{
26    ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
27    ValidateQueueState, ZerostateProvider,
28};
29pub use self::state::{
30    BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
31    TempBlockStriderState,
32};
33pub use self::state_applier::ShardStateApplier;
34#[cfg(any(test, feature = "test"))]
35pub use self::subscriber::test::PrintSubscriber;
36pub use self::subscriber::{
37    ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
38    BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
39    DelayedTasksJoinHandle, DelayedTasksSpawner, GcSubscriber, ManualGcTrigger, MetricsSubscriber,
40    NoopSubscriber, PsSubscriber, StateSubscriber, StateSubscriberContext, StateSubscriberExt,
41};
42use crate::storage::CoreStorage;
43
44mod archive_handler;
45mod block_saver;
46mod provider;
47mod starter;
48mod state;
49mod state_applier;
50mod subscriber;
51
52pub struct BlockStriderBuilder<T, P, B> {
53    state: T,
54    provider: P,
55    subscriber: B,
56}
57
58impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
59    #[inline]
60    pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
61        BlockStriderBuilder {
62            state,
63            provider: self.provider,
64            subscriber: self.subscriber,
65        }
66    }
67}
68
69impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
70    #[inline]
71    pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
72        BlockStriderBuilder {
73            state: self.state,
74            provider,
75            subscriber: self.subscriber,
76        }
77    }
78}
79
80impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
81    #[inline]
82    pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
83    where
84        B: BlockSubscriber,
85    {
86        BlockStriderBuilder {
87            state: self.state,
88            provider: self.provider,
89            subscriber,
90        }
91    }
92}
93
94impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
95    pub fn with_state_subscriber<S>(
96        self,
97        storage: CoreStorage,
98        state_subscriber: S,
99    ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
100    where
101        S: StateSubscriber,
102    {
103        BlockStriderBuilder {
104            state: self.state,
105            provider: self.provider,
106            subscriber: ShardStateApplier::new(storage, state_subscriber),
107        }
108    }
109}
110
111impl<T, P, B> BlockStriderBuilder<T, P, B>
112where
113    T: BlockStriderState,
114    P: BlockProvider,
115    B: BlockSubscriber,
116{
117    pub fn build(self) -> BlockStrider<T, P, B> {
118        BlockStrider {
119            state: self.state,
120            provider: Arc::new(self.provider),
121            subscriber: Arc::new(self.subscriber),
122        }
123    }
124}
125
126pub struct BlockStrider<T, P, B> {
127    state: T,
128    provider: Arc<P>,
129    subscriber: Arc<B>,
130}
131
132impl BlockStrider<(), (), ()> {
133    pub fn builder() -> BlockStriderBuilder<(), (), ()> {
134        BlockStriderBuilder {
135            state: (),
136            provider: (),
137            subscriber: (),
138        }
139    }
140}
141
142impl<S, P, B> BlockStrider<S, P, B>
143where
144    S: BlockStriderState,
145    P: BlockProvider,
146    B: BlockSubscriber,
147{
148    /// Walks through blocks and handles them.
149    ///
150    /// Stops either when the provider is exhausted or it can't provide a requested block.
151    pub async fn run(self) -> Result<()> {
152        tracing::info!("block strider loop started");
153
154        let mut next_master_fut =
155            JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
156
157        while let Some(next) = next_master_fut.await.transpose()? {
158            // NOTE: Start fetching the next master block in parallel to the processing of the current one
159            // If we have a chain of providers, when switching to the next one, since blocks are processed
160            // asynchronously and in parallel with requesting the next block, the processing of the
161            // previous block may already use the new provider. Therefore, the next provider must
162            // necessarily store the previous block.
163            next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
164
165            let mc_seqno = next.id().seqno;
166
167            {
168                let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
169                self.process_mc_block(next.data, next.archive_data).await?;
170            }
171
172            {
173                let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
174                self.provider.cleanup_until(mc_seqno).await?;
175            }
176        }
177
178        tracing::info!("block strider loop finished");
179        Ok(())
180    }
181
182    /// Processes a single masterchain block and its shard blocks.
183    async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
184        let mc_block_id = *block.id();
185        tracing::debug!(%mc_block_id, "processing masterchain block");
186
187        let started_at = Instant::now();
188
189        let custom = block.load_custom()?;
190        let is_key_block = custom.config.is_some();
191
192        // Begin preparing master block in the background
193        let (delayed_handle, delayed) = DelayedTasks::new();
194        let prepared_master = {
195            let cx = Box::new(BlockSubscriberContext {
196                mc_block_id,
197                mc_is_key_block: is_key_block,
198                is_key_block,
199                block: block.clone(),
200                archive_data,
201                delayed,
202            });
203            let subscriber = self.subscriber.clone();
204            JoinTask::new(async move {
205                let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
206                let prepared = subscriber.prepare_block(&cx).await;
207                (cx, prepared)
208            })
209        };
210
211        // Start downloading shard blocks
212        let mut shard_heights = FastHashMap::default();
213        let mut download_futures = FuturesUnordered::new();
214        for entry in custom.shards.latest_blocks() {
215            let top_block_id = entry?;
216            shard_heights.insert(top_block_id.shard, top_block_id.seqno);
217            download_futures.push(Box::pin(
218                self.download_shard_blocks(mc_block_id, top_block_id),
219            ));
220        }
221
222        // Start processing shard blocks in parallel
223        let mut process_futures = FuturesUnordered::new();
224        while let Some(blocks) = download_futures.next().await.transpose()? {
225            process_futures.push(Box::pin(self.process_shard_blocks(
226                &mc_block_id,
227                is_key_block,
228                blocks,
229            )));
230        }
231        metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
232
233        // Wait for all shard blocks to be processed
234        while process_futures.next().await.transpose()?.is_some() {}
235        metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
236
237        // Finally handle the masterchain block
238        let delayed_handle = delayed_handle.spawn();
239        let (cx, prepared) = prepared_master.await;
240
241        let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
242        self.subscriber.handle_block(&cx, prepared?).await?;
243
244        // Join delayed tasks after all processing is done.
245        delayed_handle.join().await?;
246
247        // Commit only when everything is ok.
248        let shard_heights = ShardHeights::from(shard_heights);
249        self.state.commit_master(CommitMasterBlock {
250            block_id: &mc_block_id,
251            is_key_block,
252            shard_heights: &shard_heights,
253        });
254
255        Ok(())
256    }
257
258    /// Downloads blocks for the single shard in descending order starting from the top block.
259    async fn download_shard_blocks(
260        &self,
261        mc_block_id: BlockId,
262        mut top_block_id: BlockId,
263    ) -> Result<Vec<BlockStuffAug>> {
264        const MAX_DEPTH: u32 = 32;
265
266        tracing::debug!(
267            mc_block_id = %mc_block_id.as_short_id(),
268            %top_block_id,
269            "downloading shard blocks"
270        );
271
272        let mut depth = 0;
273        let mut result = Vec::new();
274        while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
275            // Download block
276            let block = {
277                let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
278
279                let block = self
280                    .fetch_block(&top_block_id.relative_to(mc_block_id))
281                    .await?;
282                tracing::debug!(
283                    mc_block_id = %mc_block_id.as_short_id(),
284                    block_id = %top_block_id,
285                    "fetched shard block",
286                );
287                debug_assert_eq!(block.id(), &top_block_id);
288
289                block
290            };
291
292            // Parse info in advance to make borrow checker happy
293            let info = block.data.load_info()?;
294
295            // Add new block to result
296            result.push(block.clone());
297
298            // Process block refs
299            if info.after_split || info.after_merge {
300                // Blocks after split or merge are always the first blocks after
301                // the previous master block
302                break;
303            }
304
305            match info.load_prev_ref()? {
306                PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
307                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
308            }
309
310            depth += 1;
311            if depth >= MAX_DEPTH {
312                anyhow::bail!("max depth reached");
313            }
314        }
315
316        Ok(result)
317    }
318
319    async fn process_shard_blocks(
320        &self,
321        mc_block_id: &BlockId,
322        mc_is_key_block: bool,
323        mut blocks: Vec<BlockStuffAug>,
324    ) -> Result<()> {
325        let start_preparing_block = |block: BlockStuffAug| {
326            let (delayed_handle, delayed) = DelayedTasks::new();
327
328            let cx = Box::new(BlockSubscriberContext {
329                mc_block_id: *mc_block_id,
330                mc_is_key_block,
331                is_key_block: false,
332                block: block.data,
333                archive_data: block.archive_data,
334                delayed,
335            });
336            let subscriber = self.subscriber.clone();
337            JoinTask::new(async move {
338                let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
339
340                let prepared = subscriber.prepare_block(&cx).await;
341                (delayed_handle, cx, prepared)
342            })
343        };
344
345        let mut prepare_task = blocks.pop().map(start_preparing_block);
346
347        while let Some(prepared) = prepare_task.take() {
348            let (delayed_handle, cx, prepared) = prepared.await;
349            let delayed_handle = delayed_handle.spawn();
350
351            prepare_task = blocks.pop().map(start_preparing_block);
352
353            let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
354
355            // Handle block by subscribers chain.
356            self.subscriber.handle_block(&cx, prepared?).await?;
357
358            // Join delayed tasks after all processing is done.
359            delayed_handle.join().await?;
360
361            // Commit only when everything is ok.
362            self.state.commit_shard(CommitShardBlock {
363                block_id: cx.block.id(),
364            });
365        }
366
367        Ok(())
368    }
369
370    fn fetch_next_master_block(
371        &self,
372        prev_block_id: &BlockId,
373    ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
374        let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
375
376        tracing::debug!(%prev_block_id, "fetching next master block");
377
378        let provider = self.provider.clone();
379        let prev_block_id = *prev_block_id;
380        async move {
381            let res = provider.get_next_block(&prev_block_id).await?;
382            Some(res.with_context(|| {
383                format!(
384                    "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
385                )
386            }))
387        }
388    }
389
390    async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
391        match self.provider.get_block(block_id_relation).await {
392            Some(Ok(block)) => Ok(block),
393            Some(Err(e)) => {
394                anyhow::bail!(
395                    "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
396                )
397            }
398            None => {
399                anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
400            }
401        }
402    }
403}