1use std::collections::{BTreeMap, VecDeque, hash_map};
2use std::sync::Arc;
3
4use ahash::HashMapExt;
5use anyhow::{Context, Result, anyhow, bail};
6use async_trait::async_trait;
7use futures_util::TryFutureExt;
8use parking_lot::{Mutex, RwLock};
9use tokio::sync::Notify;
10use tokio_util::sync::CancellationToken;
11use tycho_block_util::block::{TopBlocks, ValidatorSubsetInfo, calc_next_block_id_short};
12use tycho_block_util::queue::{QueueKey, QueuePartitionIdx};
13use tycho_block_util::state::ShardStateStuff;
14use tycho_core::global_config::MempoolGlobalConfig;
15use tycho_core::storage::{LoadStateHint, StateNotFound};
16use tycho_crypto::ed25519::KeyPair;
17use tycho_types::models::{
18 BlockId, BlockIdShort, CollationConfig, GlobalCapabilities, ProcessedUptoInfo, ShardIdent,
19 ValidatorDescription,
20};
21use tycho_util::futures::AwaitBlocking;
22use tycho_util::metrics::HistogramGuard;
23use tycho_util::{DashMapEntry, FastDashMap, FastHashMap, FastHashSet};
24use types::{
25 ActiveCollator, ActiveSync, BlockCacheEntry, BlockCacheEntryData, BlockCacheStoreResult,
26 CollatedBlockInfo, CollationState, CollationStatus, CollatorState, HandledBlockFromBcCtx,
27 ImportedAnchorEvent, McBlockSubgraph, NextCollationStep,
28};
29
30use self::blocks_cache::BlocksCache;
31use self::types::{BlockCacheKey, CandidateStatus, CollationSyncState, McBlockSubgraphExtract};
32use self::utils::find_us_in_collators_set;
33use crate::collator::{
34 CollationCancelReason, Collator, CollatorContext, CollatorEventListener, CollatorFactory,
35 ForceMasterCollation,
36};
37use crate::internal_queue::types::diff::{DiffZone, QueueDiffWithMessages};
38use crate::internal_queue::types::message::EnqueuedMessage;
39use crate::internal_queue::types::stats::DiffStatistics;
40use crate::mempool::{
41 MempoolAdapter, MempoolAdapterFactory, MempoolAnchor, MempoolAnchorId, MempoolEventListener,
42 StateUpdateContext,
43};
44use crate::queue_adapter::MessageQueueAdapter;
45use crate::state_node::{StateNodeAdapter, StateNodeAdapterFactory, StateNodeEventListener};
46use crate::types::processed_upto::{
47 BlockSeqno, ProcessedUptoInfoExtension, ProcessedUptoInfoStuff, find_min_processed_to_by_shards,
48};
49use crate::types::{
50 BlockCollationResult, BlockIdExt, CollationSessionId, CollationSessionInfo, CollatorConfig,
51 DebugIter, DisplayAsShortId, DisplayBlockIdsIntoIter, McData, ProcessedToByPartitions,
52 ShardDescriptionShort, ShardDescriptionShortExt, ShardHashesExt, TopBlockId, TopBlockIdUpdated,
53};
54use crate::utils::async_dispatcher::{AsyncDispatcher, STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE};
55use crate::utils::block::detect_top_processed_to_anchor;
56use crate::utils::shard::calc_split_merge_actions;
57use crate::utils::vset_cache::ValidatorSetCache;
58use crate::validator::{AddSession, ValidationStatus, Validator};
59use crate::{method_to_async_closure, tracing_targets};
60
61mod blocks_cache;
62mod types;
63mod utils;
64
65#[cfg(test)]
66#[path = "tests/manager_tests.rs"]
67pub(super) mod tests;
68
69const BLOCKS_FROM_BC_QUEUE_LIMIT: usize = 1000;
70
71pub struct RunningCollationManager<CF, V>
72where
73 CF: CollatorFactory,
74{
75 cancel_async_tasks: CancellationToken,
76 dispatcher: AsyncDispatcher<CollationManager<CF, V>>,
77 state_node_adapter: Arc<dyn StateNodeAdapter>,
78 mpool_adapter: Arc<dyn MempoolAdapter>,
79 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
80}
81
82impl<CF: CollatorFactory, V> RunningCollationManager<CF, V> {
83 pub fn dispatcher(&self) -> &AsyncDispatcher<CollationManager<CF, V>> {
84 &self.dispatcher
85 }
86
87 pub fn state_node_adapter(&self) -> &Arc<dyn StateNodeAdapter> {
88 &self.state_node_adapter
89 }
90
91 pub fn mpool_adapter(&self) -> &Arc<dyn MempoolAdapter> {
92 &self.mpool_adapter
93 }
94
95 pub fn mq_adapter(&self) -> &Arc<dyn MessageQueueAdapter<EnqueuedMessage>> {
96 &self.mq_adapter
97 }
98}
99
100impl<CF: CollatorFactory, V> Drop for RunningCollationManager<CF, V> {
101 fn drop(&mut self) {
102 self.cancel_async_tasks.cancel();
103 }
104}
105
106pub struct CollationManager<CF, V>
107where
108 CF: CollatorFactory,
109{
110 keypair: Arc<KeyPair>,
111 config: Arc<CollatorConfig>,
112
113 dispatcher: Arc<AsyncDispatcher<Self>>,
114 state_node_adapter: Arc<dyn StateNodeAdapter>,
115 mpool_adapter: Arc<dyn MempoolAdapter>,
116 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
117
118 collator_factory: CF,
119 validator: Arc<V>,
120
121 active_collation_sessions: RwLock<FastHashMap<ShardIdent, Arc<CollationSessionInfo>>>,
122 collation_sessions_to_finish: FastDashMap<CollationSessionId, Arc<CollationSessionInfo>>,
123 active_collators: FastDashMap<ShardIdent, ActiveCollator<Arc<CF::Collator>>>,
124 collators_to_stop: FastDashMap<CollationSessionId, ActiveCollator<Arc<CF::Collator>>>,
125
126 blocks_cache: BlocksCache,
127
128 blocks_from_bc_queue_sender: tokio::sync::mpsc::Sender<HandledBlockFromBcCtx>,
130
131 ready_to_sync: Arc<Notify>,
133
134 last_processed_mc_block_id: Arc<Mutex<Option<BlockId>>>,
136
137 collation_sync_state: Arc<Mutex<CollationSyncState>>,
139
140 validator_set_cache: ValidatorSetCache,
142
143 mempool_config_override: Option<MempoolGlobalConfig>,
145
146 delayed_mc_state_update: Arc<Mutex<Option<Arc<McData>>>>,
148}
149
150#[async_trait]
151impl<CF, V> MempoolEventListener for AsyncDispatcher<CollationManager<CF, V>>
152where
153 CF: CollatorFactory,
154 V: Validator,
155{
156 async fn on_new_anchor(&self, anchor: Arc<MempoolAnchor>) -> Result<()> {
157 self.spawn_task(method_to_async_closure!(
158 process_new_anchor_from_mempool,
159 anchor
160 ))
161 .await
162 }
163}
164
165#[async_trait]
166impl<CF, V> StateNodeEventListener for AsyncDispatcher<CollationManager<CF, V>>
167where
168 CF: CollatorFactory,
169 V: Validator,
170{
171 async fn on_block_accepted(&self, state: &ShardStateStuff) -> Result<()> {
172 let processed_upto = state.state().processed_upto.load()?;
173
174 metrics_report_last_applied_block_and_anchor(state, &processed_upto)?;
175
176 let state_cloned = state.clone();
177
178 self.spawn_task(method_to_async_closure!(
179 detect_top_processed_to_anchor_and_notify_mempool,
180 state_cloned,
181 processed_upto.get_min_externals_processed_to()?.0
182 ))
183 .await?;
184
185 let state_cloned = state.clone();
186 self.spawn_task(move |worker| {
187 Box::pin(async move { worker.cancel_validation_sessions_until_block(state_cloned) })
188 })
189 .await?;
190
191 Ok(())
192 }
193
194 async fn on_block_accepted_external(
195 &self,
196 mc_block_id: &BlockId,
197 state: &ShardStateStuff,
198 ) -> Result<()> {
199 let processed_upto = state.state().processed_upto.load()?;
200
201 metrics_report_last_applied_block_and_anchor(state, &processed_upto)?;
202
203 let state = state.clone();
204 let ctx = HandledBlockFromBcCtx {
205 mc_block_id: *mc_block_id,
206 state,
207 processed_upto,
208 };
209
210 self.enqueue_task(method_to_async_closure!(enqueue_handle_block_from_bc, ctx))
211 .await?;
212
213 Ok(())
214 }
215}
216
217fn metrics_report_last_applied_block_and_anchor(
218 state: &ShardStateStuff,
219 processed_upto: &ProcessedUptoInfo,
220) -> Result<()> {
221 let block_id = state.block_id();
222 let labels = [("workchain", block_id.shard.workchain().to_string())];
223
224 let block_ct = state.get_gen_chain_time();
225 let processed_to_anchor_id = processed_upto.get_min_externals_processed_to()?.0;
226
227 metrics::gauge!("tycho_last_applied_block_seqno", &labels).set(block_id.seqno);
228 metrics::gauge!("tycho_last_processed_to_anchor_id", &labels).set(processed_to_anchor_id);
229
230 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
231 block_id = %DisplayAsShortId(block_id),
232 block_ct,
233 processed_to_anchor_id = processed_to_anchor_id,
234 "last applied block",
235 );
236
237 Ok(())
238}
239
240#[async_trait]
241impl<CF, V> CollatorEventListener for AsyncDispatcher<CollationManager<CF, V>>
242where
243 CF: CollatorFactory,
244 V: Validator,
245{
246 async fn on_skipped(
247 &self,
248 prev_mc_block_id: BlockId,
249 next_block_id_short: BlockIdShort,
250 anchor_chain_time: u64,
251 force_mc_block: ForceMasterCollation,
252 collation_config: Arc<CollationConfig>,
253 ) -> Result<()> {
254 self.spawn_task(method_to_async_closure!(
255 handle_collation_skipped,
256 prev_mc_block_id,
257 next_block_id_short,
258 anchor_chain_time,
259 force_mc_block,
260 collation_config
261 ))
262 .await
263 }
264
265 async fn on_cancelled(
266 &self,
267 prev_mc_block_id: BlockId,
268 next_block_id_short: BlockIdShort,
269 cancel_reason: CollationCancelReason,
270 ) -> Result<()> {
271 self.spawn_task(method_to_async_closure!(
272 handle_collation_cancelled,
273 prev_mc_block_id,
274 next_block_id_short,
275 cancel_reason
276 ))
277 .await
278 }
279
280 async fn on_block_candidate(&self, collation_result: BlockCollationResult) -> Result<()> {
281 self.spawn_task(method_to_async_closure!(
282 handle_collated_block_candidate,
283 collation_result
284 ))
285 .await
286 }
287
288 async fn on_collator_stopped(&self, stop_key: CollationSessionId) -> Result<()> {
289 self.spawn_task(move |worker| {
290 Box::pin(async move { worker.handle_collator_stopped(stop_key) })
291 })
292 .await
293 }
294}
295
296impl<CF, V> CollationManager<CF, V>
297where
298 CF: CollatorFactory,
299 V: Validator,
300{
301 #[allow(clippy::too_many_arguments)]
302 pub fn start<STF, MPF>(
303 keypair: Arc<KeyPair>,
304 config: CollatorConfig,
305 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
306 state_node_adapter_factory: STF,
307 mpool_adapter_factory: MPF,
308 validator: V,
309 collator_factory: CF,
310 mempool_config_override: Option<MempoolGlobalConfig>,
311 ) -> RunningCollationManager<CF, V>
312 where
313 STF: StateNodeAdapterFactory,
314 MPF: MempoolAdapterFactory,
315 {
316 tracing::info!(target: tracing_targets::COLLATION_MANAGER, "Creating collation manager...");
317
318 let (dispatcher, dispatcher_ctx) = AsyncDispatcher::new(
320 "collation_manager_async_dispatcher",
321 STANDARD_ASYNC_DISPATCHER_BUFFER_SIZE,
322 );
323 let arc_dispatcher = Arc::new(dispatcher.clone());
324
325 let state_node_adapter =
327 Arc::new(state_node_adapter_factory.create(arc_dispatcher.clone()));
328
329 let mpool_adapter = mpool_adapter_factory.create(arc_dispatcher.clone());
331
332 let validator = Arc::new(validator);
333
334 let (blocks_from_bc_queue_sender, blocks_from_bc_queue_receiver) =
335 tokio::sync::mpsc::channel::<HandledBlockFromBcCtx>(BLOCKS_FROM_BC_QUEUE_LIMIT);
336
337 let ready_to_sync = Arc::new(Notify::new());
338 ready_to_sync.notify_one();
339
340 let blocks_cache = BlocksCache::new(state_node_adapter.zerostate_id());
341
342 let collation_manager = Self {
343 keypair,
344 config: Arc::new(config),
345 dispatcher: arc_dispatcher.clone(),
346 state_node_adapter: state_node_adapter.clone(),
347 mpool_adapter: mpool_adapter.clone(),
348 mq_adapter: mq_adapter.clone(),
349 collator_factory,
350 validator,
351
352 active_collation_sessions: Default::default(),
353 collation_sessions_to_finish: Default::default(),
354 active_collators: Default::default(),
355 collators_to_stop: Default::default(),
356
357 blocks_cache,
358
359 blocks_from_bc_queue_sender,
360
361 ready_to_sync,
362
363 last_processed_mc_block_id: Default::default(),
364
365 collation_sync_state: Default::default(),
366
367 validator_set_cache: Default::default(),
368
369 mempool_config_override,
370
371 delayed_mc_state_update: Arc::new(Mutex::new(None)),
372 };
373 let collation_manager = Arc::new(collation_manager);
374
375 let cancel_async_tasks = CancellationToken::new();
376
377 tokio::spawn(
379 collation_manager
380 .clone()
381 .process_handle_block_from_bc_queue(
382 blocks_from_bc_queue_receiver,
383 cancel_async_tasks.clone(),
384 )
385 .map_err(|e| {
386 tracing::error!("initial process_handle_block_from_bc_queue failed: {e:?}");
387 }),
388 );
389
390 arc_dispatcher.run(collation_manager, dispatcher_ctx);
392 tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "Tasks dispatchers started");
393
394 tracing::info!(target: tracing_targets::COLLATION_MANAGER, "Collation manager created");
395
396 RunningCollationManager {
397 cancel_async_tasks,
398 dispatcher,
399 state_node_adapter,
400 mpool_adapter,
401 mq_adapter,
402 }
403 }
404
405 pub async fn process_new_anchor_from_mempool(&self, _anchor: Arc<MempoolAnchor>) -> Result<()> {
409 Ok(())
411 }
412
413 #[tracing::instrument(skip_all, fields(block_id = %state.block_id().as_short_id()))]
416 async fn detect_top_processed_to_anchor_and_notify_mempool(
417 &self,
418 state: ShardStateStuff,
419 mc_processed_to_anchor_id: MempoolAnchorId,
420 ) -> Result<()> {
421 if !state.block_id().is_masterchain() {
423 return Ok(());
424 }
425
426 self.detect_top_processed_to_anchor_and_notify_mempool_impl(
427 state.block_id().seqno,
428 state
429 .shards()?
430 .as_vec()?
431 .into_iter()
432 .map(|(_, descr)| descr),
433 mc_processed_to_anchor_id,
434 )
435 .await
436 }
437
438 async fn detect_top_processed_to_anchor_and_notify_mempool_impl<I>(
439 &self,
440 mc_block_seqno: BlockSeqno,
441 mc_top_shards: I,
442 mc_processed_to_anchor_id: MempoolAnchorId,
443 ) -> Result<()>
444 where
445 I: Iterator<Item = ShardDescriptionShort>,
446 {
447 let top_processed_to_anchor =
448 detect_top_processed_to_anchor(mc_top_shards, mc_processed_to_anchor_id);
449
450 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
451 top_processed_to_anchor,
452 mc_processed_to_anchor_id,
453 "detected minimal top_processed_to_anchor, will notify mempool",
454
455 );
456
457 self.notify_top_processed_to_anchor_to_mempool(mc_block_seqno, top_processed_to_anchor)
458 .await?;
459
460 Ok(())
461 }
462
463 async fn notify_top_processed_to_anchor_to_mempool(
464 &self,
465 mc_block_seqno: BlockSeqno,
466 top_processed_to_anchor: MempoolAnchorId,
467 ) -> Result<()> {
468 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
469 top_processed_to_anchor,
470 "will notify top_processed_to_anchor to mempool",
471 );
472
473 self.mpool_adapter
474 .handle_signed_mc_block(mc_block_seqno)
475 .await?;
476
477 self.mpool_adapter
478 .clear_anchors_cache(top_processed_to_anchor)?;
479
480 Ok(())
481 }
482
483 #[tracing::instrument(skip_all, fields(block_id = %state.block_id().as_short_id()))]
484 fn cancel_validation_sessions_until_block(&self, state: ShardStateStuff) -> Result<()> {
485 let block_id = state.block_id().as_short_id();
486
487 let session_id = self
488 .active_collation_sessions
489 .read()
490 .get(&block_id.shard)
491 .map(|session_info| session_info.get_validation_session_id());
492
493 self.validator.cancel_validation(&block_id, session_id)?;
494 Ok(())
495 }
496
497 #[tracing::instrument(skip_all, fields(block_id = %block_id))]
498 fn commit_block_queue_diff(
499 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
500 block_id: &BlockId,
501 top_shard_blocks_info: &[TopBlockIdUpdated],
502 partitions: &FastHashSet<QueuePartitionIdx>,
503 ) -> Result<()> {
504 if !block_id.is_masterchain() {
505 return Ok(());
506 }
507
508 let _histogram = HistogramGuard::begin("tycho_collator_commit_queue_diffs_time");
509
510 let mut top_blocks = top_shard_blocks_info.to_vec();
511 top_blocks.push(TopBlockIdUpdated {
512 block: TopBlockId {
513 ref_by_mc_seqno: block_id.seqno,
514 block_id: *block_id,
515 },
516 updated: true,
517 });
518
519 if let Err(err) = mq_adapter.commit_diff(top_blocks, partitions) {
520 bail!(
521 "Error committing message queue diff of block ({}): {:?}",
522 block_id,
523 err,
524 )
525 }
526
527 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
528 "message queue diff was committed",
529 );
530
531 Ok(())
532 }
533
534 fn apply_block_queue_diff_from_entry_stuff(
537 state_node_adapter: &dyn StateNodeAdapter,
538 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
539 block_entry: &BlockCacheEntry,
540 min_processed_to: Option<&QueueKey>,
541 first_required_diffs: &mut FastHashMap<ShardIdent, BlockId>,
542 ) -> Result<Option<BlockId>> {
543 let block_id = block_entry.block_id;
544
545 if block_entry.ref_by_mc_seqno <= state_node_adapter.zerostate_id().seqno {
547 return Ok(None);
548 }
549
550 let queue_diff = match &block_entry.data {
551 BlockCacheEntryData::Collated {
552 candidate_stuff, ..
553 } => &candidate_stuff.candidate.queue_diff_aug.data,
554 BlockCacheEntryData::Received { queue_diff, .. } => queue_diff,
555 };
556
557 if let Some(min_pt) = min_processed_to
559 && queue_diff.as_ref().max_message <= *min_pt
560 {
561 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
562 "Skipping diff for block {}: max_message {} <= min_processed_to {}",
563 block_id.as_short_id(),
564 queue_diff.as_ref().max_message,
565 min_pt,
566 );
567 return Ok(None);
568 }
569
570 if mq_adapter.is_diff_exists(&block_id.as_short_id())? {
572 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
573 queue_diff_block_id = %block_id.as_short_id(),
574 "queue diff apply skipped because it is already applied",
575 );
576 first_required_diffs.insert(block_id.shard, BlockId::default());
579 return Ok(None);
580 }
581
582 let out_msgs = match &block_entry.data {
584 BlockCacheEntryData::Collated {
585 candidate_stuff, ..
586 } => &candidate_stuff
587 .candidate
588 .block
589 .data
590 .load_extra()?
591 .out_msg_description
592 .load()?,
593 BlockCacheEntryData::Received { out_msgs, .. } => &out_msgs.load()?,
594 };
595
596 let queue_diff_with_msgs = QueueDiffWithMessages::from_queue_diff(queue_diff, out_msgs)?;
597
598 let statistics = DiffStatistics::from_diff(
599 &queue_diff_with_msgs,
600 queue_diff.block_id().shard,
601 queue_diff.as_ref().min_message,
602 queue_diff.as_ref().max_message,
603 );
604
605 let check_sequence = match first_required_diffs.get(&block_id.shard).copied() {
606 None => {
607 first_required_diffs.insert(block_id.shard, block_id);
610 None
611 }
612 Some(id) if id == block_id => None,
613 _ => Some(DiffZone::Both),
614 };
615
616 mq_adapter
617 .apply_diff(
618 queue_diff_with_msgs,
619 queue_diff.block_id().as_short_id(),
620 queue_diff.diff_hash(),
621 statistics,
622 check_sequence,
623 )
624 .context("apply_block_queue_diff_from_entry_stuff")?;
625
626 Ok(Some(block_id))
627 }
628
629 #[tracing::instrument(skip_all, fields(next_block_id = %next_block_id_short))]
630 async fn handle_collation_cancelled(
631 self: &Arc<Self>,
632 _prev_mc_block_id: BlockId,
633 next_block_id_short: BlockIdShort,
634 cancel_reason: CollationCancelReason,
635 ) -> Result<()> {
636 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
637 ?cancel_reason,
638 "start handle collation cancelled",
639 );
640
641 match cancel_reason {
644 CollationCancelReason::AnchorNotFound(_)
645 | CollationCancelReason::NextAnchorNotFound(_)
646 | CollationCancelReason::ExternalCancel
647 | CollationCancelReason::DiffNotFoundInQueue(_) => {
648 self.ready_to_sync.notified().await;
650 scopeguard::defer!(self.ready_to_sync.notify_one());
651
652 self.set_collator_state(&next_block_id_short.shard, |ac| {
654 ac.state = CollatorState::Cancelled;
655 });
656
657 let has_active = self.active_collators.iter().any(|ac| {
659 ac.state == CollatorState::Active || ac.state == CollatorState::CancelPending
660 });
661 if !has_active {
662 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
663 "no active collators in shards and masterchain, \
664 will run sync to last applied mc block",
665 );
666
667 let (last_collated_mc_block_id, applied_range) = self
669 .blocks_cache
670 .get_last_collated_block_and_applied_mc_queue_range();
671
672 self.sync_to_applied_mc_block_if_exist(
674 last_collated_mc_block_id,
675 applied_range,
676 )
677 .await?;
678 }
679 }
680 }
681 Ok(())
682 }
683
684 #[tracing::instrument(skip_all, fields(next_block_id = %next_block_id_short, ct = anchor_chain_time, ?force_mc_block))]
685 async fn handle_collation_skipped(
686 &self,
687 prev_mc_block_id: BlockId,
688 next_block_id_short: BlockIdShort,
689 anchor_chain_time: u64,
690 force_mc_block: ForceMasterCollation,
691 collation_config: Arc<CollationConfig>,
692 ) -> Result<()> {
693 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
694 "will run next collation step",
695 );
696
697 self.ready_to_sync.notified().await;
699 scopeguard::defer!(self.ready_to_sync.notify_one());
700
701 let updated_collator_state = self.set_collator_state(&next_block_id_short.shard, |ac| {
703 if ac.state == CollatorState::CancelPending {
704 ac.state = CollatorState::Cancelled;
705 }
706 });
707 if updated_collator_state == Some(CollatorState::Cancelled) {
708 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
709 shard_id = %next_block_id_short.shard,
710 "collator was cancelled before",
711 );
712 return Ok(());
713 }
714
715 self.run_next_collation_step(
716 &prev_mc_block_id,
717 next_block_id_short.shard,
718 None,
719 DetectNextCollationStepContext::new(
720 anchor_chain_time,
721 force_mc_block,
722 collation_config.mc_block_min_interval_ms as _,
723 collation_config.mc_block_max_interval_ms as _,
724 None,
725 ),
726 )
727 .await
728 }
729
730 async fn run_next_collation_step(
734 &self,
735 prev_mc_block_id: &BlockId,
736 shard_id: ShardIdent,
737 trigger_shard_block_id_opt: Option<BlockId>,
738 ctx: DetectNextCollationStepContext,
739 ) -> Result<()> {
740 let next_step = Self::detect_next_collation_step(
741 &mut self.collation_sync_state.lock(),
742 self.active_collation_sessions
743 .read()
744 .keys()
745 .cloned()
746 .collect(),
747 shard_id,
748 ctx,
749 );
750
751 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
752 next_step = ?next_step,
753 "detected next collation step",
754 );
755
756 match next_step {
757 NextCollationStep::CollateMaster(next_mc_block_chain_time) => {
758 if !shard_id.is_masterchain() {
759 self.set_collator_state(&shard_id, |ac| ac.state = CollatorState::Waiting);
761 }
762
763 self.set_collator_state(&ShardIdent::MASTERCHAIN, |ac| {
764 ac.state = CollatorState::Active;
765 });
766
767 self.enqueue_mc_block_collation(
768 prev_mc_block_id.get_next_id_short(),
769 next_mc_block_chain_time,
770 trigger_shard_block_id_opt,
771 )
772 .await?;
773 }
774 NextCollationStep::WaitForMasterStatus | NextCollationStep::WaitForShardStatus => {
775 self.set_collator_state(&shard_id, |ac| ac.state = CollatorState::Waiting);
777 }
778 NextCollationStep::ResumeAttemptsIn(shards_to_resume_attempts) => {
779 let mut current_shard_should_wait = true;
784 for shard_ident in shards_to_resume_attempts {
785 if shard_ident == shard_id {
786 current_shard_should_wait = false;
787 }
788 self.set_collator_state(&shard_ident, |ac| ac.state = CollatorState::Active);
789 self.enqueue_try_collate(&shard_ident).await?;
790 }
791 if current_shard_should_wait {
792 self.set_collator_state(&shard_id, |ac| ac.state = CollatorState::Waiting);
793 }
794 }
795 }
796
797 Ok(())
798 }
799
800 #[tracing::instrument(
808 skip_all,
809 fields(block_id = %collation_result.candidate.block.id().as_short_id()),
810 )]
811 pub async fn handle_collated_block_candidate(
812 self: &Arc<Self>,
813 collation_result: BlockCollationResult,
814 ) -> Result<()> {
815 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
816 ct = collation_result.candidate.chain_time,
817 processed_to_anchor_id = collation_result.candidate.processed_to_anchor_id,
818 "start processing block candidate",
819 );
820
821 let _histogram =
822 HistogramGuard::begin("tycho_collator_handle_collated_block_candidate_time");
823
824 let block_id = *collation_result.candidate.block.id();
825 let candidate_chain_time = collation_result.candidate.chain_time;
826 let consensus_config_changed = collation_result.candidate.consensus_config_changed;
827
828 debug_assert_eq!(
829 block_id.is_masterchain(),
830 collation_result.mc_data.is_some(),
831 );
832
833 self.ready_to_sync.notified().await;
835 scopeguard::defer!(self.ready_to_sync.notify_one());
836
837 let session_info = match self
839 .active_collation_sessions
840 .read()
841 .get(&block_id.shard)
842 .cloned()
843 {
844 Some(session_info) if session_info.id() == collation_result.collation_session_id => {
845 session_info
846 }
847 _ => {
848 tracing::warn!(
851 target: tracing_targets::COLLATION_MANAGER,
852 "there is no active session related to collated block. Skipped",
853 );
854
855 self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Waiting);
856
857 return Ok(());
858 }
859 };
860
861 let updated_collator_state = self.set_collator_state(&block_id.shard, |ac| {
863 if ac.state == CollatorState::CancelPending {
864 ac.state = CollatorState::Cancelled;
865 }
866 });
867 let collator_cancelled = updated_collator_state == Some(CollatorState::Cancelled);
868
869 let store_res = if collator_cancelled {
870 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
871 shard_id = %block_id.shard,
872 "collator was cancelled before",
873 );
874
875 let top_shards = self.blocks_cache.get_last_top_shards();
879 self.mq_adapter.clear_uncommitted_state(&top_shards)?;
880
881 let (last_collated_mc_block_id, applied_mc_queue_range) = self
884 .blocks_cache
885 .get_last_collated_block_and_applied_mc_queue_range();
886
887 let store_res = BlockCacheStoreResult {
888 received_and_collated: false,
889 block_mismatch: false,
890 last_collated_mc_block_id,
891 applied_mc_queue_range,
892 };
893
894 tracing::info!(
895 target: tracing_targets::COLLATION_MANAGER,
896 ?store_res,
897 "block candidate was not saved to cache",
898 );
899
900 store_res
901 } else {
902 let top_shard_blocks_info = collation_result
905 .mc_data
906 .as_ref()
907 .map(|mc_data| {
908 mc_data
909 .shards
910 .iter()
911 .map(|(shard_id, shard_descr)| TopBlockIdUpdated {
912 block: TopBlockId {
913 ref_by_mc_seqno: shard_descr.reg_mc_seqno,
914 block_id: shard_descr.get_block_id(*shard_id),
915 },
916 updated: shard_descr.top_sc_block_updated,
917 })
918 .collect::<Vec<_>>()
919 })
920 .unwrap_or_default();
921 let top_processed_to_anchor = collation_result
922 .mc_data
923 .as_ref()
924 .map(|mc_data| mc_data.top_processed_to_anchor);
925 let store_res = self.blocks_cache.store_collated(
926 collation_result.candidate,
927 top_shard_blocks_info,
928 top_processed_to_anchor,
929 )?;
930
931 if store_res.block_mismatch {
932 let labels = [("workchain", block_id.shard.workchain().to_string())];
933 metrics::counter!("tycho_collator_block_mismatch_count", &labels).increment(1);
934
935 self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Cancelled);
936
937 if block_id.is_masterchain() {
939 for mut ac in self
940 .active_collators
941 .iter_mut()
942 .filter(|ac| ac.key() != &block_id.shard)
943 {
944 ac.state = match ac.state {
947 CollatorState::Waiting | CollatorState::Cancelled => {
948 CollatorState::Cancelled
949 }
950 _ => CollatorState::CancelPending,
951 };
952 }
953 }
954
955 let top_shards = self.blocks_cache.get_last_top_shards();
958 self.mq_adapter.clear_uncommitted_state(&top_shards)?;
959
960 tracing::info!(
961 target: tracing_targets::COLLATION_MANAGER,
962 ?store_res,
963 "saved block candidate to cache",
964 );
965 } else {
966 tracing::debug!(
967 target: tracing_targets::COLLATION_MANAGER,
968 ?store_res,
969 "saved block candidate to cache",
970 );
971 }
972
973 store_res
974 };
975
976 let collation_cancelled = collator_cancelled || store_res.block_mismatch;
977
978 let should_sync_to_last_applied_mc_block = 'check_should_sync: {
980 if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
982 let last_received_mc_block_seqno = self.get_last_received_mc_block_seqno();
983 if matches!(
984 last_received_mc_block_seqno,
985 Some(last_received_seqno) if last_received_seqno.saturating_sub(applied_range_end) > 1
986 ) {
987 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
988 last_received_mc_block_seqno,
989 applied_range_end,
990 "check_should_sync: should not sync to last applied mc block \
991 because a newer one already received",
992 );
993 break 'check_should_sync false;
994 }
995 }
996
997 if collation_cancelled {
998 true
999 } else if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
1000 if let Some(last_collated_mc_block_id) = store_res.last_collated_mc_block_id {
1001 let applied_range_end_delta =
1002 applied_range_end.saturating_sub(last_collated_mc_block_id.seqno);
1003 if applied_range_end_delta < self.config.min_mc_block_delta_from_bc_to_sync {
1004 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1006 "check_should_sync: should collate next own mc block: \
1007 last applied ({}) ahead last collated ({}) on {} < {}",
1008 applied_range_end, last_collated_mc_block_id.seqno,
1009 applied_range_end_delta, self.config.min_mc_block_delta_from_bc_to_sync,
1010 );
1011 false
1012 } else {
1013 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1015 "check_should_sync: should sync to last applied mc block from bc: \
1016 last applied ({}) ahead last collated ({}) on {} >= {}",
1017 applied_range_end, last_collated_mc_block_id.seqno,
1018 applied_range_end_delta, self.config.min_mc_block_delta_from_bc_to_sync,
1019 );
1020 true
1021 }
1022 } else {
1023 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1025 "check_should_sync: should sync to last applied mc block from bc: \
1026 last applied ({}) and last collated not exist",
1027 applied_range_end,
1028 );
1029 true
1030 }
1031 } else {
1032 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1034 "check_should_sync: should collate next own mc block after because nothing applied ahead",
1035 );
1036 false
1037 }
1038 };
1039
1040 if should_sync_to_last_applied_mc_block {
1042 if block_id.is_masterchain() {
1048 self.sync_to_applied_mc_block_if_exist(
1050 store_res.last_collated_mc_block_id,
1051 store_res.applied_mc_queue_range,
1052 )
1053 .await?;
1054 } else {
1055 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1057 "sync_to_applied_mc_block: mark shard collator Cancelled for shard",
1058 );
1059
1060 self.set_collator_state(&block_id.shard, |ac| ac.state = CollatorState::Cancelled);
1061
1062 let has_active = self.active_collators.iter().any(|ac| {
1065 ac.state == CollatorState::Active || ac.state == CollatorState::CancelPending
1066 });
1067 if !has_active {
1068 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1069 "sync_to_applied_mc_block: no active collators in shards and master, \
1070 will run sync to last applied mc block",
1071 );
1072
1073 self.sync_to_applied_mc_block_if_exist(
1075 store_res.last_collated_mc_block_id,
1076 store_res.applied_mc_queue_range,
1077 )
1078 .await?;
1079 }
1080 }
1081 } else if block_id.is_masterchain() {
1082 if consensus_config_changed == Some(true) {
1086 let mut delayed_mc_state_update = self.delayed_mc_state_update.lock();
1087 *delayed_mc_state_update = collation_result.mc_data.clone();
1088 } else {
1089 self.notify_mc_state_update_to_mempool(collation_result.mc_data.clone().unwrap())
1091 .await?;
1092 }
1093
1094 if store_res.received_and_collated {
1096 self.commit_valid_master_block(&block_id).await?;
1100 } else {
1101 let validator = self.validator.clone();
1102 let validation_session_id = session_info.get_validation_session_id();
1103 let dispatcher = self.dispatcher.clone();
1104 tokio::spawn(async move {
1105 let validation_result =
1106 validator.validate(validation_session_id, &block_id).await;
1107
1108 match validation_result {
1109 Ok(status) => {
1110 _ = dispatcher
1111 .spawn_task(method_to_async_closure!(
1112 handle_validated_master_block,
1113 block_id,
1114 status
1115 ))
1116 .await;
1117 }
1118 Err(e) => {
1119 tracing::error!(target: tracing_targets::COLLATION_MANAGER,
1120 "block candidate validation failed: {e:?}",
1121 );
1122 }
1123 }
1124 });
1125 }
1126
1127 if consensus_config_changed != Some(true) {
1129 self.process_mc_state_update(
1130 collation_result.mc_data.unwrap(),
1131 ProcessMcStateUpdateMode::StartCollation {
1132 reset_collators: false,
1133 },
1134 )
1135 .await?;
1136 }
1137 } else {
1138 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1142 "will run next collation step",
1143 );
1144
1145 self.run_next_collation_step(
1146 &collation_result.prev_mc_block_id,
1147 block_id.shard,
1148 Some(block_id),
1149 DetectNextCollationStepContext::new(
1150 candidate_chain_time,
1151 collation_result.force_next_mc_block,
1152 collation_result.collation_config.mc_block_min_interval_ms as _,
1153 collation_result.collation_config.mc_block_max_interval_ms as _,
1154 Some(CollatedBlockInfo::new(
1155 collation_result.prev_mc_block_id.seqno,
1156 collation_result.has_processed_externals,
1157 )),
1158 ),
1159 )
1160 .await?;
1161 }
1162
1163 Ok(())
1164 }
1165
1166 #[tracing::instrument(skip_all, fields(block_id = %ctx.state.block_id().as_short_id()))]
1169 pub async fn enqueue_handle_block_from_bc(&self, ctx: HandledBlockFromBcCtx) -> Result<()> {
1170 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
1175 "cancel sync: enqueue_handle_block_from_bc: started",
1176 );
1177
1178 let labels = [
1179 (
1180 "workchain",
1181 ctx.state.block_id().shard.workchain().to_string(),
1182 ),
1183 ("src", "01_received".to_string()),
1184 ];
1185 metrics::gauge!("tycho_last_block_seqno", &labels).set(ctx.state.block_id().seqno);
1186
1187 self.update_last_received_mc_block_seqno(ctx.state.block_id());
1188
1189 self.finish_active_sync_to_applied(ctx.state.block_id());
1191
1192 self.blocks_from_bc_queue_sender.send(ctx).await?;
1194
1195 Ok(())
1196 }
1197
1198 #[tracing::instrument(skip_all)]
1200 async fn process_handle_block_from_bc_queue(
1201 self: Arc<Self>,
1202 mut blocks_from_bc_queue_receiver: tokio::sync::mpsc::Receiver<HandledBlockFromBcCtx>,
1203 cancel_task: CancellationToken,
1204 ) -> Result<()> {
1205 const BATCH_SIZE: usize = 300;
1206
1207 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1208 "started",
1209 );
1210
1211 loop {
1212 let mut batch = Vec::with_capacity(BATCH_SIZE);
1213 tokio::select! {
1214 received_count = blocks_from_bc_queue_receiver.recv_many(&mut batch, BATCH_SIZE) => {
1215 if received_count == 0 {
1216 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1217 "channel closed",
1218 );
1219 break;
1220 }
1221
1222 let mut last_mc_block_id_opt = None;
1225 for HandledBlockFromBcCtx { state, .. } in batch.iter().rev() {
1226 if state.block_id().is_masterchain() {
1227 last_mc_block_id_opt = Some(*state.block_id());
1228 }
1229 }
1230
1231 for ctx in batch {
1232 let is_last_mc_block_in_batch = matches!(
1233 last_mc_block_id_opt, Some(last_mc_block_id) if ctx.state.block_id() == &last_mc_block_id
1234 );
1235
1236 self.handle_block_from_bc(ctx, is_last_mc_block_in_batch).await.map_err(|err| {
1238 tracing::error!(target: tracing_targets::COLLATION_MANAGER,
1239 "error handling block from bc: {err:?}",
1240 );
1241 err
1242 })?;
1243 }
1244 },
1245 _ = cancel_task.cancelled() => {
1246 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1247 "cancelled",
1248 );
1249 break;
1250 }
1251 }
1252 }
1253
1254 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1255 "finished",
1256 );
1257
1258 Ok(())
1259 }
1260
1261 #[tracing::instrument(skip_all, fields(block_id = %ctx.state.block_id().as_short_id(), is_last_mc_block_in_batch))]
1268 async fn handle_block_from_bc(
1269 self: &Arc<Self>,
1270 ctx: HandledBlockFromBcCtx,
1271 is_last_mc_block_in_batch: bool,
1272 ) -> Result<()> {
1273 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1274 "start processing block from bc",
1275 );
1276
1277 let block_id = *ctx.state.block_id();
1278 debug_assert!(!block_id.is_masterchain() || block_id == ctx.mc_block_id);
1279
1280 let _histogram = HistogramGuard::begin("tycho_collator_handle_block_from_bc_time");
1281
1282 self.ready_to_sync.notified().await;
1284 scopeguard::defer!(self.ready_to_sync.notify_one());
1285
1286 let processed_upto = ProcessedUptoInfoStuff::try_from(ctx.processed_upto)?;
1287
1288 let Some(store_res) = self
1289 .blocks_cache
1290 .store_received(
1291 self.state_node_adapter.clone(),
1292 &ctx.mc_block_id,
1293 ctx.state.clone(),
1294 processed_upto,
1295 )
1296 .await?
1297 else {
1298 return Ok(());
1299 };
1300
1301 if store_res.block_mismatch {
1302 let labels = [("workchain", block_id.shard.workchain().to_string())];
1303 metrics::counter!("tycho_collator_block_mismatch_count", &labels).increment(1);
1304
1305 self.set_collator_state(&block_id.shard, |ac| {
1308 ac.state = match ac.state {
1309 CollatorState::Waiting | CollatorState::Cancelled => CollatorState::Cancelled,
1310 _ => CollatorState::CancelPending,
1311 };
1312 });
1313
1314 if block_id.is_masterchain() {
1316 for mut ac in self
1317 .active_collators
1318 .iter_mut()
1319 .filter(|ac| ac.key() != &block_id.shard)
1320 {
1321 ac.state = match ac.state {
1322 CollatorState::Waiting | CollatorState::Cancelled => {
1323 CollatorState::Cancelled
1324 }
1325 _ => CollatorState::CancelPending,
1326 };
1327 }
1328 }
1329
1330 let top_shards = self.blocks_cache.get_last_top_shards();
1335 self.mq_adapter.clear_uncommitted_state(&top_shards)?;
1336
1337 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1338 ?store_res,
1339 "saved block from bc to cache",
1340 );
1341 } else {
1342 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1343 ?store_res,
1344 "saved block from bc to cache",
1345 );
1346 }
1347
1348 if !block_id.is_masterchain() {
1349 return Ok(());
1350 }
1351
1352 let is_key_block = ctx.state.state_extra()?.after_key_block;
1354
1355 let session_id = self
1358 .active_collation_sessions
1359 .read()
1360 .get(&block_id.shard)
1361 .map(|session_info| session_info.get_validation_session_id());
1362
1363 self.validator
1364 .cancel_validation(&block_id.as_short_id(), session_id)?;
1365
1366 let should_sync_to_last_applied_mc_block = 'check_should_sync: {
1368 if !is_last_mc_block_in_batch && !is_key_block {
1370 break 'check_should_sync false;
1371 }
1372
1373 if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
1375 let last_received_mc_block_seqno = self.get_last_received_mc_block_seqno();
1376 if matches!(
1377 last_received_mc_block_seqno,
1378 Some(last_received_seqno) if last_received_seqno.saturating_sub(applied_range_end) > 1
1379 ) {
1380 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1381 last_received_mc_block_seqno,
1382 received_is_key_block = is_key_block,
1383 "check_should_sync: should not sync to last applied mc block \
1384 because a newer one already received",
1385 );
1386 break 'check_should_sync false;
1387 }
1388 }
1389
1390 if let Some(top_mc_block_id_for_next_collation) =
1391 self.get_top_mc_block_id_for_next_collation(store_res.last_collated_mc_block_id)
1392 {
1393 if let Some((_, applied_range_end)) = store_res.applied_mc_queue_range {
1395 let should_sync = {
1397 let applied_range_end_delta = applied_range_end
1398 .saturating_sub(top_mc_block_id_for_next_collation.seqno);
1399
1400 let required_min_mc_block_delta = if is_key_block
1402 && !self.active_collators.contains_key(&ShardIdent::MASTERCHAIN)
1403 {
1404 1
1405 } else {
1406 self.config.min_mc_block_delta_from_bc_to_sync
1407 };
1408
1409 if applied_range_end_delta < required_min_mc_block_delta {
1410 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1411 last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1412 last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1413 last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1414 received_is_key_block = is_key_block,
1415 "check_should_sync: should wait for next collated own mc block: \
1416 last applied ({}) ahead of top for collation ({}) on {} < {}",
1417 applied_range_end, top_mc_block_id_for_next_collation.seqno,
1418 applied_range_end_delta, required_min_mc_block_delta,
1419 );
1420 false
1421 } else {
1422 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1423 last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1424 last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1425 last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1426 received_is_key_block = is_key_block,
1427 "check_should_sync: should sync to last applied mc block from bc: \
1428 last applied ({}) ahead of top for collation ({}) on {} >= {}",
1429 applied_range_end, top_mc_block_id_for_next_collation.seqno,
1430 applied_range_end_delta, required_min_mc_block_delta,
1431 );
1432 true
1433 }
1434 };
1435
1436 if should_sync {
1437 let mut has_active = false;
1439 for active_collator in self.active_collators.iter().filter(|ac| {
1440 ac.state == CollatorState::Active
1441 || ac.state == CollatorState::CancelPending
1442 }) {
1443 active_collator.cancel_collation.notify_one();
1445 has_active = true;
1446 }
1447
1448 if has_active {
1449 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1450 last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1451 last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1452 last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1453 received_is_key_block = is_key_block,
1454 "check_should_sync: cannot sync when there are active collations, \
1455 try to gracefully cancel them",
1456 );
1457 false
1458 } else {
1459 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1460 last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1461 last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1462 last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1463 received_is_key_block = is_key_block,
1464 "check_should_sync: can sync to last applied mc block \
1465 when all collators were cancelled, or waiting, or there are no collators (node not in set)",
1466 );
1467 true
1468 }
1469 } else {
1470 false
1471 }
1472 } else {
1473 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1475 last_synced_to_mc_block_id = ?self.get_last_synced_to_mc_block_id().map(|id| id.as_short_id().to_string()),
1476 last_collated_mc_block_id = ?store_res.last_collated_mc_block_id.map(|id| id.as_short_id().to_string()),
1477 last_processed_mc_block_id = ?self.get_last_processed_mc_block_id().map(|id| id.as_short_id().to_string()),
1478 "check_should_sync: should collate next own mc block after because nothing applied ahead",
1479 );
1480 false
1481 }
1482 } else {
1483 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1484 received_is_key_block = is_key_block,
1485 "should sync to last applied mc block when no last collated or prev sync to",
1486 );
1487 true
1488 }
1489 };
1490
1491 if should_sync_to_last_applied_mc_block {
1492 self.sync_to_applied_mc_block_if_exist(
1494 store_res.last_collated_mc_block_id,
1495 store_res.applied_mc_queue_range,
1496 )
1497 .await?;
1498 } else {
1499 if store_res.received_and_collated {
1501 self.commit_valid_master_block(&block_id).await?;
1509 }
1510 }
1511
1512 Ok(())
1513 }
1514
1515 async fn sync_to_applied_mc_block_if_exist(
1516 self: &Arc<Self>,
1517 last_collated_mc_block_id: Option<BlockId>,
1518 applied_range: Option<(BlockSeqno, BlockSeqno)>,
1519 ) -> Result<()> {
1520 if let Some(applied_range) = applied_range {
1521 let this = self.clone();
1522
1523 let res = tokio::task::spawn_blocking(move || {
1524 this.sync_to_applied_mc_block(applied_range, last_collated_mc_block_id)
1525 })
1526 .await??;
1527
1528 if !res {
1529 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1530 last_applied_mc_block_id = %BlockIdShort {
1531 shard: ShardIdent::MASTERCHAIN,
1532 seqno: applied_range.1,
1533 },
1534 "sync_to_applied_mc_block: unable to sync to last applied mc block, \
1535 need to receive next blocks from bc",
1536 );
1537 }
1538 } else {
1539 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1540 "sync_to_applied_mc_block: there is no received applied mc blocks in cache, \
1541 will wait for next blocks from bc",
1542 );
1543 }
1544 Ok(())
1545 }
1546
1547 #[tracing::instrument(skip_all, fields(applied_range = ?applied_range))]
1554 fn sync_to_applied_mc_block(
1555 &self,
1556 applied_range: (BlockSeqno, BlockSeqno),
1557 last_collated_mc_block_id: Option<BlockId>,
1558 ) -> Result<bool> {
1559 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1560 "start sync to applied mc block",
1561 );
1562
1563 let _histogram = HistogramGuard::begin("tycho_collator_sync_to_applied_mc_block_time");
1564 metrics::counter!("tycho_collator_sync_to_applied_mc_block_count").increment(1);
1565
1566 let first_applied_mc_block_key = BlockIdShort {
1567 shard: ShardIdent::MASTERCHAIN,
1568 seqno: applied_range.0,
1569 };
1570 let last_applied_mc_block_key = BlockIdShort {
1571 shard: ShardIdent::MASTERCHAIN,
1572 seqno: applied_range.1,
1573 };
1574
1575 let cancelled = self.set_active_sync_info(last_applied_mc_block_key.seqno)?;
1577 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
1578 "cancel sync: sync_to_applied_mc_block: set_active_sync_info for seqno={}",
1579 last_applied_mc_block_key.seqno,
1580 );
1581 scopeguard::defer!({
1582 self.clean_active_sync_info();
1583 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
1584 "cancel sync: sync_to_applied_mc_block: active_sync_info cleaned",
1585 );
1586 });
1587
1588 scopeguard::defer!(self.blocks_cache.gc_prev_blocks());
1590
1591 if let Some(mp_cfg_override) = &self.mempool_config_override {
1594 let last_consesus_info = self
1595 .blocks_cache
1596 .get_consensus_info_for_mc_block(&last_applied_mc_block_key)?;
1597 if mp_cfg_override
1598 .genesis_info
1599 .overrides(&last_consesus_info.genesis_info)
1600 {
1601 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
1602 prev_genesis_start_round = last_consesus_info.genesis_info.start_round,
1603 prev_genesis_time_millis = last_consesus_info.genesis_info.genesis_millis,
1604 new_genesis_start_round = mp_cfg_override.genesis_info.start_round,
1605 new_genesis_time_millis = mp_cfg_override.genesis_info.genesis_millis,
1606 "will drop uncommitted internal messages from queue on new genesis",
1607 );
1608
1609 let top_shards = self.blocks_cache.get_last_top_shards();
1616 self.mq_adapter.clear_uncommitted_state(&top_shards)?;
1617 }
1618 }
1619
1620 let all_shards_processed_to_by_partitions =
1623 Self::get_all_shards_processed_to_by_partitions_for_mc_block(
1624 &last_applied_mc_block_key,
1625 &self.blocks_cache,
1626 self.state_node_adapter.clone(),
1627 )
1628 .await_blocking()?;
1629
1630 let min_processed_to_by_shards =
1632 find_min_processed_to_by_shards(&all_shards_processed_to_by_partitions);
1633
1634 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1635 ?min_processed_to_by_shards,
1636 );
1637
1638 let before_tail_block_ids = self
1640 .blocks_cache
1641 .read_before_tail_ids_of_mc_block(&first_applied_mc_block_key)?;
1642
1643 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1644 tail_block_ids = ?before_tail_block_ids
1645 .iter()
1646 .map(|(shard_id, (id, prev_ids))| {
1647 format!(
1648 "({}, id={:?}, prev_ids={})",
1649 shard_id,
1650 id.as_ref().map(DisplayAsShortId),
1651 DisplayBlockIdsIntoIter(prev_ids),
1652 )
1653 }).collect::<Vec<_>>().as_slice(),
1654 );
1655
1656 for shard in before_tail_block_ids.keys() {
1658 let labels = [("workchain", shard.workchain().to_string())];
1659 metrics::gauge!("tycho_collator_sync_is_running", &labels).set(1);
1660 }
1661
1662 let queue_diffs_applied_to_top_blocks = if self.config.fast_sync
1664 && let Some(applied_to_mc_block_id) =
1665 self.get_queue_diffs_applied_to_mc_block_id(last_collated_mc_block_id)?
1666 {
1667 Self::get_top_blocks_seqno(
1678 &applied_to_mc_block_id,
1679 &self.blocks_cache,
1680 self.state_node_adapter.clone(),
1681 )
1682 .await_blocking()?
1683 } else {
1684 None
1685 };
1686 if queue_diffs_applied_to_top_blocks.is_some() {
1687 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1688 ?queue_diffs_applied_to_top_blocks,
1689 "will use fast sync to restore queue",
1690 );
1691 }
1692
1693 let queue_restore_res = Self::restore_queue(
1695 &self.blocks_cache,
1696 self.state_node_adapter.clone(),
1697 self.mq_adapter.clone(),
1698 applied_range.0,
1699 min_processed_to_by_shards,
1700 before_tail_block_ids,
1701 queue_diffs_applied_to_top_blocks.unwrap_or_default(),
1702 )
1703 .await_blocking()?;
1704
1705 let Some(last_mc_state) = queue_restore_res.last_mc_state else {
1706 return Ok(false);
1707 };
1708
1709 let last_mc_block_id = *last_mc_state.block_id();
1711
1712 if last_mc_block_id.seqno > self.state_node_adapter.zerostate_id().seqno {
1715 Self::renew_mc_block_latest_chain_time(
1716 &mut self.collation_sync_state.lock(),
1717 last_mc_state.get_gen_chain_time(),
1718 );
1719 }
1720
1721 Self::reset_collation_sync_status(&mut self.collation_sync_state.lock());
1722
1723 self.update_last_synced_to_mc_block_id(last_mc_block_id);
1725
1726 self.blocks_cache.reset_top_shard_blocks_additional_info();
1729
1730 let mc_data = self.build_mc_data(
1731 last_mc_state,
1732 queue_restore_res.prev_mc_state,
1733 queue_restore_res.prev_mc_block_id,
1734 all_shards_processed_to_by_partitions,
1735 )?;
1736
1737 self.blocks_cache
1738 .remove_next_collated_blocks_from_cache(&queue_restore_res.synced_to_blocks_keys);
1739
1740 self.notify_mc_state_update_to_mempool(mc_data.clone())
1742 .await_blocking()?;
1743
1744 let process_state_update_mode = match cancelled.is_cancelled() {
1746 true => ProcessMcStateUpdateMode::SkipCollation,
1747 false => ProcessMcStateUpdateMode::StartCollation {
1748 reset_collators: true,
1749 },
1750 };
1751
1752 self.process_mc_state_update(mc_data.clone(), process_state_update_mode)
1753 .await_blocking()?;
1754
1755 self.notify_top_processed_to_anchor_to_mempool(
1757 mc_data.block_id.seqno,
1758 mc_data.top_processed_to_anchor,
1759 )
1760 .await_blocking()?;
1761
1762 for synced_to_block_id in queue_restore_res.synced_to_blocks_keys {
1764 let labels = [
1765 (
1766 "workchain",
1767 synced_to_block_id.shard.workchain().to_string(),
1768 ),
1769 ("src", "02_synced_to".to_string()),
1770 ];
1771 metrics::gauge!("tycho_last_block_seqno", &labels).set(synced_to_block_id.seqno);
1772 }
1773
1774 Ok(true)
1775 }
1776
1777 fn build_mc_data(
1780 &self,
1781 last_mc_state: ShardStateStuff,
1782 prev_mc_state: Option<ShardStateStuff>,
1783 prev_mc_block_id: Option<BlockId>,
1784 all_shards_processed_to_by_partitions: FastHashMap<
1785 ShardIdent,
1786 (bool, ProcessedToByPartitions),
1787 >,
1788 ) -> Result<Arc<McData>> {
1789 let prev_mc_state = match (prev_mc_state, prev_mc_block_id) {
1790 (Some(mc_state), _) => Some(mc_state),
1791 (None, None) => None,
1792 (None, Some(prev_mc_block_id)) if prev_mc_block_id.seqno <= self.state_node_adapter.zerostate_id().seqno => None,
1793 (None, Some(prev_mc_block_id)) => self
1795 .state_node_adapter
1796 .load_state(0, &prev_mc_block_id, Default::default())
1797 .await_blocking()
1798 .map_err(|err| {
1799 tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
1800 prev_mc_block_id = %prev_mc_block_id.as_short_id(),
1801 error = ?err,
1802 "failed to load previous mc state to build mc data, continue without prev_mc_data",
1803 );
1804 err
1805 }).ok(),
1806 };
1807
1808 McData::load_from_state(
1809 &last_mc_state,
1810 prev_mc_state.as_ref(),
1811 all_shards_processed_to_by_partitions,
1812 )
1813 }
1814
1815 async fn get_all_shards_processed_to_by_partitions_for_mc_block(
1816 mc_block_key: &BlockCacheKey,
1817 blocks_cache: &BlocksCache,
1818 state_node_adapter: Arc<dyn StateNodeAdapter>,
1819 ) -> Result<FastHashMap<ShardIdent, (bool, ProcessedToByPartitions)>> {
1820 let mut result = FastHashMap::default();
1821
1822 let zerostate_mc_seqno = blocks_cache.zerostate_mc_seqno();
1823 if mc_block_key.seqno <= zerostate_mc_seqno {
1824 return Ok(result);
1825 }
1826
1827 let from_cache = blocks_cache.get_top_blocks_processed_to_by_partitions(mc_block_key)?;
1828
1829 for (top_block_id, item) in from_cache {
1830 let processed_to = match item.by {
1831 Some(processed_to) => processed_to,
1832 None => {
1833 if item.ref_by_mc_seqno <= zerostate_mc_seqno {
1834 FastHashMap::default()
1835 } else {
1836 let state = state_node_adapter
1838 .load_state(mc_block_key.seqno, &top_block_id, LoadStateHint {
1839 allow_ignore_direct: false,
1841 })
1842 .await?;
1843 let processed_upto = state.state().processed_upto.load()?;
1844 let processed_upto = ProcessedUptoInfoStuff::try_from(processed_upto)?;
1845 processed_upto.get_internals_processed_to_by_partitions()
1846 }
1847 }
1848 };
1849
1850 result.insert(top_block_id.shard, (item.updated, processed_to));
1851 }
1852
1853 Ok(result)
1854 }
1855
1856 fn get_queue_diffs_applied_to_mc_block_id(
1858 &self,
1859 last_collated_mc_block_id: Option<BlockId>,
1860 ) -> Result<Option<BlockId>> {
1861 let last_queue_comitted_on = self.mq_adapter.get_last_committed_mc_block_id()?;
1862 let last_collated_or_synced_to =
1863 self.get_top_mc_block_id_for_next_collation(last_collated_mc_block_id);
1864
1865 let mc_block_id = match (last_queue_comitted_on, last_collated_or_synced_to) {
1866 (Some(last_queue_comitted_on), Some(last_collated_or_synced_to)) => {
1867 if last_collated_or_synced_to.seqno >= last_queue_comitted_on.seqno {
1870 Some(last_collated_or_synced_to)
1871 } else {
1872 Some(last_queue_comitted_on)
1873 }
1874 }
1875 (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id),
1876 _ => None,
1877 };
1878
1879 Ok(mc_block_id)
1880 }
1881
1882 #[tracing::instrument(skip_all)]
1887 fn get_top_mc_block_id_for_next_collation(
1888 &self,
1889 last_collated_mc_block_id: Option<BlockId>,
1890 ) -> Option<BlockId> {
1891 let last_synced_to_mc_block_id = self.get_last_synced_to_mc_block_id();
1892
1893 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1894 ?last_synced_to_mc_block_id,
1895 ?last_collated_mc_block_id,
1896 );
1897
1898 match (last_synced_to_mc_block_id, last_collated_mc_block_id) {
1899 (Some(last_synced_to), Some(last_collated)) => {
1900 if last_synced_to.seqno > last_collated.seqno {
1901 Some(last_synced_to)
1902 } else {
1903 Some(last_collated)
1904 }
1905 }
1906 (Some(mc_block_id), _) | (_, Some(mc_block_id)) => Some(mc_block_id),
1907 _ => None,
1908 }
1909 }
1910
1911 #[tracing::instrument(skip_all, fields(from_mc_block_seqno))]
1912 async fn restore_queue(
1913 blocks_cache: &BlocksCache,
1914 state_node_adapter: Arc<dyn StateNodeAdapter>,
1915 mq_adapter: Arc<dyn MessageQueueAdapter<EnqueuedMessage>>,
1916 from_mc_block_seqno: u32,
1917 min_processed_to_by_shards: BTreeMap<ShardIdent, QueueKey>,
1918 before_tail_block_ids: BTreeMap<ShardIdent, (Option<BlockId>, Vec<BlockId>)>,
1919 queue_diffs_applied_to_top_blocks: FastHashMap<ShardIdent, u32>,
1920 ) -> Result<RestoreQueueResult> {
1921 let mut res = RestoreQueueResult::default();
1922
1923 let init_mc_block_id = state_node_adapter.load_init_block_id();
1925 let mut init_mc_block_reached_on = FastHashMap::new();
1926
1927 let mut first_required_diffs = FastHashMap::new();
1929 for (shard_id, min_processed_to) in &min_processed_to_by_shards {
1930 let mut prev_queue_diffs = vec![];
1931 let Some((_, prev_block_ids)) = before_tail_block_ids.get(shard_id) else {
1932 continue;
1933 };
1934 let mut prev_block_ids: VecDeque<_> = prev_block_ids.iter().cloned().collect();
1935
1936 while let Some(prev_block_id) = prev_block_ids.pop_front() {
1937 if prev_block_id.seqno == 0
1944 || prev_block_id.is_masterchain()
1945 && prev_block_id.seqno <= state_node_adapter.zerostate_id().seqno
1946 {
1947 continue;
1948 }
1949
1950 if let Some(border) = queue_diffs_applied_to_top_blocks.get(shard_id)
1952 && prev_block_id.seqno <= *border
1953 {
1954 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1955 prev_block_id = %prev_block_id.as_short_id(),
1956 top_applied_seqno = border,
1957 "previous queue diff skipped because it below top applied",
1958 );
1959 continue;
1960 }
1961
1962 if let Some(init_mc_block_id) = init_mc_block_id {
1965 let mut skip_diff = false;
1966 if prev_block_id.is_masterchain() {
1967 if prev_block_id.seqno <= init_mc_block_id.seqno {
1968 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1969 prev_block_id = %prev_block_id.as_short_id(),
1970 init_mc_block_id = %init_mc_block_id.as_short_id(),
1971 "master block queue diff apply skipped because it is below init block from persistent state",
1972 );
1973 skip_diff = true;
1974 }
1975 } else {
1976 let mut init_mc_block_reached = matches!(
1978 init_mc_block_reached_on.get(&prev_block_id.shard),
1979 Some(reached_seqno) if prev_block_id.seqno <= *reached_seqno,
1980 );
1981 if !init_mc_block_reached {
1982 let prev_ref_by_mc_seqno = state_node_adapter
1984 .get_ref_by_mc_seqno(&prev_block_id)
1985 .await?
1986 .unwrap();
1987 init_mc_block_reached = prev_ref_by_mc_seqno <= init_mc_block_id.seqno;
1988 if init_mc_block_reached {
1989 init_mc_block_reached_on
1990 .insert(prev_block_id.shard, prev_block_id.seqno);
1991 }
1992 }
1993 if init_mc_block_reached {
1994 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
1995 prev_block_id = %prev_block_id.as_short_id(),
1996 init_mc_block_id = %init_mc_block_id.as_short_id(),
1997 "shard block queue diff apply skipped because it is below init block from persistent state",
1998 );
1999 skip_diff = true;
2000 }
2001 }
2002 if skip_diff {
2003 first_required_diffs.insert(prev_block_id.shard, BlockId::default());
2006 continue;
2007 }
2008 }
2009
2010 let Some(queue_diff_stuff) = state_node_adapter.load_diff(&prev_block_id).await?
2012 else {
2013 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2014 prev_block_id = %prev_block_id,
2015 "unable to load prev diff to sync queue state, cancel sync",
2016 );
2017
2018 for shard in before_tail_block_ids.keys() {
2020 let labels = [("workchain", shard.workchain().to_string())];
2021 metrics::gauge!("tycho_collator_sync_is_running", &labels).set(0);
2022 }
2023
2024 return Ok(res);
2025 };
2026 let diff_required = &queue_diff_stuff.as_ref().max_message > min_processed_to;
2027 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2028 diff_block_id = %prev_block_id.as_short_id(),
2029 diff_required,
2030 max_message = %queue_diff_stuff.as_ref().max_message,
2031 min_processed_to = %min_processed_to,
2032 "check if diff required to restore queue working state on sync:",
2033 );
2034 if diff_required {
2035 first_required_diffs.insert(prev_block_id.shard, prev_block_id);
2038
2039 let block_stuff = state_node_adapter
2040 .load_block(&prev_block_id)
2041 .await?
2042 .unwrap();
2043 let out_msgs = block_stuff.load_extra()?.out_msg_description.load()?;
2044
2045 let queue_diff_with_messages =
2046 QueueDiffWithMessages::from_queue_diff(&queue_diff_stuff, &out_msgs)?;
2047
2048 prev_queue_diffs.push((
2049 queue_diff_with_messages,
2050 *queue_diff_stuff.diff_hash(),
2051 prev_block_id,
2052 queue_diff_stuff.as_ref().min_message,
2053 queue_diff_stuff.as_ref().max_message,
2054 ));
2055
2056 let prev_ids_info = block_stuff.construct_prev_id()?;
2057 prev_block_ids.push_back(prev_ids_info.0);
2058 if let Some(id) = prev_ids_info.1 {
2059 prev_block_ids.push_back(id);
2060 }
2061 }
2062 }
2063
2064 while let Some((diff, diff_hash, block_id, min_message, max_message)) =
2066 prev_queue_diffs.pop()
2067 {
2068 let statistics =
2069 DiffStatistics::from_diff(&diff, block_id.shard, min_message, max_message);
2070
2071 let check_sequence = match first_required_diffs.get(&block_id.shard) {
2073 Some(id) if *id == block_id => None,
2074 _ => Some(DiffZone::Both),
2075 };
2076
2077 mq_adapter
2078 .apply_diff(
2079 diff,
2080 block_id.as_short_id(),
2081 &diff_hash,
2082 statistics,
2083 check_sequence,
2084 )
2085 .context("sync_to_applied_mc_block")?;
2086
2087 res.applied_diffs_ids.insert(block_id);
2088 }
2089 }
2090
2091 let mut prev_mc_state = None;
2093 let mut last_mc_state;
2094
2095 loop {
2098 let (mc_block_subgraph_extract, is_last) =
2102 blocks_cache.pop_front_applied_mc_block_subgraph(from_mc_block_seqno)?;
2103
2104 let subgraph = match mc_block_subgraph_extract {
2105 McBlockSubgraphExtract::Extracted(subgraph) => subgraph,
2106 McBlockSubgraphExtract::AlreadyExtracted => {
2107 bail!("mc block subgraph extract result cannot be AlreadyExtracted")
2108 }
2109 };
2110
2111 let mc_block_entry = &subgraph.master_block;
2112
2113 if subgraph.master_block.block_id.seqno > state_node_adapter.zerostate_id().seqno {
2116 for block_entry in [mc_block_entry]
2117 .into_iter()
2118 .chain(subgraph.shard_blocks.iter())
2119 {
2120 if let Some(border) =
2122 queue_diffs_applied_to_top_blocks.get(&block_entry.block_id.shard)
2123 && block_entry.block_id.seqno <= *border
2124 {
2125 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2126 received_block_id = %block_entry.block_id.as_short_id(),
2127 top_applied_seqno = border,
2128 "queue diff apply skipped because it is below top applied",
2129 );
2130 continue;
2131 }
2132
2133 let min_processed_to =
2134 min_processed_to_by_shards.get(&block_entry.block_id.shard);
2135
2136 if let Some(applied_diff_block_id) =
2137 Self::apply_block_queue_diff_from_entry_stuff(
2138 state_node_adapter.as_ref(),
2139 mq_adapter.clone(),
2140 block_entry,
2141 min_processed_to,
2142 &mut first_required_diffs,
2143 )?
2144 {
2145 res.applied_diffs_ids.insert(applied_diff_block_id);
2146 }
2147 }
2148 }
2149
2150 let to_blocks_keys = mc_block_entry.get_top_blocks_keys()?;
2152 blocks_cache.set_gc_to_boundary(&to_blocks_keys);
2153
2154 last_mc_state = mc_block_entry.cached_state()?.clone();
2155
2156 if is_last {
2158 let partitions = subgraph.get_partitions();
2159 Self::commit_block_queue_diff(
2160 mq_adapter.clone(),
2161 &mc_block_entry.block_id,
2162 &mc_block_entry.top_shard_blocks_info,
2163 &partitions,
2164 )?;
2165
2166 let top_shards = blocks_cache.get_last_top_shards();
2170 mq_adapter.clear_uncommitted_state(&top_shards)?;
2171
2172 res.last_mc_state = Some(last_mc_state);
2173 res.prev_mc_state = prev_mc_state;
2174 res.prev_mc_block_id = mc_block_entry.prev_blocks_ids.first().copied();
2175 res.synced_to_blocks_keys.extend(to_blocks_keys.into_iter());
2176
2177 return Ok(res);
2178 }
2179
2180 prev_mc_state = Some(last_mc_state.clone());
2181 }
2182 }
2183
2184 fn get_last_processed_mc_block_id(&self) -> Option<BlockId> {
2185 *self.last_processed_mc_block_id.lock()
2186 }
2187
2188 fn check_should_process_and_update_last_processed_mc_block(&self, block_id: &BlockId) -> bool {
2189 let mut guard = self.last_processed_mc_block_id.lock();
2190 let last_processed_mc_block_id_opt = guard.as_ref();
2191 let (seqno_delta, is_equal) =
2192 Self::compare_mc_block_with(block_id, last_processed_mc_block_id_opt);
2193 if seqno_delta < 0 || is_equal {
2194 false
2195 } else {
2196 guard.replace(*block_id);
2197 true
2198 }
2199 }
2200
2201 fn compare_mc_block_with(
2204 mc_block_id: &BlockId,
2205 other_mc_block_id_opt: Option<&BlockId>,
2206 ) -> (i32, bool) {
2207 let (seqno_delta, is_equal) = match other_mc_block_id_opt {
2208 None => {
2209 tracing::info!(
2210 target: tracing_targets::COLLATION_MANAGER,
2211 "other mc block is None: current {} other ({:?}): is_equal = false, seqno_delta = 0",
2212 mc_block_id.as_short_id(),
2213 other_mc_block_id_opt.map(|b| b.as_short_id()),
2214 );
2215 (0, false)
2216 }
2217 Some(other_mc_block_id) => (
2218 mc_block_id.seqno as i32 - other_mc_block_id.seqno as i32,
2219 mc_block_id == other_mc_block_id,
2220 ),
2221 };
2222 if seqno_delta < 0 || is_equal {
2223 tracing::info!(
2224 target: tracing_targets::COLLATION_MANAGER,
2225 "mc block is NOT AHEAD of other: current {} other ({:?}): is_equal = {}, seqno_delta = {}",
2226 mc_block_id.as_short_id(),
2227 other_mc_block_id_opt.map(|b| b.as_short_id()),
2228 is_equal, seqno_delta,
2229 );
2230 }
2231 (seqno_delta, is_equal)
2232 }
2233
2234 async fn process_mc_state_update(
2235 &self,
2236 mc_data: Arc<McData>,
2237 mode: ProcessMcStateUpdateMode,
2238 ) -> Result<()> {
2239 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2240 ?mode,
2241 "will process master state update",
2242 );
2243
2244 if let ProcessMcStateUpdateMode::StartCollation { reset_collators } = mode {
2245 let block_global = mc_data.config.get_global_version()?;
2246 if self.config.supported_block_version >= block_global.version
2247 && block_global
2248 .capabilities
2249 .is_subset_of(self.config.supported_capabilities)
2250 {
2251 self.refresh_collation_sessions(mc_data, reset_collators)
2252 .await?;
2253 } else {
2254 tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
2255 collator_supported_block_version = self.config.supported_block_version,
2256 mc_block_version = block_global.version,
2257 collator_supported_capabilities = ?self.config.supported_capabilities,
2258 mc_block_capabilities = ?block_global.capabilities,
2259 "Refresh collation sessions is skipped: collator does not support mc block version or capabilities",
2260 );
2261 }
2262 }
2263
2264 Ok(())
2265 }
2266
2267 async fn notify_to_mempool_and_process_delayed_mc_state_update(
2269 &self,
2270 block_id: &BlockId,
2271 ) -> Result<Option<MempoolAnchorId>> {
2272 let mut delayed_mc_data = None;
2273 {
2274 let mut guard = self.delayed_mc_state_update.lock();
2275 if let Some(mc_data) = guard.clone()
2276 && mc_data.block_id <= *block_id
2277 {
2278 if mc_data.block_id == *block_id {
2280 delayed_mc_data = Some(mc_data);
2281 }
2282
2283 *guard = None;
2285 }
2286 }
2287 if let Some(mc_data) = delayed_mc_data {
2288 self.notify_mc_state_update_to_mempool(mc_data.clone())
2289 .await?;
2290 self.process_mc_state_update(
2291 mc_data.clone(),
2292 ProcessMcStateUpdateMode::StartCollation {
2293 reset_collators: false,
2294 },
2295 )
2296 .await?;
2297
2298 Ok(Some(mc_data.top_processed_to_anchor))
2299 } else {
2300 Ok(None)
2301 }
2302 }
2303
2304 async fn notify_mc_state_update_to_mempool(&self, mc_data: Arc<McData>) -> Result<()> {
2305 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2306 block_id = %mc_data.block_id.as_short_id(),
2307 "will notify master state update to mempool",
2308 );
2309
2310 let prev_validator_set = self
2311 .validator_set_cache
2312 .get_prev_validator_set(&mc_data.config)?;
2313 let current_validator_set = self
2314 .validator_set_cache
2315 .get_current_validator_set(&mc_data.config)?;
2316 let next_validator_set = self
2317 .validator_set_cache
2318 .get_next_validator_set(&mc_data.config)?;
2319
2320 let cx = Box::new(StateUpdateContext {
2321 mc_block_id: mc_data.block_id,
2322 mc_block_chain_time: mc_data.gen_chain_time,
2323 top_processed_to_anchor_id: mc_data.top_processed_to_anchor,
2324 consensus_info: mc_data.consensus_info,
2325 shuffle_validators: mc_data.config.get_collation_config()?.shuffle_mc_validators,
2326 consensus_config: mc_data.config.get_consensus_config()?,
2327 prev_validator_set,
2328 current_validator_set,
2329 next_validator_set,
2330 });
2331
2332 self.mpool_adapter.handle_mc_state_update(cx).await
2333 }
2334
2335 #[tracing::instrument(skip_all)]
2339 async fn refresh_collation_sessions(
2340 &self,
2341 mc_data: Arc<McData>,
2342 reset_collators: bool,
2343 ) -> Result<()> {
2344 tracing::debug!(
2345 target: tracing_targets::COLLATION_MANAGER,
2346 "Start refresh collation sessions by mc state ({})...",
2347 mc_data.block_id.as_short_id(),
2348 );
2349
2350 let _histogram = HistogramGuard::begin("tycho_collator_refresh_collation_sessions_time");
2351
2352 if !self.check_should_process_and_update_last_processed_mc_block(&mc_data.block_id) {
2355 return Ok(());
2356 }
2357
2358 tracing::trace!(target: tracing_targets::COLLATION_MANAGER, "mc_data: {:?}", mc_data);
2359
2360 let mut new_shards_info = FastHashMap::default();
2362 new_shards_info.insert(ShardIdent::MASTERCHAIN, vec![mc_data.block_id]);
2363 for (shard_id, descr) in mc_data.shards.iter() {
2364 let top_block_id = descr.get_block_id(*shard_id);
2365 new_shards_info.insert(*shard_id, vec![top_block_id]);
2367 }
2368
2369 let active_shards_ids: Vec<_> = self
2371 .active_collation_sessions
2372 .read()
2373 .keys()
2374 .cloned()
2375 .collect();
2376 let new_shards_ids: Vec<&ShardIdent> = new_shards_info.keys().collect();
2377 tracing::debug!(
2378 target: tracing_targets::COLLATION_MANAGER,
2379 "Detecting split/merge actions to move from current shards {:?} to new shards {:?}...",
2380 active_shards_ids.as_slice(),
2381 new_shards_ids
2382 );
2383
2384 let split_merge_actions = calc_split_merge_actions(&active_shards_ids, new_shards_ids)?;
2385 if !split_merge_actions.is_empty() {
2386 tracing::info!(
2387 target: tracing_targets::COLLATION_MANAGER,
2388 "Detected split/merge actions: {:?}",
2389 split_merge_actions,
2390 );
2391 }
2393
2394 let current_session_seqno = mc_data.validator_info.catchain_seqno;
2396
2397 let full_validators_set = mc_data.config.get_current_validator_set()?;
2399 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2400 "full_validators_set: since={}, until={}, main={}, total_weight={}, list={:?}",
2401 full_validators_set.utime_since, full_validators_set.utime_until,
2402 full_validators_set.main, full_validators_set.total_weight,
2403 DebugIter(full_validators_set.list.iter().map(|i| i.public_key)),
2404 );
2405 let collation_config = mc_data.config.get_collation_config()?;
2406 let mut subset_cache = FastHashMap::new();
2407 let mut get_validator_subset = |shard_id| match subset_cache.entry(shard_id) {
2408 hash_map::Entry::Occupied(entry) => {
2409 let (subset, hash_short): &(Arc<FastHashMap<[u8; 32], ValidatorDescription>>, u32) =
2410 entry.get();
2411 Result::<_>::Ok((subset.clone(), *hash_short))
2412 }
2413 hash_map::Entry::Vacant(entry) => {
2414 let (subset, hash_short) = full_validators_set
2415 .compute_mc_subset(current_session_seqno, collation_config.shuffle_mc_validators)
2416 .ok_or_else(|| anyhow!(
2417 "Error calculating subset of validators for session (shard_id = {}, seqno = {})",
2418 ShardIdent::MASTERCHAIN,
2419 current_session_seqno,
2420 ))?;
2421
2422 let subset: FastHashMap<_, _> = subset
2423 .into_iter()
2424 .map(|vldr| (vldr.public_key.into(), vldr))
2425 .collect();
2426 let subset = Arc::new(subset);
2427
2428 entry.insert((subset.clone(), hash_short));
2429 Ok((subset, hash_short))
2430 }
2431 };
2432
2433 let mut sessions_to_keep = Vec::new();
2435 let mut sessions_to_start = Vec::new();
2436 let mut to_finish_sessions = Vec::new();
2437 let mut to_stop_collators = Vec::new();
2438 {
2439 let mut active_collation_sessions_guard = self.active_collation_sessions.write();
2440 let mut missed_shards_ids: FastHashSet<_> = active_shards_ids.into_iter().collect();
2441 for (shard_id, block_ids) in new_shards_info {
2442 missed_shards_ids.remove(&shard_id);
2443
2444 let (subset, hash_short) = get_validator_subset(shard_id)?;
2446 let local_pubkey = find_us_in_collators_set(&self.keypair, &subset);
2447
2448 if local_pubkey.is_none() {
2449 tracing::debug!(
2450 target: tracing_targets::COLLATION_MANAGER,
2451 public_key = %self.keypair.public_key,
2452 current_session_seqno,
2453 hash_short,
2454 "Current node was not authorized to collate shard {}. Use TRACE to see subset",
2455 shard_id,
2456 );
2457 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2458 subset = ?DebugIter(subset.values()),
2459 "Current node was not authorized to collate shard {}",
2460 shard_id,
2461 );
2462 metrics::gauge!("tycho_node_in_current_vset").set(0);
2463 } else {
2464 metrics::gauge!("tycho_node_in_current_vset").set(1);
2465 }
2466
2467 match active_collation_sessions_guard.entry(shard_id) {
2468 hash_map::Entry::Occupied(entry) => {
2469 let existing_session_info = entry.get().clone();
2470 if local_pubkey.is_some() {
2471 if existing_session_info.collators().short_hash == hash_short
2473 && existing_session_info.seqno() == current_session_seqno
2474 {
2475 sessions_to_keep.push((shard_id, existing_session_info, block_ids));
2476 } else {
2477 to_finish_sessions.push(entry.remove());
2478 sessions_to_start.push((shard_id, block_ids));
2479 }
2480 } else {
2481 to_finish_sessions.push(entry.remove());
2482 if let Some((_, collator)) = self.active_collators.remove(&shard_id) {
2483 to_stop_collators.push((existing_session_info, collator));
2484 }
2485 }
2486 }
2487 hash_map::Entry::Vacant(_) => {
2488 if local_pubkey.is_some() {
2489 sessions_to_start.push((shard_id, block_ids));
2490 }
2491 }
2492 }
2493 }
2494
2495 for shard_id in missed_shards_ids {
2498 if let Some(existing_session_info) =
2499 active_collation_sessions_guard.remove(&shard_id)
2500 {
2501 to_finish_sessions.push(existing_session_info.clone());
2502 if let Some((_, collator)) = self.active_collators.remove(&shard_id) {
2503 to_stop_collators.push((existing_session_info, collator));
2504 }
2505 }
2506 }
2507 }
2508
2509 if !sessions_to_start.is_empty() {
2510 tracing::info!(
2511 target: tracing_targets::COLLATION_MANAGER,
2512 "Will start new collation sessions: {:?}",
2513 DebugIter(sessions_to_start.iter().map(|(s, _)| (s, current_session_seqno))),
2514 );
2515 }
2516
2517 for (shard_id, prev_blocks_ids) in sessions_to_start {
2520 let (subset, hash_short) = get_validator_subset(shard_id)?;
2521
2522 let new_session_info = Arc::new(CollationSessionInfo::new(
2523 shard_id,
2524 current_session_seqno,
2525 ValidatorSubsetInfo {
2526 validators: subset.values().cloned().collect(),
2527 short_hash: hash_short,
2528 },
2529 Some(self.keypair.clone()),
2530 ));
2531
2532 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2533 "new_session_info: {:?}",
2534 new_session_info,
2535 );
2536
2537 let next_block_id_short = calc_next_block_id_short(&prev_blocks_ids);
2538
2539 match self.active_collators.entry(shard_id) {
2540 DashMapEntry::Occupied(_) => {
2541 tracing::info!(
2542 target: tracing_targets::COLLATION_MANAGER,
2543 "Active collator exists for collation session {:?}. Will resume it",
2544 new_session_info.id(),
2545 );
2546 sessions_to_keep.push((shard_id, new_session_info.clone(), prev_blocks_ids));
2547 }
2548 DashMapEntry::Vacant(entry) => {
2549 tracing::info!(
2550 target: tracing_targets::COLLATION_MANAGER,
2551 "There is no active collator for collation session {:?}. Will start it",
2552 new_session_info.id(),
2553 );
2554
2555 let cancel_collation_notify = Arc::new(Notify::new());
2556
2557 match self
2558 .collator_factory
2559 .start(CollatorContext {
2560 mq_adapter: self.mq_adapter.clone(),
2561 mpool_adapter: self.mpool_adapter.clone(),
2562 state_node_adapter: self.state_node_adapter.clone(),
2563 config: self.config.clone(),
2564 collation_session: new_session_info.clone(),
2565 listener: self.dispatcher.clone(),
2566 zerostate_id: *self.state_node_adapter.zerostate_id(),
2567 shard_id,
2568 prev_blocks_ids,
2569 mc_data: mc_data.clone(),
2570 mempool_config_override: self.mempool_config_override.clone(),
2571 cancel_collation: cancel_collation_notify.clone(),
2572 })
2573 .await
2574 {
2575 Err(err) => {
2576 tracing::error!(target: tracing_targets::COLLATION_MANAGER,
2577 session_id = ?new_session_info.id(),
2578 ?err,
2579 "error starting collator"
2580 );
2581 }
2582 Ok(collator) => {
2583 entry.insert(ActiveCollator {
2584 collator: Arc::new(collator),
2585 state: CollatorState::Active,
2586 cancel_collation: cancel_collation_notify,
2587 });
2588 }
2589 }
2590 }
2591 }
2592
2593 if shard_id.is_masterchain() {
2595 self.validator.add_session(AddSession {
2596 shard_ident: shard_id,
2597 session_id: new_session_info.get_validation_session_id(),
2598 start_block_seqno: next_block_id_short.seqno,
2599 validators: &new_session_info.collators().validators,
2600 })?;
2601 }
2602
2603 self.active_collation_sessions
2604 .write()
2605 .insert(shard_id, new_session_info);
2606 }
2607
2608 tracing::debug!(
2609 target: tracing_targets::COLLATION_MANAGER,
2610 "Will keep existing collation sessions: {:?}",
2611 DebugIter(sessions_to_keep.iter().map(|(_, s, _)| s.id())),
2612 );
2613
2614 for (shard_id, new_session_info, prev_blocks_ids) in sessions_to_keep {
2616 let collator = {
2617 let Some(mut active_collator) = self.active_collators.get_mut(&shard_id) else {
2618 bail!(
2619 "Collator for shard should exist for active session {:?}",
2620 new_session_info.id(),
2621 )
2622 };
2623 active_collator.state = CollatorState::Active;
2624 active_collator.collator.clone()
2625 };
2626
2627 tracing::debug!(
2628 target: tracing_targets::COLLATION_MANAGER,
2629 "Resuming collation attempts in shard session {:?}",
2630 new_session_info.id(),
2631 );
2632 collator
2633 .enqueue_resume_collation(
2634 mc_data.clone(),
2635 reset_collators,
2636 new_session_info,
2637 prev_blocks_ids,
2638 )
2639 .await?;
2640 }
2641
2642 if !to_finish_sessions.is_empty() {
2643 tracing::info!(
2644 target: tracing_targets::COLLATION_MANAGER,
2645 "Will finish outdated collation sessions: {:?}",
2646 DebugIter(to_finish_sessions.iter().map(|s| s.id())),
2647 );
2648 }
2649
2650 for session_info in to_finish_sessions {
2652 self.collation_sessions_to_finish
2653 .insert(session_info.id(), session_info.clone());
2654 self.finish_collation_session(session_info)?;
2655 }
2656
2657 if !to_stop_collators.is_empty() {
2658 tracing::info!(
2659 target: tracing_targets::COLLATION_MANAGER,
2660 "Will stop collators for sessions that we do not serve: {:?}",
2661 DebugIter(to_stop_collators.iter().map(|(s, _)| s.id())),
2662 );
2663 }
2664
2665 for (session_info, active_collator) in to_stop_collators {
2667 let collator = active_collator.collator.clone();
2668 self.collators_to_stop
2669 .insert(session_info.id(), active_collator);
2670 collator.enqueue_stop().await?;
2671 }
2672
2673 Ok(())
2674
2675 }
2678
2679 pub fn finish_collation_session(
2681 &self,
2682 collation_session: Arc<CollationSessionInfo>,
2683 ) -> Result<()> {
2684 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2685 "finish_collation_session: {:?}", collation_session.id(),
2686 );
2687 self.collation_sessions_to_finish
2688 .remove(&collation_session.id());
2689 Ok(())
2690 }
2691
2692 pub fn handle_collator_stopped(&self, collation_session_id: CollationSessionId) -> Result<()> {
2694 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
2695 "handle_collator_stopped: {:?}", collation_session_id,
2696 );
2697 self.collators_to_stop.remove(&collation_session_id);
2698 Ok(())
2699 }
2700
2701 fn set_collator_state<F>(&self, shard_id: &ShardIdent, f: F) -> Option<CollatorState>
2702 where
2703 F: Fn(&mut ActiveCollator<Arc<CF::Collator>>),
2704 {
2705 match self.active_collators.get_mut(shard_id) {
2706 Some(mut active_collator) => {
2707 f(&mut active_collator);
2708 Some(active_collator.state)
2709 }
2710 None => None,
2711 }
2712 }
2713
2714 fn set_active_sync_info(&self, target_mc_block_seqno: BlockSeqno) -> Result<CancellationToken> {
2715 let mut guard = self.collation_sync_state.lock();
2716 if let Some(active_sync) = &guard.active_sync_to_applied {
2717 bail!(
2718 "previous sync_to_applied_mc_block should be finished \
2719 before: previous seqno={}, target seqno={}",
2720 active_sync.target_mc_block_seqno,
2721 target_mc_block_seqno,
2722 )
2723 }
2724
2725 let cancelled = CancellationToken::new();
2726 guard.active_sync_to_applied = Some(ActiveSync {
2727 target_mc_block_seqno,
2728 cancelled: cancelled.clone(),
2729 });
2730
2731 Ok(cancelled)
2732 }
2733
2734 fn clean_active_sync_info(&self) {
2735 let mut guard = self.collation_sync_state.lock();
2736 guard.active_sync_to_applied = None;
2737 }
2738
2739 fn update_last_received_mc_block_seqno(&self, received_block_id: &BlockId) {
2740 if !received_block_id.is_masterchain() {
2741 return;
2742 }
2743
2744 let mut guard = self.collation_sync_state.lock();
2745 guard.last_received_mc_block_seqno = Some(received_block_id.seqno);
2746 }
2747
2748 fn get_last_received_mc_block_seqno(&self) -> Option<BlockSeqno> {
2749 let guard = self.collation_sync_state.lock();
2750 guard.last_received_mc_block_seqno
2751 }
2752
2753 fn update_last_synced_to_mc_block_id(&self, mc_block_id: BlockId) {
2754 let mut guard = self.collation_sync_state.lock();
2755 guard.last_synced_to_mc_block_id = Some(mc_block_id);
2756 }
2757
2758 fn get_last_synced_to_mc_block_id(&self) -> Option<BlockId> {
2759 let guard = self.collation_sync_state.lock();
2760 guard.last_synced_to_mc_block_id
2761 }
2762
2763 fn finish_active_sync_to_applied(&self, received_block_id: &BlockId) -> bool {
2765 if !received_block_id.is_masterchain() {
2767 return false;
2768 }
2769
2770 let guard = self.collation_sync_state.lock();
2771
2772 if let Some(active_sync_info) = &guard.active_sync_to_applied {
2774 if received_block_id
2776 .seqno
2777 .saturating_sub(active_sync_info.target_mc_block_seqno)
2778 >= self.config.min_mc_block_delta_from_bc_to_sync
2779 {
2780 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
2781 prev_target_block_id = %BlockIdShort {
2782 shard: ShardIdent::MASTERCHAIN,
2783 seqno: active_sync_info.target_mc_block_seqno,
2784 },
2785 "cancel sync: will force finish previous sync \
2786 to applied master block if not started to process state update",
2787 );
2788 active_sync_info.cancelled.cancel();
2789 return true;
2790 } else {
2791 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2793 prev_target_block_id = %BlockIdShort {
2794 shard: ShardIdent::MASTERCHAIN,
2795 seqno: active_sync_info.target_mc_block_seqno,
2796 },
2797 "cancel sync: will not force finish previous sync \
2798 to applied master block, because new block is not far ahead",
2799 );
2800 }
2801 } else {
2802 tracing::trace!(target: tracing_targets::COLLATION_MANAGER,
2803 "cancel sync: route_handle_block_from_bc: no active sync to cancel",
2804 );
2805 }
2806
2807 false
2808 }
2809
2810 fn renew_mc_block_latest_chain_time(guard: &mut CollationSyncState, chain_time: u64) {
2813 if guard.mc_block_latest_chain_time < chain_time {
2814 guard.mc_block_latest_chain_time = chain_time;
2815 }
2816
2817 for (_, collation_state) in guard.states.iter_mut() {
2819 collation_state
2820 .last_imported_anchor_events
2821 .retain(|it| it.ct > chain_time);
2822 }
2823 }
2824
2825 fn reset_collation_sync_status(guard: &mut CollationSyncState) {
2832 for (_, collation_state) in guard.states.iter_mut() {
2833 if collation_state.status == CollationStatus::WaitForMasterStatus {
2834 collation_state.status = CollationStatus::AttemptsInProgress;
2835 }
2836 }
2837 }
2838
2839 #[tracing::instrument(skip_all)]
2842 fn detect_next_collation_step(
2843 guard: &mut CollationSyncState,
2844 active_shards: Vec<ShardIdent>,
2845 shard_id: ShardIdent,
2846 ctx: DetectNextCollationStepContext,
2847 ) -> NextCollationStep {
2848 let DetectNextCollationStepContext {
2849 last_imported_anchor_ct,
2850 force_mc_block,
2851 mc_block_min_interval_ms,
2852 mc_block_max_interval_ms,
2853 collated_block_info,
2854 } = ctx;
2855
2856 let mc_block_max_interval_ms = if mc_block_max_interval_ms == 0 {
2858 mc_block_min_interval_ms
2859 } else {
2860 mc_block_max_interval_ms
2861 };
2862
2863 let _histogram = HistogramGuard::begin("detect_next_collation_step_time");
2864 assert!(
2865 active_shards.contains(&shard_id),
2866 "active_shards must include current shard"
2867 );
2868
2869 let mc_block_latest_chain_time = guard.mc_block_latest_chain_time;
2870
2871 let mc_collation_state_exist = shard_id.is_masterchain()
2873 || guard
2874 .states
2875 .get(&ShardIdent::MASTERCHAIN)
2876 .is_some_and(|state| !state.last_imported_anchor_events.is_empty());
2877
2878 if shard_id.is_masterchain()
2880 && matches!(force_mc_block, ForceMasterCollation::ByUnprocessedMessages)
2881 {
2882 guard.mc_collation_forced_for_all = true;
2883 }
2884
2885 if matches!(
2887 force_mc_block,
2888 ForceMasterCollation::NoPendingMessagesAfterShardBlocks
2889 ) {
2890 guard.mc_forced_by_no_pending_msgs_on_ct = Some(last_imported_anchor_ct);
2891 }
2892
2893 let hard_forced_for_all = guard.mc_collation_forced_for_all;
2894 let mc_forced_by_no_pending_msgs_on_ct = guard.mc_forced_by_no_pending_msgs_on_ct;
2895
2896 let forced_in_current_shard = force_mc_block.is_forced();
2898
2899 tracing::trace!(
2901 target: tracing_targets::COLLATION_MANAGER,
2902 shard_id=?shard_id,
2903 last_imported_anchor_ct,
2904 forced_in_current_shard,
2905 ?collated_block_info,
2906 "anchor event appended"
2907 );
2908 let current_collation_state = guard.states.entry(shard_id).or_default();
2909 current_collation_state
2910 .last_imported_anchor_events
2911 .push(ImportedAnchorEvent {
2912 ct: last_imported_anchor_ct,
2913 mc_forced: forced_in_current_shard,
2914 collated_block_info,
2915 });
2916
2917 #[derive(Debug, Clone)]
2919 struct ShardFact {
2920 shard_id: ShardIdent,
2921 status: CollationStatus, first_ct: Option<u64>, mc_forced_ct: Option<u64>, min_ct: Option<u64>, max_ct: Option<u64>, has_shard_block_with_externals: bool, }
2928 impl ShardFact {
2929 fn with_status(shard_id: ShardIdent, status: CollationStatus) -> Self {
2930 Self {
2931 shard_id,
2932 status,
2933 first_ct: None,
2934 mc_forced_ct: None,
2935 min_ct: None,
2936 max_ct: None,
2937 has_shard_block_with_externals: false,
2938 }
2939 }
2940 fn calc(
2941 shard_id: ShardIdent,
2942 state: &CollationState,
2943 mc_ct: u64,
2944 min_interval_ms: u64,
2945 max_interval_ms: u64,
2946 ) -> Self {
2947 let mut fact = Self::with_status(shard_id, state.status);
2948
2949 let mut last_known_collated_block_info: Option<CollatedBlockInfo> = None;
2950
2951 for s in &state.last_imported_anchor_events {
2952 let mut is_first_shard_block_with_externals = false;
2954 if let Some(curr_b_info) = s.collated_block_info {
2955 let is_first_after_prev_master = match &last_known_collated_block_info {
2957 Some(last_b_info)
2958 if last_b_info.prev_mc_block_seqno
2959 < curr_b_info.prev_mc_block_seqno =>
2960 {
2961 true
2962 }
2963 None => true,
2964 _ => false,
2965 };
2966
2967 if !shard_id.is_masterchain() {
2968 if is_first_after_prev_master {
2970 fact.has_shard_block_with_externals = false;
2971 }
2972
2973 is_first_shard_block_with_externals = curr_b_info
2975 .has_processed_externals
2976 && !fact.has_shard_block_with_externals;
2977 if is_first_shard_block_with_externals {
2978 fact.has_shard_block_with_externals = true;
2979 }
2980 }
2981
2982 last_known_collated_block_info = Some(curr_b_info);
2983 }
2984
2985 if fact.first_ct.is_none() {
2986 fact.first_ct = Some(s.ct);
2987 }
2988
2989 if s.mc_forced && fact.mc_forced_ct.is_none() {
2990 fact.mc_forced_ct = Some(s.ct);
2991 }
2992
2993 if (fact.min_ct.is_none() || is_first_shard_block_with_externals)
2996 && s.ct.saturating_sub(mc_ct) >= min_interval_ms
2997 {
2998 fact.min_ct = Some(s.ct);
2999 }
3000
3001 if fact.max_ct.is_none() && s.ct.saturating_sub(mc_ct) >= max_interval_ms {
3003 fact.max_ct = Some(s.ct);
3004 }
3005 }
3006
3007 fact
3008 }
3009 }
3010
3011 let mut facts = Vec::with_capacity(active_shards.len());
3013 for sid in &active_shards {
3014 if let Some(st) = guard.states.get(sid) {
3015 let f = ShardFact::calc(
3016 *sid,
3017 st,
3018 mc_block_latest_chain_time,
3019 mc_block_min_interval_ms,
3020 mc_block_max_interval_ms,
3021 );
3022 facts.push(f);
3023 } else {
3024 facts.push(ShardFact::with_status(
3025 *sid,
3026 CollationStatus::AttemptsInProgress,
3027 ));
3028 }
3029 }
3030
3031 tracing::debug!(
3032 target: tracing_targets::COLLATION_MANAGER,
3033 mc_block_latest_chain_time,
3034 mc_block_min_interval_ms,
3035 mc_block_max_interval_ms,
3036 hard_forced_for_all,
3037 mc_forced_by_no_pending_msgs_on_ct,
3038 ?facts,
3039 "calculated shard facts"
3040 );
3041
3042 let any_has_shard_block_with_externals =
3043 facts.iter().any(|f| f.has_shard_block_with_externals);
3044
3045 fn choose_candidate(
3046 curr_sid: &ShardIdent,
3047 f: &ShardFact,
3048 mc_forced_by_shard_on_ct: Option<u64>,
3049 any_has_shard_block_with_externals: bool,
3050 hard_forced_for_all: bool,
3051 ) -> Option<u64> {
3052 let ready_to_detect_next_step = f.shard_id == *curr_sid
3055 || matches!(
3056 f.status,
3057 CollationStatus::ReadyToCollateMaster
3058 | CollationStatus::WaitForMasterStatus
3059 | CollationStatus::WaitForShardStatus
3060 );
3061
3062 f.mc_forced_ct
3064 .or(if hard_forced_for_all {
3066 f.first_ct
3067 } else {
3068 None
3069 })
3070 .or(
3071 if any_has_shard_block_with_externals || mc_forced_by_shard_on_ct.is_some() {
3076 if ready_to_detect_next_step {
3077 f.min_ct.map(|min_ct| {
3078 mc_forced_by_shard_on_ct
3079 .map_or(min_ct, |forced_ct| min_ct.max(forced_ct))
3080 })
3081 } else {
3082 None
3083 }
3084 }
3085 else {
3087 f.max_ct
3088 },
3089 )
3090 }
3091
3092 let mut should_collate_by_current_shard = false;
3095
3096 let candidates: Vec<_> = facts
3097 .iter()
3098 .map(|f| {
3099 let ct = choose_candidate(
3100 &shard_id,
3101 f,
3102 mc_forced_by_no_pending_msgs_on_ct,
3103 any_has_shard_block_with_externals,
3104 hard_forced_for_all,
3105 );
3106 if f.shard_id == shard_id && ct.is_some() {
3107 should_collate_by_current_shard = true;
3108 }
3109 ct
3110 })
3111 .collect();
3112
3113 let should_collate_by_every_shard =
3115 !candidates.is_empty() && candidates.iter().all(|c| c.is_some());
3116
3117 tracing::debug!(
3118 target: tracing_targets::COLLATION_MANAGER,
3119 shard_id=?shard_id,
3120 ?candidates,
3121 should_collate_by_current_shard,
3122 should_collate_by_every_shard,
3123 any_has_shard_block_with_externals,
3124 hard_forced_for_all,
3125 mc_forced_by_no_pending_msgs_on_ct,
3126 mc_block_min_interval_ms,
3127 mc_block_max_interval_ms,
3128 "candidates collected"
3129 );
3130
3131 if !should_collate_by_every_shard {
3133 let mut shards_to_resume_attempts = vec![];
3134
3135 let all_other_shards_ready_to_collate = guard.states.iter().all(|(sid, s)| {
3137 *sid == shard_id || s.status == CollationStatus::ReadyToCollateMaster
3138 });
3139
3140 let current_collation_state = guard.states.entry(shard_id).or_default();
3142 if !should_collate_by_current_shard {
3143 if !mc_collation_state_exist {
3144 current_collation_state.status = CollationStatus::WaitForMasterStatus;
3146 } else if shard_id.is_masterchain() && !all_other_shards_ready_to_collate {
3147 current_collation_state.status = CollationStatus::WaitForShardStatus;
3150 } else {
3151 current_collation_state.status = CollationStatus::AttemptsInProgress;
3153 shards_to_resume_attempts.push(shard_id);
3154 }
3155 } else {
3156 current_collation_state.status = CollationStatus::ReadyToCollateMaster;
3158 };
3159
3160 for (sid, collation_state) in guard.states.iter_mut() {
3161 if (shard_id.is_masterchain()
3163 && collation_state.status == CollationStatus::WaitForMasterStatus)
3164 || (!shard_id.is_masterchain()
3166 && collation_state.status == CollationStatus::WaitForShardStatus)
3167 {
3168 collation_state.status = CollationStatus::AttemptsInProgress;
3169 shards_to_resume_attempts.push(*sid);
3170 }
3171 }
3172
3173 let res = if !shards_to_resume_attempts.is_empty() {
3174 NextCollationStep::ResumeAttemptsIn(shards_to_resume_attempts)
3175 } else if shard_id.is_masterchain() {
3176 NextCollationStep::WaitForShardStatus
3177 } else {
3178 NextCollationStep::WaitForMasterStatus
3179 };
3180
3181 tracing::info!(
3182 target: tracing_targets::COLLATION_MANAGER,
3183 shard_id=?shard_id,
3184 ?candidates,
3185 should_collate_by_current_shard,
3186 should_collate_by_every_shard,
3187 any_has_shard_block_with_externals,
3188 hard_forced_for_all,
3189 mc_forced_by_no_pending_msgs_on_ct,
3190 mc_collation_state_exist,
3191 mc_block_min_interval_ms,
3192 mc_block_max_interval_ms,
3193 decision=?res,
3194 "step decision"
3195 );
3196
3197 return res;
3198 }
3199
3200 let next_mc_block_chain_time = candidates.into_iter().flatten().max().unwrap();
3202
3203 for (sid, st) in guard.states.iter_mut() {
3205 st.status = CollationStatus::AttemptsInProgress;
3206
3207 if sid.is_masterchain() {
3211 st.last_imported_anchor_events
3212 .retain(|s| s.ct <= next_mc_block_chain_time);
3213 }
3214 }
3215
3216 Self::renew_mc_block_latest_chain_time(guard, next_mc_block_chain_time);
3218
3219 guard.mc_collation_forced_for_all = false;
3222 guard.mc_forced_by_no_pending_msgs_on_ct = None;
3223
3224 let res = NextCollationStep::CollateMaster(next_mc_block_chain_time);
3225
3226 tracing::info!(
3227 target: tracing_targets::COLLATION_MANAGER,
3228 shard_id=?shard_id,
3229 should_collate_by_current_shard,
3230 should_collate_by_every_shard,
3231 any_has_shard_block_with_externals,
3232 hard_forced_for_all,
3233 mc_forced_by_no_pending_msgs_on_ct,
3234 mc_collation_state_exist,
3235 mc_block_min_interval_ms,
3236 mc_block_max_interval_ms,
3237 decision=?res,
3238 "step decision"
3239 );
3240
3241 res
3242 }
3243
3244 async fn enqueue_mc_block_collation(
3246 &self,
3247 next_mc_block_id_short: BlockIdShort,
3248 next_mc_block_chain_time: u64,
3249 _trigger_block_id_opt: Option<BlockId>,
3250 ) -> Result<()> {
3251 let _histogram = HistogramGuard::begin("tycho_collator_enqueue_mc_block_collation_time");
3252
3253 let Some(mc_collator) = self
3255 .active_collators
3256 .get(&ShardIdent::MASTERCHAIN)
3257 .map(|r| r.collator.clone())
3258 else {
3259 bail!("Masterchain collator is not started yet!");
3260 };
3261
3262 let top_shard_blocks_info = self
3269 .blocks_cache
3270 .get_top_shard_blocks_info_for_mc_block(next_mc_block_id_short)?;
3271
3272 mc_collator
3273 .enqueue_do_collate(top_shard_blocks_info, next_mc_block_chain_time)
3274 .await?;
3275
3276 tracing::info!(target: tracing_targets::COLLATION_MANAGER,
3277 "Master block collation enqueued: (block_id={} ct={})",
3278 next_mc_block_id_short,
3279 next_mc_block_chain_time,
3280 );
3281
3282 Ok(())
3283 }
3284
3285 async fn enqueue_try_collate(&self, shard_id: &ShardIdent) -> Result<()> {
3286 let Some(collator) = self
3288 .active_collators
3289 .get(shard_id)
3290 .map(|r| r.collator.clone())
3291 else {
3292 tracing::warn!(
3293 target: tracing_targets::COLLATION_MANAGER,
3294 "Node does not collate blocks for shard {}",
3295 shard_id,
3296 );
3297 return Ok(());
3298 };
3299
3300 collator.enqueue_try_collate().await?;
3301
3302 tracing::info!(
3303 target: tracing_targets::COLLATION_MANAGER,
3304 "Enqueued next attempt to collate block for shard {}",
3305 shard_id,
3306 );
3307
3308 Ok(())
3309 }
3310
3311 #[tracing::instrument(skip_all, fields(block_id = %block_id.as_short_id()))]
3316 pub async fn handle_validated_master_block(
3317 &self,
3318 block_id: BlockId,
3319 status: ValidationStatus,
3320 ) -> Result<()> {
3321 tracing::debug!(
3322 target: tracing_targets::COLLATION_MANAGER,
3323 is_complete = matches!(&status, ValidationStatus::Complete(_)),
3324 "Start processing block validation result...",
3325 );
3326
3327 let _histogram = HistogramGuard::begin("tycho_collator_handle_validated_master_block_time");
3328
3329 let updated = self
3331 .blocks_cache
3332 .store_master_block_validation_result(&block_id, status);
3333 if !updated {
3334 return Ok(());
3335 }
3336
3337 self.ready_to_sync.notified().await;
3338
3339 self.commit_valid_master_block(&block_id).await?;
3341
3342 self.ready_to_sync.notify_one();
3343
3344 Ok(())
3345 }
3346
3347 async fn commit_valid_master_block(&self, mc_block_id: &BlockId) -> Result<()> {
3357 tracing::debug!(
3358 target: tracing_targets::COLLATION_MANAGER,
3359 "Start to commit validated and valid master block ({})...",
3360 mc_block_id.as_short_id(),
3361 );
3362
3363 scopeguard::defer!(self.blocks_cache.gc_prev_blocks());
3365
3366 let mut top_processed_to_anchor_to_notify = self
3368 .notify_to_mempool_and_process_delayed_mc_state_update(mc_block_id)
3369 .await?;
3370
3371 let histogram = HistogramGuard::begin("tycho_collator_commit_valid_master_block_time");
3372 let histogram_extract =
3373 HistogramGuard::begin("tycho_collator_extract_master_block_subgraph_time");
3374 let mut extract_elapsed = Default::default();
3375 let mut sync_elapsed = Default::default();
3376
3377 match self
3379 .blocks_cache
3380 .extract_mc_block_subgraph_for_sync(mc_block_id)
3381 {
3382 McBlockSubgraphExtract::Extracted(subgraph) => {
3383 extract_elapsed = histogram_extract.finish();
3384
3385 let partitions = subgraph.get_partitions();
3386
3387 let McBlockSubgraph {
3388 master_block,
3389 shard_blocks,
3390 } = subgraph;
3391
3392 let to_blocks_keys = master_block.get_top_blocks_keys()?;
3397 self.blocks_cache.set_gc_to_boundary(&to_blocks_keys);
3398
3399 if matches!(&master_block.data, BlockCacheEntryData::Collated {
3401 received_after_collation: false,
3402 ..
3403 }) {
3404 let histogram =
3405 HistogramGuard::begin("tycho_collator_send_blocks_to_sync_time");
3406
3407 self.send_block_to_sync(master_block.data)?;
3408
3409 for shard_block in shard_blocks {
3410 self.send_block_to_sync(shard_block.data)?;
3411 }
3412
3413 sync_elapsed = histogram.finish();
3414 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
3415 total = sync_elapsed.as_millis(),
3416 "send_blocks_to_sync timings",
3417 );
3418
3419 top_processed_to_anchor_to_notify = None;
3424 } else {
3425 top_processed_to_anchor_to_notify = master_block.top_processed_to_anchor;
3429 }
3430
3431 let _histogram =
3432 HistogramGuard::begin("tycho_collator_send_blocks_to_sync_commit_diffs_time");
3433
3434 Self::commit_block_queue_diff(
3435 self.mq_adapter.clone(),
3436 &master_block.block_id,
3437 &master_block.top_shard_blocks_info,
3438 &partitions,
3439 )?;
3440 }
3441 McBlockSubgraphExtract::AlreadyExtracted => {
3442 tracing::debug!(
3443 target: tracing_targets::COLLATION_MANAGER,
3444 "Master block subgraph is already extracted and cleaned up from cache ({}). Do nothing",
3445 mc_block_id.as_short_id(),
3446 );
3447 }
3448 }
3449
3450 tracing::debug!(target: tracing_targets::COLLATION_MANAGER,
3451 total = histogram.finish().as_millis(),
3452 extract_subgraph = extract_elapsed.as_millis(),
3453 sync = sync_elapsed.as_millis(),
3454 "commit_valid_master_block timings",
3455 );
3456
3457 if let Some(top_processed_to_anchor) = top_processed_to_anchor_to_notify {
3459 self.notify_top_processed_to_anchor_to_mempool(
3460 mc_block_id.seqno,
3461 top_processed_to_anchor,
3462 )
3463 .await?;
3464 }
3465
3466 Ok(())
3467 }
3468
3469 fn send_block_to_sync(&self, data: BlockCacheEntryData) -> Result<()> {
3470 let candidate_stuff = match data {
3471 BlockCacheEntryData::Collated {
3472 candidate_stuff,
3473 status,
3474 received_after_collation: false,
3475 ..
3476 } if status != CandidateStatus::Synced => candidate_stuff,
3477 _ => return Ok(()),
3478 };
3479
3480 let block_id = *candidate_stuff.candidate.block.id();
3481 self.state_node_adapter
3482 .accept_block(candidate_stuff.into_block_for_sync())?;
3483 tracing::debug!(
3484 target: tracing_targets::COLLATION_MANAGER,
3485 "Block was successfully sent to sync ({})",
3486 block_id,
3487 );
3488 Ok(())
3489 }
3490
3491 async fn get_top_blocks_seqno(
3494 mc_block_id: &BlockId,
3495 blocks_cache: &BlocksCache,
3496 state_node_adapter: Arc<dyn StateNodeAdapter>,
3497 ) -> Result<Option<FastHashMap<ShardIdent, BlockSeqno>>> {
3498 let mut result = FastHashMap::default();
3499
3500 result.insert(mc_block_id.shard, mc_block_id.seqno);
3501
3502 if let Some(top_shard_blocks) = blocks_cache.get_top_shard_blocks(mc_block_id.as_short_id())
3504 {
3505 result.extend(top_shard_blocks);
3506 return Ok(Some(result));
3507 }
3508
3509 match state_node_adapter
3511 .load_state(mc_block_id.seqno, mc_block_id, Default::default())
3512 .await
3513 {
3514 Err(err) => match err.downcast_ref::<StateNotFound>() {
3515 Some(_) => {
3516 tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3517 %mc_block_id,
3518 "master state not found in get_top_blocks_seqno",
3519 );
3520 }
3521 _ => {
3522 tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3523 ?err,
3524 "error loading master state in get_top_blocks_seqno",
3525 );
3526 }
3527 },
3528 Ok(state) => {
3529 for item in state.shards()?.latest_blocks() {
3530 let top_sb = item?;
3531 result.insert(top_sb.shard, top_sb.seqno);
3532 }
3533 return Ok(Some(result));
3534 }
3535 }
3536
3537 match state_node_adapter.load_block(mc_block_id).await {
3539 Err(err) => {
3540 tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3541 %mc_block_id,
3542 ?err,
3543 "error loading master block in get_top_blocks_seqno",
3544 );
3545 }
3546 Ok(None) => {
3547 tracing::warn!(target: tracing_targets::COLLATION_MANAGER,
3548 %mc_block_id,
3549 "master block was not found in get_top_blocks_seqno",
3550 );
3551 }
3552 Ok(Some(mc_block_stuff)) => {
3553 let top_blocks = TopBlocks::from_mc_block(&mc_block_stuff)?;
3554 for top_sb in top_blocks.shard_heights().iter() {
3555 result.insert(top_sb.shard, top_sb.seqno);
3556 }
3557 return Ok(Some(result));
3558 }
3559 }
3560
3561 Ok(None)
3563 }
3564}
3565
3566#[derive(Debug)]
3567enum ProcessMcStateUpdateMode {
3568 StartCollation { reset_collators: bool },
3569 SkipCollation,
3570}
3571
3572#[derive(Default)]
3573struct RestoreQueueResult {
3574 last_mc_state: Option<ShardStateStuff>,
3575 prev_mc_state: Option<ShardStateStuff>,
3576 prev_mc_block_id: Option<BlockId>,
3577 synced_to_blocks_keys: Vec<BlockCacheKey>,
3578 applied_diffs_ids: FastHashSet<BlockId>,
3579}
3580
3581trait GlobalCapabilitiesExt {
3583 fn is_subset_of(&self, other: GlobalCapabilities) -> bool;
3586}
3587
3588impl GlobalCapabilitiesExt for GlobalCapabilities {
3589 fn is_subset_of(&self, other: GlobalCapabilities) -> bool {
3590 let this = self.into_inner();
3591 let other = other.into_inner();
3592 this & !other == 0
3593 }
3594}
3595
3596#[derive(Debug)]
3597struct DetectNextCollationStepContext {
3598 pub last_imported_anchor_ct: u64,
3599 pub force_mc_block: ForceMasterCollation,
3600 pub mc_block_min_interval_ms: u64,
3601 pub mc_block_max_interval_ms: u64,
3602 pub collated_block_info: Option<CollatedBlockInfo>,
3603}
3604
3605impl DetectNextCollationStepContext {
3606 pub fn new(
3607 last_imported_anchor_ct: u64,
3608 force_mc_block: ForceMasterCollation,
3609 mc_block_min_interval_ms: u64,
3610 mc_block_max_interval_ms: u64,
3611 collated_block_info: Option<CollatedBlockInfo>,
3612 ) -> Self {
3613 Self {
3614 last_imported_anchor_ct,
3615 force_mc_block,
3616 mc_block_min_interval_ms,
3617 mc_block_max_interval_ms,
3618 collated_block_info,
3619 }
3620 }
3621}