1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9 BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19 ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
20 ArchiveResponse, ArchiveWriter, BlockProvider, BlockProviderExt, BlockchainBlockProvider,
21 BlockchainBlockProviderConfig, BoxBlockProvider, ChainBlockProvider, CheckProof,
22 CycleBlockProvider, EmptyBlockProvider, FoundArchive, HybridArchiveClient, IntoArchiveClient,
23 OptionalBlockStuff, ProofChecker, RetryBlockProvider, RetryConfig, StorageBlockProvider,
24};
25pub use self::starter::{
26 ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
27 ValidateQueueState, ZerostateProvider,
28};
29pub use self::state::{
30 BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
31 TempBlockStriderState,
32};
33pub use self::state_applier::ShardStateApplier;
34#[cfg(any(test, feature = "test"))]
35pub use self::subscriber::test::PrintSubscriber;
36pub use self::subscriber::{
37 ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
38 BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
39 DelayedTasksJoinHandle, DelayedTasksSpawner, GcSubscriber, ManualGcTrigger, MetricsSubscriber,
40 NoopSubscriber, PsSubscriber, StateSubscriber, StateSubscriberContext, StateSubscriberExt,
41};
42use crate::storage::CoreStorage;
43
44mod archive_handler;
45mod block_saver;
46mod provider;
47mod starter;
48mod state;
49mod state_applier;
50mod subscriber;
51
52pub struct BlockStriderBuilder<T, P, B> {
53 state: T,
54 provider: P,
55 subscriber: B,
56}
57
58impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
59 #[inline]
60 pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
61 BlockStriderBuilder {
62 state,
63 provider: self.provider,
64 subscriber: self.subscriber,
65 }
66 }
67}
68
69impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
70 #[inline]
71 pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
72 BlockStriderBuilder {
73 state: self.state,
74 provider,
75 subscriber: self.subscriber,
76 }
77 }
78}
79
80impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
81 #[inline]
82 pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
83 where
84 B: BlockSubscriber,
85 {
86 BlockStriderBuilder {
87 state: self.state,
88 provider: self.provider,
89 subscriber,
90 }
91 }
92}
93
94impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
95 pub fn with_state_subscriber<S>(
96 self,
97 storage: CoreStorage,
98 state_subscriber: S,
99 ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
100 where
101 S: StateSubscriber,
102 {
103 BlockStriderBuilder {
104 state: self.state,
105 provider: self.provider,
106 subscriber: ShardStateApplier::new(storage, state_subscriber),
107 }
108 }
109}
110
111impl<T, P, B> BlockStriderBuilder<T, P, B>
112where
113 T: BlockStriderState,
114 P: BlockProvider,
115 B: BlockSubscriber,
116{
117 pub fn build(self) -> BlockStrider<T, P, B> {
118 BlockStrider {
119 state: self.state,
120 provider: Arc::new(self.provider),
121 subscriber: Arc::new(self.subscriber),
122 }
123 }
124}
125
126pub struct BlockStrider<T, P, B> {
127 state: T,
128 provider: Arc<P>,
129 subscriber: Arc<B>,
130}
131
132impl BlockStrider<(), (), ()> {
133 pub fn builder() -> BlockStriderBuilder<(), (), ()> {
134 BlockStriderBuilder {
135 state: (),
136 provider: (),
137 subscriber: (),
138 }
139 }
140}
141
142impl<S, P, B> BlockStrider<S, P, B>
143where
144 S: BlockStriderState,
145 P: BlockProvider,
146 B: BlockSubscriber,
147{
148 pub async fn run(self) -> Result<()> {
152 tracing::info!("block strider loop started");
153
154 let mut next_master_fut =
155 JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
156
157 while let Some(next) = next_master_fut.await.transpose()? {
158 next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
164
165 let mc_seqno = next.id().seqno;
166
167 {
168 let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
169 self.process_mc_block(next.data, next.archive_data).await?;
170 }
171
172 {
173 let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
174 self.provider.cleanup_until(mc_seqno).await?;
175 }
176 }
177
178 tracing::info!("block strider loop finished");
179 Ok(())
180 }
181
182 async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
184 let mc_block_id = *block.id();
185 tracing::debug!(%mc_block_id, "processing masterchain block");
186
187 let started_at = Instant::now();
188
189 let custom = block.load_custom()?;
190 let is_key_block = custom.config.is_some();
191
192 let (delayed_handle, delayed) = DelayedTasks::new();
194 let prepared_master = {
195 let cx = Box::new(BlockSubscriberContext {
196 mc_block_id,
197 mc_is_key_block: is_key_block,
198 is_key_block,
199 block: block.clone(),
200 archive_data,
201 delayed,
202 });
203 let subscriber = self.subscriber.clone();
204 JoinTask::new(async move {
205 let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
206 let prepared = subscriber.prepare_block(&cx).await;
207 (cx, prepared)
208 })
209 };
210
211 let mut shard_heights = FastHashMap::default();
213 let mut download_futures = FuturesUnordered::new();
214 for entry in custom.shards.latest_blocks() {
215 let top_block_id = entry?;
216 shard_heights.insert(top_block_id.shard, top_block_id.seqno);
217 download_futures.push(Box::pin(
218 self.download_shard_blocks(mc_block_id, top_block_id),
219 ));
220 }
221
222 let mut process_futures = FuturesUnordered::new();
224 while let Some(blocks) = download_futures.next().await.transpose()? {
225 process_futures.push(Box::pin(self.process_shard_blocks(
226 &mc_block_id,
227 is_key_block,
228 blocks,
229 )));
230 }
231 metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
232
233 while process_futures.next().await.transpose()?.is_some() {}
235 metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
236
237 let delayed_handle = delayed_handle.spawn();
239 let (cx, prepared) = prepared_master.await;
240
241 let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
242 self.subscriber.handle_block(&cx, prepared?).await?;
243
244 delayed_handle.join().await?;
246
247 let shard_heights = ShardHeights::from(shard_heights);
249 self.state.commit_master(CommitMasterBlock {
250 block_id: &mc_block_id,
251 is_key_block,
252 shard_heights: &shard_heights,
253 });
254
255 Ok(())
256 }
257
258 async fn download_shard_blocks(
260 &self,
261 mc_block_id: BlockId,
262 mut top_block_id: BlockId,
263 ) -> Result<Vec<BlockStuffAug>> {
264 const MAX_DEPTH: u32 = 32;
265
266 tracing::debug!(
267 mc_block_id = %mc_block_id.as_short_id(),
268 %top_block_id,
269 "downloading shard blocks"
270 );
271
272 let mut depth = 0;
273 let mut result = Vec::new();
274 while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
275 let block = {
277 let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
278
279 let block = self
280 .fetch_block(&top_block_id.relative_to(mc_block_id))
281 .await?;
282 tracing::debug!(
283 mc_block_id = %mc_block_id.as_short_id(),
284 block_id = %top_block_id,
285 "fetched shard block",
286 );
287 debug_assert_eq!(block.id(), &top_block_id);
288
289 block
290 };
291
292 let info = block.data.load_info()?;
294
295 result.push(block.clone());
297
298 if info.after_split || info.after_merge {
300 break;
303 }
304
305 match info.load_prev_ref()? {
306 PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
307 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
308 }
309
310 depth += 1;
311 if depth >= MAX_DEPTH {
312 anyhow::bail!("max depth reached");
313 }
314 }
315
316 Ok(result)
317 }
318
319 async fn process_shard_blocks(
320 &self,
321 mc_block_id: &BlockId,
322 mc_is_key_block: bool,
323 mut blocks: Vec<BlockStuffAug>,
324 ) -> Result<()> {
325 let start_preparing_block = |block: BlockStuffAug| {
326 let (delayed_handle, delayed) = DelayedTasks::new();
327
328 let cx = Box::new(BlockSubscriberContext {
329 mc_block_id: *mc_block_id,
330 mc_is_key_block,
331 is_key_block: false,
332 block: block.data,
333 archive_data: block.archive_data,
334 delayed,
335 });
336 let subscriber = self.subscriber.clone();
337 JoinTask::new(async move {
338 let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
339
340 let prepared = subscriber.prepare_block(&cx).await;
341 (delayed_handle, cx, prepared)
342 })
343 };
344
345 let mut prepare_task = blocks.pop().map(start_preparing_block);
346
347 while let Some(prepared) = prepare_task.take() {
348 let (delayed_handle, cx, prepared) = prepared.await;
349 let delayed_handle = delayed_handle.spawn();
350
351 prepare_task = blocks.pop().map(start_preparing_block);
352
353 let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
354
355 self.subscriber.handle_block(&cx, prepared?).await?;
357
358 delayed_handle.join().await?;
360
361 self.state.commit_shard(CommitShardBlock {
363 block_id: cx.block.id(),
364 });
365 }
366
367 Ok(())
368 }
369
370 fn fetch_next_master_block(
371 &self,
372 prev_block_id: &BlockId,
373 ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
374 let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
375
376 tracing::debug!(%prev_block_id, "fetching next master block");
377
378 let provider = self.provider.clone();
379 let prev_block_id = *prev_block_id;
380 async move {
381 let res = provider.get_next_block(&prev_block_id).await?;
382 Some(res.with_context(|| {
383 format!(
384 "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
385 )
386 }))
387 }
388 }
389
390 async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
391 match self.provider.get_block(block_id_relation).await {
392 Some(Ok(block)) => Ok(block),
393 Some(Err(e)) => {
394 anyhow::bail!(
395 "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
396 )
397 }
398 None => {
399 anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
400 }
401 }
402 }
403}