Skip to main content

tycho_core/block_strider/
mod.rs

1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9    BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19    ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
20    ArchiveResponse, BlockProvider, BlockProviderExt, BlockchainBlockProvider,
21    BlockchainBlockProviderConfig, BoxBlockProvider, ChainBlockProvider, CheckProof,
22    CycleBlockProvider, EmptyBlockProvider, FoundArchive, HybridArchiveClient, IntoArchiveClient,
23    OptionalBlockStuff, ProofChecker, RetryBlockProvider, RetryConfig, StorageBlockProvider,
24};
25pub use self::starter::{
26    ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
27    ValidateQueueState, ZerostateProvider,
28};
29use self::state::UpdateGcState;
30pub use self::state::{
31    BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
32    TempBlockStriderState,
33};
34pub use self::state_applier::ShardStateApplier;
35#[cfg(any(test, feature = "test"))]
36pub use self::subscriber::test::PrintSubscriber;
37pub use self::subscriber::{
38    ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
39    BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
40    DelayedTasksJoinHandle, DelayedTasksSpawner, MetricsSubscriber, NoopSubscriber,
41    StateSubscriber, StateSubscriberContext, StateSubscriberExt,
42};
43use crate::storage::CoreStorage;
44
45mod archive_handler;
46mod block_saver;
47mod provider;
48mod starter;
49mod state;
50mod state_applier;
51mod subscriber;
52
53pub struct BlockStriderBuilder<T, P, B> {
54    state: T,
55    provider: P,
56    subscriber: B,
57}
58
59impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
60    #[inline]
61    pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
62        BlockStriderBuilder {
63            state,
64            provider: self.provider,
65            subscriber: self.subscriber,
66        }
67    }
68}
69
70impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
71    #[inline]
72    pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
73        BlockStriderBuilder {
74            state: self.state,
75            provider,
76            subscriber: self.subscriber,
77        }
78    }
79}
80
81impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
82    #[inline]
83    pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
84    where
85        B: BlockSubscriber,
86    {
87        BlockStriderBuilder {
88            state: self.state,
89            provider: self.provider,
90            subscriber,
91        }
92    }
93}
94
95impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
96    pub fn with_state_subscriber<S>(
97        self,
98        storage: CoreStorage,
99        state_subscriber: S,
100    ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
101    where
102        S: StateSubscriber,
103    {
104        BlockStriderBuilder {
105            state: self.state,
106            provider: self.provider,
107            subscriber: ShardStateApplier::new(storage, state_subscriber),
108        }
109    }
110}
111
112impl<T, P, B> BlockStriderBuilder<T, P, B>
113where
114    T: BlockStriderState,
115    P: BlockProvider,
116    B: BlockSubscriber,
117{
118    pub fn build(self) -> BlockStrider<T, P, B> {
119        BlockStrider {
120            state: self.state,
121            provider: Arc::new(self.provider),
122            subscriber: Arc::new(self.subscriber),
123        }
124    }
125}
126
127pub struct BlockStrider<T, P, B> {
128    state: T,
129    provider: Arc<P>,
130    subscriber: Arc<B>,
131}
132
133impl BlockStrider<(), (), ()> {
134    pub fn builder() -> BlockStriderBuilder<(), (), ()> {
135        BlockStriderBuilder {
136            state: (),
137            provider: (),
138            subscriber: (),
139        }
140    }
141}
142
143impl<S, P, B> BlockStrider<S, P, B>
144where
145    S: BlockStriderState,
146    P: BlockProvider,
147    B: BlockSubscriber,
148{
149    /// Walks through blocks and handles them.
150    ///
151    /// Stops either when the provider is exhausted or it can't provide a requested block.
152    pub async fn run(self) -> Result<()> {
153        tracing::info!("block strider loop started");
154
155        let mut next_master_fut =
156            JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
157
158        while let Some(next) = next_master_fut.await.transpose()? {
159            // NOTE: Start fetching the next master block in parallel to the processing of the current one
160            // If we have a chain of providers, when switching to the next one, since blocks are processed
161            // asynchronously and in parallel with requesting the next block, the processing of the
162            // previous block may already use the new provider. Therefore, the next provider must
163            // necessarily store the previous block.
164            next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
165
166            let mc_seqno = next.id().seqno;
167
168            {
169                let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
170                self.process_mc_block(next.data, next.archive_data).await?;
171            }
172
173            {
174                let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
175                self.provider.cleanup_until(mc_seqno).await?;
176            }
177        }
178
179        tracing::info!("block strider loop finished");
180        Ok(())
181    }
182
183    /// Processes a single masterchain block and its shard blocks.
184    async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
185        let mc_block_id = *block.id();
186        tracing::debug!(%mc_block_id, "processing masterchain block");
187
188        let started_at = Instant::now();
189
190        let info = block.load_info()?;
191        metrics::gauge!("tycho_core_last_mc_block_utime").set(info.gen_utime);
192        metrics::gauge!("tycho_core_last_mc_block_seqno").set(mc_block_id.seqno);
193
194        let custom = block.load_custom()?;
195        let is_key_block = custom.config.is_some();
196
197        // Begin preparing master block in the background
198        let (delayed_handle, delayed) = DelayedTasks::new();
199        let prepared_master = {
200            let cx = Box::new(BlockSubscriberContext {
201                mc_block_id,
202                mc_is_key_block: is_key_block,
203                is_key_block,
204                block: block.clone(),
205                archive_data,
206                delayed,
207            });
208            let subscriber = self.subscriber.clone();
209            JoinTask::new(async move {
210                let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
211                let prepared = subscriber.prepare_block(&cx).await;
212                (cx, prepared)
213            })
214        };
215
216        // Start downloading shard blocks
217        let mut shard_heights = FastHashMap::default();
218        let mut download_futures = FuturesUnordered::new();
219        for entry in custom.shards.latest_blocks() {
220            let top_block_id = entry?;
221            shard_heights.insert(top_block_id.shard, top_block_id.seqno);
222            download_futures.push(Box::pin(
223                self.download_shard_blocks(mc_block_id, top_block_id),
224            ));
225        }
226
227        // Start processing shard blocks in parallel
228        let mut process_futures = FuturesUnordered::new();
229        while let Some(blocks) = download_futures.next().await.transpose()? {
230            process_futures.push(Box::pin(self.process_shard_blocks(
231                &mc_block_id,
232                is_key_block,
233                blocks,
234            )));
235        }
236        metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
237
238        // Wait for all shard blocks to be processed
239        while process_futures.next().await.transpose()?.is_some() {}
240        metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
241
242        // Finally handle the masterchain block
243        let delayed_handle = delayed_handle.spawn();
244        let (cx, prepared) = prepared_master.await;
245
246        let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
247        self.subscriber.handle_block(&cx, prepared?).await?;
248
249        // Update GC state.
250        self.state.update_gc_state(UpdateGcState {
251            mc_block_id: &cx.mc_block_id,
252            mc_is_key_block: cx.is_key_block,
253            is_key_block: cx.is_key_block,
254            block: &cx.block,
255        })?;
256
257        // Join delayed tasks after all processing is done.
258        delayed_handle.join().await?;
259
260        // Commit only when everything is ok.
261        let shard_heights = ShardHeights::from(shard_heights);
262        self.state.commit_master(CommitMasterBlock {
263            block_id: &mc_block_id,
264            is_key_block,
265            shard_heights: &shard_heights,
266        });
267
268        // Update "applied" only after commit
269        metrics::gauge!("tycho_core_last_mc_block_applied")
270            .set(tycho_util::time::now_millis() as f64 / 1000.0);
271
272        Ok(())
273    }
274
275    /// Downloads blocks for the single shard in descending order starting from the top block.
276    async fn download_shard_blocks(
277        &self,
278        mc_block_id: BlockId,
279        mut top_block_id: BlockId,
280    ) -> Result<Vec<BlockStuffAug>> {
281        const MAX_DEPTH: u32 = 32;
282
283        tracing::debug!(
284            mc_block_id = %mc_block_id.as_short_id(),
285            %top_block_id,
286            "downloading shard blocks"
287        );
288
289        let mut depth = 0;
290        let mut result = Vec::new();
291        while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
292            // Download block
293            let block = {
294                let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
295
296                let block = self
297                    .fetch_block(&top_block_id.relative_to(mc_block_id))
298                    .await?;
299                tracing::debug!(
300                    mc_block_id = %mc_block_id.as_short_id(),
301                    block_id = %top_block_id,
302                    "fetched shard block",
303                );
304                debug_assert_eq!(block.id(), &top_block_id);
305
306                block
307            };
308
309            // Parse info in advance to make borrow checker happy
310            let info = block.data.load_info()?;
311
312            // Add new block to result
313            result.push(block.clone());
314
315            // Process block refs
316            if info.after_split || info.after_merge {
317                // Blocks after split or merge are always the first blocks after
318                // the previous master block
319                break;
320            }
321
322            match info.load_prev_ref()? {
323                PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
324                PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
325            }
326
327            depth += 1;
328            if depth >= MAX_DEPTH {
329                anyhow::bail!("max depth reached");
330            }
331        }
332
333        Ok(result)
334    }
335
336    async fn process_shard_blocks(
337        &self,
338        mc_block_id: &BlockId,
339        mc_is_key_block: bool,
340        mut blocks: Vec<BlockStuffAug>,
341    ) -> Result<()> {
342        let start_preparing_block = |block: BlockStuffAug| {
343            let (delayed_handle, delayed) = DelayedTasks::new();
344
345            let cx = Box::new(BlockSubscriberContext {
346                mc_block_id: *mc_block_id,
347                mc_is_key_block,
348                is_key_block: false,
349                block: block.data,
350                archive_data: block.archive_data,
351                delayed,
352            });
353            let subscriber = self.subscriber.clone();
354            JoinTask::new(async move {
355                let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
356
357                let prepared = subscriber.prepare_block(&cx).await;
358                (delayed_handle, cx, prepared)
359            })
360        };
361
362        let mut prepare_task = blocks.pop().map(start_preparing_block);
363
364        while let Some(prepared) = prepare_task.take() {
365            let (delayed_handle, cx, prepared) = prepared.await;
366            let delayed_handle = delayed_handle.spawn();
367
368            prepare_task = blocks.pop().map(start_preparing_block);
369
370            let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
371
372            // Handle block by subscribers chain.
373            self.subscriber.handle_block(&cx, prepared?).await?;
374
375            // Update GC state.
376            self.state.update_gc_state(UpdateGcState {
377                mc_block_id,
378                mc_is_key_block,
379                is_key_block: cx.is_key_block,
380                block: &cx.block,
381            })?;
382
383            // Join delayed tasks after all processing is done.
384            delayed_handle.join().await?;
385
386            // Commit only when everything is ok.
387            self.state.commit_shard(CommitShardBlock {
388                block_id: cx.block.id(),
389            });
390        }
391
392        Ok(())
393    }
394
395    fn fetch_next_master_block(
396        &self,
397        prev_block_id: &BlockId,
398    ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
399        let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
400
401        tracing::debug!(%prev_block_id, "fetching next master block");
402
403        let provider = self.provider.clone();
404        let prev_block_id = *prev_block_id;
405        async move {
406            let res = provider.get_next_block(&prev_block_id).await?;
407            Some(res.with_context(|| {
408                format!(
409                    "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
410                )
411            }))
412        }
413    }
414
415    async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
416        match self.provider.get_block(block_id_relation).await {
417            Some(Ok(block)) => Ok(block),
418            Some(Err(e)) => {
419                anyhow::bail!(
420                    "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
421                )
422            }
423            None => {
424                anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
425            }
426        }
427    }
428}