tycho_core/block_strider/
mod.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9    BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19    ArchiveBlockProvider, ArchiveBlockProviderConfig, BlockProvider, BlockProviderExt,
20    BlockchainBlockProvider, BlockchainBlockProviderConfig, ChainBlockProvider, CheckProof,
21    EmptyBlockProvider, OptionalBlockStuff, ProofChecker, RetryConfig, StorageBlockProvider,
22};
23pub use self::starter::{
24    ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
25    ZerostateProvider,
26};
27pub use self::state::{
28    BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
29    TempBlockStriderState,
30};
31pub use self::state_applier::ShardStateApplier;
32#[cfg(any(test, feature = "test"))]
33pub use self::subscriber::test::PrintSubscriber;
34pub use self::subscriber::{
35    ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
36    BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
37    DelayedTasksJoinHandle, DelayedTasksSpawner, GcSubscriber, ManualGcTrigger, MetricsSubscriber,
38    NoopSubscriber, PsSubscriber, StateSubscriber, StateSubscriberContext, StateSubscriberExt,
39};
40use crate::storage::CoreStorage;
41
42mod archive_handler;
43mod block_saver;
44mod provider;
45mod starter;
46mod state;
47mod state_applier;
48mod subscriber;
49
50pub struct BlockStriderBuilder<T, P, B> {
51    state: T,
52    provider: P,
53    subscriber: B,
54}
55
56impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
57    #[inline]
58    pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
59        BlockStriderBuilder {
60            state,
61            provider: self.provider,
62            subscriber: self.subscriber,
63        }
64    }
65}
66
67impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
68    #[inline]
69    pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
70        BlockStriderBuilder {
71            state: self.state,
72            provider,
73            subscriber: self.subscriber,
74        }
75    }
76}
77
78impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
79    #[inline]
80    pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
81    where
82        B: BlockSubscriber,
83    {
84        BlockStriderBuilder {
85            state: self.state,
86            provider: self.provider,
87            subscriber,
88        }
89    }
90}
91
92impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
93    pub fn with_state_subscriber<S>(
94        self,
95        storage: CoreStorage,
96        state_subscriber: S,
97    ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
98    where
99        S: StateSubscriber,
100    {
101        BlockStriderBuilder {
102            state: self.state,
103            provider: self.provider,
104            subscriber: ShardStateApplier::new(storage, state_subscriber),
105        }
106    }
107}
108
109impl<T, P, B> BlockStriderBuilder<T, P, B>
110where
111    T: BlockStriderState,
112    P: BlockProvider,
113    B: BlockSubscriber,
114{
115    pub fn build(self) -> BlockStrider<T, P, B> {
116        BlockStrider {
117            state: self.state,
118            provider: Arc::new(self.provider),
119            subscriber: Arc::new(self.subscriber),
120        }
121    }
122}
123
124pub struct BlockStrider<T, P, B> {
125    state: T,
126    provider: Arc<P>,
127    subscriber: Arc<B>,
128}
129
130impl BlockStrider<(), (), ()> {
131    pub fn builder() -> BlockStriderBuilder<(), (), ()> {
132        BlockStriderBuilder {
133            state: (),
134            provider: (),
135            subscriber: (),
136        }
137    }
138}
139
140impl<S, P, B> BlockStrider<S, P, B>
141where
142    S: BlockStriderState,
143    P: BlockProvider,
144    B: BlockSubscriber,
145{
146    /// Walks through blocks and handles them.
147    ///
148    /// Stops either when the provider is exhausted or it can't provide a requested block.
149    pub async fn run(self) -> Result<()> {
150        tracing::info!("block strider loop started");
151
152        let mut next_master_fut =
153            JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
154
155        while let Some(next) = next_master_fut.await.transpose()? {
156            // NOTE: Start fetching the next master block in parallel to the processing of the current one
157            // If we have a chain of providers, when switching to the next one, since blocks are processed
158            // asynchronously and in parallel with requesting the next block, the processing of the
159            // previous block may already use the new provider. Therefore, the next provider must
160            // necessarily store the previous block.
161            next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
162
163            let mc_seqno = next.id().seqno;
164
165            {
166                let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
167                self.process_mc_block(next.data, next.archive_data).await?;
168            }
169
170            {
171                let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
172                self.provider.cleanup_until(mc_seqno).await?;
173            }
174        }
175
176        tracing::info!("block strider loop finished");
177        Ok(())
178    }
179
180    /// Processes a single masterchain block and its shard blocks.
181    async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
182        let mc_block_id = *block.id();
183        tracing::debug!(%mc_block_id, "processing masterchain block");
184
185        let started_at = Instant::now();
186
187        let custom = block.load_custom()?;
188        let is_key_block = custom.config.is_some();
189
190        // Begin preparing master block in the background
191        let (delayed_handle, delayed) = DelayedTasks::new();
192        let prepared_master = {
193            let cx = Box::new(BlockSubscriberContext {
194                mc_block_id,
195                mc_is_key_block: is_key_block,
196                is_key_block,
197                block: block.clone(),
198                archive_data,
199                delayed,
200            });
201            let subscriber = self.subscriber.clone();
202            JoinTask::new(async move {
203                let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
204                let prepared = subscriber.prepare_block(&cx).await;
205                (cx, prepared)
206            })
207        };
208
209        // Start downloading shard blocks
210        let mut shard_heights = FastHashMap::default();
211        let mut download_futures = FuturesUnordered::new();
212        for entry in custom.shards.latest_blocks() {
213            let top_block_id = entry?;
214            shard_heights.insert(top_block_id.shard, top_block_id.seqno);
215            download_futures.push(Box::pin(
216                self.download_shard_blocks(mc_block_id, top_block_id),
217            ));
218        }
219
220        // Start processing shard blocks in parallel
221        let mut process_futures = FuturesUnordered::new();
222        while let Some(blocks) = download_futures.next().await.transpose()? {
223            process_futures.push(Box::pin(self.process_shard_blocks(
224                &mc_block_id,
225                is_key_block,
226                blocks,
227            )));
228        }
229        metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
230
231        // Wait for all shard blocks to be processed
232        while process_futures.next().await.transpose()?.is_some() {}
233        metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
234
235        // Finally handle the masterchain block
236        let delayed_handle = delayed_handle.spawn();
237        let (cx, prepared) = prepared_master.await;
238
239        let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
240        self.subscriber.handle_block(&cx, prepared?).await?;
241
242        // Join delayed tasks after all processing is done.
243        delayed_handle.join().await?;
244
245        // Commit only when everything is ok.
246        let shard_heights = ShardHeights::from(shard_heights);
247        self.state.commit_master(CommitMasterBlock {
248            block_id: &mc_block_id,
249            is_key_block,
250            shard_heights: &shard_heights,
251        });
252
253        Ok(())
254    }
255
256    /// Downloads blocks for the single shard in descending order starting from the top block.
257    async fn download_shard_blocks(
258        &self,
259        mc_block_id: BlockId,
260        mut top_block_id: BlockId,
261    ) -> Result<Vec<BlockStuffAug>> {
262        const MAX_DEPTH: u32 = 32;
263
264        tracing::debug!(
265            mc_block_id = %mc_block_id.as_short_id(),
266            %top_block_id,
267            "downloading shard blocks"
268        );
269
270        let mut depth = 0;
271        let mut result = Vec::new();
272        while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
273            // Download block
274            let block = {
275                let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
276
277                let block = self
278                    .fetch_block(&top_block_id.relative_to(mc_block_id))
279                    .await?;
280                tracing::debug!(
281                    mc_block_id = %mc_block_id.as_short_id(),
282                    block_id = %top_block_id,
283                    "fetched shard block",
284                );
285                debug_assert_eq!(block.id(), &top_block_id);
286
287                block
288            };
289
290            // Parse info in advance to make borrow checker happy
291            let info = block.data.load_info()?;
292
293            // Add new block to result
294            result.push(block.clone());
295
296            // Process block refs
297            if info.after_split || info.after_merge {
298                // Blocks after split or merge are always the first blocks after
299                // the previous master block
300                break;
301            }
302
303            match info.load_prev_ref()? {
304                PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
305                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
306            }
307
308            depth += 1;
309            if depth >= MAX_DEPTH {
310                anyhow::bail!("max depth reached");
311            }
312        }
313
314        Ok(result)
315    }
316
317    async fn process_shard_blocks(
318        &self,
319        mc_block_id: &BlockId,
320        mc_is_key_block: bool,
321        mut blocks: Vec<BlockStuffAug>,
322    ) -> Result<()> {
323        let start_preparing_block = |block: BlockStuffAug| {
324            let (delayed_handle, delayed) = DelayedTasks::new();
325
326            let cx = Box::new(BlockSubscriberContext {
327                mc_block_id: *mc_block_id,
328                mc_is_key_block,
329                is_key_block: false,
330                block: block.data,
331                archive_data: block.archive_data,
332                delayed,
333            });
334            let subscriber = self.subscriber.clone();
335            JoinTask::new(async move {
336                let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
337
338                let prepared = subscriber.prepare_block(&cx).await;
339                (delayed_handle, cx, prepared)
340            })
341        };
342
343        let mut prepare_task = blocks.pop().map(start_preparing_block);
344
345        while let Some(prepared) = prepare_task.take() {
346            let (delayed_handle, cx, prepared) = prepared.await;
347            let delayed_handle = delayed_handle.spawn();
348
349            prepare_task = blocks.pop().map(start_preparing_block);
350
351            let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
352
353            // Handle block by subscribers chain.
354            self.subscriber.handle_block(&cx, prepared?).await?;
355
356            // Join delayed tasks after all processing is done.
357            delayed_handle.join().await?;
358
359            // Commit only when everything is ok.
360            self.state.commit_shard(CommitShardBlock {
361                block_id: cx.block.id(),
362            });
363        }
364
365        Ok(())
366    }
367
368    fn fetch_next_master_block(
369        &self,
370        prev_block_id: &BlockId,
371    ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
372        let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
373
374        tracing::debug!(%prev_block_id, "fetching next master block");
375
376        let provider = self.provider.clone();
377        let prev_block_id = *prev_block_id;
378        async move {
379            let res = provider.get_next_block(&prev_block_id).await?;
380            Some(res.with_context(|| {
381                format!(
382                    "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
383                )
384            }))
385        }
386    }
387
388    async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
389        match self.provider.get_block(block_id_relation).await {
390            Some(Ok(block)) => Ok(block),
391            Some(Err(e)) => {
392                anyhow::bail!(
393                    "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
394                )
395            }
396            None => {
397                anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
398            }
399        }
400    }
401}