1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9 BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19 ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
20 ArchiveResponse, BlockProvider, BlockProviderExt, BlockchainBlockProvider,
21 BlockchainBlockProviderConfig, BoxBlockProvider, ChainBlockProvider, CheckProof,
22 CycleBlockProvider, EmptyBlockProvider, FoundArchive, HybridArchiveClient, IntoArchiveClient,
23 OptionalBlockStuff, ProofChecker, RetryBlockProvider, RetryConfig, StorageBlockProvider,
24};
25pub use self::starter::{
26 ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
27 ValidateQueueState, ZerostateProvider,
28};
29use self::state::UpdateGcState;
30pub use self::state::{
31 BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
32 TempBlockStriderState,
33};
34pub use self::state_applier::ShardStateApplier;
35#[cfg(any(test, feature = "test"))]
36pub use self::subscriber::test::PrintSubscriber;
37pub use self::subscriber::{
38 ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
39 BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
40 DelayedTasksJoinHandle, DelayedTasksSpawner, MetricsSubscriber, NoopSubscriber,
41 StateSubscriber, StateSubscriberContext, StateSubscriberExt,
42};
43use crate::storage::CoreStorage;
44
45mod archive_handler;
46mod block_saver;
47mod provider;
48mod starter;
49mod state;
50mod state_applier;
51mod subscriber;
52
53pub struct BlockStriderBuilder<T, P, B> {
54 state: T,
55 provider: P,
56 subscriber: B,
57}
58
59impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
60 #[inline]
61 pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
62 BlockStriderBuilder {
63 state,
64 provider: self.provider,
65 subscriber: self.subscriber,
66 }
67 }
68}
69
70impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
71 #[inline]
72 pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
73 BlockStriderBuilder {
74 state: self.state,
75 provider,
76 subscriber: self.subscriber,
77 }
78 }
79}
80
81impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
82 #[inline]
83 pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
84 where
85 B: BlockSubscriber,
86 {
87 BlockStriderBuilder {
88 state: self.state,
89 provider: self.provider,
90 subscriber,
91 }
92 }
93}
94
95impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
96 pub fn with_state_subscriber<S>(
97 self,
98 storage: CoreStorage,
99 state_subscriber: S,
100 ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
101 where
102 S: StateSubscriber,
103 {
104 BlockStriderBuilder {
105 state: self.state,
106 provider: self.provider,
107 subscriber: ShardStateApplier::new(storage, state_subscriber),
108 }
109 }
110}
111
112impl<T, P, B> BlockStriderBuilder<T, P, B>
113where
114 T: BlockStriderState,
115 P: BlockProvider,
116 B: BlockSubscriber,
117{
118 pub fn build(self) -> BlockStrider<T, P, B> {
119 BlockStrider {
120 state: self.state,
121 provider: Arc::new(self.provider),
122 subscriber: Arc::new(self.subscriber),
123 }
124 }
125}
126
127pub struct BlockStrider<T, P, B> {
128 state: T,
129 provider: Arc<P>,
130 subscriber: Arc<B>,
131}
132
133impl BlockStrider<(), (), ()> {
134 pub fn builder() -> BlockStriderBuilder<(), (), ()> {
135 BlockStriderBuilder {
136 state: (),
137 provider: (),
138 subscriber: (),
139 }
140 }
141}
142
143impl<S, P, B> BlockStrider<S, P, B>
144where
145 S: BlockStriderState,
146 P: BlockProvider,
147 B: BlockSubscriber,
148{
149 pub async fn run(self) -> Result<()> {
153 tracing::info!("block strider loop started");
154
155 let mut next_master_fut =
156 JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
157
158 while let Some(next) = next_master_fut.await.transpose()? {
159 next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
165
166 let mc_seqno = next.id().seqno;
167
168 {
169 let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
170 self.process_mc_block(next.data, next.archive_data).await?;
171 }
172
173 {
174 let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
175 self.provider.cleanup_until(mc_seqno).await?;
176 }
177 }
178
179 tracing::info!("block strider loop finished");
180 Ok(())
181 }
182
183 async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
185 let mc_block_id = *block.id();
186 tracing::debug!(%mc_block_id, "processing masterchain block");
187
188 let started_at = Instant::now();
189
190 let info = block.load_info()?;
191 metrics::gauge!("tycho_core_last_mc_block_utime").set(info.gen_utime);
192 metrics::gauge!("tycho_core_last_mc_block_seqno").set(mc_block_id.seqno);
193
194 let custom = block.load_custom()?;
195 let is_key_block = custom.config.is_some();
196
197 let (delayed_handle, delayed) = DelayedTasks::new();
199 let prepared_master = {
200 let cx = Box::new(BlockSubscriberContext {
201 mc_block_id,
202 mc_is_key_block: is_key_block,
203 is_key_block,
204 is_top_block: true,
205 block: block.clone(),
206 archive_data,
207 delayed,
208 });
209 let subscriber = self.subscriber.clone();
210 JoinTask::new(async move {
211 let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
212 let prepared = subscriber.prepare_block(&cx).await;
213 (cx, prepared)
214 })
215 };
216
217 let mut shard_heights = FastHashMap::default();
219 let mut download_futures = FuturesUnordered::new();
220 for entry in custom.shards.latest_blocks() {
221 let top_block_id = entry?;
222 shard_heights.insert(top_block_id.shard, top_block_id.seqno);
223 download_futures.push(Box::pin(
224 self.download_shard_blocks(mc_block_id, top_block_id),
225 ));
226 }
227
228 let shard_heights = ShardHeights::from(shard_heights);
229
230 let mut process_futures = FuturesUnordered::new();
232 while let Some(blocks) = download_futures.next().await.transpose()? {
233 process_futures.push(Box::pin(self.process_shard_blocks(
234 &mc_block_id,
235 is_key_block,
236 &shard_heights,
237 blocks,
238 )));
239 }
240 metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
241
242 while process_futures.next().await.transpose()?.is_some() {}
244 metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
245
246 let delayed_handle = delayed_handle.spawn();
248 let (cx, prepared) = prepared_master.await;
249
250 let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
251 self.subscriber.handle_block(&cx, prepared?).await?;
252
253 self.state.update_gc_state(UpdateGcState {
255 mc_block_id: &cx.mc_block_id,
256 mc_is_key_block: cx.is_key_block,
257 is_key_block: cx.is_key_block,
258 block: &cx.block,
259 })?;
260
261 delayed_handle.join().await?;
263
264 self.state.commit_master(CommitMasterBlock {
266 block_id: &mc_block_id,
267 is_key_block,
268 shard_heights: &shard_heights,
269 });
270
271 metrics::gauge!("tycho_core_last_mc_block_applied")
273 .set(tycho_util::time::now_millis() as f64 / 1000.0);
274
275 Ok(())
276 }
277
278 async fn download_shard_blocks(
280 &self,
281 mc_block_id: BlockId,
282 mut top_block_id: BlockId,
283 ) -> Result<Vec<BlockStuffAug>> {
284 const MAX_DEPTH: u32 = 32;
285
286 tracing::debug!(
287 mc_block_id = %mc_block_id.as_short_id(),
288 %top_block_id,
289 "downloading shard blocks"
290 );
291
292 let mut depth = 0;
293 let mut result = Vec::new();
294 while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
295 let block = {
297 let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
298
299 let block = self
300 .fetch_block(&top_block_id.relative_to(mc_block_id))
301 .await?;
302 tracing::debug!(
303 mc_block_id = %mc_block_id.as_short_id(),
304 block_id = %top_block_id,
305 "fetched shard block",
306 );
307 debug_assert_eq!(block.id(), &top_block_id);
308
309 block
310 };
311
312 let info = block.data.load_info()?;
314
315 result.push(block.clone());
317
318 if info.after_split || info.after_merge {
320 break;
323 }
324
325 match info.load_prev_ref()? {
326 PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
327 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
328 }
329
330 depth += 1;
331 if depth >= MAX_DEPTH {
332 anyhow::bail!("max depth reached");
333 }
334 }
335
336 Ok(result)
337 }
338
339 async fn process_shard_blocks(
340 &self,
341 mc_block_id: &BlockId,
342 mc_is_key_block: bool,
343 top_blocks: &ShardHeights,
344 mut blocks: Vec<BlockStuffAug>,
345 ) -> Result<()> {
346 let start_preparing_block = |block: BlockStuffAug| {
347 let (delayed_handle, delayed) = DelayedTasks::new();
348
349 let cx = Box::new(BlockSubscriberContext {
350 mc_block_id: *mc_block_id,
351 mc_is_key_block,
352 is_key_block: false,
353 is_top_block: top_blocks.contains(block.id()),
354 block: block.data,
355 archive_data: block.archive_data,
356 delayed,
357 });
358 let subscriber = self.subscriber.clone();
359 JoinTask::new(async move {
360 let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
361
362 let prepared = subscriber.prepare_block(&cx).await;
363 (delayed_handle, cx, prepared)
364 })
365 };
366
367 let mut prepare_task = blocks.pop().map(start_preparing_block);
368
369 while let Some(prepared) = prepare_task.take() {
370 let (delayed_handle, cx, prepared) = prepared.await;
371 let delayed_handle = delayed_handle.spawn();
372
373 prepare_task = blocks.pop().map(start_preparing_block);
374
375 let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
376
377 self.subscriber.handle_block(&cx, prepared?).await?;
379
380 self.state.update_gc_state(UpdateGcState {
382 mc_block_id,
383 mc_is_key_block,
384 is_key_block: cx.is_key_block,
385 block: &cx.block,
386 })?;
387
388 delayed_handle.join().await?;
390
391 self.state.commit_shard(CommitShardBlock {
393 block_id: cx.block.id(),
394 });
395 }
396
397 Ok(())
398 }
399
400 fn fetch_next_master_block(
401 &self,
402 prev_block_id: &BlockId,
403 ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
404 let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
405
406 tracing::debug!(%prev_block_id, "fetching next master block");
407
408 let provider = self.provider.clone();
409 let prev_block_id = *prev_block_id;
410 async move {
411 let res = provider.get_next_block(&prev_block_id).await?;
412 Some(res.with_context(|| {
413 format!(
414 "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
415 )
416 }))
417 }
418 }
419
420 async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
421 match self.provider.get_block(block_id_relation).await {
422 Some(Ok(block)) => Ok(block),
423 Some(Err(e)) => {
424 anyhow::bail!(
425 "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
426 )
427 }
428 None => {
429 anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
430 }
431 }
432 }
433}