1use std::sync::Arc;
2
3use anyhow::{Context, Result};
4use futures_util::Future;
5use futures_util::stream::{FuturesUnordered, StreamExt};
6use tokio::time::Instant;
7use tycho_block_util::archive::ArchiveData;
8use tycho_block_util::block::{
9 BlockIdExt, BlockIdRelation, BlockStuff, BlockStuffAug, ShardHeights,
10};
11use tycho_types::models::{BlockId, PrevBlockRef};
12use tycho_util::FastHashMap;
13use tycho_util::futures::JoinTask;
14use tycho_util::metrics::HistogramGuard;
15
16pub use self::archive_handler::ArchiveHandler;
17pub use self::block_saver::BlockSaver;
18pub use self::provider::{
19 ArchiveBlockProvider, ArchiveBlockProviderConfig, ArchiveClient, ArchiveDownloadContext,
20 ArchiveResponse, BlockProvider, BlockProviderExt, BlockchainBlockProvider,
21 BlockchainBlockProviderConfig, BoxBlockProvider, ChainBlockProvider, CheckProof,
22 CycleBlockProvider, EmptyBlockProvider, FoundArchive, HybridArchiveClient, IntoArchiveClient,
23 OptionalBlockStuff, ProofChecker, RetryBlockProvider, RetryConfig, StorageBlockProvider,
24};
25pub use self::starter::{
26 ColdBootType, FileZerostateProvider, QueueStateHandler, Starter, StarterBuilder, StarterConfig,
27 ValidateQueueState, ZerostateProvider,
28};
29use self::state::UpdateGcState;
30pub use self::state::{
31 BlockStriderState, CommitMasterBlock, CommitShardBlock, PersistentBlockStriderState,
32 TempBlockStriderState,
33};
34pub use self::state_applier::ShardStateApplier;
35#[cfg(any(test, feature = "test"))]
36pub use self::subscriber::test::PrintSubscriber;
37pub use self::subscriber::{
38 ArchiveSubscriber, ArchiveSubscriberContext, ArchiveSubscriberExt, BlockSubscriber,
39 BlockSubscriberContext, BlockSubscriberExt, ChainSubscriber, DelayedTasks,
40 DelayedTasksJoinHandle, DelayedTasksSpawner, MetricsSubscriber, NoopSubscriber,
41 StateSubscriber, StateSubscriberContext, StateSubscriberExt,
42};
43use crate::storage::CoreStorage;
44
45mod archive_handler;
46mod block_saver;
47mod provider;
48mod starter;
49mod state;
50mod state_applier;
51mod subscriber;
52
53pub struct BlockStriderBuilder<T, P, B> {
54 state: T,
55 provider: P,
56 subscriber: B,
57}
58
59impl<T2, T3> BlockStriderBuilder<(), T2, T3> {
60 #[inline]
61 pub fn with_state<T: BlockStriderState>(self, state: T) -> BlockStriderBuilder<T, T2, T3> {
62 BlockStriderBuilder {
63 state,
64 provider: self.provider,
65 subscriber: self.subscriber,
66 }
67 }
68}
69
70impl<T1, T3> BlockStriderBuilder<T1, (), T3> {
71 #[inline]
72 pub fn with_provider<P: BlockProvider>(self, provider: P) -> BlockStriderBuilder<T1, P, T3> {
73 BlockStriderBuilder {
74 state: self.state,
75 provider,
76 subscriber: self.subscriber,
77 }
78 }
79}
80
81impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
82 #[inline]
83 pub fn with_block_subscriber<B>(self, subscriber: B) -> BlockStriderBuilder<T1, T2, B>
84 where
85 B: BlockSubscriber,
86 {
87 BlockStriderBuilder {
88 state: self.state,
89 provider: self.provider,
90 subscriber,
91 }
92 }
93}
94
95impl<T1, T2> BlockStriderBuilder<T1, T2, ()> {
96 pub fn with_state_subscriber<S>(
97 self,
98 storage: CoreStorage,
99 state_subscriber: S,
100 ) -> BlockStriderBuilder<T1, T2, ShardStateApplier<S>>
101 where
102 S: StateSubscriber,
103 {
104 BlockStriderBuilder {
105 state: self.state,
106 provider: self.provider,
107 subscriber: ShardStateApplier::new(storage, state_subscriber),
108 }
109 }
110}
111
112impl<T, P, B> BlockStriderBuilder<T, P, B>
113where
114 T: BlockStriderState,
115 P: BlockProvider,
116 B: BlockSubscriber,
117{
118 pub fn build(self) -> BlockStrider<T, P, B> {
119 BlockStrider {
120 state: self.state,
121 provider: Arc::new(self.provider),
122 subscriber: Arc::new(self.subscriber),
123 }
124 }
125}
126
127pub struct BlockStrider<T, P, B> {
128 state: T,
129 provider: Arc<P>,
130 subscriber: Arc<B>,
131}
132
133impl BlockStrider<(), (), ()> {
134 pub fn builder() -> BlockStriderBuilder<(), (), ()> {
135 BlockStriderBuilder {
136 state: (),
137 provider: (),
138 subscriber: (),
139 }
140 }
141}
142
143impl<S, P, B> BlockStrider<S, P, B>
144where
145 S: BlockStriderState,
146 P: BlockProvider,
147 B: BlockSubscriber,
148{
149 pub async fn run(self) -> Result<()> {
153 tracing::info!("block strider loop started");
154
155 let mut next_master_fut =
156 JoinTask::new(self.fetch_next_master_block(&self.state.load_last_mc_block_id()));
157
158 while let Some(next) = next_master_fut.await.transpose()? {
159 next_master_fut = JoinTask::new(self.fetch_next_master_block(next.id()));
165
166 let mc_seqno = next.id().seqno;
167
168 {
169 let _histogram = HistogramGuard::begin("tycho_core_process_strider_step_time");
170 self.process_mc_block(next.data, next.archive_data).await?;
171 }
172
173 {
174 let _histogram = HistogramGuard::begin("tycho_core_provider_cleanup_time");
175 self.provider.cleanup_until(mc_seqno).await?;
176 }
177 }
178
179 tracing::info!("block strider loop finished");
180 Ok(())
181 }
182
183 async fn process_mc_block(&self, block: BlockStuff, archive_data: ArchiveData) -> Result<()> {
185 let mc_block_id = *block.id();
186 tracing::debug!(%mc_block_id, "processing masterchain block");
187
188 let started_at = Instant::now();
189
190 let info = block.load_info()?;
191 metrics::gauge!("tycho_core_last_mc_block_utime").set(info.gen_utime);
192 metrics::gauge!("tycho_core_last_mc_block_seqno").set(mc_block_id.seqno);
193
194 let custom = block.load_custom()?;
195 let is_key_block = custom.config.is_some();
196
197 let (delayed_handle, delayed) = DelayedTasks::new();
199 let prepared_master = {
200 let cx = Box::new(BlockSubscriberContext {
201 mc_block_id,
202 mc_is_key_block: is_key_block,
203 is_key_block,
204 block: block.clone(),
205 archive_data,
206 delayed,
207 });
208 let subscriber = self.subscriber.clone();
209 JoinTask::new(async move {
210 let _histogram = HistogramGuard::begin("tycho_core_prepare_mc_block_time");
211 let prepared = subscriber.prepare_block(&cx).await;
212 (cx, prepared)
213 })
214 };
215
216 let mut shard_heights = FastHashMap::default();
218 let mut download_futures = FuturesUnordered::new();
219 for entry in custom.shards.latest_blocks() {
220 let top_block_id = entry?;
221 shard_heights.insert(top_block_id.shard, top_block_id.seqno);
222 download_futures.push(Box::pin(
223 self.download_shard_blocks(mc_block_id, top_block_id),
224 ));
225 }
226
227 let mut process_futures = FuturesUnordered::new();
229 while let Some(blocks) = download_futures.next().await.transpose()? {
230 process_futures.push(Box::pin(self.process_shard_blocks(
231 &mc_block_id,
232 is_key_block,
233 blocks,
234 )));
235 }
236 metrics::histogram!("tycho_core_download_sc_blocks_time").record(started_at.elapsed());
237
238 while process_futures.next().await.transpose()?.is_some() {}
240 metrics::histogram!("tycho_core_process_sc_blocks_time").record(started_at.elapsed());
241
242 let delayed_handle = delayed_handle.spawn();
244 let (cx, prepared) = prepared_master.await;
245
246 let _histogram = HistogramGuard::begin("tycho_core_process_mc_block_time");
247 self.subscriber.handle_block(&cx, prepared?).await?;
248
249 self.state.update_gc_state(UpdateGcState {
251 mc_block_id: &cx.mc_block_id,
252 mc_is_key_block: cx.is_key_block,
253 is_key_block: cx.is_key_block,
254 block: &cx.block,
255 })?;
256
257 delayed_handle.join().await?;
259
260 let shard_heights = ShardHeights::from(shard_heights);
262 self.state.commit_master(CommitMasterBlock {
263 block_id: &mc_block_id,
264 is_key_block,
265 shard_heights: &shard_heights,
266 });
267
268 metrics::gauge!("tycho_core_last_mc_block_applied")
270 .set(tycho_util::time::now_millis() as f64 / 1000.0);
271
272 Ok(())
273 }
274
275 async fn download_shard_blocks(
277 &self,
278 mc_block_id: BlockId,
279 mut top_block_id: BlockId,
280 ) -> Result<Vec<BlockStuffAug>> {
281 const MAX_DEPTH: u32 = 32;
282
283 tracing::debug!(
284 mc_block_id = %mc_block_id.as_short_id(),
285 %top_block_id,
286 "downloading shard blocks"
287 );
288
289 let mut depth = 0;
290 let mut result = Vec::new();
291 while top_block_id.seqno > 0 && !self.state.is_committed(&top_block_id) {
292 let block = {
294 let _histogram = HistogramGuard::begin("tycho_core_download_sc_block_time");
295
296 let block = self
297 .fetch_block(&top_block_id.relative_to(mc_block_id))
298 .await?;
299 tracing::debug!(
300 mc_block_id = %mc_block_id.as_short_id(),
301 block_id = %top_block_id,
302 "fetched shard block",
303 );
304 debug_assert_eq!(block.id(), &top_block_id);
305
306 block
307 };
308
309 let info = block.data.load_info()?;
311
312 result.push(block.clone());
314
315 if info.after_split || info.after_merge {
317 break;
320 }
321
322 match info.load_prev_ref()? {
323 PrevBlockRef::Single(id) => top_block_id = id.as_block_id(top_block_id.shard),
324 PrevBlockRef::AfterMerge { .. } => anyhow::bail!("unexpected `AfterMerge` ref"),
325 }
326
327 depth += 1;
328 if depth >= MAX_DEPTH {
329 anyhow::bail!("max depth reached");
330 }
331 }
332
333 Ok(result)
334 }
335
336 async fn process_shard_blocks(
337 &self,
338 mc_block_id: &BlockId,
339 mc_is_key_block: bool,
340 mut blocks: Vec<BlockStuffAug>,
341 ) -> Result<()> {
342 let start_preparing_block = |block: BlockStuffAug| {
343 let (delayed_handle, delayed) = DelayedTasks::new();
344
345 let cx = Box::new(BlockSubscriberContext {
346 mc_block_id: *mc_block_id,
347 mc_is_key_block,
348 is_key_block: false,
349 block: block.data,
350 archive_data: block.archive_data,
351 delayed,
352 });
353 let subscriber = self.subscriber.clone();
354 JoinTask::new(async move {
355 let _histogram = HistogramGuard::begin("tycho_core_prepare_sc_block_time");
356
357 let prepared = subscriber.prepare_block(&cx).await;
358 (delayed_handle, cx, prepared)
359 })
360 };
361
362 let mut prepare_task = blocks.pop().map(start_preparing_block);
363
364 while let Some(prepared) = prepare_task.take() {
365 let (delayed_handle, cx, prepared) = prepared.await;
366 let delayed_handle = delayed_handle.spawn();
367
368 prepare_task = blocks.pop().map(start_preparing_block);
369
370 let _histogram = HistogramGuard::begin("tycho_core_process_sc_block_time");
371
372 self.subscriber.handle_block(&cx, prepared?).await?;
374
375 self.state.update_gc_state(UpdateGcState {
377 mc_block_id,
378 mc_is_key_block,
379 is_key_block: cx.is_key_block,
380 block: &cx.block,
381 })?;
382
383 delayed_handle.join().await?;
385
386 self.state.commit_shard(CommitShardBlock {
388 block_id: cx.block.id(),
389 });
390 }
391
392 Ok(())
393 }
394
395 fn fetch_next_master_block(
396 &self,
397 prev_block_id: &BlockId,
398 ) -> impl Future<Output = OptionalBlockStuff> + Send + 'static {
399 let _histogram = HistogramGuard::begin("tycho_core_download_mc_block_time");
400
401 tracing::debug!(%prev_block_id, "fetching next master block");
402
403 let provider = self.provider.clone();
404 let prev_block_id = *prev_block_id;
405 async move {
406 let res = provider.get_next_block(&prev_block_id).await?;
407 Some(res.with_context(|| {
408 format!(
409 "BUGGY PROVIDER. failed to fetch next master block after prev: {prev_block_id}"
410 )
411 }))
412 }
413 }
414
415 async fn fetch_block(&self, block_id_relation: &BlockIdRelation) -> Result<BlockStuffAug> {
416 match self.provider.get_block(block_id_relation).await {
417 Some(Ok(block)) => Ok(block),
418 Some(Err(e)) => {
419 anyhow::bail!(
420 "BUGGY PROVIDER. failed to fetch block for {block_id_relation:?}: {e:?}"
421 )
422 }
423 None => {
424 anyhow::bail!("BUGGY PROVIDER. block not found for {block_id_relation:?}")
425 }
426 }
427 }
428}