1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9 BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19 ArchiveBlockProvider, ArchiveBlockProviderConfig, BlockProvider, BlockProviderExt,
20 BlockchainBlockProvider, BlockchainBlockProviderConfig, ChainBlockProvider, CheckProof,
21 EmptyBlockProvider, OptionalBlockStuff, ProofChecker, RetryConfig, StorageBlockProvider,
22};
23pub use self::starter::{
24 ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
25 ZerostateProvider,
26};
27pub use self::state::{
28 BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
29 TempBlockStriderState,
30};
31pub use self::state_applier::ShardStateApplier;
32#[cfg(any(test, feature = "test"))]
33pub use self::subscriber::test::PrintSubscriber;
34pub use self::subscriber::{
35 ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
36 BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
37 DelayedTasksJoinHandle, DelayedTasksSpawner, GcSubscriber, ManualGcTrigger, MetricsSubscriber,
38 NoopSubscriber, PsSubscriber, StateSubscriber, StateSubscriberContext, StateSubscriberExt,
39};
40use crate::storage::CoreStorage;
41
42mod archive_handler;
43mod block_saver;
44mod provider;
45mod starter;
46mod state;
47mod state_applier;
48mod subscriber;
49
50pub struct BlockStriderBuilder<T, P, B> {
51 state: T,
52 provider: P,
53 subscriber: B,
54}
55
56impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
57 #[inline]
58 pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
59 BlockStriderBuilder {
60 state,
61 provider: self.provider,
62 subscriber: self.subscriber,
63 }
64 }
65}
66
67impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
68 #[inline]
69 pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
70 BlockStriderBuilder {
71 state: self.state,
72 provider,
73 subscriber: self.subscriber,
74 }
75 }
76}
77
78impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
79 #[inline]
80 pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
81 where
82 B: BlockSubscriber,
83 {
84 BlockStriderBuilder {
85 state: self.state,
86 provider: self.provider,
87 subscriber,
88 }
89 }
90}
91
92impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
93 pub fn with_state_subscriber<S>(
94 self,
95 storage: CoreStorage,
96 state_subscriber: S,
97 ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
98 where
99 S: StateSubscriber,
100 {
101 BlockStriderBuilder {
102 state: self.state,
103 provider: self.provider,
104 subscriber: ShardStateApplier::new(storage, state_subscriber),
105 }
106 }
107}
108
109impl<T, P, B> BlockStriderBuilder<T, P, B>
110where
111 T: BlockStriderState,
112 P: BlockProvider,
113 B: BlockSubscriber,
114{
115 pub fn build(self) -> BlockStrider<T, P, B> {
116 BlockStrider {
117 state: self.state,
118 provider: Arc::new(self.provider),
119 subscriber: Arc::new(self.subscriber),
120 }
121 }
122}
123
124pub struct BlockStrider<T, P, B> {
125 state: T,
126 provider: Arc<P>,
127 subscriber: Arc<B>,
128}
129
130impl BlockStrider<(), (), ()> {
131 pub fn builder() -> BlockStriderBuilder<(), (), ()> {
132 BlockStriderBuilder {
133 state: (),
134 provider: (),
135 subscriber: (),
136 }
137 }
138}
139
140impl<S, P, B> BlockStrider<S, P, B>
141where
142 S: BlockStriderState,
143 P: BlockProvider,
144 B: BlockSubscriber,
145{
146 pub async fn run(self) -> Result<()> {
150 tracing::info!("block strider loop started");
151
152 let mut next_master_fut =
153 JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
154
155 while let Some(next) = next_master_fut.await.transpose()? {
156 next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
162
163 let mc_seqno = next.id().seqno;
164
165 {
166 let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
167 self.process_mc_block(next.data, next.archive_data).await?;
168 }
169
170 {
171 let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
172 self.provider.cleanup_until(mc_seqno).await?;
173 }
174 }
175
176 tracing::info!("block strider loop finished");
177 Ok(())
178 }
179
180 async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
182 let mc_block_id = *block.id();
183 tracing::debug!(%mc_block_id, "processing masterchain block");
184
185 let started_at = Instant::now();
186
187 let custom = block.load_custom()?;
188 let is_key_block = custom.config.is_some();
189
190 let (delayed_handle, delayed) = DelayedTasks::new();
192 let prepared_master = {
193 let cx = Box::new(BlockSubscriberContext {
194 mc_block_id,
195 mc_is_key_block: is_key_block,
196 is_key_block,
197 block: block.clone(),
198 archive_data,
199 delayed,
200 });
201 let subscriber = self.subscriber.clone();
202 JoinTask::new(async move {
203 let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
204 let prepared = subscriber.prepare_block(&cx).await;
205 (cx, prepared)
206 })
207 };
208
209 let mut shard_heights = FastHashMap::default();
211 let mut download_futures = FuturesUnordered::new();
212 for entry in custom.shards.latest_blocks() {
213 let top_block_id = entry?;
214 shard_heights.insert(top_block_id.shard, top_block_id.seqno);
215 download_futures.push(Box::pin(
216 self.download_shard_blocks(mc_block_id, top_block_id),
217 ));
218 }
219
220 let mut process_futures = FuturesUnordered::new();
222 while let Some(blocks) = download_futures.next().await.transpose()? {
223 process_futures.push(Box::pin(self.process_shard_blocks(
224 &mc_block_id,
225 is_key_block,
226 blocks,
227 )));
228 }
229 metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
230
231 while process_futures.next().await.transpose()?.is_some() {}
233 metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
234
235 let delayed_handle = delayed_handle.spawn();
237 let (cx, prepared) = prepared_master.await;
238
239 let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
240 self.subscriber.handle_block(&cx, prepared?).await?;
241
242 delayed_handle.join().await?;
244
245 let shard_heights = ShardHeights::from(shard_heights);
247 self.state.commit_master(CommitMasterBlock {
248 block_id: &mc_block_id,
249 is_key_block,
250 shard_heights: &shard_heights,
251 });
252
253 Ok(())
254 }
255
256 async fn download_shard_blocks(
258 &self,
259 mc_block_id: BlockId,
260 mut top_block_id: BlockId,
261 ) -> Result<Vec<BlockStuffAug>> {
262 const MAX_DEPTH: u32 = 32;
263
264 tracing::debug!(
265 mc_block_id = %mc_block_id.as_short_id(),
266 %top_block_id,
267 "downloading shard blocks"
268 );
269
270 let mut depth = 0;
271 let mut result = Vec::new();
272 while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
273 let block = {
275 let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
276
277 let block = self
278 .fetch_block(&top_block_id.relative_to(mc_block_id))
279 .await?;
280 tracing::debug!(
281 mc_block_id = %mc_block_id.as_short_id(),
282 block_id = %top_block_id,
283 "fetched shard block",
284 );
285 debug_assert_eq!(block.id(), &top_block_id);
286
287 block
288 };
289
290 let info = block.data.load_info()?;
292
293 result.push(block.clone());
295
296 if info.after_split || info.after_merge {
298 break;
301 }
302
303 match info.load_prev_ref()? {
304 PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
305 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
306 }
307
308 depth += 1;
309 if depth >= MAX_DEPTH {
310 anyhow::bail!("max depth reached");
311 }
312 }
313
314 Ok(result)
315 }
316
317 async fn process_shard_blocks(
318 &self,
319 mc_block_id: &BlockId,
320 mc_is_key_block: bool,
321 mut blocks: Vec<BlockStuffAug>,
322 ) -> Result<()> {
323 let start_preparing_block = |block: BlockStuffAug| {
324 let (delayed_handle, delayed) = DelayedTasks::new();
325
326 let cx = Box::new(BlockSubscriberContext {
327 mc_block_id: *mc_block_id,
328 mc_is_key_block,
329 is_key_block: false,
330 block: block.data,
331 archive_data: block.archive_data,
332 delayed,
333 });
334 let subscriber = self.subscriber.clone();
335 JoinTask::new(async move {
336 let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
337
338 let prepared = subscriber.prepare_block(&cx).await;
339 (delayed_handle, cx, prepared)
340 })
341 };
342
343 let mut prepare_task = blocks.pop().map(start_preparing_block);
344
345 while let Some(prepared) = prepare_task.take() {
346 let (delayed_handle, cx, prepared) = prepared.await;
347 let delayed_handle = delayed_handle.spawn();
348
349 prepare_task = blocks.pop().map(start_preparing_block);
350
351 let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
352
353 self.subscriber.handle_block(&cx, prepared?).await?;
355
356 delayed_handle.join().await?;
358
359 self.state.commit_shard(CommitShardBlock {
361 block_id: cx.block.id(),
362 });
363 }
364
365 Ok(())
366 }
367
368 fn fetch_next_master_block(
369 &self,
370 prev_block_id: &BlockId,
371 ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
372 let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
373
374 tracing::debug!(%prev_block_id, "fetching next master block");
375
376 let provider = self.provider.clone();
377 let prev_block_id = *prev_block_id;
378 async move {
379 let res = provider.get_next_block(&prev_block_id).await?;
380 Some(res.with_context(|| {
381 format!(
382 "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
383 )
384 }))
385 }
386 }
387
388 async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
389 match self.provider.get_block(block_id_relation).await {
390 Some(Ok(block)) => Ok(block),
391 Some(Err(e)) => {
392 anyhow::bail!(
393 "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
394 )
395 }
396 None => {
397 anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
398 }
399 }
400 }
401}